You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2015/02/21 01:22:30 UTC

kafka git commit: KAFKA-1867 liveBroker list not updated on a cluster with no topics; reviewed by Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk 38346fb10 -> 8c1b9325b


KAFKA-1867 liveBroker list not updated on a cluster with no topics; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c1b9325
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c1b9325
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c1b9325

Branch: refs/heads/trunk
Commit: 8c1b9325be4adc5065c6dbe3dcdbdccb1887d604
Parents: 38346fb
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Fri Feb 20 16:22:15 2015 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 20 16:22:23 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/controller/ControllerChannelManager.scala   | 8 +++++++-
 .../integration/kafka/api/ProducerFailureHandlingTest.scala | 7 -------
 .../src/test/scala/unit/kafka/server/OffsetCommitTest.scala | 9 +--------
 3 files changed, 8 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8c1b9325/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 14b22ab..c582191 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -269,7 +269,13 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
       else
         givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
     }
-    filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))
+    if(filteredPartitions.isEmpty)
+      brokerIds.filter(b => b >= 0).foreach { brokerId =>
+        updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo])
+      }
+    else
+      filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))
+
     controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c1b9325/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 11d6a97..ba48a63 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -46,13 +46,6 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
       override val zkConnect = TestZKUtils.zookeeperConnect
       override val autoCreateTopicsEnable = false
       override val messageMaxBytes = serverMessageMaxBytes
-      // TODO: Currently, when there is no topic in a cluster, the controller doesn't send any UpdateMetadataRequest to
-      // the broker. As a result, the live broker list in metadataCache is empty. If the number of live brokers is 0, we
-      // try to create the offset topic with the default offsets.topic.replication.factor of 3. The creation will fail
-      // since there is not enough live brokers. This causes testCannotSendToInternalTopic() to fail. Temporarily fixing
-      // the issue by overriding offsets.topic.replication.factor to 1 for now. When we fix KAFKA-1867, we need to
-      // remove the following config override.
-      override val offsetsTopicReplicationFactor = 1.asInstanceOf[Short]
       // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
       // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
       override val offsetsTopicPartitions = 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c1b9325/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 5b93239..a2bb885 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -46,13 +46,6 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
   override def setUp() {
     super.setUp()
     val config: Properties = createBrokerConfig(1, brokerPort)
-    // TODO: Currently, when there is no topic in a cluster, the controller doesn't send any UpdateMetadataRequest to
-    // the broker. As a result, the live broker list in metadataCache is empty. This causes the ConsumerMetadataRequest
-    // to fail since if the number of live brokers is 0, we try to create the offset topic with the default
-    // offsets.topic.replication.factor of 3. The creation will fail since there is not enough live brokers. In order
-    // for the unit test to pass, overriding offsets.topic.replication.factor to 1 for now. When we fix KAFKA-1867, we
-    // need to remove the following config override.
-    config.put("offsets.topic.replication.factor", "1")
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
     time = new MockTime()