You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2019/04/19 23:43:25 UTC

[kafka] branch trunk updated: KAFKA-7965; Fix flaky test ConsumerBounceTest

This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e32d33c  KAFKA-7965; Fix flaky test ConsumerBounceTest
e32d33c is described below

commit e32d33cd4f0c884c9d0c871c0aa4b423ae4a7bf0
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Apr 19 16:43:06 2019 -0700

    KAFKA-7965; Fix flaky test ConsumerBounceTest
    
    We suspect the problem might be a race condition after broker startup where the consumer has yet to find the coordinator and rebalance. The fix here rolls all the brokers first and then waits for the expected exception.
    
    Author: Jason Gustafson <ja...@confluent.io>
    
    Reviewers: Gwen Shapira
    
    Closes #6608 from hachikuji/KAFKA-7965
---
 .../kafka/api/AbstractConsumerTest.scala           |  6 ++-
 .../integration/kafka/api/ConsumerBounceTest.scala | 58 ++++++----------------
 2 files changed, 19 insertions(+), 45 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
index 71a95d2..210ceb2 100644
--- a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
@@ -318,8 +318,9 @@ abstract class AbstractConsumerTest extends BaseRequestTest {
 
   protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]],
                                            topicsToSubscribe: List[String],
-                                           partitionsToAssign: Set[TopicPartition]) extends ShutdownableThread("daemon-consumer-assignment", false)
-  {
+                                           partitionsToAssign: Set[TopicPartition])
+    extends ShutdownableThread("daemon-consumer-assignment", false) {
+
     def this(consumer: Consumer[Array[Byte], Array[Byte]], topicsToSubscribe: List[String]) {
       this(consumer, topicsToSubscribe, Set.empty[TopicPartition])
     }
@@ -344,6 +345,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest {
         partitionAssignment = Set.empty[TopicPartition]
       }
     }
+
     if (partitionAssignment.isEmpty) {
       consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
     } else {
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 2ce5fab..de0a38f 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -299,12 +299,8 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
     val topic = "group-max-size-test"
     val maxGroupSize = 2
     val consumerCount = maxGroupSize + 1
-    var recordsProduced = maxGroupSize * 100
     val partitionCount = consumerCount * 2
-    if (recordsProduced % partitionCount != 0) {
-      // ensure even record distribution per partition
-      recordsProduced += partitionCount - recordsProduced % partitionCount
-    }
+
     this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@@ -315,47 +311,23 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
 
     // roll all brokers with a lesser max group size to make sure coordinator has the new config
     val newConfigs = generateKafkaConfigs(maxGroupSize.toString)
-    var kickedOutConsumerIdx: Option[Int] = None
-    val holdingGroupBrokers = servers.filter(!_.groupCoordinator.groupManager.currentGroups.isEmpty).map(_.config.brokerId)
-    // should only have one broker holding the group metadata
-    assertEquals(holdingGroupBrokers.size, 1)
-    val coordinator = holdingGroupBrokers.head
-    // ensure the coordinator broker will be restarted first
-    val orderedBrokersIds = List(coordinator) ++ servers.indices.toBuffer.filter(_ != coordinator)
-    // restart brokers until the group moves to a Coordinator with the new config
-    breakable { for (broker <- orderedBrokersIds) {
-      killBroker(broker)
-      consumerPollers.indices.foreach(idx => {
-        consumerPollers(idx).thrownException match {
-          case Some(thrownException) =>
-            if (!thrownException.isInstanceOf[GroupMaxSizeReachedException]) {
-              throw thrownException
-            }
-            if (kickedOutConsumerIdx.isDefined) {
-              fail(s"Received more than one ${classOf[GroupMaxSizeReachedException]}")
-            }
-            kickedOutConsumerIdx = Some(idx)
-          case None =>
-        }
-      })
+    for (serverIdx <- servers.indices) {
+      killBroker(serverIdx)
+      val config = newConfigs(serverIdx)
+      servers(serverIdx) = TestUtils.createServer(config, time = brokerTime(config.brokerId))
+      restartDeadBrokers()
+    }
 
-      if (kickedOutConsumerIdx.isDefined)
-        break
+    def raisedExceptions: Seq[Throwable] = {
+      consumerPollers.flatten(_.thrownException)
+    }
 
-      val config = newConfigs(broker)
-      servers(broker) = TestUtils.createServer(config, time = brokerTime(config.brokerId))
-      restartDeadBrokers()
-    }}
-    if (kickedOutConsumerIdx.isEmpty)
-      fail(s"Should have received an ${classOf[GroupMaxSizeReachedException]} during the cluster roll")
-    restartDeadBrokers()
+    // we are waiting for the group to rebalance and one member to get kicked
+    TestUtils.waitUntilTrue(() => raisedExceptions.nonEmpty,
+      msg = "The remaining consumers in the group could not fetch the expected records", 10000L)
 
-    // assert that the group has gone through a rebalance and shed off one consumer
-    consumerPollers.remove(kickedOutConsumerIdx.get).shutdown()
-    sendRecords(createProducer(), recordsProduced, topic, numPartitions = Some(partitionCount))
-    TestUtils.waitUntilTrue(() => {
-      consumerPollers.forall(p => p.receivedMessages >= recordsProduced / consumerCount)
-    }, "The remaining consumers in the group could not fetch the expected records", 10000L)
+    assertEquals(1, raisedExceptions.size)
+    assertTrue(raisedExceptions.head.isInstanceOf[GroupMaxSizeReachedException])
   }
 
   /**