You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/08/05 00:11:44 UTC

kafka git commit: kafka-2205; Generalize TopicConfigManager to handle multiple entity configs; patched by Aditya Auradkar; reviewed Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 1a0179f21 -> a56a79055


kafka-2205; Generalize TopicConfigManager to handle multiple entity configs; patched by Aditya Auradkar; reviewed Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a56a7905
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a56a7905
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a56a7905

Branch: refs/heads/trunk
Commit: a56a79055dfba4687f476b0a4d20aeec1c4ebff7
Parents: 1a0179f
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Tue Aug 4 15:11:27 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Aug 4 15:11:27 2015 -0700

----------------------------------------------------------------------
 bin/kafka-configs.sh                            |  17 ++
 .../src/main/scala/kafka/admin/AdminUtils.scala |  77 +++++---
 .../main/scala/kafka/admin/ConfigCommand.scala  | 174 ++++++++++++++++++
 .../main/scala/kafka/admin/TopicCommand.scala   |  39 +---
 .../main/scala/kafka/cluster/Partition.scala    |   5 +-
 .../kafka/controller/KafkaController.scala      |   7 +-
 .../controller/PartitionLeaderSelector.scala    |   6 +-
 .../kafka/controller/TopicDeletionManager.scala |   5 +-
 .../main/scala/kafka/server/ConfigHandler.scala |  69 +++++++
 .../kafka/server/DynamicConfigManager.scala     | 183 +++++++++++++++++++
 .../main/scala/kafka/server/KafkaServer.scala   |  15 +-
 .../kafka/server/ReplicaFetcherThread.scala     |   4 +-
 .../scala/kafka/server/TopicConfigManager.scala | 152 ---------------
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  33 +++-
 .../test/scala/unit/kafka/admin/AdminTest.scala |   8 +-
 .../unit/kafka/admin/ConfigCommandTest.scala    |  73 ++++++++
 .../unit/kafka/admin/TopicCommandTest.scala     |  13 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |  83 ++++++++-
 18 files changed, 718 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/bin/kafka-configs.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-configs.sh b/bin/kafka-configs.sh
new file mode 100755
index 0000000..417eaf5
--- /dev/null
+++ b/bin/kafka-configs.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConfigCommand $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 4cc2376..9966660 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -21,6 +21,7 @@ import kafka.common._
 import kafka.cluster.{BrokerEndPoint, Broker}
 
 import kafka.log.LogConfig
+import kafka.server.ConfigType
 import kafka.utils._
 import kafka.api.{TopicMetadata, PartitionMetadata}
 
@@ -40,10 +41,8 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
 
 object AdminUtils extends Logging {
   val rand = new Random
-
   val AdminClientId = "__admin_client"
-
-  val TopicConfigChangeZnodePrefix = "config_change_"
+  val EntityConfigChangeZnodePrefix = "config_change_"
 
   /**
    * There are 2 goals of replica assignment:
@@ -103,14 +102,12 @@ object AdminUtils extends Logging {
   * @param numPartitions Number of partitions to be set
   * @param replicaAssignmentStr Manual replica assignment
   * @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
-  * @param config Pre-existing properties that should be preserved
   */
   def addPartitions(zkClient: ZkClient,
                     topic: String,
                     numPartitions: Int = 1,
                     replicaAssignmentStr: String = "",
-                    checkBrokerAvailable: Boolean = true,
-                    config: Properties = new Properties) {
+                    checkBrokerAvailable: Boolean = true) {
     val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
     if (existingPartitionsReplicaList.size == 0)
       throw new AdminOperationException("The topic %s does not exist".format(topic))
@@ -137,7 +134,7 @@ object AdminUtils extends Logging {
     val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
     // add the new list
     partitionReplicaList ++= newPartitionReplicaList
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, config, true)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
   }
 
   def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = {
@@ -238,7 +235,7 @@ object AdminUtils extends Logging {
     val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig)
   }
-                  
+
   def createOrUpdateTopicPartitionAssignmentPathInZK(zkClient: ZkClient,
                                                      topic: String,
                                                      partitionReplicaAssignment: Map[Int, Seq[Int]],
@@ -246,7 +243,6 @@ object AdminUtils extends Logging {
                                                      update: Boolean = false) {
     // validate arguments
     Topic.validate(topic)
-    LogConfig.validate(config)
     require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
 
     val topicPath = ZkUtils.getTopicPath(topic)
@@ -264,10 +260,14 @@ object AdminUtils extends Logging {
     }
 
     partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: "  + partitionReplicaAssignment))
-    
-    // write out the config if there is any, this isn't transactional with the partition assignments
-    writeTopicConfig(zkClient, topic, config)
-    
+
+    // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
+    if (!update) {
+      // write out the config if there is any, this isn't transactional with the partition assignments
+      LogConfig.validate(config)
+      writeEntityConfig(zkClient, ConfigType.Topic, topic, config)
+    }
+
     // create the partition assignment
     writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update)
   }
@@ -290,7 +290,19 @@ object AdminUtils extends Logging {
       case e2: Throwable => throw new AdminOperationException(e2.toString)
     }
   }
