You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/04/22 00:09:37 UTC

git commit: KAFKA-1356; follow-up - return unknown topic partition on non-existent topic if auto.create is off; reviewed by Timothy Chen, Neha Narkhede and Jun Rao.

Repository: kafka
Updated Branches:
  refs/heads/0.8.1 69fbdf9cb -> 1e9e107ee


KAFKA-1356; follow-up - return unknown topic partition on non-existent
topic if auto.create is off; reviewed by Timothy Chen, Neha Narkhede and
Jun Rao.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1e9e107e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1e9e107e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1e9e107e

Branch: refs/heads/0.8.1
Commit: 1e9e107ee92b9b8b37c22eb9ecf0db67a445000a
Parents: 69fbdf9
Author: Joel Koshy <jj...@gmail.com>
Authored: Mon Apr 21 14:34:21 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Mon Apr 21 15:09:05 2014 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala | 25 ++++++++++++--------
 1 file changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1e9e107e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9e569eb..aabe62d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -651,17 +651,22 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = {
     val topicResponses = metadataCache.getTopicMetadata(topics)
-    if (topics.size > 0 && topicResponses.size != topics.size && config.autoCreateTopicsEnable) {
-      val topicsToBeCreated = topics -- topicResponses.map(_.topic).toSet
-      topicResponses.appendAll(topicsToBeCreated.map { topic =>
-        try {
-          AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
-          info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor))
-        } catch {
-          case e: TopicExistsException => // let it go, possibly another broker created this topic
+    if (topics.size > 0 && topicResponses.size != topics.size) {
+      val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
+      val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
+        if (config.autoCreateTopicsEnable) {
+          try {
+            AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+            info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor))
+          } catch {
+            case e: TopicExistsException => // let it go, possibly another broker created this topic
+          }
+          new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
+        } else {
+          new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
         }
-        new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
-      })
+      }
+      topicResponses.appendAll(responsesForNonExistentTopics)
     }
 
     topicResponses