You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:30 UTC
[33/37] git commit: kafka-1562;
kafka-topics.sh alter add partitions resets cleanup.policy;
patched by Jonathan Natkins; reviewed by Jun Rao
kafka-1562; kafka-topics.sh alter add partitions resets cleanup.policy; patched by Jonathan Natkins; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f8d521a9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f8d521a9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f8d521a9
Branch: refs/heads/transactional_messaging
Commit: f8d521a9616134d78966419e5cf2aa73e8d6a5c5
Parents: a01a101
Author: Jonathan Natkins <na...@wibidata.com>
Authored: Mon Aug 4 07:21:25 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Aug 4 07:21:25 2014 -0700
----------------------------------------------------------------------
.../src/main/scala/kafka/admin/AdminUtils.scala | 10 +++-
.../main/scala/kafka/admin/TopicCommand.scala | 4 +-
.../unit/kafka/admin/TopicCommandTest.scala | 63 ++++++++++++++++++++
3 files changed, 73 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f8d521a9/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index b5d8714..94c5332 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -96,8 +96,14 @@ object AdminUtils extends Logging {
* @param numPartitions Number of partitions to be set
* @param replicaAssignmentStr Manual replica assignment
* @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
+ * @param config Pre-existing properties that should be preserved
*/
- def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "", checkBrokerAvailable: Boolean = true) {
+ def addPartitions(zkClient: ZkClient,
+ topic: String,
+ numPartitions: Int = 1,
+ replicaAssignmentStr: String = "",
+ checkBrokerAvailable: Boolean = true,
+ config: Properties = new Properties) {
val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
if (existingPartitionsReplicaList.size == 0)
throw new AdminOperationException("The topic %s does not exist".format(topic))
@@ -124,7 +130,7 @@ object AdminUtils extends Logging {
val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
// add the new list
partitionReplicaList ++= newPartitionReplicaList
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, config, true)
}
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f8d521a9/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 8d5c2e7..003a09c 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -95,11 +95,11 @@ object TopicCommand {
def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
val topics = getTopics(zkClient, opts)
topics.foreach { topic =>
+ val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
// compile the final set of configs
- val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
configs.putAll(configsToBeAdded)
configsToBeDeleted.foreach(config => configs.remove(config))
AdminUtils.changeTopicConfig(zkClient, topic, configs)
@@ -113,7 +113,7 @@ object TopicCommand {
"logic or ordering of the messages will be affected")
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
- AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
+ AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr, config = configs)
println("Adding partitions succeeded!")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f8d521a9/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
new file mode 100644
index 0000000..ac6dd20
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import junit.framework.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.Logging
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.server.KafkaConfig
+import kafka.admin.TopicCommand.TopicCommandOptions
+import kafka.utils.ZkUtils
+
+class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
+
+ @Test
+ def testConfigPreservationAcrossPartitionAlteration() {
+ val topic = "test"
+ val numPartitionsOriginal = 1
+ val cleanupKey = "cleanup.policy"
+ val cleanupVal = "compact"
+ // create brokers
+ val brokers = List(0, 1, 2)
+ TestUtils.createBrokersInZk(zkClient, brokers)
+ // create the topic
+ val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
+ "--replication-factor", "1",
+ "--config", cleanupKey + "=" + cleanupVal,
+ "--topic", topic))
+ TopicCommand.createTopic(zkClient, createOpts)
+ val props = AdminUtils.fetchTopicConfig(zkClient, topic)
+ assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey))
+ assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal))
+
+ // pre-create the topic config changes path to avoid a NoNodeException
+ ZkUtils.createPersistentPath(zkClient, ZkUtils.TopicConfigChangesPath)
+
+ // modify the topic to add new partitions
+ val numPartitionsModified = 3
+ val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString,
+ "--config", cleanupKey + "=" + cleanupVal,
+ "--topic", topic))
+ TopicCommand.alterTopic(zkClient, alterOpts)
+ val newProps = AdminUtils.fetchTopicConfig(zkClient, topic)
+ assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey))
+ assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal))
+ }
+}
\ No newline at end of file