-  
+
+  /**
+   * Update the config for a client and create a change notification so the change will propagate to other brokers
+   * @param zkClient: The ZkClient handle used to write the new config to zookeeper
+   * @param clientId: The clientId for which configs are being changed
+   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+   *                 existing configs need to be deleted, it should be done prior to invoking this API
+   *
+   */
+  def changeClientIdConfig(zkClient: ZkClient, clientId: String, configs: Properties) {
+    changeEntityConfig(zkClient, ConfigType.Client, clientId, configs)
+  }
+
   /**
    * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
    * @param zkClient: The ZkClient handle used to write the new config to zookeeper
@@ -302,34 +314,42 @@ object AdminUtils extends Logging {
   def changeTopicConfig(zkClient: ZkClient, topic: String, configs: Properties) {
     if(!topicExists(zkClient, topic))
       throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
-
     // remove the topic overrides
     LogConfig.validate(configs)
+    changeEntityConfig(zkClient, ConfigType.Topic, topic, configs)
+  }
 
+  private def changeEntityConfig(zkClient: ZkClient, entityType: String, entityName: String, configs: Properties) {
     // write the new config--may not exist if there were previously no overrides
-    writeTopicConfig(zkClient, topic, configs)
-    
+    writeEntityConfig(zkClient, entityType, entityName, configs)
+
     // create the change notification
-    zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic))
+    val seqNode = ZkUtils.EntityConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
+    val content = Json.encode(getConfigChangeZnodeData(entityType, entityName))
+    zkClient.createPersistentSequential(seqNode, content)
   }
-  
+
+  def getConfigChangeZnodeData(entityType: String, entityName: String) : Map[String, Any] = {
+    Map("version" -> 1, "entity_type" -> entityType, "entity_name" -> entityName)
+  }
+
   /**
    * Write out the topic config to zk, if there is any
    */
-  private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
+  private def writeEntityConfig(zkClient: ZkClient, entityType: String, entityName: String, config: Properties) {
     val configMap: mutable.Map[String, String] = {
       import JavaConversions._
       config
     }
     val map = Map("version" -> 1, "config" -> configMap)
-    ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map))
+    ZkUtils.updatePersistentPath(zkClient, ZkUtils.getEntityConfigPath(entityType, entityName), Json.encode(map))
   }
   
   /**
-   * Read the topic config (if any) from zk
+   * Read the entity (topic or client) config (if any) from zk
    */
-  def fetchTopicConfig(zkClient: ZkClient, topic: String): Properties = {
-    val str: String = zkClient.readData(ZkUtils.getTopicConfigPath(topic), true)
+  def fetchEntityConfig(zkClient: ZkClient, entityType: String, entity: String): Properties = {
+    val str: String = zkClient.readData(ZkUtils.getEntityConfigPath(entityType, entity), true)
     val props = new Properties()
     if(str != null) {
       Json.parseFull(str) match {
@@ -343,19 +363,20 @@ object AdminUtils extends Logging {
                 configTup match {
                   case (k: String, v: String) =>
                     props.setProperty(k, v)
-                  case _ => throw new IllegalArgumentException("Invalid topic config: " + str)
+                  case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str)
                 }
-            case _ => throw new IllegalArgumentException("Invalid topic config: " + str)
+            case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str)
           }
 
-        case o => throw new IllegalArgumentException("Unexpected value in config: "  + str)
+        case o => throw new IllegalArgumentException("Unexpected value in config:(%s), entity_type: (%s), entity: (%s)"
+                                                             .format(str, entityType, entity))
       }
     }
     props
   }
 
   def fetchAllTopicConfigs(zkClient: ZkClient): Map[String, Properties] =
-    ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchTopicConfig(zkClient, topic))).toMap
+    ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchEntityConfig(zkClient, ConfigType.Topic, topic))).toMap
 
   def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
     fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
