You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/01/09 17:49:14 UTC

git commit: KAFKA-690 TopicMetadataRequest throws exception when no topics are specified, reviewed by Jay Kreps and Neha Narkhede

Updated Branches:
  refs/heads/0.8 85c9e91c8 -> e0b3b6316


KAFKA-690 TopicMetadataRequest throws exception when no topics are specified, reviewed by Jay Kreps and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e0b3b631
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e0b3b631
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e0b3b631

Branch: refs/heads/0.8
Commit: e0b3b6316342271d798af10494c0ceab13e1eb02
Parents: 85c9e91
Author: Neha Narkhede <ne...@gmail.com>
Authored: Wed Jan 9 08:49:11 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Jan 9 08:49:11 2013 -0800

----------------------------------------------------------------------
 .../scala/kafka/api/TopicMetadataRequest.scala     |    1 -
 core/src/main/scala/kafka/server/KafkaApis.scala   |    9 +++-
 .../unit/kafka/integration/TopicMetadataTest.scala |   31 ++++++++++++---
 3 files changed, 32 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e0b3b631/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 5bdb2c1..e659532 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -40,7 +40,6 @@ object TopicMetadataRequest extends Logging {
     for(i <- 0 until numTopics)
       topics += readShortString(buffer)
     val topicsList = topics.toList
-    debug("topic = %s".format(topicsList.head))
     new TopicMetadataRequest(versionId, clientId, topics.toList, correlationId)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0b3b631/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5a85b04..5089a75 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -21,7 +21,6 @@ import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
 import kafka.message._
 import kafka.network._
-import kafka.utils.{Pool, SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
 import kafka.network.RequestChannel.Response
@@ -30,6 +29,7 @@ import java.util.concurrent.atomic._
 import kafka.metrics.KafkaMetricsGroup
 import org.I0Itec.zkclient.ZkClient
 import kafka.common._
+import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
 
 
 /**
@@ -425,7 +425,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
     val config = replicaManager.config
-    val uniqueTopics = metadataRequest.topics.toSet
+    val uniqueTopics = {
+      if(metadataRequest.topics.size > 0)
+        metadataRequest.topics.toSet
+      else
+        ZkUtils.getAllTopics(zkClient).toSet
+    }
     val topicMetadataList = AdminUtils.fetchTopicMetadataFromZk(uniqueTopics, zkClient)
     topicMetadataList.foreach(
       topicAndMetadata => {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0b3b631/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 54a5a06..f9bbfa9 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -70,7 +70,27 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
       0 -> configs.head.brokerId
     )
     TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val topicMetadata = mockLogManagerAndTestTopic(topic)
+    val topicMetadataRequest = new TopicMetadataRequest(List(topic))
+    val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
+    assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
+    val partitionMetadata = topicMetadata.head.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+    assertEquals(1, partitionMetadata.head.replicas.size)
+  }
+
+  def testGetAllTopicMetadata {
+    // create topic
+    val topic = "test"
+    CreateTopicCommand.createTopic(zkClient, topic, 1)
+    // set up leader for topic partition 0
+    val leaderForPartitionMap = Map(
+      0 -> configs.head.brokerId
+    )
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
+    val topicMetadataRequest = new TopicMetadataRequest(List())
+    val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
     assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
     assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
     val partitionMetadata = topicMetadata.head.partitionsMetadata
@@ -83,7 +103,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
     // auto create topic
     val topic = "test"
 
-    val topicMetadata = mockLogManagerAndTestTopic(topic)
+    val topicMetadataRequest = new TopicMetadataRequest(List(topic))
+    val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
     assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
     assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
     val partitionMetadata = topicMetadata.head.partitionsMetadata
@@ -94,16 +115,14 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(ErrorMapping.LeaderNotAvailableCode, partitionMetadata.head.errorCode)
   }
 
-  private def mockLogManagerAndTestTopic(topic: String): Seq[TopicMetadata] = {
+  private def mockLogManagerAndTestTopic(request: TopicMetadataRequest): Seq[TopicMetadata] = {
     // topic metadata request only requires 1 call from the replica manager
     val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
     EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
     EasyMock.replay(replicaManager)
 
-    // create a topic metadata request
-    val topicMetadataRequest = new TopicMetadataRequest(List(topic))
 
-    val serializedMetadataRequest = TestUtils.createRequestByteBuffer(topicMetadataRequest)
+    val serializedMetadataRequest = TestUtils.createRequestByteBuffer(request)
 
     // create the kafka request handler
     val requestChannel = new RequestChannel(2, 5)