You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:22 UTC
[08/50] [abbrv] kafka git commit: KAFKA-3270;
Added some Happy Path Tests for the Reassign Partitions Command
KAFKA-3270; Added some Happy Path Tests for the Reassign Partitions Command
with help from enothereska :)
Author: Ben Stopford <be...@gmail.com>
Reviewers: Jun Rao <ju...@apache.org>, Eno Thereska <en...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #956 from benstopford/KAFKA-3270-ReassignPartitionsCommand-Tests
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3847f76
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3847f76
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3847f76
Branch: refs/heads/0.10.0
Commit: b3847f76b571deac5c7da7287a642c8b354a2a8c
Parents: 996e29c
Author: Ben Stopford <be...@gmail.com>
Authored: Tue Apr 26 06:31:00 2016 -0700
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Apr 26 06:31:00 2016 -0700
----------------------------------------------------------------------
.../kafka/admin/ReassignPartitionsCommand.scala | 19 ++--
core/src/main/scala/kafka/utils/ZkUtils.scala | 4 +-
.../admin/ReassignPartitionsClusterTest.scala | 112 +++++++++++++++++++
3 files changed, 126 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3847f76/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 446ab9f..1bf351a 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -39,7 +39,7 @@ object ReassignPartitionsCommand extends Logging {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
- val zkUtils = ZkUtils(zkConnect,
+ val zkUtils = ZkUtils(zkConnect,
30000,
30000,
JaasUtils.isZkSecurityEnabled())
@@ -93,8 +93,8 @@ object ReassignPartitionsCommand extends Logging {
val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
val disableRackAware = opts.options.has(opts.disableRackAware)
val (proposedAssignments, currentAssignments) = generateAssignment(zkUtils, brokerListToReassign, topicsToMoveJsonString, disableRackAware)
- println("Current partition replica assignment\n\n%s".format(zkUtils.getPartitionReassignmentZkData(currentAssignments)))
- println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.getPartitionReassignmentZkData(proposedAssignments)))
+ println("Current partition replica assignment\n\n%s".format(zkUtils.formatAsReassignmentJson(currentAssignments)))
+ println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.formatAsReassignmentJson(proposedAssignments)))
}
def generateAssignment(zkUtils: ZkUtils, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicAndPartition, Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = {
@@ -125,9 +125,14 @@ object ReassignPartitionsCommand extends Logging {
CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option")
val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
+ executeAssignment(zkUtils, reassignmentJsonString)
+ }
+
+ def executeAssignment(zkUtils: ZkUtils,reassignmentJsonString: String){
+
val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
if (partitionsToBeReassigned.isEmpty)
- throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile))
+ throw new AdminCommandFailedException("Partition reassignment data file is empty")
val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas) => tp})
if (duplicateReassignedPartitions.nonEmpty)
throw new AdminCommandFailedException("Partition reassignment contains duplicate topic partitions: %s".format(duplicateReassignedPartitions.mkString(",")))
@@ -144,10 +149,10 @@ object ReassignPartitionsCommand extends Logging {
// before starting assignment, output the current replica assignment to facilitate rollback
val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
- .format(zkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
+ .format(zkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
// start the reassignment
if(reassignPartitionsCommand.reassignPartitions())
- println("Successfully started reassignment of partitions %s".format(zkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned.toMap)))
+ println("Successfully started reassignment of partitions %s".format(zkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap)))
else
println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
}
@@ -228,7 +233,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, partitions: collection.Map[Top
false
}
else {
- val jsonReassignmentData = zkUtils.getPartitionReassignmentZkData(validPartitions)
+ val jsonReassignmentData = zkUtils.formatAsReassignmentJson(validPartitions)
zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
true
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3847f76/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 155b3fd..83ff517 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -710,7 +710,7 @@ class ZkUtils(val zkClient: ZkClient,
topics
}
- def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
+ def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
Json.encode(Map("version" -> 1, "partitions" -> partitionsToBeReassigned.map(e => Map("topic" -> e._1.topic, "partition" -> e._1.partition,
"replicas" -> e._2))))
}
@@ -722,7 +722,7 @@ class ZkUtils(val zkClient: ZkClient,
deletePath(zkPath)
info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath))
case _ =>
- val jsonData = getPartitionReassignmentZkData(partitionsToBeReassigned)
+ val jsonData = formatAsReassignmentJson(partitionsToBeReassigned)
try {
updatePersistentPath(zkPath, jsonData)
debug("Updated partition reassignment path with %s".format(jsonData))
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3847f76/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
new file mode 100644
index 0000000..ac2c1ae
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package unit.kafka.admin
+
+import kafka.admin.ReassignPartitionsCommand
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.TestUtils._
+import kafka.utils.ZkUtils._
+import kafka.utils.{CoreUtils, Logging}
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.{After, Before, Test}
+import org.junit.Assert.assertEquals
+import scala.collection.Seq
+
+
+class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
+ val partitionId = 0
+ var servers: Seq[KafkaServer] = null
+ val topicName = "my-topic"
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ }
+
+ def startBrokers(brokerIds: Seq[Int]) {
+ servers = brokerIds.map(i => createBrokerConfig(i, zkConnect))
+ .map(c => createServer(KafkaConfig.fromProps(c)))
+ }
+
+ @After
+ override def tearDown() {
+ servers.foreach(_.shutdown())
+ servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+ super.tearDown()
+ }
+
+ @Test
+ def shouldMoveSinglePartition {
+ //Given a single replica on server 100
+ startBrokers(Seq(100, 101))
+ val partition = 0
+ createTopic(zkUtils, topicName, Map(partition -> Seq(100)), servers = servers)
+
+ //When we move the replica on 100 to broker 101
+ ReassignPartitionsCommand.executeAssignment(zkUtils, s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101]}]}""")
+ waitForReasignmentToComplete()
+
+ //Then the replica should be on 101
+ assertEquals(zkUtils.getPartitionAssignmentForTopics(Seq(topicName)).get(topicName).get(partition), Seq(101))
+ }
+
+ @Test
+ def shouldExpandCluster() {
+ //Given partitions on 2 of 3 brokers
+ val brokers = Array(100, 101, 102)
+ startBrokers(brokers)
+ createTopic(zkUtils, topicName, Map(
+ 0 -> Seq(100, 101),
+ 1 -> Seq(100, 101),
+ 2 -> Seq(100, 101)
+ ), servers = servers)
+
+ //When rebalancing
+ val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
+ ReassignPartitionsCommand.executeAssignment(zkUtils, zkUtils.formatAsReassignmentJson(newAssignment))
+ waitForReasignmentToComplete()
+
+ //Then the replicas should span all three brokers
+ val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
+ assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(100, 101, 102))
+ }
+
+ @Test
+ def shouldShrinkCluster() {
+ //Given partitions on 3 of 3 brokers
+ val brokers = Array(100, 101, 102)
+ startBrokers(brokers)
+ createTopic(zkUtils, topicName, Map(
+ 0 -> Seq(100, 101),
+ 1 -> Seq(101, 102),
+ 2 -> Seq(102, 100)
+ ), servers = servers)
+
+ //When rebalancing
+ val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
+ ReassignPartitionsCommand.executeAssignment(zkUtils, zkUtils.formatAsReassignmentJson(newAssignment))
+ waitForReasignmentToComplete()
+
+ //Then replicas should only span the first two brokers
+ val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
+ assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(100, 101))
+ }
+
+ def waitForReasignmentToComplete() {
+ waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode $zkUtils.ReassignPartitionsPath wasn't deleted")
+ }
+
+ def json(topic: String): String = {
+ s"""{"topics": [{"topic": "$topic"}],"version":1}"""
+ }
+}