You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/04/26 15:31:21 UTC

kafka git commit: KAFKA-3270; Added some Happy Path Tests for the Reassign Partitions Command

Repository: kafka
Updated Branches:
  refs/heads/trunk 996e29cfe -> b3847f76b


KAFKA-3270; Added some Happy Path Tests for the Reassign Partitions Command

with help from enothereska  :)

Author: Ben Stopford <be...@gmail.com>

Reviewers: Jun Rao <ju...@apache.org>, Eno Thereska <en...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #956 from benstopford/KAFKA-3270-ReassignPartitionsCommand-Tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3847f76
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3847f76
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3847f76

Branch: refs/heads/trunk
Commit: b3847f76b571deac5c7da7287a642c8b354a2a8c
Parents: 996e29c
Author: Ben Stopford <be...@gmail.com>
Authored: Tue Apr 26 06:31:00 2016 -0700
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Apr 26 06:31:00 2016 -0700

----------------------------------------------------------------------
 .../kafka/admin/ReassignPartitionsCommand.scala |  19 ++--
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   4 +-
 .../admin/ReassignPartitionsClusterTest.scala   | 112 +++++++++++++++++++
 3 files changed, 126 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b3847f76/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 446ab9f..1bf351a 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -39,7 +39,7 @@ object ReassignPartitionsCommand extends Logging {
     CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
 
     val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
-    val zkUtils = ZkUtils(zkConnect, 
+    val zkUtils = ZkUtils(zkConnect,
                           30000,
                           30000,
                           JaasUtils.isZkSecurityEnabled())
@@ -93,8 +93,8 @@ object ReassignPartitionsCommand extends Logging {
     val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
     val disableRackAware = opts.options.has(opts.disableRackAware)
     val (proposedAssignments, currentAssignments) = generateAssignment(zkUtils, brokerListToReassign, topicsToMoveJsonString, disableRackAware)
-    println("Current partition replica assignment\n\n%s".format(zkUtils.getPartitionReassignmentZkData(currentAssignments)))
-    println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.getPartitionReassignmentZkData(proposedAssignments)))
+    println("Current partition replica assignment\n\n%s".format(zkUtils.formatAsReassignmentJson(currentAssignments)))
+    println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.formatAsReassignmentJson(proposedAssignments)))
   }
 
   def generateAssignment(zkUtils: ZkUtils, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicAndPartition, Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = {
@@ -125,9 +125,14 @@ object ReassignPartitionsCommand extends Logging {
       CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option")
     val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
+    executeAssignment(zkUtils, reassignmentJsonString)
+  }
+
+  def executeAssignment(zkUtils: ZkUtils,reassignmentJsonString: String){
+
     val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
     if (partitionsToBeReassigned.isEmpty)
-      throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile))
+      throw new AdminCommandFailedException("Partition reassignment data file is empty")
     val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas) => tp})
     if (duplicateReassignedPartitions.nonEmpty)
       throw new AdminCommandFailedException("Partition reassignment contains duplicate topic partitions: %s".format(duplicateReassignedPartitions.mkString(",")))
@@ -144,10 +149,10 @@ object ReassignPartitionsCommand extends Logging {
     // before starting assignment, output the current replica assignment to facilitate rollback
     val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
     println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
-      .format(zkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
+      .format(zkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
     // start the reassignment
     if(reassignPartitionsCommand.reassignPartitions())
-      println("Successfully started reassignment of partitions %s".format(zkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned.toMap)))
+      println("Successfully started reassignment of partitions %s".format(zkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap)))
     else
       println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
   }
@@ -228,7 +233,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, partitions: collection.Map[Top
         false
       }
       else {
-        val jsonReassignmentData = zkUtils.getPartitionReassignmentZkData(validPartitions)
+        val jsonReassignmentData = zkUtils.formatAsReassignmentJson(validPartitions)
         zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
         true
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3847f76/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 155b3fd..83ff517 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -710,7 +710,7 @@ class ZkUtils(val zkClient: ZkClient,
     topics
   }
 
-  def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
+  def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
     Json.encode(Map("version" -> 1, "partitions" -> partitionsToBeReassigned.map(e => Map("topic" -> e._1.topic, "partition" -> e._1.partition,
                                                                                           "replicas" -> e._2))))
   }
