You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/10/07 02:10:57 UTC

[kafka] branch 2.8 updated: MINOR: re-add removed test coverage for 'KAFKA-12983: reset needsJoinPrepare flag' (#11332)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 9ab7c14  MINOR: re-add removed test coverage for 'KAFKA-12983: reset needsJoinPrepare flag' (#11332)
9ab7c14 is described below

commit 9ab7c14097b425257f958f00a53189dcae57a17a
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Wed Oct 6 19:00:09 2021 -0700

    MINOR: re-add removed test coverage for 'KAFKA-12983: reset needsJoinPrepare flag' (#11332)
    
    In 11231 we fixed a bug in which the consumer would reset its state unnecessarily, and fixed up the tests accordingly. Unfortunately this also wiped out the test coverage for https://issues.apache.org/jira/browse/KAFKA-12983 that was added in 10986. This test coverage was re-added during a cherrypick to the 2.7 branch; this PR ports that up to trunk. This test has been verified to fail without the corresponding fix, ie resetting the `needsJoinPrepare` flag
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../internals/ConsumerCoordinatorTest.java         | 45 ++++++++++++++++++++--
 1 file changed, 42 insertions(+), 3 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 67adffa..90d43ff 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -72,6 +72,7 @@ import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
@@ -2840,7 +2841,8 @@ public abstract class ConsumerCoordinatorTest {
     }
 
     @Test
-    public void testConsumerRejoinAfterRebalance() {
+    public void testPrepareJoinAndRejoinAfterFailedRebalance() {
+        final List<TopicPartition> partitions = singletonList(t1p);
         try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"))) {
             coordinator.ensureActiveGroup();
 
@@ -2875,7 +2877,7 @@ public abstract class ConsumerCoordinatorTest {
 
             assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
 
-            client.respond(syncGroupResponse(singletonList(t1p), Errors.NONE));
+            client.respond(syncGroupResponse(partitions, Errors.NONE));
 
             // Join future should succeed but generation already cleared so result of join is false.
             res = coordinator.joinGroupIfNeeded(time.timer(1));
@@ -2890,7 +2892,7 @@ public abstract class ConsumerCoordinatorTest {
 
             // Retry join should then succeed
             client.respond(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE));
-            client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+            client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
 
             res = coordinator.joinGroupIfNeeded(time.timer(3000));
 
@@ -2898,9 +2900,46 @@ public abstract class ConsumerCoordinatorTest {
             assertFalse(client.hasPendingResponses());
             assertFalse(client.hasInFlightRequests());
         }
+        Collection<TopicPartition> lost = getLost(partitions);
+        assertEquals(lost.isEmpty() ? null : lost, rebalanceListener.lost);
+        assertEquals(lost.size(), rebalanceListener.lostCount);
     }
 
     @Test
+    public void shouldLoseAllOwnedPartitionsBeforeRejoiningAfterDroppingOutOfTheGroup() {
+        final List<TopicPartition> partitions = singletonList(t1p);
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"))) {
+            final SystemTime realTime = new SystemTime();
+            coordinator.ensureActiveGroup();
+
+            prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
+
+            assertThrows(RebalanceInProgressException.class, () -> coordinator.commitOffsetsSync(
+                singletonMap(t1p, new OffsetAndMetadata(100L)),
+                time.timer(Long.MAX_VALUE)));
+
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            client.prepareResponse(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE));
+            client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_MEMBER_ID));
+
+            boolean res = coordinator.joinGroupIfNeeded(realTime.timer(1000));
+
+            assertFalse(res);
+            assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
+            assertEquals("", coordinator.generation().memberId);
+
+            res = coordinator.joinGroupIfNeeded(realTime.timer(1000));
+            assertFalse(res);
+        }
+        Collection<TopicPartition> lost = getLost(partitions);
+        assertEquals(lost.isEmpty() ? 0 : 1, rebalanceListener.lostCount);
+        assertEquals(lost.isEmpty() ? null : lost, rebalanceListener.lost);
+    }
+
+
+    @Test
     public void testThrowOnUnsupportedStableFlag() {
         supportStableFlag((short) 6, true);
     }