You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/11/08 02:45:43 UTC

git commit: KAFKA-1119 Kafka 0.8.1 overwrites previous per topic config changes; reviewed by Joel Koshy, Guozhang Wang, Swapnil Ghike, Jay Kreps

Updated Branches:
  refs/heads/trunk a700c99ef -> eedbea652


KAFKA-1119 Kafka 0.8.1 overwrites previous per topic config changes; reviewed by Joel Koshy, Guozhang Wang, Swapnil Ghike, Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eedbea65
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eedbea65
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eedbea65

Branch: refs/heads/trunk
Commit: eedbea6526986783257ad0e025c451a8ee3d9095
Parents: a700c99
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Nov 7 17:45:39 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Nov 7 17:45:39 2013 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala | 27 +++++++-----
 .../main/scala/kafka/admin/TopicCommand.scala   | 43 +++++++++++++++-----
 core/src/main/scala/kafka/log/LogConfig.scala   |  2 +-
 .../scala/kafka/server/TopicConfigManager.scala |  1 -
 4 files changed, 50 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eedbea65/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 8107a64..8ff4bd5 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -200,14 +200,21 @@ object AdminUtils extends Logging {
   
   /**
    * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
+   * @param zkClient: The ZkClient handle used to write the new config to zookeeper
+   * @param topic: The topic for which configs are being changed
+   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+   *                 existing configs need to be deleted, it should be done prior to invoking this API
+   *
    */
-  def changeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
-    LogConfig.validate(config)
+  def changeTopicConfig(zkClient: ZkClient, topic: String, configs: Properties) {
     if(!topicExists(zkClient, topic))
       throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
-    
+
+    // remove the topic overrides
+    LogConfig.validate(configs)
+
     // write the new config--may not exist if there were previously no overrides
-    writeTopicConfig(zkClient, topic, config)
+    writeTopicConfig(zkClient, topic, configs)
     
     // create the change notification
     zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic))
@@ -217,14 +224,12 @@ object AdminUtils extends Logging {
    * Write out the topic config to zk, if there is any
    */
   private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
-    if(config.size > 0) {
-      val configMap: mutable.Map[String, String] = {
-        import JavaConversions._
-        config
-      }
-      val map = Map("version" -> 1, "config" -> configMap)
-      ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map))
+    val configMap: mutable.Map[String, String] = {
+      import JavaConversions._
+      config
     }
+    val map = Map("version" -> 1, "config" -> configMap)
+    ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map))
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/eedbea65/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 56f3177..3c08dee 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -23,8 +23,8 @@ import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
 import scala.collection._
 import scala.collection.JavaConversions._
-import kafka.common.Topic
 import kafka.cluster.Broker
+import kafka.log.LogConfig
 
 object TopicCommand {
 
@@ -61,7 +61,7 @@ object TopicCommand {
   def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
     CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
     val topics = opts.options.valuesOf(opts.topicOpt)
-    val configs = parseTopicConfigs(opts)
+    val configs = parseTopicConfigsToBeAdded(opts)
     for (topic <- topics) {
       if (opts.options.has(opts.replicaAssignmentOpt)) {
         val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
@@ -79,8 +79,13 @@ object TopicCommand {
   def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
     CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
     val topic = opts.options.valueOf(opts.topicOpt)
-    if(opts.options.has(opts.configOpt)) {
-      val configs = parseTopicConfigs(opts)
+    if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
+      val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
+      val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
+      // compile the final set of configs
+      val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
+      configs.putAll(configsToBeAdded)
+      configsToBeDeleted.foreach(config => configs.remove(config))
       AdminUtils.changeTopicConfig(zkClient, topic, configs)
       println("Updated config for topic \"%s\".".format(topic))
     }
@@ -147,14 +152,28 @@ object TopicCommand {
   
   def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")"
   
-  def parseTopicConfigs(opts: TopicCommandOptions): Properties = {
-    val configs = opts.options.valuesOf(opts.configOpt).map(_.split("\\s*=\\s*"))
-    require(configs.forall(_.length == 2), "Invalid topic config: all configs must be in the format \"key=val\".")
+  def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = {
+    val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))
+    require(configsToBeAdded.forall(config => config.length == 2),
+      "Invalid topic config: all configs to be added must be in the format \"key=val\".")
     val props = new Properties
-    configs.foreach(pair => props.setProperty(pair(0), pair(1)))
+    configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
+    LogConfig.validate(props)
     props
   }
-  
+
+  def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = {
+    val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.split("""\s*=\s*"""))
+    if(opts.options.has(opts.createOpt))
+      require(configsToBeDeleted.size == 0, "Invalid topic config: all configs on create topic must be in the format \"key=val\".")
+    require(configsToBeDeleted.forall(config => config.length == 1),
+      "Invalid topic config: all configs to be deleted must be in the format \"key\".")
+    val propsToBeDeleted = new Properties
+    configsToBeDeleted.foreach(pair => propsToBeDeleted.setProperty(pair(0).trim, ""))
+    LogConfig.validateNames(propsToBeDeleted)
+    configsToBeDeleted.map(pair => pair(0))
+  }
+
   def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
     val partitionList = replicaAssignmentList.split(",")
     val ret = new mutable.HashMap[Int, List[Int]]()
@@ -184,10 +203,14 @@ object TopicCommand {
                          .withRequiredArg
                          .describedAs("topic")
                          .ofType(classOf[String])
-    val configOpt = parser.accepts("config", "A topic configuration for the topic being created or altered.")
+    val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered.")
                           .withRequiredArg
                           .describedAs("name=value")
                           .ofType(classOf[String])
+    val deleteConfigOpt = parser.accepts("deleteConfig", "A topic configuration override to be removed for an existing topic")
+                          .withRequiredArg
+                          .describedAs("name")
+                          .ofType(classOf[String])
     val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " +
       "altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected")
                            .withRequiredArg

http://git-wip-us.apache.org/repos/asf/kafka/blob/eedbea65/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 51ec796..0b32aee 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -132,7 +132,7 @@ object LogConfig {
   /**
    * Check that property names are valid
    */
-  private def validateNames(props: Properties) {
+  def validateNames(props: Properties) {
     import JavaConversions._
     for(name <- props.keys)
       require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name))

http://git-wip-us.apache.org/repos/asf/kafka/blob/eedbea65/core/src/main/scala/kafka/server/TopicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala
index 56cae58..42e98dd 100644
--- a/core/src/main/scala/kafka/server/TopicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala
@@ -90,7 +90,6 @@ class TopicConfigManager(private val zkClient: ZkClient,
       val now = time.milliseconds
       val logs = logManager.logsByTopicPartition.toBuffer
       val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
-      val lastChangeId = notifications.map(changeNumber).max
       for (notification <- notifications) {
         val changeId = changeNumber(notification)
         if (changeId > lastExecutedChange) {