You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:30 UTC

[33/37] git commit: kafka-1562; kafka-topics.sh alter add partitions resets cleanup.policy; patched by Jonathan Natkins; reviewed by Jun Rao

kafka-1562; kafka-topics.sh alter add partitions resets cleanup.policy; patched by Jonathan Natkins; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f8d521a9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f8d521a9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f8d521a9

Branch: refs/heads/transactional_messaging
Commit: f8d521a9616134d78966419e5cf2aa73e8d6a5c5
Parents: a01a101
Author: Jonathan Natkins <na...@wibidata.com>
Authored: Mon Aug 4 07:21:25 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Aug 4 07:21:25 2014 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala | 10 +++-
 .../main/scala/kafka/admin/TopicCommand.scala   |  4 +-
 .../unit/kafka/admin/TopicCommandTest.scala     | 63 ++++++++++++++++++++
 3 files changed, 73 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f8d521a9/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index b5d8714..94c5332 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -96,8 +96,14 @@ object AdminUtils extends Logging {
   * @param numPartitions Number of partitions to be set
   * @param replicaAssignmentStr Manual replica assignment
   * @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
+  * @param config Pre-existing properties that should be preserved
   */
-  def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "", checkBrokerAvailable: Boolean = true) {
+  def addPartitions(zkClient: ZkClient,
+                    topic: String,
+                    numPartitions: Int = 1,
+                    replicaAssignmentStr: String = "",
+                    checkBrokerAvailable: Boolean = true,
+                    config: Properties = new Properties) {
     val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
     if (existingPartitionsReplicaList.size == 0)
       throw new AdminOperationException("The topic %s does not exist".format(topic))
@@ -124,7 +130,7 @@ object AdminUtils extends Logging {
     val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
     // add the new list
     partitionReplicaList ++= newPartitionReplicaList
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, config, true)
   }
 
   def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8d521a9/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 8d5c2e7..003a09c 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -95,11 +95,11 @@ object TopicCommand {
   def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
     val topics = getTopics(zkClient, opts)
     topics.foreach { topic =>
+      val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
       if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
         val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
         val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
         // compile the final set of configs
-        val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
         configs.putAll(configsToBeAdded)
         configsToBeDeleted.foreach(config => configs.remove(config))
         AdminUtils.changeTopicConfig(zkClient, topic, configs)
@@ -113,7 +113,7 @@ object TopicCommand {
           "logic or ordering of the messages will be affected")
         val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
         val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
-        AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
+        AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr, config = configs)
         println("Adding partitions succeeded!")
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8d521a9/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
new file mode 100644
index 0000000..ac6dd20
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import junit.framework.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.Logging
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.server.KafkaConfig
+import kafka.admin.TopicCommand.TopicCommandOptions
+import kafka.utils.ZkUtils
+
+class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
+
+  @Test
+  def testConfigPreservationAcrossPartitionAlteration() {
+    val topic = "test"
+    val numPartitionsOriginal = 1
+    val cleanupKey = "cleanup.policy"
+    val cleanupVal = "compact"
+    // create brokers
+    val brokers = List(0, 1, 2)
+    TestUtils.createBrokersInZk(zkClient, brokers)
+    // create the topic
+    val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
+      "--replication-factor", "1",
+      "--config", cleanupKey + "=" + cleanupVal,
+      "--topic", topic))
+    TopicCommand.createTopic(zkClient, createOpts)
+    val props = AdminUtils.fetchTopicConfig(zkClient, topic)
+    assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey))
+    assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal))
+
+    // pre-create the topic config changes path to avoid a NoNodeException
+    ZkUtils.createPersistentPath(zkClient, ZkUtils.TopicConfigChangesPath)
+
+    // modify the topic to add new partitions
+    val numPartitionsModified = 3
+    val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString,
+      "--config", cleanupKey + "=" + cleanupVal,
+      "--topic", topic))
+    TopicCommand.alterTopic(zkClient, alterOpts)
+    val newProps = AdminUtils.fetchTopicConfig(zkClient, topic)
+    assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey))
+    assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal))
+  }
+}
\ No newline at end of file