You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/11/08 13:37:16 UTC
[kafka] branch trunk updated: KAFKA-7604; Fix flaky unit test
`testRebalanceAfterTopicUnavailableWithPatternSubscribe` (#5889)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 29383d6 KAFKA-7604; Fix flaky unit test `testRebalanceAfterTopicUnavailableWithPatternSubscribe` (#5889)
29383d6 is described below
commit 29383d6d6a3d42d30e815fbbb084275d449928c8
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Nov 8 05:37:05 2018 -0800
KAFKA-7604; Fix flaky unit test `testRebalanceAfterTopicUnavailableWithPatternSubscribe` (#5889)
The problem is the concurrent metadata updates in the foreground and in the heartbeat thread. Changed the code to use ConsumerNetworkClient.poll, which enforces mutual exclusion when accessing the underlying client.
---
.../clients/consumer/internals/ConsumerCoordinatorTest.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 72808c8..b430078 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -558,7 +558,7 @@ public class ConsumerCoordinatorTest {
// Refresh the metadata again. Since there have been no changes since the last refresh, it won't trigger
// rebalance again.
metadata.requestUpdate();
- client.poll(Long.MAX_VALUE, time.milliseconds());
+ consumerClient.poll(time.timer(Long.MAX_VALUE));
assertFalse(coordinator.rejoinNeededOrPending());
}
@@ -1010,13 +1010,13 @@ public class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
- partitionAssignor.prepare(Collections.<String, List<TopicPartition>>emptyMap());
+ partitionAssignor.prepare(Collections.emptyMap());
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
- client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.NONE));
+ client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
assertFalse(coordinator.rejoinNeededOrPending());
- assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
+ assertEquals(Collections.emptySet(), rebalanceListener.assigned);
assertTrue("Metadata refresh not requested for unavailable partitions", metadata.updateRequested());
Map<String, Errors> topicErrors = new HashMap<>();
@@ -1026,7 +1026,7 @@ public class ConsumerCoordinatorTest {
client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
topicErrors, singletonMap(topic1, 1)));
- client.poll(0, time.milliseconds());
+ consumerClient.poll(time.timer(0));
client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));