You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/12/19 21:54:51 UTC

git commit: KAFKA-1139 Topic data change handling callback should not call syncedRebalance directly; reviewed by Guozhang Wang and Jun Rao

Updated Branches:
  refs/heads/trunk dd58d753c -> b5d16871c


KAFKA-1139 Topic data change handling callback should not call syncedRebalance directly; reviewed by Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b5d16871
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b5d16871
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b5d16871

Branch: refs/heads/trunk
Commit: b5d16871c02a585643aec3229546af04721bb42c
Parents: dd58d75
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Dec 19 12:54:10 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Dec 19 12:54:10 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/admin/TopicCommand.scala   | 112 ++++++++++---------
 .../consumer/ZookeeperConsumerConnector.scala   |  19 ++--
 2 files changed, 72 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b5d16871/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index d25aae3..083fd63 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -25,6 +25,7 @@ import scala.collection._
 import scala.collection.JavaConversions._
 import kafka.cluster.Broker
 import kafka.log.LogConfig
+import kafka.consumer.Whitelist
 
 object TopicCommand {
 
@@ -43,67 +44,79 @@ object TopicCommand {
     CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
     
     val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
-    
-    if(opts.options.has(opts.createOpt))
-      createTopic(zkClient, opts)
-    else if(opts.options.has(opts.alterOpt))
-      alterTopic(zkClient, opts)
-    else if(opts.options.has(opts.deleteOpt))
-      deleteTopic(zkClient, opts)
-    else if(opts.options.has(opts.listOpt))
-      listTopics(zkClient, opts)
-    else if(opts.options.has(opts.describeOpt))
-      describeTopic(zkClient, opts)
 
-    zkClient.close()
+    try {
+      if(opts.options.has(opts.createOpt))
+        createTopic(zkClient, opts)
+      else if(opts.options.has(opts.alterOpt))
+        alterTopic(zkClient, opts)
+      else if(opts.options.has(opts.deleteOpt))
+        deleteTopic(zkClient, opts)
+      else if(opts.options.has(opts.listOpt))
+        listTopics(zkClient, opts)
+      else if(opts.options.has(opts.describeOpt))
+        describeTopic(zkClient, opts)
+    } catch {
+      case e => println("Error while executing topic command", e)
+    } finally {
+      zkClient.close()
+    }
+  }
+
+  private def getTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = {
+    val topicsSpec = opts.options.valueOf(opts.topicOpt)
+    val topicsFilter = new Whitelist(topicsSpec)
+    val allTopics = ZkUtils.getAllTopics(zkClient)
+    allTopics.filter(topicsFilter.isTopicAllowed).sorted
   }
 
   def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
     CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
-    val topics = opts.options.valuesOf(opts.topicOpt)
+    val topic = opts.options.valueOf(opts.topicOpt)
     val configs = parseTopicConfigsToBeAdded(opts)
-    for (topic <- topics) {
-      if (opts.options.has(opts.replicaAssignmentOpt)) {
-        val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
-        AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs)
-      } else {
-        CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
-        val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
-        val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
-        AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
-      }
-      println("Created topic \"%s\".".format(topic))
+    if (opts.options.has(opts.replicaAssignmentOpt)) {
+      val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
+      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs)
+    } else {
+      CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
+      val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
+      val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
+      AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
     }
+    println("Created topic \"%s\".".format(topic))
   }
-  
+
   def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
     CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
-    val topic = opts.options.valueOf(opts.topicOpt)
-    if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
-      val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
-      val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
-      // compile the final set of configs
-      val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
-      configs.putAll(configsToBeAdded)
-      configsToBeDeleted.foreach(config => configs.remove(config))
-      AdminUtils.changeTopicConfig(zkClient, topic, configs)
-      println("Updated config for topic \"%s\".".format(topic))
-    }
-    if(opts.options.has(opts.partitionsOpt)) {
-      println("WARNING: If partitions are increased for a topic that has a key, the partition " +
-        "logic or ordering of the messages will be affected")
-      val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
-      val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
-      AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
-      println("adding partitions succeeded!")
+    val topics = getTopics(zkClient, opts)
+    topics.foreach { topic =>
+      if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
+        val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
+        val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
+        // compile the final set of configs
+        val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
+        configs.putAll(configsToBeAdded)
+        configsToBeDeleted.foreach(config => configs.remove(config))
+        AdminUtils.changeTopicConfig(zkClient, topic, configs)
+        println("Updated config for topic \"%s\".".format(topic))
+      }
+      if(opts.options.has(opts.partitionsOpt)) {
+        println("WARNING: If partitions are increased for a topic that has a key, the partition " +
+          "logic or ordering of the messages will be affected")
+        val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
+        val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
+        AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
+        println("adding partitions succeeded!")
+      }
+      if(opts.options.has(opts.replicationFactorOpt))
+        Utils.croak("Changing the replication factor is not supported.")
     }