new file mode 100644
index 0000000..2759476
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple._
+import java.util.Properties
+import kafka.log.LogConfig
+import kafka.server.ConfigType
+import kafka.utils.{ZkUtils, CommandLineUtils}
+import org.I0Itec.zkclient.ZkClient
+import scala.collection._
+import scala.collection.JavaConversions._
+import org.apache.kafka.common.utils.Utils
+
+
+/**
+ * This script can be used to change configs for topics/clients dynamically
+ */
+object ConfigCommand {
+
+  def main(args: Array[String]): Unit = {
+
+    val opts = new ConfigCommandOptions(args)
+
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity (topic/client) configs")
+
+    opts.checkArgs()
+
+    val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
+
+    try {
+      if (opts.options.has(opts.alterOpt))
+        alterConfig(zkClient, opts)
+      else if (opts.options.has(opts.describeOpt))
+        describeConfig(zkClient, opts)
+    } catch {
+      case e: Throwable =>
+        println("Error while executing topic command " + e.getMessage)
+        println(Utils.stackTrace(e))
+    } finally {
+      zkClient.close()
+    }
+  }
+
+  private def alterConfig(zkClient: ZkClient, opts: ConfigCommandOptions) {
+    val configsToBeAdded = parseConfigsToBeAdded(opts)
+    val configsToBeDeleted = parseConfigsToBeDeleted(opts)
+    val entityType = opts.options.valueOf(opts.entityType)
+    val entityName = opts.options.valueOf(opts.entityName)
+
+    // compile the final set of configs
+    val configs = AdminUtils.fetchEntityConfig(zkClient, entityType, entityName)
+    configs.putAll(configsToBeAdded)
+    configsToBeDeleted.foreach(config => configs.remove(config))
+
+    if (entityType.equals(ConfigType.Topic)) {
+      AdminUtils.changeTopicConfig(zkClient, entityName, configs)
+      println("Updated config for topic: \"%s\".".format(entityName))
+    } else {
+      AdminUtils.changeClientIdConfig(zkClient, entityName, configs)
+      println("Updated config for clientId: \"%s\".".format(entityName))
+    }
+  }
+
+  private def describeConfig(zkClient: ZkClient, opts: ConfigCommandOptions) {
+    val entityType = opts.options.valueOf(opts.entityType)
+    val entityNames: Seq[String] =
+      if (opts.options.has(opts.entityName))
+        Seq(opts.options.valueOf(opts.entityName))
+      else
+        ZkUtils.getAllEntitiesWithConfig(zkClient, entityType)
+
+    for (entityName <- entityNames) {
+      val configs = AdminUtils.fetchEntityConfig(zkClient, entityType, entityName)
+      println("Configs for %s:%s are %s"
+                      .format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
+    }
+  }
+
+  private[admin] def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
+    val configsToBeAdded = opts.options.valuesOf(opts.addedConfig).map(_.split("""\s*=\s*"""))
+    require(configsToBeAdded.forall(config => config.length == 2),
+            "Invalid entity config: all configs to be added must be in the format \"key=val\".")
+    val props = new Properties
+    configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
+    props
+  }
+
+  private[admin] def parseConfigsToBeDeleted(opts: ConfigCommandOptions): Seq[String] = {
+    if (opts.options.has(opts.deletedConfig)) {
+      val configsToBeDeleted = opts.options.valuesOf(opts.deletedConfig).map(_.trim())
+      val propsToBeDeleted = new Properties
+      configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
+      configsToBeDeleted
+    }
+    else
+      Seq.empty
+  }
+
+  class ConfigCommandOptions(args: Array[String]) {
+    val parser = new OptionParser
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+            "Multiple URLS can be given to allow fail-over.")
+            .withRequiredArg
+            .describedAs("urls")
+            .ofType(classOf[String])
+    val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.")
+    val describeOpt = parser.accepts("describe", "List configs for the given entity.")
+    val entityType = parser.accepts("entity-type", "Type of entity (topic/client)")
+            .withRequiredArg
+            .ofType(classOf[String])
+    val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id)")
+            .withRequiredArg
+            .ofType(classOf[String])
+
+    val nl = System.getProperty("line.separator")
+    val addedConfig = parser.accepts("added-config", "Key Value pairs configs to add 'k1=v1,k2=v2'. The following is a list of valid configurations: " +
+            "For entity_type '" + ConfigType.Topic + "': " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
+            "For entity_type '" + ConfigType.Client + "' currently no configs are processed by the brokers")
+            .withRequiredArg
+            .ofType(classOf[String])
+            .withValuesSeparatedBy(',')
+    val deletedConfig = parser.accepts("deleted-config", "config keys to remove 'k1,k2'")
+            .withRequiredArg
+            .ofType(classOf[String])
+            .withValuesSeparatedBy(',')
+    val helpOpt = parser.accepts("help", "Print usage information.")
+    val options = parser.parse(args : _*)
+
+    val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, entityName, addedConfig, deletedConfig, helpOpt)
+
+    def checkArgs() {
+      // should have exactly one action
+      val actions = Seq(alterOpt, describeOpt).count(options.has _)
+      if(actions != 1)
+        CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --describe, --alter")
+
+      // check required args
+      CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType)
+      CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addedConfig, deletedConfig))
+      if(options.has(alterOpt)) {
+        if(! options.has(entityName))
+          throw new IllegalArgumentException("--entity-name must be specified with --alter")
+
+        val isAddedPresent: Boolean = options.has(addedConfig)
+        val isDeletedPresent: Boolean = options.has(deletedConfig)
+        if(! isAddedPresent && ! isDeletedPresent)
+          throw new IllegalArgumentException("At least one of --added-config or --deleted-config must be specified with --alter")
+      }
+      val entityTypeVal = options.valueOf(entityType)
+      if(! entityTypeVal.equals(ConfigType.Topic) && ! entityTypeVal.equals(ConfigType.Client)) {
+        throw new IllegalArgumentException("--entity-type must be '%s' or '%s'".format(ConfigType.Topic, ConfigType.Client))
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 4e28bf1..f1405a5 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -20,6 +20,7 @@ package kafka.admin
 import joptsimple._
 import java.util.Properties
 import kafka.common.{Topic, AdminCommandFailedException}
+import kafka.utils.CommandLineUtils
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -27,6 +28,7 @@ import scala.collection._
 import scala.collection.JavaConversions._
 import kafka.log.LogConfig
 import kafka.consumer.Whitelist
+import kafka.server.{ConfigType, OffsetManager}
 import org.apache.kafka.common.utils.Utils
 import kafka.coordinator.ConsumerCoordinator
 
@@ -106,16 +108,6 @@ object TopicCommand extends Logging {
           opts.options.valueOf(opts.zkConnectOpt)))
     }
     topics.foreach { topic =>
-      val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
-      if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
-        val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
-        val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
-        // compile the final set of configs
-        configs.putAll(configsToBeAdded)
-        configsToBeDeleted.foreach(config => configs.remove(config))
-        AdminUtils.changeTopicConfig(zkClient, topic, configs)
-        println("Updated config for topic \"%s\".".format(topic))
-      }
       if(opts.options.has(opts.partitionsOpt)) {
         if (topic == ConsumerCoordinator.OffsetsTopicName) {
           throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
@@ -124,7 +116,7 @@ object TopicCommand extends Logging {
           "logic or ordering of the messages will be affected")
         val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
         val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
-        AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr, config = configs)
+        AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
         println("Adding partitions succeeded!")
       }
     }
