You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/11/08 02:45:43 UTC
git commit: KAFKA-1119 Kafka 0.8.1 overwrites previous per topic
config changes;
reviewed by Joel Koshy, Guozhang Wang, Swapnil Ghike, Jay Kreps
Updated Branches:
refs/heads/trunk a700c99ef -> eedbea652
KAFKA-1119 Kafka 0.8.1 overwrites previous per topic config changes; reviewed by Joel Koshy, Guozhang Wang, Swapnil Ghike, Jay Kreps
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eedbea65
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eedbea65
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eedbea65
Branch: refs/heads/trunk
Commit: eedbea6526986783257ad0e025c451a8ee3d9095
Parents: a700c99
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Nov 7 17:45:39 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Nov 7 17:45:39 2013 -0800
----------------------------------------------------------------------
.../src/main/scala/kafka/admin/AdminUtils.scala | 27 +++++++-----
.../main/scala/kafka/admin/TopicCommand.scala | 43 +++++++++++++++-----
core/src/main/scala/kafka/log/LogConfig.scala | 2 +-
.../scala/kafka/server/TopicConfigManager.scala | 1 -
4 files changed, 50 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/eedbea65/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 8107a64..8ff4bd5 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -200,14 +200,21 @@ object AdminUtils extends Logging {
/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
+ * @param zkClient: The ZkClient handle used to write the new config to zookeeper
+ * @param topic: The topic for which configs are being changed
+ * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+ * existing configs need to be deleted, it should be done prior to invoking this API
+ *
*/
- def changeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
- LogConfig.validate(config)
+ def changeTopicConfig(zkClient: ZkClient, topic: String, configs: Properties) {
if(!topicExists(zkClient, topic))
throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
-
+
+ // remove the topic overrides
+ LogConfig.validate(configs)
+
// write the new config--may not exist if there were previously no overrides
- writeTopicConfig(zkClient, topic, config)
+ writeTopicConfig(zkClient, topic, configs)
// create the change notification
zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic))
@@ -217,14 +224,12 @@ object AdminUtils extends Logging {
* Write out the topic config to zk, if there is any
*/
private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
- if(config.size > 0) {
- val configMap: mutable.Map[String, String] = {
- import JavaConversions._
- config
- }
- val map = Map("version" -> 1, "config" -> configMap)
- ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map))
+ val configMap: mutable.Map[String, String] = {
+ import JavaConversions._
+ config
}
+ val map = Map("version" -> 1, "config" -> configMap)
+ ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map))
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/eedbea65/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 56f3177..3c08dee 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -23,8 +23,8 @@ import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import scala.collection._
import scala.collection.JavaConversions._
-import kafka.common.Topic
import kafka.cluster.Broker
+import kafka.log.LogConfig
object TopicCommand {
@@ -61,7 +61,7 @@ object TopicCommand {
def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
val topics = opts.options.valuesOf(opts.topicOpt)
- val configs = parseTopicConfigs(opts)
+ val configs = parseTopicConfigsToBeAdded(opts)
for (topic <- topics) {
if (opts.options.has(opts.replicaAssignmentOpt)) {
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
@@ -79,8 +79,13 @@ object TopicCommand {
def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
val topic = opts.options.valueOf(opts.topicOpt)
- if(opts.options.has(opts.configOpt)) {
- val configs = parseTopicConfigs(opts)
+ if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
+ val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
+ val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
+ // compile the final set of configs
+ val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
+ configs.putAll(configsToBeAdded)
+ configsToBeDeleted.foreach(config => configs.remove(config))
AdminUtils.changeTopicConfig(zkClient, topic, configs)
println("Updated config for topic \"%s\".".format(topic))
}
@@ -147,14 +152,28 @@ object TopicCommand {
def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")"
- def parseTopicConfigs(opts: TopicCommandOptions): Properties = {
- val configs = opts.options.valuesOf(opts.configOpt).map(_.split("\\s*=\\s*"))
- require(configs.forall(_.length == 2), "Invalid topic config: all configs must be in the format \"key=val\".")
+ def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = {
+ val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))
+ require(configsToBeAdded.forall(config => config.length == 2),
+ "Invalid topic config: all configs to be added must be in the format \"key=val\".")
val props = new Properties
- configs.foreach(pair => props.setProperty(pair(0), pair(1)))
+ configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
+ LogConfig.validate(props)
props
}
-
+
+ def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = {
+ val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.split("""\s*=\s*"""))
+ if(opts.options.has(opts.createOpt))
+ require(configsToBeDeleted.size == 0, "Invalid topic config: all configs on create topic must be in the format \"key=val\".")
+ require(configsToBeDeleted.forall(config => config.length == 1),
+ "Invalid topic config: all configs to be deleted must be in the format \"key\".")
+ val propsToBeDeleted = new Properties
+ configsToBeDeleted.foreach(pair => propsToBeDeleted.setProperty(pair(0).trim, ""))
+ LogConfig.validateNames(propsToBeDeleted)
+ configsToBeDeleted.map(pair => pair(0))
+ }
+
def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
val partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
@@ -184,10 +203,14 @@ object TopicCommand {
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
- val configOpt = parser.accepts("config", "A topic configuration for the topic being created or altered.")
+ val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered.")
.withRequiredArg
.describedAs("name=value")
.ofType(classOf[String])
+ val deleteConfigOpt = parser.accepts("deleteConfig", "A topic configuration override to be removed for an existing topic")
+ .withRequiredArg
+ .describedAs("name")
+ .ofType(classOf[String])
val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " +
"altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected")
.withRequiredArg
http://git-wip-us.apache.org/repos/asf/kafka/blob/eedbea65/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 51ec796..0b32aee 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -132,7 +132,7 @@ object LogConfig {
/**
* Check that property names are valid
*/
- private def validateNames(props: Properties) {
+ def validateNames(props: Properties) {
import JavaConversions._
for(name <- props.keys)
require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name))
http://git-wip-us.apache.org/repos/asf/kafka/blob/eedbea65/core/src/main/scala/kafka/server/TopicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala
index 56cae58..42e98dd 100644
--- a/core/src/main/scala/kafka/server/TopicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala
@@ -90,7 +90,6 @@ class TopicConfigManager(private val zkClient: ZkClient,
val now = time.milliseconds
val logs = logManager.logsByTopicPartition.toBuffer
val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
- val lastChangeId = notifications.map(changeNumber).max
for (notification <- notifications) {
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) {