-    if(opts.options.has(opts.replicationFactorOpt))
-      Utils.croak("Changing the replication factor is not supported.")
   }
   
   def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
     CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
-    for(topic <- opts.options.valuesOf(opts.topicOpt)) {
+    val topics = getTopics(zkClient, opts)
+    topics.foreach { topic =>
       AdminUtils.deleteTopic(zkClient, topic)
       println("Topic \"%s\" deleted.".format(topic))
     }
@@ -128,9 +141,7 @@ object TopicCommand {
   }
   
   def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
-    var topics: Seq[String] = opts.options.valuesOf(opts.topicOpt).toSeq.sorted
-    if (topics.size <= 0)
-      topics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).sorted
+    var topics = getTopics(zkClient, opts)
     val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false
     val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false
     val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
@@ -212,7 +223,8 @@ object TopicCommand {
     val deleteOpt = parser.accepts("delete", "Delete the topic.")
     val describeOpt = parser.accepts("describe", "List details for the given topics.")
     val helpOpt = parser.accepts("help", "Print usage information.")
-    val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe.")
+    val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe. Can also accept a regular " +
+                                           "expression except for --create option")
                          .withRequiredArg
                          .describedAs("topic")
                          .ofType(classOf[String])

http://git-wip-us.apache.org/repos/asf/kafka/blob/b5d16871/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 0cc236a..703b2e2 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -91,7 +91,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private val messageStreamCreated = new AtomicBoolean(false)
 
   private var sessionExpirationListener: ZKSessionExpireListener = null
-  private var topicPartitionChangeListenner: ZKTopicPartitionChangeListener = null
+  private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null
   private var loadBalancerListener: ZKRebalancerListener = null
 
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
@@ -302,7 +302,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       registerConsumerInZK(dirs, consumerIdString, topicCount)
       // explicitly trigger load balancing for this consumer
       loadBalancerListener.syncedRebalance()
-
       // There is no need to resubscribe to child and state changes.
       // The child change watchers will be set inside rebalance when we read the children list.
     }
@@ -315,9 +314,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     def handleDataChange(dataPath : String, data: Object) {
       try {
         info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering rebalance")
-        // explicitly trigger load balancing for this consumer
-        loadBalancerListener.syncedRebalance()
-
+        // queue up the rebalance event
+        loadBalancerListener.rebalanceEventTriggered()
         // There is no need to re-subscribe the watcher since it will be automatically
         // re-registered upon firing of this event by zkClient
       } catch {
@@ -335,7 +333,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   class ZKRebalancerListener(val group: String, val consumerIdString: String,
                              val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
     extends IZkChildListener {
-    private val correlationId = new AtomicInteger(0)
     private var isWatcherTriggered = false
     private val lock = new ReentrantLock
     private val cond = lock.newCondition()
@@ -367,6 +364,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+      rebalanceEventTriggered()
+    }
+
+    def rebalanceEventTriggered() {
       inLock(lock) {
         isWatcherTriggered = true
         cond.signalAll()
@@ -655,8 +656,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         dirs, consumerIdString, topicCount, loadBalancerListener)
 
     // create listener for topic partition change event if not exist yet
-    if (topicPartitionChangeListenner == null)
-      topicPartitionChangeListenner = new ZKTopicPartitionChangeListener(loadBalancerListener)
+    if (topicPartitionChangeListener == null)
+      topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)
 
     val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
 
@@ -714,7 +715,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     topicStreamsMap.foreach { topicAndStreams =>
       // register on broker partition path changes
       val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
-      zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListenner)
+      zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
     }
 
     // explicitly trigger load balancing for this consumer