@@ -180,7 +172,7 @@ object TopicCommand extends Logging {
           val describePartitions: Boolean = !reportOverriddenConfigs
           val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
           if (describeConfigs) {
-            val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
+            val configs = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
             if (!reportOverriddenConfigs || configs.size() != 0) {
               val numPartitions = topicPartitionAssignment.size
               val replicationFactor = topicPartitionAssignment.head._2.size
@@ -219,18 +211,6 @@ object TopicCommand extends Logging {
     props
   }
 
-  def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = {
-    if (opts.options.has(opts.deleteConfigOpt)) {
-      val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.trim())
-      val propsToBeDeleted = new Properties
-      configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
-      LogConfig.validateNames(propsToBeDeleted)
-      configsToBeDeleted
-    }
-    else
-      Seq.empty
-  }
-
   def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
     val partitionList = replicaAssignmentList.split(",")
     val ret = new mutable.HashMap[Int, List[Int]]()
@@ -256,7 +236,7 @@ object TopicCommand extends Logging {
     val listOpt = parser.accepts("list", "List all available topics.")
     val createOpt = parser.accepts("create", "Create a new topic.")
     val deleteOpt = parser.accepts("delete", "Delete a topic")
-    val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.")
+    val alterOpt = parser.accepts("alter", "Alter the number of partitions and/or replica assignment for a topic")
     val describeOpt = parser.accepts("describe", "List details for the given topics.")
     val helpOpt = parser.accepts("help", "Print usage information.")
     val topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " +
@@ -265,16 +245,12 @@ object TopicCommand extends Logging {
                          .describedAs("topic")
                          .ofType(classOf[String])
     val nl = System.getProperty("line.separator")
-    val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered."  + 
+    val configOpt = parser.accepts("config", "A configuration override for the topic being created."  +
                                                          "The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
                                                          "See the Kafka documentation for full details on the topic configs.")
                           .withRequiredArg
                           .describedAs("name=value")
                           .ofType(classOf[String])
-    val deleteConfigOpt = parser.accepts("delete-config", "A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option).")
-                          .withRequiredArg
-                          .describedAs("name")
-                          .ofType(classOf[String])
     val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " +
       "altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected")
                            .withRequiredArg
@@ -308,10 +284,11 @@ object TopicCommand extends Logging {
 
       // check invalid args
       CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(createOpt,alterOpt))
+      // Topic configs cannot be changed with alterTopic
+      CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(configOpt))
       if(options.has(createOpt))
           CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 2649090..511d3c9 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,7 +22,7 @@ import kafka.utils.CoreUtils.{inReadLock,inWriteLock}
 import kafka.admin.AdminUtils
 import kafka.api.{PartitionStateInfo, LeaderAndIsr}
 import kafka.log.LogConfig
-import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, LogReadResult, ReplicaManager}
+import kafka.server._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
 import kafka.message.ByteBufferMessageSet
@@ -86,7 +86,8 @@ class Partition(val topic: String,
       case Some(replica) => replica
       case None =>
         if (isReplicaLocal(replicaId)) {
-          val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchTopicConfig(zkClient, topic))
+          val config = LogConfig.fromProps(logManager.defaultConfig.originals,
+                                           AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic))
           val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
           val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
           val offsetMap = checkpoint.read

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index b4fc755..6844602 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1037,8 +1037,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
             // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
             // is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can
             // eventually be restored as the leader.
-            if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(zkClient,
-              topicAndPartition.topic)).uncleanLeaderElectionEnable) {
+            if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(zkClient,
+              ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
               info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
               newIsr = leaderAndIsr.isr
             }
@@ -1322,7 +1322,8 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
       processUpdateNotifications(topicAndPartitions)
 
       // delete processed children
-      childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient, ZkUtils.TopicConfigChangesPath + "/" + x))
+      childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient,
+                                                  ZkUtils.getEntityConfigPath(ConfigType.Topic, x)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index bb6b5c8..4ebeb5a 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -21,7 +21,7 @@ import kafka.api.LeaderAndIsr
 import kafka.log.LogConfig
 import kafka.utils.Logging
 import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
