You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/05/29 21:26:15 UTC

[kafka] branch 2.3 updated: KAFKA-8104; Consumer cannot rejoin to the group after rebalancing (#7460) (#8757)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new c557156  KAFKA-8104; Consumer cannot rejoin to the group after rebalancing (#7460) (#8757)
c557156 is described below

commit c5571569d23dcd4405fc940a9482a94ea54d314d
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri May 29 14:25:41 2020 -0700

    KAFKA-8104; Consumer cannot rejoin to the group after rebalancing (#7460) (#8757)
    
    This PR contains the fix of race condition bug between "consumer thread" and "consumer coordinator heartbeat thread". It reproduces in many production environments.
    
    Condition for reproducing:
    
    1. Consumer thread initiates rejoin to the group because of commit timeout. Call of AbstractCoordinator#joinGroupIfNeeded which leads to sendJoinGroupRequest.
    2. JoinGroupResponseHandler writes to the AbstractCoordinator.this.generation new generation data and leaves the synchronized section.
    3. Heartbeat thread executes mabeLeaveGroup and clears generation data via resetGenerationOnLeaveGroup.
    4. Consumer thread executes onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment); with the cleared generation data. This leads to the corresponding exception.
    
    Note this PR is a backport of #7460 for 2.3. It also contains part of #7451.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
    
    Co-authored-by: Nikolay <ni...@apache.org>
---
 .../consumer/internals/AbstractCoordinator.java    | 65 ++++++++++++++++------
 .../consumer/internals/ConsumerCoordinator.java    |  4 +-
 .../internals/ConsumerCoordinatorTest.java         | 49 +++++++++++++++-
 .../runtime/distributed/WorkerCoordinator.java     |  4 +-
 4 files changed, 100 insertions(+), 22 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 54678f7..987c740 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -419,14 +419,32 @@ public abstract class AbstractCoordinator implements Closeable {
             }
 
             if (future.succeeded()) {
-                // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
-                ByteBuffer memberAssignment = future.value().duplicate();
-                onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
+                Generation generationSnapshot;
+
+                // Generation data maybe concurrently cleared by Heartbeat thread.
+                // Can't use synchronized for {@code onJoinComplete}, because it can be long enough
+                // and  shouldn't block hearbeat thread.
+                // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
+                synchronized (this) {
+                    generationSnapshot = this.generation;
+                }
 
-                // We reset the join group future only after the completion callback returns. This ensures
-                // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
-                resetJoinGroupFuture();
-                needsJoinPrepare = true;
+                if (generationSnapshot != Generation.NO_GENERATION) {
+                    // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
+                    ByteBuffer memberAssignment = future.value().duplicate();
+
+                    onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocol, memberAssignment);
+
+                    // We reset the join group future only after the completion callback returns. This ensures
+                    // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
+                    resetJoinGroupFuture();
+                    needsJoinPrepare = true;
+                } else {
+                    log.info("Generation data was cleared by heartbeat thread. Initiating rejoin.");
+                    resetStateAndRejoin();
+
+                    return false;
+                }
             } else {
                 resetJoinGroupFuture();
                 final RuntimeException exception = future.exception();
@@ -448,6 +466,11 @@ public abstract class AbstractCoordinator implements Closeable {
         this.joinFuture = null;
     }
 
+    private void resetStateAndRejoin() {
+        rejoinNeeded = true;
+        state = MemberState.UNJOINED;
+    }
+
     private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
         // we store the join future in case we are woken up by the user after beginning the
         // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
@@ -466,12 +489,17 @@ public abstract class AbstractCoordinator implements Closeable {
                     // handle join completion in the callback so that the callback will be invoked
                     // even if the consumer is woken up before finishing the rebalance
                     synchronized (AbstractCoordinator.this) {
-                        log.info("Successfully joined group with generation {}", generation.generationId);
-                        state = MemberState.STABLE;
-                        rejoinNeeded = false;
+                        if (generation != Generation.NO_GENERATION) {
+                            log.info("Successfully joined group with generation {}", generation.generationId);
+                            state = MemberState.STABLE;
+                            rejoinNeeded = false;
 
-                        if (heartbeatThread != null)
-                            heartbeatThread.enable();
+                            if (heartbeatThread != null)
+                                heartbeatThread.enable();
+                        } else {
+                            log.info("Generation data was cleared by heartbeat thread. Rejoin failed.");
+                            state = MemberState.UNJOINED;
+                        }
                     }
                 }
 
@@ -589,8 +617,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 synchronized (AbstractCoordinator.this) {
                     AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID,
                             joinResponse.data().memberId(), null);
-                    AbstractCoordinator.this.rejoinNeeded = true;
-                    AbstractCoordinator.this.state = MemberState.UNJOINED;
+                    AbstractCoordinator.this.resetStateAndRejoin();
                 }
                 future.raise(Errors.MEMBER_ID_REQUIRED);
             } else {
@@ -796,12 +823,17 @@ public abstract class AbstractCoordinator implements Closeable {
      * Get the current generation state if the group is stable.
      * @return the current generation or null if the group is unjoined/rebalancing
      */
-    protected synchronized Generation generation() {
+    protected synchronized Generation generationIfStable() {
         if (this.state != MemberState.STABLE)
             return null;
         return generation;
     }
 
+    // Visible for testing
+    protected synchronized Generation generation() {
+        return generation;
+    }
+
     /**
      * Check whether given generation id is matching the record within current generation.
      * Only using in unit tests.
@@ -826,8 +858,7 @@ public abstract class AbstractCoordinator implements Closeable {
      */
     protected synchronized void resetGeneration() {
         this.generation = Generation.NO_GENERATION;
-        this.rejoinNeeded = true;
-        this.state = MemberState.UNJOINED;
+        resetStateAndRejoin();
     }
 
     protected synchronized void requestRejoin() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 7ed4086..439d905 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -539,7 +539,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                                                         final Timer timer) {
         if (partitions.isEmpty()) return Collections.emptyMap();
 
-        final Generation generation = generation();
+        final Generation generation = generationIfStable();
         if (pendingCommittedOffsetRequest != null && !pendingCommittedOffsetRequest.sameRequest(partitions, generation)) {
             // if we were waiting for a different request, then just clear it.
             pendingCommittedOffsetRequest = null;
@@ -812,7 +812,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         final Generation generation;
         if (subscriptions.partitionsAutoAssigned()) {
-            generation = generation();
+            generation = generationIfStable();
             // if the generation is null, we are not part of an active group (and we expect to be).
             // the only thing we can do is fail the commit and let the user rejoin the group in poll()
             if (generation == null) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index f037711..d10764d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -880,7 +880,7 @@ public class ConsumerCoordinatorTest {
         coordinator.maybeLeaveGroup();
         assertTrue(received.get());
 
-        AbstractCoordinator.Generation generation = coordinator.generation();
+        AbstractCoordinator.Generation generation = coordinator.generationIfStable();
         assertNull(generation);
     }
 
@@ -2176,6 +2176,53 @@ public class ConsumerCoordinatorTest {
                 coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), time.timer(Long.MAX_VALUE)));
     }
 
+    @Test
+    public void testConsumerRejoinAfterRebalance() throws Exception {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"))) {
+            coordinator.ensureActiveGroup();
+
+            prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
+
+            assertThrows(CommitFailedException.class, () -> coordinator.commitOffsetsSync(
+                singletonMap(t1p, new OffsetAndMetadata(100L)),
+                time.timer(Long.MAX_VALUE)));
+
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
+
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            client.prepareResponse(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE));
+
+            MockTime time = new MockTime(1);
+
+            //onJoinPrepare will be executed and onJoinComplete will not.
+            boolean res = coordinator.joinGroupIfNeeded(time.timer(2));
+
+            assertFalse(res);
+            assertFalse(client.hasPendingResponses());
+            //SynGroupRequest not responded.
+            assertEquals(1, client.inFlightRequestCount());
+            assertEquals(generationId, coordinator.generation().generationId);
+            assertEquals(memberId, coordinator.generation().memberId);
+
+            // Imitating heartbeat thread that clears generation data.
+            coordinator.maybeLeaveGroup();
+
+            assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
+
+            client.respond(syncGroupResponse(singletonList(t1p), Errors.NONE));
+
+            //Join future should succeed but generation already cleared so result of join is false.
+            res = coordinator.joinGroupIfNeeded(time.timer(1));
+
+            assertFalse(res);
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
+        }
+    }
+
     private void receiveFencedInstanceIdException() {
         subscriptions.assignFromUser(singleton(t1p));
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index ea5e1c3..f413a6e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -248,7 +248,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
     }
 
     public String memberId() {
-        Generation generation = generation();
+        Generation generation = generationIfStable();
         if (generation != null)
             return generation.memberId;
         return JoinGroupRequest.UNKNOWN_MEMBER_ID;
@@ -261,7 +261,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
      * @return the generation ID or -1 if no generation is defined
      */
     public int generationId() {
-        Generation generation = super.generation();
+        Generation generation = super.generationIfStable();
         return generation == null ? OffsetCommitRequest.DEFAULT_GENERATION_ID : generation.generationId;
     }