You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/06 13:32:51 UTC
kafka git commit: KAFKA-4266;
ReassignPartitionsClusterTest: ensure ZK publication is completed
before start
Repository: kafka
Updated Branches:
refs/heads/trunk 79f85039d -> 63010cbfe
KAFKA-4266; ReassignPartitionsClusterTest: ensure ZK publication is completed before start
Increase the reliability of the one temporal comparison in ReassignPartitionsClusterTest by imposing a delay after ZK is updated. This should be more reliable than just increasing the amount of data.
This relates to a previous PR: https://github.com/apache/kafka/pull/1982
Author: Ben Stopford <be...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #1997 from benstopford/KAFKA-4266
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/63010cbf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/63010cbf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/63010cbf
Branch: refs/heads/trunk
Commit: 63010cbfe5d07df5db060888afd348fab5cbe62c
Parents: 79f8503
Author: Ben Stopford <be...@gmail.com>
Authored: Mon Mar 6 12:42:43 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Mar 6 13:12:28 2017 +0000
----------------------------------------------------------------------
.../kafka/admin/ReassignPartitionsCommand.scala | 37 ++++++++-----
.../other/kafka/ReplicationQuotasTestRig.scala | 3 +-
.../admin/ReassignPartitionsClusterTest.scala | 55 +++++++++++---------
.../admin/ReassignPartitionsCommandTest.scala | 7 +--
.../kafka/server/ReplicationQuotasTest.scala | 1 -
5 files changed, 60 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/63010cbf/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 4e7b4e0..c167633 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -17,8 +17,9 @@
package kafka.admin
import joptsimple.OptionParser
-import kafka.server.{DynamicConfig, ConfigType}
+import kafka.server.{ConfigType, DynamicConfig}
import kafka.utils._
+
import scala.collection._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.common.{AdminCommandFailedException, TopicAndPartition}
@@ -29,6 +30,10 @@ import org.apache.kafka.common.security.JaasUtils
object ReassignPartitionsCommand extends Logging {
+ case class Throttle(value: Long, postUpdateAction: () => Unit = () => ())
+
+ private[admin] val NoThrottle = Throttle(-1)
+
def main(args: Array[String]): Unit = {
val opts = validateAndParseArgs(args)
@@ -146,10 +151,10 @@ object ReassignPartitionsCommand extends Logging {
val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
val throttle = if (opts.options.has(opts.throttleOpt)) opts.options.valueOf(opts.throttleOpt) else -1
- executeAssignment(zkUtils, reassignmentJsonString, throttle)
+ executeAssignment(zkUtils, reassignmentJsonString, Throttle(throttle))
}
- def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Long = -1) {
+ def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Throttle) {
val partitionsToBeReassigned = parseAndValidate(zkUtils, reassignmentJsonString)
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)
@@ -160,7 +165,7 @@ object ReassignPartitionsCommand extends Logging {
}
else {
printCurrentAssignment(zkUtils, partitionsToBeReassigned)
- if (throttle >= 0)
+ if (throttle.value >= 0)
println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))
if (reassignPartitionsCommand.reassignPartitions(throttle)) {
println("Successfully started reassignment of partitions.")
@@ -178,7 +183,7 @@ object ReassignPartitionsCommand extends Logging {
def parseAndValidate(zkUtils: ZkUtils, reassignmentJsonString: String): Seq[(TopicAndPartition, Seq[Int])] = {
val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
-
+
if (partitionsToBeReassigned.isEmpty)
throw new AdminCommandFailedException("Partition reassignment data file is empty")
val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map { case (tp, _) => tp })
@@ -306,15 +311,19 @@ object ReassignPartitionsCommand extends Logging {
class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicAndPartition, Seq[Int]], admin: AdminUtilities = AdminUtils)
extends Logging {
+ import ReassignPartitionsCommand._
+
def existingAssignment(): Map[TopicAndPartition, Seq[Int]] = {
val proposedTopics = proposedAssignment.keySet.map(_.topic).toSeq
zkUtils.getReplicaAssignmentForTopics(proposedTopics)
}
- private def maybeThrottle(throttle: Long): Unit = {
- if (throttle >= 0) {
- maybeLimit(throttle)
+ private def maybeThrottle(throttle: Throttle): Unit = {
+ if (throttle.value >= 0) {
assignThrottledReplicas(existingAssignment(), proposedAssignment)
+ maybeLimit(throttle)
+ throttle.postUpdateAction()
+ println(s"The throttle limit was set to ${throttle.value} B/s")
}
}
@@ -322,19 +331,18 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
* Limit the throttle on currently moving replicas. Note that this command can use used to alter the throttle, but
* it may not alter all limits originally set, if some of the brokers have completed their rebalance.
*/
- def maybeLimit(throttle: Long) {
- if (throttle >= 0) {
+ def maybeLimit(throttle: Throttle) {
+ if (throttle.value >= 0) {
val existingBrokers = existingAssignment().values.flatten.toSeq
val proposedBrokers = proposedAssignment.values.flatten.toSeq
val brokers = (existingBrokers ++ proposedBrokers).distinct
for (id <- brokers) {
val configs = admin.fetchEntityConfig(zkUtils, ConfigType.Broker, id.toString)
- configs.put(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString)
- configs.put(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.toString)
+ configs.put(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.value.toString)
+ configs.put(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.value.toString)
admin.changeBrokerConfig(zkUtils, Seq(id), configs)
}
- println(s"The throttle limit was set to $throttle B/s")
}
}
@@ -384,7 +392,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
allProposed.filter { case (tp, _) => tp.topic == topic })
}
- def reassignPartitions(throttle: Long = -1): Boolean = {
+ def reassignPartitions(throttle: Throttle = NoThrottle): Boolean = {
maybeThrottle(throttle)
try {
val validPartitions = proposedAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }
@@ -422,6 +430,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
}
}
+
sealed trait ReassignmentStatus { def status: Int }
case object ReassignmentCompleted extends ReassignmentStatus { val status = 1 }
case object ReassignmentInProgress extends ReassignmentStatus { val status = 0 }
http://git-wip-us.apache.org/repos/asf/kafka/blob/63010cbf/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 71a5091..38d07ba 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -21,6 +21,7 @@ import java.io.{File, FileOutputStream, PrintWriter}
import javax.imageio.ImageIO
import kafka.admin.ReassignPartitionsCommand
+import kafka.admin.ReassignPartitionsCommand.Throttle
import kafka.common.TopicAndPartition
import org.apache.kafka.common.TopicPartition
import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
@@ -139,7 +140,7 @@ object ReplicationQuotasTestRig {
val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
val start = System.currentTimeMillis()
- ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), config.throttle)
+ ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(config.throttle))
//Await completion
waitForReassignmentToComplete()
http://git-wip-us.apache.org/repos/asf/kafka/blob/63010cbf/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index c211c24..c576a5c 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -12,6 +12,7 @@
*/
package kafka.admin
+import kafka.admin.ReassignPartitionsCommand._
import kafka.common.{AdminCommandFailedException, TopicAndPartition}
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.TestUtils._
@@ -21,14 +22,15 @@ import kafka.zk.ZooKeeperTestHarness
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{After, Before, Test}
import kafka.admin.ReplicationQuotaUtils._
-
-import scala.collection.{Map, Seq}
-
+import scala.collection.Map
+import scala.collection.Seq
class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
val partitionId = 0
var servers: Seq[KafkaServer] = null
val topicName = "my-topic"
+ val delayMs = 1000
+ def zkUpdateDelay = {Thread.sleep(delayMs)}
@Before
override def setUp() {
@@ -55,7 +57,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
createTopic(zkUtils, topicName, Map(partition -> Seq(100)), servers = servers)
//When we move the replica on 100 to broker 101
- ReassignPartitionsCommand.executeAssignment(zkUtils, s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101]}]}""")
+ val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101]}]}"""
+ ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
waitForReassignmentToComplete()
//Then the replica should be on 101
@@ -74,8 +77,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
), servers = servers)
//When rebalancing
- val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
- ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
+ val newAssignment = generateAssignment(zkUtils, brokers, json(topicName), true)._1
+ ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), NoThrottle)
waitForReassignmentToComplete()
//Then the replicas should span all three brokers
@@ -95,8 +98,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
), servers = servers)
//When rebalancing
- val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
- ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
+ val newAssignment = generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
+ ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), NoThrottle)
waitForReassignmentToComplete()
//Then replicas should only span the first two brokers
@@ -127,7 +130,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
)
//When rebalancing
- ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(proposed))
+ ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(proposed), NoThrottle)
waitForReassignmentToComplete()
//Then the proposed changes should have been made
@@ -142,6 +145,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
@Test
def shouldExecuteThrottledReassignment() {
+
//Given partitions on 3 of 3 brokers
val brokers = Array(100, 101, 102)
startBrokers(brokers)
@@ -150,33 +154,35 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
), servers = servers)
//Given throttle set so replication will take a certain number of secs
- val initialThrottle: Long = 10 * 1000 * 1000
+ val initialThrottle = Throttle(10 * 1000 * 1000, () => zkUpdateDelay)
val expectedDurationSecs = 5
val numMessages: Int = 500
val msgSize: Int = 100 * 1000
produceMessages(servers, topicName, numMessages, acks = 0, msgSize)
- assertEquals(expectedDurationSecs, numMessages * msgSize / initialThrottle)
+ assertEquals(expectedDurationSecs, numMessages * msgSize / initialThrottle.value)
//Start rebalance which will move replica on 100 -> replica on 102
- val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
+ val newAssignment = generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
val start = System.currentTimeMillis()
ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), initialThrottle)
//Check throttle config. Should be throttling replica 0 on 100 and 102 only.
- checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
+ checkThrottleConfigAddedToZK(initialThrottle.value, servers, topicName, "0:100,0:101", "0:102")
//Await completion
waitForReassignmentToComplete()
- val took = System.currentTimeMillis() - start
+ val took = System.currentTimeMillis() - start - delayMs
//Check move occurred
val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
assertEquals(Seq(101, 102), actual.values.flatten.toSeq.distinct.sorted)
//Then command should have taken longer than the throttle rate
- assertTrue(s"Expected replication to be > ${expectedDurationSecs * 0.9 * 1000} but was $took", took > expectedDurationSecs * 0.9 * 1000)
- assertTrue(s"Expected replication to be < ${expectedDurationSecs * 2 * 1000} but was $took", took < expectedDurationSecs * 2 * 1000)
+ assertTrue(s"Expected replication to be > ${expectedDurationSecs * 0.9 * 1000} but was $took",
+ took > expectedDurationSecs * 0.9 * 1000)
+ assertTrue(s"Expected replication to be < ${expectedDurationSecs * 2 * 1000} but was $took",
+ took < expectedDurationSecs * 2 * 1000)
}
@@ -211,7 +217,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
TopicAndPartition("topic1", 2) -> Seq(103, 104), //didn't move
TopicAndPartition("topic2", 2) -> Seq(103, 104) //didn't move
)
- ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), throttle)
+ ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(throttle))
//Check throttle config. Should be throttling specific replicas for each topic.
checkThrottleConfigAddedToZK(throttle, servers, "topic1",
@@ -238,15 +244,15 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
produceMessages(servers, topicName, numMessages = 200, acks = 0, valueBytes = 100 * 1000)
//Start rebalance
- val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
+ val newAssignment = generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
- ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), initialThrottle)
+ ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(initialThrottle))
//Check throttle config
checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
//Ensure that running Verify, whilst the command is executing, should have no effect
- ReassignPartitionsCommand.verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
+ verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
//Check throttle config again
checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
@@ -254,7 +260,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
//Now re-run the same assignment with a larger throttle, which should only act to increase the throttle and make progress
val newThrottle = initialThrottle * 1000
- ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), newThrottle)
+ ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(newThrottle))
//Check throttle was changed
checkThrottleConfigAddedToZK(newThrottle, servers, topicName, "0:100,0:101", "0:102")
@@ -263,7 +269,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
waitForReassignmentToComplete()
//Verify should remove the throttle
- ReassignPartitionsCommand.verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
+ verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
//Check removed
checkThrottleConfigRemovedFromZK(topicName, servers)
@@ -280,12 +286,13 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
createTopic(zkUtils, topicName, Map(0 -> Seq(100)), servers = servers)
//When we execute an assignment that includes an invalid partition (1:101 in this case)
- ReassignPartitionsCommand.executeAssignment(zkUtils, s"""{"version":1,"partitions":[{"topic":"$topicName","partition":1,"replicas":[101]}]}""")
+ val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":1,"replicas":[101]}]}"""
+ ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
}
@Test
def shouldPerformThrottledReassignmentOverVariousTopics() {
- val throttle = 1000L
+ val throttle = Throttle(1000L)
//Given four brokers
servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(conf => TestUtils.createServer(KafkaConfig.fromProps(conf)))
http://git-wip-us.apache.org/repos/asf/kafka/blob/63010cbf/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 5ecc19b..9e23983 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -18,6 +18,7 @@ package kafka.admin
import java.util.Properties
+import kafka.admin.ReassignPartitionsCommand.Throttle
import kafka.common.TopicAndPartition
import kafka.log.LogConfig
import kafka.log.LogConfig._
@@ -248,7 +249,7 @@ class ReassignPartitionsCommandTest extends Logging {
replay(admin)
//When
- assigner.maybeLimit(1000)
+ assigner.maybeLimit(Throttle(1000))
//Then
for (actual <- propsCapture.getValues) {
@@ -282,7 +283,7 @@ class ReassignPartitionsCommandTest extends Logging {
replay(admin)
//When
- assigner.maybeLimit(1000)
+ assigner.maybeLimit(Throttle(1000))
//Then
for (actual <- propsCapture.getValues) {
@@ -312,7 +313,7 @@ class ReassignPartitionsCommandTest extends Logging {
replay(admin)
//When
- assigner.maybeLimit(1000)
+ assigner.maybeLimit(Throttle(1000))
//Then other property remains
for (actual <- propsCapture.getValues) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/63010cbf/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 984d340..b6e3607 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -31,7 +31,6 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.junit.Assert._
import org.junit.{After, Before, Test}
-
import scala.collection.JavaConverters._
/**