-import kafka.server.KafkaConfig
+import kafka.server.{ConfigType, KafkaConfig}
 
 trait PartitionLeaderSelector {
 
@@ -61,8 +61,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
           case true =>
             // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
             // for unclean leader election.
-            if (!LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
-              topicAndPartition.topic)).uncleanLeaderElectionEnable) {
+            if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkClient,
+              ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
               throw new NoReplicaOnlineException(("No broker in ISR for partition " +
                 "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
                 " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 64ecb49..64b11df 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -16,6 +16,9 @@
  */
 package kafka.controller
 
+
+import kafka.server.ConfigType
+
 import collection.mutable
 import kafka.utils.{ShutdownableThread, Logging, ZkUtils}
 import kafka.utils.CoreUtils._
@@ -284,7 +287,7 @@ class TopicDeletionManager(controller: KafkaController,
     topicsToBeDeleted -= topic
     partitionsToBeDeleted.retain(_.topic != topic)
     controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
-    controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
+    controllerContext.zkClient.deleteRecursive(ZkUtils.getEntityConfigPath(ConfigType.Topic, topic))
     controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))
     controllerContext.removeTopic(topic)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
new file mode 100644
index 0000000..8347a69
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.Properties
+
+import kafka.common.TopicAndPartition
+import kafka.log.{Log, LogConfig, LogManager}
+import kafka.utils.Pool
+
+import scala.collection.mutable
+
+/**
+ * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
+ */
+trait ConfigHandler {
+  def processConfigChanges(entityName : String, value : Properties)
+}
+
+/**
+ * The TopicConfigHandler will process topic config changes in ZK.
+ * The callback provides the topic name and the full properties set read from ZK
+ */
+class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler{
+
+  def processConfigChanges(topic : String, topicConfig : Properties) {
+    val logs: mutable.Buffer[(TopicAndPartition, Log)] = logManager.logsByTopicPartition.toBuffer
+    val logsByTopic: Map[String, mutable.Buffer[Log]] = logs.groupBy{ case (topicAndPartition, log) => topicAndPartition.topic }
+            .mapValues{ case v: mutable.Buffer[(TopicAndPartition, Log)] => v.map(_._2) }
+
+    if (logsByTopic.contains(topic)) {
+      /* combine the default properties with the overrides in zk to create the new LogConfig */
+      val props = new Properties()
+      props.putAll(logManager.defaultConfig.originals)
+      props.putAll(topicConfig)
+      val logConfig = LogConfig(props)
+      for (log <- logsByTopic(topic))
+        log.config = logConfig
+    }
+  }
+}
+
+/**
+ * The ClientIdConfigHandler will process clientId config changes in ZK.
+ * The callback provides the clientId and the full properties set read from ZK.
+ * This implementation does nothing currently. In the future, it will change quotas per client
+ */
+class ClientIdConfigHandler extends ConfigHandler {
+  val configPool = new Pool[String, Properties]()
+
+  def processConfigChanges(clientId : String, clientConfig : Properties): Unit = {
+    configPool.put(clientId, clientConfig)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
new file mode 100644
index 0000000..a66fb75
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Json
+import kafka.utils.Logging
+import kafka.utils.SystemTime
+import kafka.utils.Time
+import kafka.utils.ZkUtils
+
+import scala.collection._
+import kafka.admin.AdminUtils
+import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+
+
+/**
+ * Represents all the entities that can be configured via ZK
+ */
+object ConfigType {
+  val Topic = "topic"
+  val Client = "client"
+}
+
+/**
+ * This class initiates and carries out config changes for all entities defined in ConfigType.
+ *
+ * It works as follows.
+ *
+ * Config is stored under the path: /config/entityType/entityName
+ *   E.g. /config/topics/<topic_name> and /config/clients/<clientId>
+ * This znode stores the overrides for this entity (but no defaults) in properties format.
+ *
+ * To avoid watching all topics for changes instead we have a notification path
+ *   /config/changes
+ * The DynamicConfigManager has a child watch on this path.
+ *
+ * To update a config we first update the config properties. Then we create a new sequential
+ * znode under the change path which contains the name of the entityType and entityName that was updated, say
+ *   /config/changes/config_change_13321
+ * The sequential znode contains data in this format: {"version" : 1, "entityType":"topic/client", "entityName" : "topic_name/client_id"}
+ * This is just a notification--the actual config change is stored only once under the /config/entityType/entityName path.
+ *
+ * This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications.
+ * It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds
+ * it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification.
+ * For any new changes it reads the new configuration, combines it with the defaults, and updates the existing config.
+ *
+ * Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is
+ * down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that
+ * if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the
+ * broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice,
+ * but that is harmless.
+ *
+ * On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions
+ * on startup where a change might be missed between the initial config load and registering for change notifications.
+ *
+ */
+class DynamicConfigManager(private val zkClient: ZkClient,
+                           private val configHandler : Map[String, ConfigHandler],
+                           private val changeExpirationMs: Long = 15*60*1000,
+                           private val time: Time = SystemTime) extends Logging {
+  private var lastExecutedChange = -1L
+
+  /**
+   * Begin watching for config changes
+   */
+  def startup() {
+    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.EntityConfigChangesPath)
+    zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener)
+    processAllConfigChanges()
+  }
+
+  /**
+   * Process all config changes
+   */
+  private def processAllConfigChanges() {
+    val configChanges = zkClient.getChildren(ZkUtils.EntityConfigChangesPath)
+    import JavaConversions._
+    processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
+  }
+
+  /**
+   * Process the given list of config changes
+   */
+  private def processConfigChanges(notifications: Seq[String]) {
+    if (notifications.size > 0) {
+      info("Processing config change notification(s)...")
+      val now = time.milliseconds
+      for (notification <- notifications) {
+        val changeId = changeNumber(notification)
+
+        if (changeId > lastExecutedChange) {
+          val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
+
+          val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
+          processNotification(jsonOpt)
+        }
+        lastExecutedChange = changeId
+      }
+      purgeObsoleteNotifications(now, notifications)
+    }
+  }
+
+  def processNotification(jsonOpt: Option[String]) = {
+    if(jsonOpt.isDefined) {
+      val json = jsonOpt.get
+      Json.parseFull(json) match {
+        case None => // There are no config overrides.
+          // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
+        case Some(mapAnon: Map[_, _]) =>
+          val map = mapAnon collect
+                  { case (k: String, v: Any) => k -> v }
+          require(map("version") == 1)
+
+          val entityType = map.get("entity_type") match {
+            case Some(ConfigType.Topic) => ConfigType.Topic
+            case Some(ConfigType.Client) => ConfigType.Client
+            case _ => throw new IllegalArgumentException("Config change notification must have 'entity_type' set to either 'client' or 'topic'." +
+                    " Received: " + json)
+          }
+
+          val entity = map.get("entity_name") match {
+            case Some(value: String) => value
+            case _ => throw new IllegalArgumentException("Config change notification does not specify 'entity_name'. Received: " + json)
+          }
+          configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkClient, entityType, entity))
+
+        case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
+                                                             "{\"version\" : 1," +
+                                                             " \"entity_type\":\"topic/client\"," +
+                                                             " \"entity_name\" : \"topic_name/client_id\"}." +
+                                                             " Received: " + json)
+      }
+    }
+  }
+
+  private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
+    for(notification <- notifications.sorted) {
+      val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.EntityConfigChangesPath + "/" + notification)
+      if(jsonOpt.isDefined) {
+        val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
+        if (now - stat.getCtime > changeExpirationMs) {
+          debug("Purging config change notification " + notification)
+          ZkUtils.deletePath(zkClient, changeZnode)
+        } else {
+          return
+        }
+      }
+    }
+  }
+
+  /* get the change number from a change notification znode */
+  private def changeNumber(name: String): Long = name.substring(AdminUtils.EntityConfigChangeZnodePrefix.length).toLong
+
+  /**
+   * A listener that applies config changes to logs
+   */
+  object ConfigChangeListener extends IZkChildListener {
+    override def handleChildChange(path: String, chillins: java.util.List[String]) {
+      try {
+        import JavaConversions._
+        processConfigChanges(chillins: mutable.Buffer[String])
+      } catch {
+        case e: Exception => error("Error processing config change:", e)
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 18917bc..84d4730 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -31,8 +31,9 @@ import java.io.File
 import kafka.utils._
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.network.NetworkReceive
+import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
 
-import scala.collection.{JavaConversions, mutable}
+import scala.collection.mutable
 import org.I0Itec.zkclient.ZkClient
 import kafka.controller.{ControllerStats, KafkaController}
 import kafka.cluster.{EndPoint, Broker}
@@ -77,7 +78,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
   var replicaManager: ReplicaManager = null
 
-  var topicConfigManager: TopicConfigManager = null
+  var dynamicConfigHandlers: Map[String, ConfigHandler] = null
+  var dynamicConfigManager: DynamicConfigManager = null
+  val metrics: Metrics = new Metrics()
 
   var consumerCoordinator: ConsumerCoordinator = null
 
@@ -171,9 +174,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
           Mx4jLoader.maybeLoad()
 
-          /* start topic config manager */
-          topicConfigManager = new TopicConfigManager(zkClient, logManager)
-          topicConfigManager.startup()
+          /* start dynamic config manager */
+          dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),
+                                                             ConfigType.Client -> new ClientIdConfigHandler)
+          dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
+          dynamicConfigManager.startup()
 
           /* tell everyone we are alive */
           val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index c89d00b..fae22d2 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -90,8 +90,8 @@ class ReplicaFetcherThread(name:String,
       // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
       // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
       // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
-      if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
-        topicAndPartition.topic)).uncleanLeaderElectionEnable) {
+      if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkClient,
+        ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
         // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
         fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) +
           " Current leader %d's latest offset %d is less than replica %d's latest offset %d"

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/server/TopicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala
deleted file mode 100644
index 01b1b0a..0000000
--- a/core/src/main/scala/kafka/server/TopicConfigManager.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util.Properties
-import scala.collection._
-import kafka.log._
-import kafka.utils._
-import kafka.admin.AdminUtils
-import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
-
-/**
- * This class initiates and carries out topic config changes.
- * 
- * It works as follows.
- * 
- * Config is stored under the path
- *   /config/topics/<topic_name>
- * This znode stores the topic-overrides for this topic (but no defaults) in properties format.
- * 
- * To avoid watching all topics for changes instead we have a notification path
- *   /config/changes
- * The TopicConfigManager has a child watch on this path.
- * 
- * To update a topic config we first update the topic config properties. Then we create a new sequential
- * znode under the change path which contains the name of the topic that was updated, say
- *   /config/changes/config_change_13321
- * This is just a notification--the actual config change is stored only once under the /config/topics/<topic_name> path.
- *   
- * This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications.
- * It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds
- * it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification. 
- * For any new changes it reads the new configuration, combines it with the defaults, and updates the log config 
- * for all logs for that topic (if any) that it has.
- * 
- * Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is
- * down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that
- * if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the 
- * broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice,
- * but that is harmless.
- * 
- * On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions
- * on startup where a change might be missed between the initial config load and registering for change notifications.
- * 
- */
-class TopicConfigManager(private val zkClient: ZkClient,
-                         private val logManager: LogManager,
-                         private val changeExpirationMs: Long = 15*60*1000,
-                         private val time: Time = SystemTime) extends Logging {
-  private var lastExecutedChange = -1L
-  
-  /**
-   * Begin watching for config changes
-   */
-  def startup() {
-    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath)
-    zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener)
-    processAllConfigChanges()
-  }
-  
-  /**
-   * Process all config changes
-   */
-  private def processAllConfigChanges() {
-    val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath)
-    import JavaConversions._
-    processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
-  }
-
-  /**
-   * Process the given list of config changes
-   */
-  private def processConfigChanges(notifications: Seq[String]) {
-    if (notifications.size > 0) {
-      info("Processing config change notification(s)...")
-      val now = time.milliseconds
-      val logs = logManager.logsByTopicPartition.toBuffer
-      val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
-      for (notification <- notifications) {
-        val changeId = changeNumber(notification)
-        if (changeId > lastExecutedChange) {
-          val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
-          val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
-          if(jsonOpt.isDefined) {
-            val json = jsonOpt.get
-            val topic = json.substring(1, json.length - 1) // hacky way to dequote
-            if (logsByTopic.contains(topic)) {
-              /* combine the default properties with the overrides in zk to create the new LogConfig */
-              val props = new Properties()
-              props.putAll(logManager.defaultConfig.originals)
-              props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
-              val logConfig = LogConfig(props)
-              for (log <- logsByTopic(topic))
-                log.config = logConfig
-              info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
-              purgeObsoleteNotifications(now, notifications)
-            }
-          }
-          lastExecutedChange = changeId
-        }
-      }
-    }
-  }
-  
-  private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
-    for(notification <- notifications.sorted) {
-      val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.TopicConfigChangesPath + "/" + notification)
-      if(jsonOpt.isDefined) {
-        val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
-        if (now - stat.getCtime > changeExpirationMs) {
-          debug("Purging config change notification " + notification)
-          ZkUtils.deletePath(zkClient, changeZnode)
-        } else {
-          return
-        }
-      }
-    }
-  }
-    
-  /* get the change number from a change notification znode */
-  private def changeNumber(name: String): Long = name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong
-  
-  /**
-   * A listener that applies config changes to logs
-   */
-  object ConfigChangeListener extends IZkChildListener {
-    override def handleChildChange(path: String, chillins: java.util.List[String]) {
-      try {
-        import JavaConversions._
-        processConfigChanges(chillins: mutable.Buffer[String])
-      } catch {
-        case e: Exception => error("Error processing config change:", e)
-      }
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 166814c..4ae310e 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -19,6 +19,7 @@ package kafka.utils
 
 import kafka.cluster._
 import kafka.consumer.{ConsumerThreadId, TopicCount}
+import kafka.server.ConfigType
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
   ZkMarshallingError, ZkBadVersionException}
@@ -39,8 +40,6 @@ object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
-  val TopicConfigPath = "/config/topics"
-  val TopicConfigChangesPath = "/config/changes"
   val ControllerPath = "/controller"
   val ControllerEpochPath = "/controller_epoch"
   val ReassignPartitionsPath = "/admin/reassign_partitions"
@@ -48,6 +47,8 @@ object ZkUtils extends Logging {
   val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
   val BrokerSequenceIdPath = "/brokers/seqid"
   val IsrChangeNotificationPath = "/isr_change_notification"
+  val EntityConfigPath = "/config"
+  val EntityConfigChangesPath = "/config/changes"
 
   def getTopicPath(topic: String): String = {
     BrokerTopicsPath + "/" + topic
@@ -57,8 +58,11 @@ object ZkUtils extends Logging {
     getTopicPath(topic) + "/partitions"
   }
 
-  def getTopicConfigPath(topic: String): String =
-    TopicConfigPath + "/" + topic
+  def getEntityConfigRootPath(entityType: String): String =
+    EntityConfigPath + "/" + entityType
+
+  def getEntityConfigPath(entityType: String, entity: String): String =
+    getEntityConfigRootPath(entityType) + "/" + entity
 
   def getDeleteTopicPath(topic: String): String =
     DeleteTopicsPath + "/" + topic
@@ -93,8 +97,14 @@ object ZkUtils extends Logging {
   }
 
   def setupCommonPaths(zkClient: ZkClient) {
-    for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath,
-      DeleteTopicsPath, BrokerSequenceIdPath))
+    for(path <- Seq(ConsumersPath,
+                    BrokerIdsPath,
+                    BrokerTopicsPath,
+                    EntityConfigChangesPath,
+                    ZkUtils.getEntityConfigRootPath(ConfigType.Topic),
+                    ZkUtils.getEntityConfigRootPath(ConfigType.Client),
+                    DeleteTopicsPath,
+                    BrokerSequenceIdPath))
       makeSurePersistentPathExists(zkClient, path)
   }
 
@@ -753,6 +763,17 @@ object ZkUtils extends Logging {
       topics
   }
 
+  /**
+   * Returns all the entities whose configs have been overridden.
+   */
+  def getAllEntitiesWithConfig(zkClient: ZkClient, entityType: String): Seq[String] = {
+    val entities = ZkUtils.getChildrenParentMayNotExist(zkClient, getEntityConfigRootPath(entityType))
+    if(entities == null)
+      Seq.empty[String]
+    else
+      entities
+  }
+
   def getAllPartitions(zkClient: ZkClient): Set[TopicAndPartition] = {
     val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
     if(topics == null) Set.empty[TopicAndPartition]

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 93f200e..86dcc4c 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -25,7 +25,7 @@ import kafka.log._
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{Logging, ZkUtils, TestUtils}
 import kafka.common.{InvalidTopicException, TopicExistsException, TopicAndPartition}
-import kafka.server.{KafkaServer, KafkaConfig}
+import kafka.server.{ConfigType, KafkaServer, KafkaConfig}
 import java.io.File
 import TestUtils._
 
@@ -407,12 +407,16 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       checkConfig(maxMessageSize, retentionMs)
 
       // now double the config values for the topic and check that it is applied
+      val newConfig: Properties = makeConfig(2*maxMessageSize, 2 * retentionMs)
       AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs))
       checkConfig(2*maxMessageSize, 2 * retentionMs)
