You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/04/22 00:09:37 UTC
git commit: KAFKA-1356;
follow-up - return unknown topic partition on non-existent topic if
auto.create is off; reviewed by Timothy Chen, Neha Narkhede and Jun Rao.
Repository: kafka
Updated Branches:
refs/heads/0.8.1 69fbdf9cb -> 1e9e107ee
KAFKA-1356; follow-up - return unknown topic partition on non-existent
topic if auto.create is off; reviewed by Timothy Chen, Neha Narkhede and
Jun Rao.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1e9e107e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1e9e107e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1e9e107e
Branch: refs/heads/0.8.1
Commit: 1e9e107ee92b9b8b37c22eb9ecf0db67a445000a
Parents: 69fbdf9
Author: Joel Koshy <jj...@gmail.com>
Authored: Mon Apr 21 14:34:21 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Mon Apr 21 15:09:05 2014 -0700
----------------------------------------------------------------------
.../src/main/scala/kafka/server/KafkaApis.scala | 25 ++++++++++++--------
1 file changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1e9e107e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9e569eb..aabe62d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -651,17 +651,22 @@ class KafkaApis(val requestChannel: RequestChannel,
private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = {
val topicResponses = metadataCache.getTopicMetadata(topics)
- if (topics.size > 0 && topicResponses.size != topics.size && config.autoCreateTopicsEnable) {
- val topicsToBeCreated = topics -- topicResponses.map(_.topic).toSet
- topicResponses.appendAll(topicsToBeCreated.map { topic =>
- try {
- AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
- info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor))
- } catch {
- case e: TopicExistsException => // let it go, possibly another broker created this topic
+ if (topics.size > 0 && topicResponses.size != topics.size) {
+ val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
+ val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
+ if (config.autoCreateTopicsEnable) {
+ try {
+ AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+ info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor))
+ } catch {
+ case e: TopicExistsException => // let it go, possibly another broker created this topic
+ }
+ new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
+ } else {
+ new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
}
- new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
- })
+ }
+ topicResponses.appendAll(responsesForNonExistentTopics)
}
topicResponses