You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/09/17 05:47:04 UTC

[kafka] branch 2.7 updated: KAFKA-12983: reset needsJoinPrepare flag before rejoining the group (#10986)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 4558845  KAFKA-12983: reset needsJoinPrepare flag before rejoining the group (#10986)
4558845 is described below

commit 455884591a207d58052bc4c263e0a7e58dfaa5c1
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Jul 13 12:14:39 2021 -0700

    KAFKA-12983: reset needsJoinPrepare flag before rejoining the group (#10986)
    
    The #onJoinPrepare callback is not always invoked before a member (re)joins the group, but only once when it first enters the rebalance. This means that any updates or events that occur during the join phase can be lost in the internal state: for example, clearing the SubscriptionState (and thus the "ownedPartitions" that are used for cooperative rebalancing) after losing its memberId during a rebalance. We should reset the needsJoinPrepare flag inside the resetStateAndRejoin() method.
    
    Reviewers: Guozhang Wang <gu...@apache.org>, Jason Gustafson <ja...@confluent.io>, David Jacot <dj...@confluent.io>
---
 .../consumer/internals/AbstractCoordinator.java    |  1 +
 .../internals/ConsumerCoordinatorTest.java         | 22 +++++++++++++++++++---
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 1170714..4a6c029 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -967,6 +967,7 @@ public abstract class AbstractCoordinator implements Closeable {
     private synchronized void resetStateAndRejoin() {
         resetState();
         rejoinNeeded = true;
+        needsJoinPrepare = true;
     }
 
     synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index e9f0f7c..dba44f9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -101,6 +101,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
+import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
@@ -2789,7 +2790,8 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
-    public void testConsumerRejoinAfterRebalance() {
+    public void testConsumerPrepareJoinAndRejoinAfterFailedRebalance() {
+        final List<TopicPartition> partitions = singletonList(t1p);
         try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"))) {
             coordinator.ensureActiveGroup();
 
@@ -2824,7 +2826,7 @@ public class ConsumerCoordinatorTest {
 
             assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
 
-            client.respond(syncGroupResponse(singletonList(t1p), Errors.NONE));
+            client.respond(syncGroupResponse(partitions, Errors.NONE));
 
             // Join future should succeed but generation already cleared so result of join is false.
             res = coordinator.joinGroupIfNeeded(time.timer(1));
@@ -2839,7 +2841,7 @@ public class ConsumerCoordinatorTest {
 
             // Retry join should then succeed
             client.respond(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE));
-            client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+            client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
 
             res = coordinator.joinGroupIfNeeded(time.timer(3000));
 
@@ -2847,6 +2849,9 @@ public class ConsumerCoordinatorTest {
             assertFalse(client.hasPendingResponses());
             assertFalse(client.hasInFlightRequests());
         }
+        Collection<TopicPartition> lost = getLost(partitions);
+        assertEquals(lost.isEmpty() ? 0 : 1, rebalanceListener.lostCount);
+        assertEquals(lost.isEmpty() ? null : lost, rebalanceListener.lost);
     }
 
     @Test
@@ -3036,6 +3041,17 @@ public class ConsumerCoordinatorTest {
         }
     }
 
+    private Collection<TopicPartition> getLost(final List<TopicPartition> owned) {
+        switch (protocol) {
+            case EAGER:
+                return emptySet();
+            case COOPERATIVE:
+                return toSet(owned);
+            default:
+                throw new IllegalStateException("This should not happen");
+        }
+    }
+
     private Collection<TopicPartition> getAdded(final List<TopicPartition> owned,
                                                 final List<TopicPartition> assigned) {
         switch (protocol) {