+
+      // Verify that the same config can be read from ZK
+      val configInZk = AdminUtils.fetchEntityConfig(server.zkClient, ConfigType.Topic, topic)
+      assertEquals(newConfig, configInZk)
     } finally {
       server.shutdown()
       server.config.logDirs.foreach(CoreUtils.rm(_))
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
new file mode 100644
index 0000000..cfe0ec3
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import junit.framework.Assert._
+import kafka.admin.ConfigCommand.ConfigCommandOptions
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.Logging
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.server.{ConfigType, OffsetManager, KafkaConfig}
+import kafka.admin.TopicCommand.TopicCommandOptions
+import kafka.utils.ZkUtils
+
+class ConfigCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
+  @Test
+  def testArgumentParse() {
+    // Should parse correctly
+    var createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+                                                     "--entity-name", "x",
+                                                     "--entity-type", "client",
+                                                     "--describe"))
+    createOpts.checkArgs()
+
+    // For --alter and added config
+    createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+                                                "--entity-name", "x",
+                                                "--entity-type", "client",
+                                                "--alter",
+                                                "--added-config", "a=b,c=d"))
+    createOpts.checkArgs()
+
+    // For alter and deleted config
+    createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+                                                "--entity-name", "x",
+                                                "--entity-type", "client",
+                                                "--alter",
+                                                "--deleted-config", "a,b,c"))
+    createOpts.checkArgs()
+
+    // For alter and both added, deleted config
+    createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+                                                "--entity-name", "x",
+                                                "--entity-type", "client",
+                                                "--alter",
+                                                "--added-config", "a=b,c=d",
+                                                "--deleted-config", "a"))
+    createOpts.checkArgs()
+    val addedProps = ConfigCommand.parseConfigsToBeAdded(createOpts)
+    assertEquals(2, addedProps.size())
+    assertEquals("b", addedProps.getProperty("a"))
+    assertEquals("d", addedProps.getProperty("c"))
+
+    val deletedProps = ConfigCommand.parseConfigsToBeDeleted(createOpts)
+    assertEquals(1, deletedProps.size)
+    assertEquals("a", deletedProps(0))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index dcd6988..58adef6 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -22,6 +22,7 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.utils.Logging
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
+import kafka.server.{ConfigType, OffsetManager, KafkaConfig}
 import kafka.admin.TopicCommand.TopicCommandOptions
 import kafka.utils.ZkUtils
 import kafka.coordinator.ConsumerCoordinator
