You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/12/19 21:54:51 UTC
git commit: KAFKA-1139 Topic data change handling callback should not
call syncedRebalance directly; reviewed by Guozhang Wang and Jun Rao
Updated Branches:
refs/heads/trunk dd58d753c -> b5d16871c
KAFKA-1139 Topic data change handling callback should not call syncedRebalance directly; reviewed by Guozhang Wang and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b5d16871
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b5d16871
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b5d16871
Branch: refs/heads/trunk
Commit: b5d16871c02a585643aec3229546af04721bb42c
Parents: dd58d75
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Dec 19 12:54:10 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Dec 19 12:54:10 2013 -0800
----------------------------------------------------------------------
.../main/scala/kafka/admin/TopicCommand.scala | 112 ++++++++++---------
.../consumer/ZookeeperConsumerConnector.scala | 19 ++--
2 files changed, 72 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b5d16871/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index d25aae3..083fd63 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -25,6 +25,7 @@ import scala.collection._
import scala.collection.JavaConversions._
import kafka.cluster.Broker
import kafka.log.LogConfig
+import kafka.consumer.Whitelist
object TopicCommand {
@@ -43,67 +44,79 @@ object TopicCommand {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
-
- if(opts.options.has(opts.createOpt))
- createTopic(zkClient, opts)
- else if(opts.options.has(opts.alterOpt))
- alterTopic(zkClient, opts)
- else if(opts.options.has(opts.deleteOpt))
- deleteTopic(zkClient, opts)
- else if(opts.options.has(opts.listOpt))
- listTopics(zkClient, opts)
- else if(opts.options.has(opts.describeOpt))
- describeTopic(zkClient, opts)
- zkClient.close()
+ try {
+ if(opts.options.has(opts.createOpt))
+ createTopic(zkClient, opts)
+ else if(opts.options.has(opts.alterOpt))
+ alterTopic(zkClient, opts)
+ else if(opts.options.has(opts.deleteOpt))
+ deleteTopic(zkClient, opts)
+ else if(opts.options.has(opts.listOpt))
+ listTopics(zkClient, opts)
+ else if(opts.options.has(opts.describeOpt))
+ describeTopic(zkClient, opts)
+ } catch {
+ case e => println("Error while executing topic command", e)
+ } finally {
+ zkClient.close()
+ }
+ }
+
+ private def getTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = {
+ val topicsSpec = opts.options.valueOf(opts.topicOpt)
+ val topicsFilter = new Whitelist(topicsSpec)
+ val allTopics = ZkUtils.getAllTopics(zkClient)
+ allTopics.filter(topicsFilter.isTopicAllowed).sorted
}
def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
- val topics = opts.options.valuesOf(opts.topicOpt)
+ val topic = opts.options.valueOf(opts.topicOpt)
val configs = parseTopicConfigsToBeAdded(opts)
- for (topic <- topics) {
- if (opts.options.has(opts.replicaAssignmentOpt)) {
- val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs)
- } else {
- CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
- val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
- val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
- AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
- }
- println("Created topic \"%s\".".format(topic))
+ if (opts.options.has(opts.replicaAssignmentOpt)) {
+ val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs)
+ } else {
+ CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
+ val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
+ val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
+ AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
}
+ println("Created topic \"%s\".".format(topic))
}
-
+
def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
- val topic = opts.options.valueOf(opts.topicOpt)
- if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
- val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
- val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
- // compile the final set of configs
- val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
- configs.putAll(configsToBeAdded)
- configsToBeDeleted.foreach(config => configs.remove(config))
- AdminUtils.changeTopicConfig(zkClient, topic, configs)
- println("Updated config for topic \"%s\".".format(topic))
- }
- if(opts.options.has(opts.partitionsOpt)) {
- println("WARNING: If partitions are increased for a topic that has a key, the partition " +
- "logic or ordering of the messages will be affected")
- val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
- val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
- AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
- println("adding partitions succeeded!")
+ val topics = getTopics(zkClient, opts)
+ topics.foreach { topic =>
+ if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
+ val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
+ val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
+ // compile the final set of configs
+ val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
+ configs.putAll(configsToBeAdded)
+ configsToBeDeleted.foreach(config => configs.remove(config))
+ AdminUtils.changeTopicConfig(zkClient, topic, configs)
+ println("Updated config for topic \"%s\".".format(topic))
+ }
+ if(opts.options.has(opts.partitionsOpt)) {
+ println("WARNING: If partitions are increased for a topic that has a key, the partition " +
+ "logic or ordering of the messages will be affected")
+ val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
+ val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
+ AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
+ println("adding partitions succeeded!")
+ }
+ if(opts.options.has(opts.replicationFactorOpt))
+ Utils.croak("Changing the replication factor is not supported.")
}
- if(opts.options.has(opts.replicationFactorOpt))
- Utils.croak("Changing the replication factor is not supported.")
}
def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
- for(topic <- opts.options.valuesOf(opts.topicOpt)) {
+ val topics = getTopics(zkClient, opts)
+ topics.foreach { topic =>
AdminUtils.deleteTopic(zkClient, topic)
println("Topic \"%s\" deleted.".format(topic))
}
@@ -128,9 +141,7 @@ object TopicCommand {
}
def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
- var topics: Seq[String] = opts.options.valuesOf(opts.topicOpt).toSeq.sorted
- if (topics.size <= 0)
- topics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).sorted
+ var topics = getTopics(zkClient, opts)
val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false
val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false
val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
@@ -212,7 +223,8 @@ object TopicCommand {
val deleteOpt = parser.accepts("delete", "Delete the topic.")
val describeOpt = parser.accepts("describe", "List details for the given topics.")
val helpOpt = parser.accepts("help", "Print usage information.")
- val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe.")
+ val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe. Can also accept a regular " +
+ "expression except for --create option")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
http://git-wip-us.apache.org/repos/asf/kafka/blob/b5d16871/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 0cc236a..703b2e2 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -91,7 +91,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private val messageStreamCreated = new AtomicBoolean(false)
private var sessionExpirationListener: ZKSessionExpireListener = null
- private var topicPartitionChangeListenner: ZKTopicPartitionChangeListener = null
+ private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null
private var loadBalancerListener: ZKRebalancerListener = null
private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
@@ -302,7 +302,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
registerConsumerInZK(dirs, consumerIdString, topicCount)
// explicitly trigger load balancing for this consumer
loadBalancerListener.syncedRebalance()
-
// There is no need to resubscribe to child and state changes.
// The child change watchers will be set inside rebalance when we read the children list.
}
@@ -315,9 +314,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def handleDataChange(dataPath : String, data: Object) {
try {
info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering rebalance")
- // explicitly trigger load balancing for this consumer
- loadBalancerListener.syncedRebalance()
-
+ // queue up the rebalance event
+ loadBalancerListener.rebalanceEventTriggered()
// There is no need to re-subscribe the watcher since it will be automatically
// re-registered upon firing of this event by zkClient
} catch {
@@ -335,7 +333,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
class ZKRebalancerListener(val group: String, val consumerIdString: String,
val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
extends IZkChildListener {
- private val correlationId = new AtomicInteger(0)
private var isWatcherTriggered = false
private val lock = new ReentrantLock
private val cond = lock.newCondition()
@@ -367,6 +364,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+ rebalanceEventTriggered()
+ }
+
+ def rebalanceEventTriggered() {
inLock(lock) {
isWatcherTriggered = true
cond.signalAll()
@@ -655,8 +656,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
dirs, consumerIdString, topicCount, loadBalancerListener)
// create listener for topic partition change event if not exist yet
- if (topicPartitionChangeListenner == null)
- topicPartitionChangeListenner = new ZKTopicPartitionChangeListener(loadBalancerListener)
+ if (topicPartitionChangeListener == null)
+ topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)
val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
@@ -714,7 +715,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
topicStreamsMap.foreach { topicAndStreams =>
// register on broker partition path changes
val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
- zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListenner)
+ zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
}
// explicitly trigger load balancing for this consumer