You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/08/05 00:11:44 UTC
kafka git commit: kafka-2205;
Generalize TopicConfigManager to handle multiple entity configs;
patched by Aditya Auradkar; reviewed Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 1a0179f21 -> a56a79055
kafka-2205; Generalize TopicConfigManager to handle multiple entity configs; patched by Aditya Auradkar; reviewed Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a56a7905
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a56a7905
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a56a7905
Branch: refs/heads/trunk
Commit: a56a79055dfba4687f476b0a4d20aeec1c4ebff7
Parents: 1a0179f
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Tue Aug 4 15:11:27 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Aug 4 15:11:27 2015 -0700
----------------------------------------------------------------------
bin/kafka-configs.sh | 17 ++
.../src/main/scala/kafka/admin/AdminUtils.scala | 77 +++++---
.../main/scala/kafka/admin/ConfigCommand.scala | 174 ++++++++++++++++++
.../main/scala/kafka/admin/TopicCommand.scala | 39 +---
.../main/scala/kafka/cluster/Partition.scala | 5 +-
.../kafka/controller/KafkaController.scala | 7 +-
.../controller/PartitionLeaderSelector.scala | 6 +-
.../kafka/controller/TopicDeletionManager.scala | 5 +-
.../main/scala/kafka/server/ConfigHandler.scala | 69 +++++++
.../kafka/server/DynamicConfigManager.scala | 183 +++++++++++++++++++
.../main/scala/kafka/server/KafkaServer.scala | 15 +-
.../kafka/server/ReplicaFetcherThread.scala | 4 +-
.../scala/kafka/server/TopicConfigManager.scala | 152 ---------------
core/src/main/scala/kafka/utils/ZkUtils.scala | 33 +++-
.../test/scala/unit/kafka/admin/AdminTest.scala | 8 +-
.../unit/kafka/admin/ConfigCommandTest.scala | 73 ++++++++
.../unit/kafka/admin/TopicCommandTest.scala | 13 +-
.../kafka/server/DynamicConfigChangeTest.scala | 83 ++++++++-
18 files changed, 718 insertions(+), 245 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/bin/kafka-configs.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-configs.sh b/bin/kafka-configs.sh
new file mode 100755
index 0000000..417eaf5
--- /dev/null
+++ b/bin/kafka-configs.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConfigCommand $@
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 4cc2376..9966660 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -21,6 +21,7 @@ import kafka.common._
import kafka.cluster.{BrokerEndPoint, Broker}
import kafka.log.LogConfig
+import kafka.server.ConfigType
import kafka.utils._
import kafka.api.{TopicMetadata, PartitionMetadata}
@@ -40,10 +41,8 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
object AdminUtils extends Logging {
val rand = new Random
-
val AdminClientId = "__admin_client"
-
- val TopicConfigChangeZnodePrefix = "config_change_"
+ val EntityConfigChangeZnodePrefix = "config_change_"
/**
* There are 2 goals of replica assignment:
@@ -103,14 +102,12 @@ object AdminUtils extends Logging {
* @param numPartitions Number of partitions to be set
* @param replicaAssignmentStr Manual replica assignment
* @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
- * @param config Pre-existing properties that should be preserved
*/
def addPartitions(zkClient: ZkClient,
topic: String,
numPartitions: Int = 1,
replicaAssignmentStr: String = "",
- checkBrokerAvailable: Boolean = true,
- config: Properties = new Properties) {
+ checkBrokerAvailable: Boolean = true) {
val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
if (existingPartitionsReplicaList.size == 0)
throw new AdminOperationException("The topic %s does not exist".format(topic))
@@ -137,7 +134,7 @@ object AdminUtils extends Logging {
val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
// add the new list
partitionReplicaList ++= newPartitionReplicaList
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, config, true)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
}
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = {
@@ -238,7 +235,7 @@ object AdminUtils extends Logging {
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig)
}
-
+
def createOrUpdateTopicPartitionAssignmentPathInZK(zkClient: ZkClient,
topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
@@ -246,7 +243,6 @@ object AdminUtils extends Logging {
update: Boolean = false) {
// validate arguments
Topic.validate(topic)
- LogConfig.validate(config)
require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
val topicPath = ZkUtils.getTopicPath(topic)
@@ -264,10 +260,14 @@ object AdminUtils extends Logging {
}
partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment))
-
- // write out the config if there is any, this isn't transactional with the partition assignments
- writeTopicConfig(zkClient, topic, config)
-
+
+ // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
+ if (!update) {
+ // write out the config if there is any, this isn't transactional with the partition assignments
+ LogConfig.validate(config)
+ writeEntityConfig(zkClient, ConfigType.Topic, topic, config)
+ }
+
// create the partition assignment
writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update)
}
@@ -290,7 +290,19 @@ object AdminUtils extends Logging {
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}
-
+
+ /**
+ * Update the config for a client and create a change notification so the change will propagate to other brokers
+ * @param zkClient: The ZkClient handle used to write the new config to zookeeper
+ * @param clientId: The clientId for which configs are being changed
+ * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+ * existing configs need to be deleted, it should be done prior to invoking this API
+ *
+ */
+ def changeClientIdConfig(zkClient: ZkClient, clientId: String, configs: Properties) {
+ changeEntityConfig(zkClient, ConfigType.Client, clientId, configs)
+ }
+
/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
* @param zkClient: The ZkClient handle used to write the new config to zookeeper
@@ -302,34 +314,42 @@ object AdminUtils extends Logging {
def changeTopicConfig(zkClient: ZkClient, topic: String, configs: Properties) {
if(!topicExists(zkClient, topic))
throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
-
// remove the topic overrides
LogConfig.validate(configs)
+ changeEntityConfig(zkClient, ConfigType.Topic, topic, configs)
+ }
+ private def changeEntityConfig(zkClient: ZkClient, entityType: String, entityName: String, configs: Properties) {
// write the new config--may not exist if there were previously no overrides
- writeTopicConfig(zkClient, topic, configs)
-
+ writeEntityConfig(zkClient, entityType, entityName, configs)
+
// create the change notification
- zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic))
+ val seqNode = ZkUtils.EntityConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
+ val content = Json.encode(getConfigChangeZnodeData(entityType, entityName))
+ zkClient.createPersistentSequential(seqNode, content)
}
-
+
+ def getConfigChangeZnodeData(entityType: String, entityName: String) : Map[String, Any] = {
+ Map("version" -> 1, "entity_type" -> entityType, "entity_name" -> entityName)
+ }
+
/**
* Write out the topic config to zk, if there is any
*/
- private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
+ private def writeEntityConfig(zkClient: ZkClient, entityType: String, entityName: String, config: Properties) {
val configMap: mutable.Map[String, String] = {
import JavaConversions._
config
}
val map = Map("version" -> 1, "config" -> configMap)
- ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map))
+ ZkUtils.updatePersistentPath(zkClient, ZkUtils.getEntityConfigPath(entityType, entityName), Json.encode(map))
}
/**
- * Read the topic config (if any) from zk
+ * Read the entity (topic or client) config (if any) from zk
*/
- def fetchTopicConfig(zkClient: ZkClient, topic: String): Properties = {
- val str: String = zkClient.readData(ZkUtils.getTopicConfigPath(topic), true)
+ def fetchEntityConfig(zkClient: ZkClient, entityType: String, entity: String): Properties = {
+ val str: String = zkClient.readData(ZkUtils.getEntityConfigPath(entityType, entity), true)
val props = new Properties()
if(str != null) {
Json.parseFull(str) match {
@@ -343,19 +363,20 @@ object AdminUtils extends Logging {
configTup match {
case (k: String, v: String) =>
props.setProperty(k, v)
- case _ => throw new IllegalArgumentException("Invalid topic config: " + str)
+ case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str)
}
- case _ => throw new IllegalArgumentException("Invalid topic config: " + str)
+ case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str)
}
- case o => throw new IllegalArgumentException("Unexpected value in config: " + str)
+ case o => throw new IllegalArgumentException("Unexpected value in config:(%s), entity_type: (%s), entity: (%s)"
+ .format(str, entityType, entity))
}
}
props
}
def fetchAllTopicConfigs(zkClient: ZkClient): Map[String, Properties] =
- ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchTopicConfig(zkClient, topic))).toMap
+ ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchEntityConfig(zkClient, ConfigType.Topic, topic))).toMap
def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
new file mode 100644
index 0000000..2759476
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple._
+import java.util.Properties
+import kafka.log.LogConfig
+import kafka.server.ConfigType
+import kafka.utils.{ZkUtils, CommandLineUtils}
+import org.I0Itec.zkclient.ZkClient
+import scala.collection._
+import scala.collection.JavaConversions._
+import org.apache.kafka.common.utils.Utils
+
+
+/**
+ * This script can be used to change configs for topics/clients dynamically
+ */
+object ConfigCommand {
+
+ def main(args: Array[String]): Unit = {
+
+ val opts = new ConfigCommandOptions(args)
+
+ if(args.length == 0)
+ CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity (topic/client) configs")
+
+ opts.checkArgs()
+
+ val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
+
+ try {
+ if (opts.options.has(opts.alterOpt))
+ alterConfig(zkClient, opts)
+ else if (opts.options.has(opts.describeOpt))
+ describeConfig(zkClient, opts)
+ } catch {
+ case e: Throwable =>
+ println("Error while executing topic command " + e.getMessage)
+ println(Utils.stackTrace(e))
+ } finally {
+ zkClient.close()
+ }
+ }
+
+ private def alterConfig(zkClient: ZkClient, opts: ConfigCommandOptions) {
+ val configsToBeAdded = parseConfigsToBeAdded(opts)
+ val configsToBeDeleted = parseConfigsToBeDeleted(opts)
+ val entityType = opts.options.valueOf(opts.entityType)
+ val entityName = opts.options.valueOf(opts.entityName)
+
+ // compile the final set of configs
+ val configs = AdminUtils.fetchEntityConfig(zkClient, entityType, entityName)
+ configs.putAll(configsToBeAdded)
+ configsToBeDeleted.foreach(config => configs.remove(config))
+
+ if (entityType.equals(ConfigType.Topic)) {
+ AdminUtils.changeTopicConfig(zkClient, entityName, configs)
+ println("Updated config for topic: \"%s\".".format(entityName))
+ } else {
+ AdminUtils.changeClientIdConfig(zkClient, entityName, configs)
+ println("Updated config for clientId: \"%s\".".format(entityName))
+ }
+ }
+
+ private def describeConfig(zkClient: ZkClient, opts: ConfigCommandOptions) {
+ val entityType = opts.options.valueOf(opts.entityType)
+ val entityNames: Seq[String] =
+ if (opts.options.has(opts.entityName))
+ Seq(opts.options.valueOf(opts.entityName))
+ else
+ ZkUtils.getAllEntitiesWithConfig(zkClient, entityType)
+
+ for (entityName <- entityNames) {
+ val configs = AdminUtils.fetchEntityConfig(zkClient, entityType, entityName)
+ println("Configs for %s:%s are %s"
+ .format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
+ }
+ }
+
+ private[admin] def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
+ val configsToBeAdded = opts.options.valuesOf(opts.addedConfig).map(_.split("""\s*=\s*"""))
+ require(configsToBeAdded.forall(config => config.length == 2),
+ "Invalid entity config: all configs to be added must be in the format \"key=val\".")
+ val props = new Properties
+ configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
+ props
+ }
+
+ private[admin] def parseConfigsToBeDeleted(opts: ConfigCommandOptions): Seq[String] = {
+ if (opts.options.has(opts.deletedConfig)) {
+ val configsToBeDeleted = opts.options.valuesOf(opts.deletedConfig).map(_.trim())
+ val propsToBeDeleted = new Properties
+ configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
+ configsToBeDeleted
+ }
+ else
+ Seq.empty
+ }
+
+ class ConfigCommandOptions(args: Array[String]) {
+ val parser = new OptionParser
+ val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+ "Multiple URLS can be given to allow fail-over.")
+ .withRequiredArg
+ .describedAs("urls")
+ .ofType(classOf[String])
+ val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.")
+ val describeOpt = parser.accepts("describe", "List configs for the given entity.")
+ val entityType = parser.accepts("entity-type", "Type of entity (topic/client)")
+ .withRequiredArg
+ .ofType(classOf[String])
+ val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id)")
+ .withRequiredArg
+ .ofType(classOf[String])
+
+ val nl = System.getProperty("line.separator")
+ val addedConfig = parser.accepts("added-config", "Key Value pairs configs to add 'k1=v1,k2=v2'. The following is a list of valid configurations: " +
+ "For entity_type '" + ConfigType.Topic + "': " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
+ "For entity_type '" + ConfigType.Client + "' currently no configs are processed by the brokers")
+ .withRequiredArg
+ .ofType(classOf[String])
+ .withValuesSeparatedBy(',')
+ val deletedConfig = parser.accepts("deleted-config", "config keys to remove 'k1,k2'")
+ .withRequiredArg
+ .ofType(classOf[String])
+ .withValuesSeparatedBy(',')
+ val helpOpt = parser.accepts("help", "Print usage information.")
+ val options = parser.parse(args : _*)
+
+ val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, entityName, addedConfig, deletedConfig, helpOpt)
+
+ def checkArgs() {
+ // should have exactly one action
+ val actions = Seq(alterOpt, describeOpt).count(options.has _)
+ if(actions != 1)
+ CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --describe, --alter")
+
+ // check required args
+ CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType)
+ CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addedConfig, deletedConfig))
+ if(options.has(alterOpt)) {
+ if(! options.has(entityName))
+ throw new IllegalArgumentException("--entity-name must be specified with --alter")
+
+ val isAddedPresent: Boolean = options.has(addedConfig)
+ val isDeletedPresent: Boolean = options.has(deletedConfig)
+ if(! isAddedPresent && ! isDeletedPresent)
+ throw new IllegalArgumentException("At least one of --added-config or --deleted-config must be specified with --alter")
+ }
+ val entityTypeVal = options.valueOf(entityType)
+ if(! entityTypeVal.equals(ConfigType.Topic) && ! entityTypeVal.equals(ConfigType.Client)) {
+ throw new IllegalArgumentException("--entity-type must be '%s' or '%s'".format(ConfigType.Topic, ConfigType.Client))
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 4e28bf1..f1405a5 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -20,6 +20,7 @@ package kafka.admin
import joptsimple._
import java.util.Properties
import kafka.common.{Topic, AdminCommandFailedException}
+import kafka.utils.CommandLineUtils
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -27,6 +28,7 @@ import scala.collection._
import scala.collection.JavaConversions._
import kafka.log.LogConfig
import kafka.consumer.Whitelist
+import kafka.server.{ConfigType, OffsetManager}
import org.apache.kafka.common.utils.Utils
import kafka.coordinator.ConsumerCoordinator
@@ -106,16 +108,6 @@ object TopicCommand extends Logging {
opts.options.valueOf(opts.zkConnectOpt)))
}
topics.foreach { topic =>
- val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
- if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
- val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
- val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
- // compile the final set of configs
- configs.putAll(configsToBeAdded)
- configsToBeDeleted.foreach(config => configs.remove(config))
- AdminUtils.changeTopicConfig(zkClient, topic, configs)
- println("Updated config for topic \"%s\".".format(topic))
- }
if(opts.options.has(opts.partitionsOpt)) {
if (topic == ConsumerCoordinator.OffsetsTopicName) {
throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
@@ -124,7 +116,7 @@ object TopicCommand extends Logging {
"logic or ordering of the messages will be affected")
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
- AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr, config = configs)
+ AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
println("Adding partitions succeeded!")
}
}
@@ -180,7 +172,7 @@ object TopicCommand extends Logging {
val describePartitions: Boolean = !reportOverriddenConfigs
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
if (describeConfigs) {
- val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
+ val configs = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
if (!reportOverriddenConfigs || configs.size() != 0) {
val numPartitions = topicPartitionAssignment.size
val replicationFactor = topicPartitionAssignment.head._2.size
@@ -219,18 +211,6 @@ object TopicCommand extends Logging {
props
}
- def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = {
- if (opts.options.has(opts.deleteConfigOpt)) {
- val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.trim())
- val propsToBeDeleted = new Properties
- configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
- LogConfig.validateNames(propsToBeDeleted)
- configsToBeDeleted
- }
- else
- Seq.empty
- }
-
def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
val partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
@@ -256,7 +236,7 @@ object TopicCommand extends Logging {
val listOpt = parser.accepts("list", "List all available topics.")
val createOpt = parser.accepts("create", "Create a new topic.")
val deleteOpt = parser.accepts("delete", "Delete a topic")
- val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.")
+ val alterOpt = parser.accepts("alter", "Alter the number of partitions and/or replica assignment for a topic")
val describeOpt = parser.accepts("describe", "List details for the given topics.")
val helpOpt = parser.accepts("help", "Print usage information.")
val topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " +
@@ -265,16 +245,12 @@ object TopicCommand extends Logging {
.describedAs("topic")
.ofType(classOf[String])
val nl = System.getProperty("line.separator")
- val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered." +
+ val configOpt = parser.accepts("config", "A configuration override for the topic being created." +
"The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
"See the Kafka documentation for full details on the topic configs.")
.withRequiredArg
.describedAs("name=value")
.ofType(classOf[String])
- val deleteConfigOpt = parser.accepts("delete-config", "A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option).")
- .withRequiredArg
- .describedAs("name")
- .ofType(classOf[String])
val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " +
"altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected")
.withRequiredArg
@@ -308,10 +284,11 @@ object TopicCommand extends Logging {
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
- CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt))
CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(createOpt,alterOpt))
+ // Topic configs cannot be changed with alterTopic
+ CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(configOpt))
if(options.has(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt))
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 2649090..511d3c9 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,7 +22,7 @@ import kafka.utils.CoreUtils.{inReadLock,inWriteLock}
import kafka.admin.AdminUtils
import kafka.api.{PartitionStateInfo, LeaderAndIsr}
import kafka.log.LogConfig
-import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, LogReadResult, ReplicaManager}
+import kafka.server._
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
import kafka.message.ByteBufferMessageSet
@@ -86,7 +86,8 @@ class Partition(val topic: String,
case Some(replica) => replica
case None =>
if (isReplicaLocal(replicaId)) {
- val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchTopicConfig(zkClient, topic))
+ val config = LogConfig.fromProps(logManager.defaultConfig.originals,
+ AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic))
val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
val offsetMap = checkpoint.read
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index b4fc755..6844602 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1037,8 +1037,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
// if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
// is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can
// eventually be restored as the leader.
- if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(zkClient,
- topicAndPartition.topic)).uncleanLeaderElectionEnable) {
+ if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(zkClient,
+ ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
newIsr = leaderAndIsr.isr
}
@@ -1322,7 +1322,8 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
processUpdateNotifications(topicAndPartitions)
// delete processed children
- childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient, ZkUtils.TopicConfigChangesPath + "/" + x))
+ childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient,
+ ZkUtils.getEntityConfigPath(ConfigType.Topic, x)))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index bb6b5c8..4ebeb5a 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -21,7 +21,7 @@ import kafka.api.LeaderAndIsr
import kafka.log.LogConfig
import kafka.utils.Logging
import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
-import kafka.server.KafkaConfig
+import kafka.server.{ConfigType, KafkaConfig}
trait PartitionLeaderSelector {
@@ -61,8 +61,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
case true =>
// Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
// for unclean leader election.
- if (!LogConfig.fromProps(config.originals, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
- topicAndPartition.topic)).uncleanLeaderElectionEnable) {
+ if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkClient,
+ ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
throw new NoReplicaOnlineException(("No broker in ISR for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 64ecb49..64b11df 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -16,6 +16,9 @@
*/
package kafka.controller
+
+import kafka.server.ConfigType
+
import collection.mutable
import kafka.utils.{ShutdownableThread, Logging, ZkUtils}
import kafka.utils.CoreUtils._
@@ -284,7 +287,7 @@ class TopicDeletionManager(controller: KafkaController,
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
- controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
+ controllerContext.zkClient.deleteRecursive(ZkUtils.getEntityConfigPath(ConfigType.Topic, topic))
controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))
controllerContext.removeTopic(topic)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
new file mode 100644
index 0000000..8347a69
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.Properties
+
+import kafka.common.TopicAndPartition
+import kafka.log.{Log, LogConfig, LogManager}
+import kafka.utils.Pool
+
+import scala.collection.mutable
+
+/**
+ * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
+ */
+trait ConfigHandler {
+ def processConfigChanges(entityName : String, value : Properties)
+}
+
+/**
+ * The TopicConfigHandler will process topic config changes in ZK.
+ * The callback provides the topic name and the full properties set read from ZK
+ */
+class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler{
+
+ def processConfigChanges(topic : String, topicConfig : Properties) {
+ val logs: mutable.Buffer[(TopicAndPartition, Log)] = logManager.logsByTopicPartition.toBuffer
+ val logsByTopic: Map[String, mutable.Buffer[Log]] = logs.groupBy{ case (topicAndPartition, log) => topicAndPartition.topic }
+ .mapValues{ case v: mutable.Buffer[(TopicAndPartition, Log)] => v.map(_._2) }
+
+ if (logsByTopic.contains(topic)) {
+ /* combine the default properties with the overrides in zk to create the new LogConfig */
+ val props = new Properties()
+ props.putAll(logManager.defaultConfig.originals)
+ props.putAll(topicConfig)
+ val logConfig = LogConfig(props)
+ for (log <- logsByTopic(topic))
+ log.config = logConfig
+ }
+ }
+}
+
+/**
+ * The ClientIdConfigHandler will process clientId config changes in ZK.
+ * The callback provides the clientId and the full properties set read from ZK.
+ * This implementation does nothing currently. In the future, it will change quotas per client
+ */
+class ClientIdConfigHandler extends ConfigHandler {
+ val configPool = new Pool[String, Properties]()
+
+ def processConfigChanges(clientId : String, clientConfig : Properties): Unit = {
+ configPool.put(clientId, clientConfig)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
new file mode 100644
index 0000000..a66fb75
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Json
+import kafka.utils.Logging
+import kafka.utils.SystemTime
+import kafka.utils.Time
+import kafka.utils.ZkUtils
+
+import scala.collection._
+import kafka.admin.AdminUtils
+import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+
+
+/**
+ * Represents all the entities that can be configured via ZK
+ */
+object ConfigType {
+ val Topic = "topic"
+ val Client = "client"
+}
+
+/**
+ * This class initiates and carries out config changes for all entities defined in ConfigType.
+ *
+ * It works as follows.
+ *
+ * Config is stored under the path: /config/entityType/entityName
+ * E.g. /config/topics/<topic_name> and /config/clients/<clientId>
+ * This znode stores the overrides for this entity (but no defaults) in properties format.
+ *
+ * To avoid watching all topics for changes instead we have a notification path
+ * /config/changes
+ * The DynamicConfigManager has a child watch on this path.
+ *
+ * To update a config we first update the config properties. Then we create a new sequential
+ * znode under the change path which contains the name of the entityType and entityName that was updated, say
+ * /config/changes/config_change_13321
+ * The sequential znode contains data in this format: {"version" : 1, "entityType":"topic/client", "entityName" : "topic_name/client_id"}
+ * This is just a notification--the actual config change is stored only once under the /config/entityType/entityName path.
+ *
+ * This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications.
+ * It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds
+ * it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification.
+ * For any new changes it reads the new configuration, combines it with the defaults, and updates the existing config.
+ *
+ * Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is
+ * down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that
+ * if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the
+ * broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice,
+ * but that is harmless.
+ *
+ * On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions
+ * on startup where a change might be missed between the initial config load and registering for change notifications.
+ *
+ */
+class DynamicConfigManager(private val zkClient: ZkClient,
+ private val configHandler : Map[String, ConfigHandler],
+ private val changeExpirationMs: Long = 15*60*1000,
+ private val time: Time = SystemTime) extends Logging {
+ private var lastExecutedChange = -1L
+
+ /**
+ * Begin watching for config changes
+ */
+ def startup() {
+ ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.EntityConfigChangesPath)
+ zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener)
+ processAllConfigChanges()
+ }
+
+ /**
+ * Process all config changes
+ */
+ private def processAllConfigChanges() {
+ val configChanges = zkClient.getChildren(ZkUtils.EntityConfigChangesPath)
+ import JavaConversions._
+ processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
+ }
+
+ /**
+ * Process the given list of config changes
+ */
+ private def processConfigChanges(notifications: Seq[String]) {
+ if (notifications.size > 0) {
+ info("Processing config change notification(s)...")
+ val now = time.milliseconds
+ for (notification <- notifications) {
+ val changeId = changeNumber(notification)
+
+ if (changeId > lastExecutedChange) {
+ val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
+
+ val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
+ processNotification(jsonOpt)
+ }
+ lastExecutedChange = changeId
+ }
+ purgeObsoleteNotifications(now, notifications)
+ }
+ }
+
+ def processNotification(jsonOpt: Option[String]) = {
+ if(jsonOpt.isDefined) {
+ val json = jsonOpt.get
+ Json.parseFull(json) match {
+ case None => // There are no config overrides.
+ // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
+ case Some(mapAnon: Map[_, _]) =>
+ val map = mapAnon collect
+ { case (k: String, v: Any) => k -> v }
+ require(map("version") == 1)
+
+ val entityType = map.get("entity_type") match {
+ case Some(ConfigType.Topic) => ConfigType.Topic
+ case Some(ConfigType.Client) => ConfigType.Client
+ case _ => throw new IllegalArgumentException("Config change notification must have 'entity_type' set to either 'client' or 'topic'." +
+ " Received: " + json)
+ }
+
+ val entity = map.get("entity_name") match {
+ case Some(value: String) => value
+ case _ => throw new IllegalArgumentException("Config change notification does not specify 'entity_name'. Received: " + json)
+ }
+ configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkClient, entityType, entity))
+
+ case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
+ "{\"version\" : 1," +
+ " \"entity_type\":\"topic/client\"," +
+ " \"entity_name\" : \"topic_name/client_id\"}." +
+ " Received: " + json)
+ }
+ }
+ }
+
+ private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
+ for(notification <- notifications.sorted) {
+ val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.EntityConfigChangesPath + "/" + notification)
+ if(jsonOpt.isDefined) {
+ val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
+ if (now - stat.getCtime > changeExpirationMs) {
+ debug("Purging config change notification " + notification)
+ ZkUtils.deletePath(zkClient, changeZnode)
+ } else {
+ return
+ }
+ }
+ }
+ }
+
+ /* get the change number from a change notification znode */
+ private def changeNumber(name: String): Long = name.substring(AdminUtils.EntityConfigChangeZnodePrefix.length).toLong
+
+ /**
+ * A listener that applies config changes to logs
+ */
+ object ConfigChangeListener extends IZkChildListener {
+ override def handleChildChange(path: String, chillins: java.util.List[String]) {
+ try {
+ import JavaConversions._
+ processConfigChanges(chillins: mutable.Buffer[String])
+ } catch {
+ case e: Exception => error("Error processing config change:", e)
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 18917bc..84d4730 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -31,8 +31,9 @@ import java.io.File
import kafka.utils._
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.network.NetworkReceive
+import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
-import scala.collection.{JavaConversions, mutable}
+import scala.collection.mutable
import org.I0Itec.zkclient.ZkClient
import kafka.controller.{ControllerStats, KafkaController}
import kafka.cluster.{EndPoint, Broker}
@@ -77,7 +78,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
var replicaManager: ReplicaManager = null
- var topicConfigManager: TopicConfigManager = null
+ var dynamicConfigHandlers: Map[String, ConfigHandler] = null
+ var dynamicConfigManager: DynamicConfigManager = null
+ val metrics: Metrics = new Metrics()
var consumerCoordinator: ConsumerCoordinator = null
@@ -171,9 +174,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
Mx4jLoader.maybeLoad()
- /* start topic config manager */
- topicConfigManager = new TopicConfigManager(zkClient, logManager)
- topicConfigManager.startup()
+ /* start dynamic config manager */
+ dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),
+ ConfigType.Client -> new ClientIdConfigHandler)
+ dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
+ dynamicConfigManager.startup()
/* tell everyone we are alive */
val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index c89d00b..fae22d2 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -90,8 +90,8 @@ class ReplicaFetcherThread(name:String,
// Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
// This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
// we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
- if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
- topicAndPartition.topic)).uncleanLeaderElectionEnable) {
+ if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkClient,
+ ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) +
" Current leader %d's latest offset %d is less than replica %d's latest offset %d"
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/server/TopicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala
deleted file mode 100644
index 01b1b0a..0000000
--- a/core/src/main/scala/kafka/server/TopicConfigManager.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util.Properties
-import scala.collection._
-import kafka.log._
-import kafka.utils._
-import kafka.admin.AdminUtils
-import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
-
-/**
- * This class initiates and carries out topic config changes.
- *
- * It works as follows.
- *
- * Config is stored under the path
- * /config/topics/<topic_name>
- * This znode stores the topic-overrides for this topic (but no defaults) in properties format.
- *
- * To avoid watching all topics for changes instead we have a notification path
- * /config/changes
- * The TopicConfigManager has a child watch on this path.
- *
- * To update a topic config we first update the topic config properties. Then we create a new sequential
- * znode under the change path which contains the name of the topic that was updated, say
- * /config/changes/config_change_13321
- * This is just a notification--the actual config change is stored only once under the /config/topics/<topic_name> path.
- *
- * This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications.
- * It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds
- * it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification.
- * For any new changes it reads the new configuration, combines it with the defaults, and updates the log config
- * for all logs for that topic (if any) that it has.
- *
- * Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is
- * down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that
- * if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the
- * broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice,
- * but that is harmless.
- *
- * On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions
- * on startup where a change might be missed between the initial config load and registering for change notifications.
- *
- */
-class TopicConfigManager(private val zkClient: ZkClient,
- private val logManager: LogManager,
- private val changeExpirationMs: Long = 15*60*1000,
- private val time: Time = SystemTime) extends Logging {
- private var lastExecutedChange = -1L
-
- /**
- * Begin watching for config changes
- */
- def startup() {
- ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath)
- zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener)
- processAllConfigChanges()
- }
-
- /**
- * Process all config changes
- */
- private def processAllConfigChanges() {
- val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath)
- import JavaConversions._
- processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
- }
-
- /**
- * Process the given list of config changes
- */
- private def processConfigChanges(notifications: Seq[String]) {
- if (notifications.size > 0) {
- info("Processing config change notification(s)...")
- val now = time.milliseconds
- val logs = logManager.logsByTopicPartition.toBuffer
- val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
- for (notification <- notifications) {
- val changeId = changeNumber(notification)
- if (changeId > lastExecutedChange) {
- val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
- val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
- if(jsonOpt.isDefined) {
- val json = jsonOpt.get
- val topic = json.substring(1, json.length - 1) // hacky way to dequote
- if (logsByTopic.contains(topic)) {
- /* combine the default properties with the overrides in zk to create the new LogConfig */
- val props = new Properties()
- props.putAll(logManager.defaultConfig.originals)
- props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
- val logConfig = LogConfig(props)
- for (log <- logsByTopic(topic))
- log.config = logConfig
- info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
- purgeObsoleteNotifications(now, notifications)
- }
- }
- lastExecutedChange = changeId
- }
- }
- }
- }
-
- private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
- for(notification <- notifications.sorted) {
- val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.TopicConfigChangesPath + "/" + notification)
- if(jsonOpt.isDefined) {
- val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
- if (now - stat.getCtime > changeExpirationMs) {
- debug("Purging config change notification " + notification)
- ZkUtils.deletePath(zkClient, changeZnode)
- } else {
- return
- }
- }
- }
- }
-
- /* get the change number from a change notification znode */
- private def changeNumber(name: String): Long = name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong
-
- /**
- * A listener that applies config changes to logs
- */
- object ConfigChangeListener extends IZkChildListener {
- override def handleChildChange(path: String, chillins: java.util.List[String]) {
- try {
- import JavaConversions._
- processConfigChanges(chillins: mutable.Buffer[String])
- } catch {
- case e: Exception => error("Error processing config change:", e)
- }
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 166814c..4ae310e 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -19,6 +19,7 @@ package kafka.utils
import kafka.cluster._
import kafka.consumer.{ConsumerThreadId, TopicCount}
+import kafka.server.ConfigType
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
ZkMarshallingError, ZkBadVersionException}
@@ -39,8 +40,6 @@ object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
- val TopicConfigPath = "/config/topics"
- val TopicConfigChangesPath = "/config/changes"
val ControllerPath = "/controller"
val ControllerEpochPath = "/controller_epoch"
val ReassignPartitionsPath = "/admin/reassign_partitions"
@@ -48,6 +47,8 @@ object ZkUtils extends Logging {
val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
val BrokerSequenceIdPath = "/brokers/seqid"
val IsrChangeNotificationPath = "/isr_change_notification"
+ val EntityConfigPath = "/config"
+ val EntityConfigChangesPath = "/config/changes"
def getTopicPath(topic: String): String = {
BrokerTopicsPath + "/" + topic
@@ -57,8 +58,11 @@ object ZkUtils extends Logging {
getTopicPath(topic) + "/partitions"
}
- def getTopicConfigPath(topic: String): String =
- TopicConfigPath + "/" + topic
+ def getEntityConfigRootPath(entityType: String): String =
+ EntityConfigPath + "/" + entityType
+
+ def getEntityConfigPath(entityType: String, entity: String): String =
+ getEntityConfigRootPath(entityType) + "/" + entity
def getDeleteTopicPath(topic: String): String =
DeleteTopicsPath + "/" + topic
@@ -93,8 +97,14 @@ object ZkUtils extends Logging {
}
def setupCommonPaths(zkClient: ZkClient) {
- for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath,
- DeleteTopicsPath, BrokerSequenceIdPath))
+ for(path <- Seq(ConsumersPath,
+ BrokerIdsPath,
+ BrokerTopicsPath,
+ EntityConfigChangesPath,
+ ZkUtils.getEntityConfigRootPath(ConfigType.Topic),
+ ZkUtils.getEntityConfigRootPath(ConfigType.Client),
+ DeleteTopicsPath,
+ BrokerSequenceIdPath))
makeSurePersistentPathExists(zkClient, path)
}
@@ -753,6 +763,17 @@ object ZkUtils extends Logging {
topics
}
+ /**
+ * Returns all the entities whose configs have been overridden.
+ */
+ def getAllEntitiesWithConfig(zkClient: ZkClient, entityType: String): Seq[String] = {
+ val entities = ZkUtils.getChildrenParentMayNotExist(zkClient, getEntityConfigRootPath(entityType))
+ if(entities == null)
+ Seq.empty[String]
+ else
+ entities
+ }
+
def getAllPartitions(zkClient: ZkClient): Set[TopicAndPartition] = {
val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
if(topics == null) Set.empty[TopicAndPartition]
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 93f200e..86dcc4c 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -25,7 +25,7 @@ import kafka.log._
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{Logging, ZkUtils, TestUtils}
import kafka.common.{InvalidTopicException, TopicExistsException, TopicAndPartition}
-import kafka.server.{KafkaServer, KafkaConfig}
+import kafka.server.{ConfigType, KafkaServer, KafkaConfig}
import java.io.File
import TestUtils._
@@ -407,12 +407,16 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
checkConfig(maxMessageSize, retentionMs)
// now double the config values for the topic and check that it is applied
+ val newConfig: Properties = makeConfig(2*maxMessageSize, 2 * retentionMs)
AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs))
checkConfig(2*maxMessageSize, 2 * retentionMs)
+
+ // Verify that the same config can be read from ZK
+ val configInZk = AdminUtils.fetchEntityConfig(server.zkClient, ConfigType.Topic, topic)
+ assertEquals(newConfig, configInZk)
} finally {
server.shutdown()
server.config.logDirs.foreach(CoreUtils.rm(_))
}
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
new file mode 100644
index 0000000..cfe0ec3
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import junit.framework.Assert._
+import kafka.admin.ConfigCommand.ConfigCommandOptions
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.Logging
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.server.{ConfigType, OffsetManager, KafkaConfig}
+import kafka.admin.TopicCommand.TopicCommandOptions
+import kafka.utils.ZkUtils
+
+class ConfigCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
+ @Test
+ def testArgumentParse() {
+ // Should parse correctly
+ var createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+ "--entity-name", "x",
+ "--entity-type", "client",
+ "--describe"))
+ createOpts.checkArgs()
+
+ // For --alter and added config
+ createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+ "--entity-name", "x",
+ "--entity-type", "client",
+ "--alter",
+ "--added-config", "a=b,c=d"))
+ createOpts.checkArgs()
+
+ // For alter and deleted config
+ createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+ "--entity-name", "x",
+ "--entity-type", "client",
+ "--alter",
+ "--deleted-config", "a,b,c"))
+ createOpts.checkArgs()
+
+ // For alter and both added, deleted config
+ createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+ "--entity-name", "x",
+ "--entity-type", "client",
+ "--alter",
+ "--added-config", "a=b,c=d",
+ "--deleted-config", "a"))
+ createOpts.checkArgs()
+ val addedProps = ConfigCommand.parseConfigsToBeAdded(createOpts)
+ assertEquals(2, addedProps.size())
+ assertEquals("b", addedProps.getProperty("a"))
+ assertEquals("d", addedProps.getProperty("c"))
+
+ val deletedProps = ConfigCommand.parseConfigsToBeDeleted(createOpts)
+ assertEquals(1, deletedProps.size)
+ assertEquals("a", deletedProps(0))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index dcd6988..58adef6 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -22,6 +22,7 @@ import org.scalatest.junit.JUnit3Suite
import kafka.utils.Logging
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
+import kafka.server.{ConfigType, OffsetManager, KafkaConfig}
import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils
import kafka.coordinator.ConsumerCoordinator
@@ -43,20 +44,18 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin
"--config", cleanupKey + "=" + cleanupVal,
"--topic", topic))
TopicCommand.createTopic(zkClient, createOpts)
- val props = AdminUtils.fetchTopicConfig(zkClient, topic)
+ val props = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey))
assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal))
// pre-create the topic config changes path to avoid a NoNodeException
- ZkUtils.createPersistentPath(zkClient, ZkUtils.TopicConfigChangesPath)
+ ZkUtils.createPersistentPath(zkClient, ZkUtils.EntityConfigChangesPath)
// modify the topic to add new partitions
val numPartitionsModified = 3
- val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString,
- "--config", cleanupKey + "=" + cleanupVal,
- "--topic", topic))
+ val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, "--topic", topic))
TopicCommand.alterTopic(zkClient, alterOpts)
- val newProps = AdminUtils.fetchTopicConfig(zkClient, topic)
+ val newProps = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey))
assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal))
}
@@ -99,4 +98,4 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin
}
assertFalse("Delete path for topic shouldn't exist after deletion.", zkClient.exists(deleteOffsetTopicPath))
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a56a7905/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 8a871cf..7c45393 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -19,6 +19,7 @@ package kafka.server
import java.util.Properties
import junit.framework.Assert._
+import org.easymock.{Capture, EasyMock}
import org.junit.Test
import kafka.integration.KafkaServerTestHarness
import kafka.utils._
@@ -32,8 +33,10 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testConfigChange() {
- val oldVal: java.lang.Long = 100000
- val newVal: java.lang.Long = 200000
+ assertTrue("Should contain a ConfigHandler for topics",
+ this.servers(0).dynamicConfigHandlers.contains(ConfigType.Topic))
+ val oldVal: java.lang.Long = 100000L
+ val newVal: java.lang.Long = 200000L
val tp = TopicAndPartition("test", 0)
val logProps = new Properties()
logProps.put(LogConfig.FlushMessagesProp, oldVal.toString)
@@ -50,6 +53,25 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
}
}
+ // For now client config changes do not do anything. Simply verify that the call was made
+ @Test
+ def testClientConfigChange() {
+ assertTrue("Should contain a ConfigHandler for topics",
+ this.servers(0).dynamicConfigHandlers.contains(ConfigType.Client))
+ val clientId = "testClient"
+ val props = new Properties()
+ props.put("a.b", "c")
+ props.put("x.y", "z")
+ AdminUtils.changeClientIdConfig(zkClient, clientId, props)
+ TestUtils.retry(10000) {
+ val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler]
+ assertTrue("ClientId testClient must exist", configHandler.configPool.contains(clientId))
+ assertEquals("ClientId testClient must be the only override", 1, configHandler.configPool.size)
+ assertEquals("c", configHandler.configPool.get(clientId).getProperty("a.b"))
+ assertEquals("z", configHandler.configPool.get(clientId).getProperty("x.y"))
+ }
+ }
+
@Test
def testConfigChangeOnNonExistingTopic() {
val topic = TestUtils.tempTopic
@@ -63,4 +85,59 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
}
}
-}
\ No newline at end of file
+ @Test
+ def testProcessNotification {
+ val props = new Properties()
+ props.put("a.b", "10")
+
+ // Create a mock ConfigHandler to record config changes it is asked to process
+ val entityArgument = new Capture[String]()
+ val propertiesArgument = new Capture[Properties]()
+ val handler = EasyMock.createNiceMock(classOf[ConfigHandler])
+ handler.processConfigChanges(
+ EasyMock.and(EasyMock.capture(entityArgument), EasyMock.isA(classOf[String])),
+ EasyMock.and(EasyMock.capture(propertiesArgument), EasyMock.isA(classOf[Properties])))
+ EasyMock.expectLastCall().once()
+ EasyMock.replay(handler)
+
+ val configManager = new DynamicConfigManager(zkClient, Map(ConfigType.Topic -> handler))
+ // Notifications created using the old TopicConfigManager are ignored.
+ configManager.processNotification(Some("not json"))
+
+ // Incorrect Map. No version
+ try {
+ val jsonMap = Map("v" -> 1, "x" -> 2)
+ configManager.processNotification(Some(Json.encode(jsonMap)))
+ fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
+ }
+ catch {
+ case t: Throwable =>
+ }
+ // Version is provided. EntityType is incorrect
+ try {
+ val jsonMap = Map("version" -> 1, "entity_type" -> "garbage", "entity_name" -> "x")
+ configManager.processNotification(Some(Json.encode(jsonMap)))
+ fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
+ }
+ catch {
+ case t: Throwable =>
+ }
+
+ // EntityName isn't provided
+ try {
+ val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic)
+ configManager.processNotification(Some(Json.encode(jsonMap)))
+ fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
+ }
+ catch {
+ case t: Throwable =>
+ }
+
+ // Everything is provided
+ val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic, "entity_name" -> "x")
+ configManager.processNotification(Some(Json.encode(jsonMap)))
+
+ // Verify that processConfigChanges was only called once
+ EasyMock.verify(handler)
+ }
+}