@@ -43,20 +44,18 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin
       "--config", cleanupKey + "=" + cleanupVal,
       "--topic", topic))
     TopicCommand.createTopic(zkClient, createOpts)
-    val props = AdminUtils.fetchTopicConfig(zkClient, topic)
+    val props = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
     assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey))
     assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal))
 
     // pre-create the topic config changes path to avoid a NoNodeException
-    ZkUtils.createPersistentPath(zkClient, ZkUtils.TopicConfigChangesPath)
+    ZkUtils.createPersistentPath(zkClient, ZkUtils.EntityConfigChangesPath)
 
     // modify the topic to add new partitions
     val numPartitionsModified = 3
-    val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString,
-      "--config", cleanupKey + "=" + cleanupVal,
-      "--topic", topic))
+    val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, "--topic", topic))
     TopicCommand.alterTopic(zkClient, alterOpts)
-    val newProps = AdminUtils.fetchTopicConfig(zkClient, topic)
+    val newProps = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
     assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey))
     assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal))
   }
@@ -99,4 +98,4 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin
     }
     assertFalse("Delete path for topic shouldn't exist after deletion.", zkClient.exists(deleteOffsetTopicPath))
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 8a871cf..7c45393 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -19,6 +19,7 @@ package kafka.server
 import java.util.Properties
 
 import junit.framework.Assert._
