You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/05/29 20:12:47 UTC

[kafka] branch 2.6 updated: KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (#8749)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 50482bc  KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (#8749)
50482bc is described below

commit 50482bc31d4742c8c3027d50e6ce65f4c1dedac7
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri May 29 12:09:09 2020 -0700

    KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (#8749)
    
    We have seen this test failing from time to time. In spite of the low quota, it is possible for one or more of the reassignments to complete before verification that the reassignment is in progress. The patch makes this less likely by reducing the quota even further and increasing the amount of data in the topic.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Rajini Sivaram <ra...@googlemail.com>
---
 .../admin/ReassignPartitionsIntegrationTest.scala  | 32 ++++++++++------------
 1 file changed, 14 insertions(+), 18 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index c365d5f..974877b 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -56,10 +56,8 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
   }
 
   val unthrottledBrokerConfigs =
-    0.to(4).map {
-      case brokerId => (brokerId, brokerLevelThrottles.map {
-        case throttleName => (throttleName, -1L)
-      }.toMap)
+    0.to(4).map { brokerId =>
+      brokerId -> brokerLevelThrottles.map(throttle => (throttle, -1L)).toMap
     }.toMap
 
   /**
@@ -277,15 +275,15 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
   def testCancellation(): Unit = {
     cluster = new ReassignPartitionsTestCluster(zkConnect)
     cluster.setup()
-    cluster.produceMessages("foo", 0, 60)
-    cluster.produceMessages("baz", 1, 80)
+    cluster.produceMessages("foo", 0, 200)
+    cluster.produceMessages("baz", 1, 200)
     val assignment = """{"version":1,"partitions":""" +
       """[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},""" +
       """{"topic":"baz","partition":1,"replicas":[0,2,3],"log_dirs":["any","any","any"]}""" +
       """]}"""
     assertEquals(unthrottledBrokerConfigs,
       describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
-    val interBrokerThrottle = 100L
+    val interBrokerThrottle = 1L
     runExecuteAssignment(cluster.adminClient, false, assignment, interBrokerThrottle, -1L)
     val throttledConfigMap = Map[String, Long](
       brokerLevelLeaderThrottle -> interBrokerThrottle,
@@ -303,10 +301,9 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
     // Verify that the reassignment is running.  The very low throttle should keep it
     // from completing before this runs.
     waitForVerifyAssignment(cluster.adminClient, assignment, true,
-      VerifyAssignmentResult(Map(new TopicPartition("foo", 0) ->
-        PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false),
-        new TopicPartition("baz", 1) ->
-          PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), false)),
+      VerifyAssignmentResult(Map(
+        new TopicPartition("foo", 0) -> PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false),
+        new TopicPartition("baz", 1) -> PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), false)),
       true, Map(), false))
     // Cancel the reassignment.
     assertEquals((Set(
@@ -339,13 +336,12 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
    *                        information.  The nested maps are keyed on throttle name.
    */
   private def describeBrokerLevelThrottles(brokerIds: Seq[Int]): Map[Int, Map[String, Long]] = {
-    brokerIds.map {
-      case brokerId =>
-        val props = zkClient.getEntityConfigs("brokers", brokerId.toString)
-        (brokerId, brokerLevelThrottles.map {
-          case throttleName => (throttleName,
-            props.getOrDefault(throttleName, "-1").asInstanceOf[String].toLong)
-        }.toMap)
+    brokerIds.map { brokerId =>
+      val props = zkClient.getEntityConfigs("brokers", brokerId.toString)
+      val throttles = brokerLevelThrottles.map { throttleName =>
+        (throttleName, props.getOrDefault(throttleName, "-1").asInstanceOf[String].toLong)
+      }.toMap
+      brokerId -> throttles
     }.toMap
   }