You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/11/08 13:37:16 UTC

[kafka] branch trunk updated: KAFKA-7604; Fix flaky unit test `testRebalanceAfterTopicUnavailableWithPatternSubscribe` (#5889)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 29383d6  KAFKA-7604; Fix flaky unit test `testRebalanceAfterTopicUnavailableWithPatternSubscribe` (#5889)
29383d6 is described below

commit 29383d6d6a3d42d30e815fbbb084275d449928c8
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Nov 8 05:37:05 2018 -0800

    KAFKA-7604; Fix flaky unit test `testRebalanceAfterTopicUnavailableWithPatternSubscribe` (#5889)
    
    The problem is the concurrent metadata updates in the foreground and in the heartbeat thread. Changed the code to use ConsumerNetworkClient.poll, which enforces mutual exclusion when accessing the underlying client.
---
 .../clients/consumer/internals/ConsumerCoordinatorTest.java    | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 72808c8..b430078 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -558,7 +558,7 @@ public class ConsumerCoordinatorTest {
         // Refresh the metadata again. Since there have been no changes since the last refresh, it won't trigger
         // rebalance again.
         metadata.requestUpdate();
-        client.poll(Long.MAX_VALUE, time.milliseconds());
+        consumerClient.poll(time.timer(Long.MAX_VALUE));
         assertFalse(coordinator.rejoinNeededOrPending());
     }
 
@@ -1010,13 +1010,13 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
-        partitionAssignor.prepare(Collections.<String, List<TopicPartition>>emptyMap());
+        partitionAssignor.prepare(Collections.emptyMap());
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
-        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.NONE));
+        client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
         assertFalse(coordinator.rejoinNeededOrPending());
-        assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
+        assertEquals(Collections.emptySet(), rebalanceListener.assigned);
         assertTrue("Metadata refresh not requested for unavailable partitions", metadata.updateRequested());
 
         Map<String, Errors> topicErrors = new HashMap<>();
@@ -1026,7 +1026,7 @@ public class ConsumerCoordinatorTest {
         client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
                 topicErrors, singletonMap(topic1, 1)));
 
-        client.poll(0, time.milliseconds());
+        consumerClient.poll(time.timer(0));
         client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));