@@ -722,7 +722,7 @@ class ZkUtils(val zkClient: ZkClient,
         deletePath(zkPath)
         info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath))
       case _ =>
-        val jsonData = getPartitionReassignmentZkData(partitionsToBeReassigned)
+        val jsonData = formatAsReassignmentJson(partitionsToBeReassigned)
         try {
           updatePersistentPath(zkPath, jsonData)
           debug("Updated partition reassignment path with %s".format(jsonData))

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3847f76/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
new file mode 100644
index 0000000..ac2c1ae
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -0,0 +1,112 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package unit.kafka.admin
+
+import kafka.admin.ReassignPartitionsCommand
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.TestUtils._
+import kafka.utils.ZkUtils._
+import kafka.utils.{CoreUtils, Logging}
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.{After, Before, Test}
+import org.junit.Assert.assertEquals
+import scala.collection.Seq
+
+
+class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
+  val partitionId = 0
+  var servers: Seq[KafkaServer] = null
+  val topicName = "my-topic"
+
+  @Before
+  override def setUp() {
+    super.setUp()
+  }
+
+  def startBrokers(brokerIds: Seq[Int]) {
+    servers = brokerIds.map(i => createBrokerConfig(i, zkConnect))
+      .map(c => createServer(KafkaConfig.fromProps(c)))
+  }
+
+  @After
+  override def tearDown() {
+    servers.foreach(_.shutdown())
+    servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+    super.tearDown()
+  }
+
+  @Test
+  def shouldMoveSinglePartition {
+    //Given a single replica on server 100
+    startBrokers(Seq(100, 101))
+    val partition = 0
+    createTopic(zkUtils, topicName, Map(partition -> Seq(100)), servers = servers)
+
+    //When we move the replica on 100 to broker 101
+    ReassignPartitionsCommand.executeAssignment(zkUtils, s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101]}]}""")
+    waitForReasignmentToComplete()
+
+    //Then the replica should be on 101
+    assertEquals(zkUtils.getPartitionAssignmentForTopics(Seq(topicName)).get(topicName).get(partition), Seq(101))
+  }
+
+  @Test
+  def shouldExpandCluster() {
+    //Given partitions on 2 of 3 brokers
+    val brokers = Array(100, 101, 102)
+    startBrokers(brokers)
+    createTopic(zkUtils, topicName, Map(
+      0 -> Seq(100, 101),
+      1 -> Seq(100, 101),
+      2 -> Seq(100, 101)
+    ), servers = servers)
+
+    //When rebalancing
+    val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
+    ReassignPartitionsCommand.executeAssignment(zkUtils, zkUtils.formatAsReassignmentJson(newAssignment))
+    waitForReasignmentToComplete()
+
+    //Then the replicas should span all three brokers
+    val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
+    assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(100, 101, 102))
+  }
+
+  @Test
+  def shouldShrinkCluster() {
+    //Given partitions on 3 of 3 brokers
+    val brokers = Array(100, 101, 102)
+    startBrokers(brokers)
+    createTopic(zkUtils, topicName, Map(
+      0 -> Seq(100, 101),
+      1 -> Seq(101, 102),
+      2 -> Seq(102, 100)
+    ), servers = servers)
+
+    //When rebalancing
+    val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
+    ReassignPartitionsCommand.executeAssignment(zkUtils, zkUtils.formatAsReassignmentJson(newAssignment))
+    waitForReasignmentToComplete()
+
+    //Then replicas should only span the first two brokers
+    val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
+    assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(100, 101))
+  }
+
+  def waitForReasignmentToComplete() {
+    waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode $zkUtils.ReassignPartitionsPath wasn't deleted")
+  }
+
+  def json(topic: String): String = {
+    s"""{"topics": [{"topic": "$topic"}],"version":1}"""
+  }
+}