You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/10/07 02:10:57 UTC
[kafka] branch 2.8 updated: MINOR: re-add removed test coverage for
'KAFKA-12983: reset needsJoinPrepare flag' (#11332)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 9ab7c14 MINOR: re-add removed test coverage for 'KAFKA-12983: reset needsJoinPrepare flag' (#11332)
9ab7c14 is described below
commit 9ab7c14097b425257f958f00a53189dcae57a17a
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Wed Oct 6 19:00:09 2021 -0700
MINOR: re-add removed test coverage for 'KAFKA-12983: reset needsJoinPrepare flag' (#11332)
In 11231 we fixed a bug in which the consumer would reset its state unnecessarily, and fixed up the tests accordingly. Unfortunately this also wiped out the test coverage for https://issues.apache.org/jira/browse/KAFKA-12983 that was added in 10986. This test coverage was re-added during a cherrypick to the 2.7 branch; this PR ports that up to trunk. This test has been verified to fail without the corresponding fix, ie resetting the `needsJoinPrepare` flag
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../internals/ConsumerCoordinatorTest.java | 45 ++++++++++++++++++++--
1 file changed, 42 insertions(+), 3 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 67adffa..90d43ff 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -72,6 +72,7 @@ import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
@@ -2840,7 +2841,8 @@ public abstract class ConsumerCoordinatorTest {
}
@Test
- public void testConsumerRejoinAfterRebalance() {
+ public void testPrepareJoinAndRejoinAfterFailedRebalance() {
+ final List<TopicPartition> partitions = singletonList(t1p);
try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"))) {
coordinator.ensureActiveGroup();
@@ -2875,7 +2877,7 @@ public abstract class ConsumerCoordinatorTest {
assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
- client.respond(syncGroupResponse(singletonList(t1p), Errors.NONE));
+ client.respond(syncGroupResponse(partitions, Errors.NONE));
// Join future should succeed but generation already cleared so result of join is false.
res = coordinator.joinGroupIfNeeded(time.timer(1));
@@ -2890,7 +2892,7 @@ public abstract class ConsumerCoordinatorTest {
// Retry join should then succeed
client.respond(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE));
- client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+ client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
res = coordinator.joinGroupIfNeeded(time.timer(3000));
@@ -2898,9 +2900,46 @@ public abstract class ConsumerCoordinatorTest {
assertFalse(client.hasPendingResponses());
assertFalse(client.hasInFlightRequests());
}
+ Collection<TopicPartition> lost = getLost(partitions);
+ assertEquals(lost.isEmpty() ? null : lost, rebalanceListener.lost);
+ assertEquals(lost.size(), rebalanceListener.lostCount);
}
@Test
+ public void shouldLoseAllOwnedPartitionsBeforeRejoiningAfterDroppingOutOfTheGroup() {
+ final List<TopicPartition> partitions = singletonList(t1p);
+ try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"))) {
+ final SystemTime realTime = new SystemTime();
+ coordinator.ensureActiveGroup();
+
+ prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
+
+ assertThrows(RebalanceInProgressException.class, () -> coordinator.commitOffsetsSync(
+ singletonMap(t1p, new OffsetAndMetadata(100L)),
+ time.timer(Long.MAX_VALUE)));
+
+ int generationId = 42;
+ String memberId = "consumer-42";
+
+ client.prepareResponse(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE));
+ client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_MEMBER_ID));
+
+ boolean res = coordinator.joinGroupIfNeeded(realTime.timer(1000));
+
+ assertFalse(res);
+ assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
+ assertEquals("", coordinator.generation().memberId);
+
+ res = coordinator.joinGroupIfNeeded(realTime.timer(1000));
+ assertFalse(res);
+ }
+ Collection<TopicPartition> lost = getLost(partitions);
+ assertEquals(lost.isEmpty() ? 0 : 1, rebalanceListener.lostCount);
+ assertEquals(lost.isEmpty() ? null : lost, rebalanceListener.lost);
+ }
+
+
+ @Test
public void testThrowOnUnsupportedStableFlag() {
supportStableFlag((short) 6, true);
}