+import org.easymock.{Capture, EasyMock}
 import org.junit.Test
 import kafka.integration.KafkaServerTestHarness
 import kafka.utils._
@@ -32,8 +33,10 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
 
   @Test
   def testConfigChange() {
-    val oldVal: java.lang.Long = 100000
-    val newVal: java.lang.Long = 200000
+    assertTrue("Should contain a ConfigHandler for topics",
+               this.servers(0).dynamicConfigHandlers.contains(ConfigType.Topic))
+    val oldVal: java.lang.Long = 100000L
+    val newVal: java.lang.Long = 200000L
     val tp = TopicAndPartition("test", 0)
     val logProps = new Properties()
     logProps.put(LogConfig.FlushMessagesProp, oldVal.toString)
@@ -50,6 +53,25 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
     }
   }
 
+  // For now client config changes do not do anything. Simply verify that the call was made
+  @Test
+  def testClientConfigChange() {
+    assertTrue("Should contain a ConfigHandler for topics",
+               this.servers(0).dynamicConfigHandlers.contains(ConfigType.Client))
+    val clientId = "testClient"
+    val props = new Properties()
+    props.put("a.b", "c")
+    props.put("x.y", "z")
+    AdminUtils.changeClientIdConfig(zkClient, clientId, props)
+    TestUtils.retry(10000) {
+      val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler]
+      assertTrue("ClientId testClient must exist", configHandler.configPool.contains(clientId))
+      assertEquals("ClientId testClient must be the only override", 1, configHandler.configPool.size)
+      assertEquals("c", configHandler.configPool.get(clientId).getProperty("a.b"))
+      assertEquals("z", configHandler.configPool.get(clientId).getProperty("x.y"))
+    }
+  }
+
   @Test
   def testConfigChangeOnNonExistingTopic() {
     val topic = TestUtils.tempTopic
@@ -63,4 +85,59 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
     }
   }
 
-}
\ No newline at end of file
+  @Test
+  def testProcessNotification {
+    val props = new Properties()
+    props.put("a.b", "10")
+
+    // Create a mock ConfigHandler to record config changes it is asked to process
+    val entityArgument = new Capture[String]()
+    val propertiesArgument = new Capture[Properties]()
+    val handler = EasyMock.createNiceMock(classOf[ConfigHandler])
+    handler.processConfigChanges(
+      EasyMock.and(EasyMock.capture(entityArgument), EasyMock.isA(classOf[String])),
+      EasyMock.and(EasyMock.capture(propertiesArgument), EasyMock.isA(classOf[Properties])))
+    EasyMock.expectLastCall().once()
+    EasyMock.replay(handler)
+
+    val configManager = new DynamicConfigManager(zkClient, Map(ConfigType.Topic -> handler))
+    // Notifications created using the old TopicConfigManager are ignored.
+    configManager.processNotification(Some("not json"))
+
+    // Incorrect Map. No version
+    try {
+      val jsonMap = Map("v" -> 1, "x" -> 2)
+      configManager.processNotification(Some(Json.encode(jsonMap)))
+      fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
+    }
+    catch {
+      case t: Throwable =>
+    }
+    // Version is provided. EntityType is incorrect
+    try {
+      val jsonMap = Map("version" -> 1, "entity_type" -> "garbage", "entity_name" -> "x")
+      configManager.processNotification(Some(Json.encode(jsonMap)))
+      fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
+    }
+    catch {
+      case t: Throwable =>
+    }
+
+    // EntityName isn't provided
+    try {
+      val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic)
+      configManager.processNotification(Some(Json.encode(jsonMap)))
+      fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
+    }
+    catch {
+      case t: Throwable =>
+    }
+
+    // Everything is provided
+    val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic, "entity_name" -> "x")
+    configManager.processNotification(Some(Json.encode(jsonMap)))
+
+    // Verify that processConfigChanges was only called once
+    EasyMock.verify(handler)
+  }
+}