You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/05/29 22:25:08 UTC

[kafka] branch 2.1 updated: KAFKA-8104: Consumer cannot rejoin to the group after rebalancing (#7460)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 47c70a7  KAFKA-8104: Consumer cannot rejoin to the group after rebalancing (#7460)
47c70a7 is described below

commit 47c70a7416f4ed5cbf204485b7d9c391f5e82b7c
Author: Nikolay <ni...@apache.org>
AuthorDate: Thu Oct 17 08:53:14 2019 +0300

    KAFKA-8104: Consumer cannot rejoin to the group after rebalancing (#7460)
    
    This PR contains the fix of race condition bug between "consumer thread" and "consumer coordinator heartbeat thread". It reproduces in many production environments.
    
    Condition for reproducing:
    
    1. Consumer thread initiates rejoin to the group because of commit timeout. Call of AbstractCoordinator#joinGroupIfNeeded which leads to sendJoinGroupRequest.
    2. JoinGroupResponseHandler writes to the AbstractCoordinator.this.generation new generation data and leaves the synchronized section.
    3. Heartbeat thread executes mabeLeaveGroup and clears generation data via resetGenerationOnLeaveGroup.
    4. Consumer thread executes onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment); with the cleared generation data. This leads to the corresponding exception.
    
    Note this PR is a backport of #7460 for 2.1. It also contains part of #7451.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
    
    Co-authored-by: Nikolay <ni...@apache.org>
---
 .../consumer/internals/AbstractCoordinator.java    | 62 ++++++++++++++++------
 .../consumer/internals/ConsumerCoordinator.java    | 19 +++----
 .../internals/ConsumerCoordinatorTest.java         | 59 +++++++++++++++++++-
 .../runtime/distributed/WorkerCoordinator.java     |  2 +-
 4 files changed, 115 insertions(+), 27 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d983087..0a90144 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -401,14 +401,32 @@ public abstract class AbstractCoordinator implements Closeable {
             }
 
             if (future.succeeded()) {
-                // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
-                ByteBuffer memberAssignment = future.value().duplicate();
-                onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
+                Generation generationSnapshot;
+
+                // Generation data maybe concurrently cleared by Heartbeat thread.
+                // Can't use synchronized for {@code onJoinComplete}, because it can be long enough
+                // and  shouldn't block hearbeat thread.
+                // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
+                synchronized (this) {
+                    generationSnapshot = this.generation;
+                }
 
-                // We reset the join group future only after the completion callback returns. This ensures
-                // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
-                resetJoinGroupFuture();
-                needsJoinPrepare = true;
+                if (generationSnapshot != Generation.NO_GENERATION) {
+                    // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
+                    ByteBuffer memberAssignment = future.value().duplicate();
+
+                    onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocol, memberAssignment);
+
+                    // We reset the join group future only after the completion callback returns. This ensures
+                    // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
+                    resetJoinGroupFuture();
+                    needsJoinPrepare = true;
+                } else {
+                    log.info("Generation data was cleared by heartbeat thread. Initiating rejoin.");
+                    resetStateAndRejoin();
+
+                    return false;
+                }
             } else {
                 resetJoinGroupFuture();
                 final RuntimeException exception = future.exception();
@@ -429,6 +447,11 @@ public abstract class AbstractCoordinator implements Closeable {
         this.joinFuture = null;
     }
 
+    private void resetStateAndRejoin() {
+        rejoinNeeded = true;
+        state = MemberState.UNJOINED;
+    }
+
     private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
         // we store the join future in case we are woken up by the user after beginning the
         // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
@@ -447,12 +470,17 @@ public abstract class AbstractCoordinator implements Closeable {
                     // handle join completion in the callback so that the callback will be invoked
                     // even if the consumer is woken up before finishing the rebalance
                     synchronized (AbstractCoordinator.this) {
-                        log.info("Successfully joined group with generation {}", generation.generationId);
-                        state = MemberState.STABLE;
-                        rejoinNeeded = false;
+                        if (generation != Generation.NO_GENERATION) {
+                            log.info("Successfully joined group with generation {}", generation.generationId);
+                            state = MemberState.STABLE;
+                            rejoinNeeded = false;
 
-                        if (heartbeatThread != null)
-                            heartbeatThread.enable();
+                            if (heartbeatThread != null)
+                                heartbeatThread.enable();
+                        } else {
+                            log.info("Generation data was cleared by heartbeat thread. Rejoin failed.");
+                            state = MemberState.UNJOINED;
+                        }
                     }
                 }
 
@@ -722,19 +750,23 @@ public abstract class AbstractCoordinator implements Closeable {
      * Get the current generation state if the group is stable.
      * @return the current generation or null if the group is unjoined/rebalancing
      */
-    protected synchronized Generation generation() {
+    protected synchronized Generation generationIfStable() {
         if (this.state != MemberState.STABLE)
             return null;
         return generation;
     }
 
+    // Visible for testing
+    protected synchronized Generation generation() {
+        return generation;
+    }
+
     /**
      * Reset the generation and memberId because we have fallen out of the group.
      */
     protected synchronized void resetGeneration() {
         this.generation = Generation.NO_GENERATION;
-        this.rejoinNeeded = true;
-        this.state = MemberState.UNJOINED;
+        resetStateAndRejoin();
     }
 
     protected synchronized void requestRejoin() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 729011c..dd73341 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -515,7 +515,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                                                         final Timer timer) {
         if (partitions.isEmpty()) return Collections.emptyMap();
 
-        final Generation generation = generation();
+        final Generation generation = generationIfStable();
         if (pendingCommittedOffsetRequest != null && !pendingCommittedOffsetRequest.sameRequest(partitions, generation)) {
             // if we were waiting for a different request, then just clear it.
             pendingCommittedOffsetRequest = null;
@@ -767,16 +767,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
 
         final Generation generation;
-        if (subscriptions.partitionsAutoAssigned())
-            generation = generation();
-        else
+        if (subscriptions.partitionsAutoAssigned()) {
+            generation = generationIfStable();
+            // if the generation is null, we are not part of an active group (and we expect to be).
+            // the only thing we can do is fail the commit and let the user rejoin the group in poll()
+            if (generation == null) {
+                log.info("Failing OffsetCommit request since the consumer is not part of an active group");
+                return RequestFuture.failure(new CommitFailedException());
+            }
+        } else
             generation = Generation.NO_GENERATION;
 
-        // if the generation is null, we are not part of an active group (and we expect to be).
-        // the only thing we can do is fail the commit and let the user rejoin the group in poll()
-        if (generation == null)
-            return RequestFuture.failure(new CommitFailedException());
-
         OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(this.groupId, offsetData).
                 setGenerationId(generation.generationId).
                 setMemberId(generation.memberId);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index af073f1..d091e1f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -741,7 +742,7 @@ public class ConsumerCoordinatorTest {
         coordinator.maybeLeaveGroup();
         assertTrue(received.get());
 
-        AbstractCoordinator.Generation generation = coordinator.generation();
+        AbstractCoordinator.Generation generation = coordinator.generationIfStable();
         assertNull(generation);
     }
 
@@ -1905,7 +1906,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testAutoCommitAfterCoordinatorBackToService() {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
+            ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
         subscriptions.assignFromUser(Collections.singleton(t1p));
         subscriptions.seek(t1p, 100L);
 
@@ -1921,6 +1922,60 @@ public class ConsumerCoordinatorTest {
         assertEquals(100L, subscriptions.position(t1p).longValue());
     }
 
+    @Test
+    public void testConsumerRejoinAfterRebalance() throws Exception {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, false)) {
+            coordinator.ensureActiveGroup();
+
+            prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
+
+            try {
+                coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)),
+                    time.timer(Long.MAX_VALUE));
+                fail("Expected commit failure");
+            } catch (CommitFailedException e) {
+                // Expected
+            }
+
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
+
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            client.prepareResponse(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE));
+
+            MockTime time = new MockTime(1);
+
+            //onJoinPrepare will be executed and onJoinComplete will not.
+            boolean res = coordinator.joinGroupIfNeeded(time.timer(2));
+
+            assertFalse(res);
+            assertFalse(client.hasPendingResponses());
+            //SynGroupRequest not responded.
+            assertEquals(1, client.inFlightRequestCount());
+            assertEquals(generationId, coordinator.generation().generationId);
+            assertEquals(memberId, coordinator.generation().memberId);
+
+            // Imitating heartbeat thread that clears generation data.
+            coordinator.maybeLeaveGroup();
+
+            assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
+
+            client.respond(syncGroupResponse(singletonList(t1p), Errors.NONE));
+
+            //Join future should succeed but generation already cleared so result of join is false.
+            res = coordinator.joinGroupIfNeeded(time.timer(1));
+
+            assertFalse(res);
+            assertFalse(client.hasPendingResponses());
+
+            // We just have the LeaveGroup
+            assertEquals(1, client.inFlightRequestCount());
+            assertEquals(ApiKeys.LEAVE_GROUP, client.requests().peek().apiKey());
+        }
+    }
+
     private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement,
                                                                final boolean autoCommit,
                                                                final boolean leaveGroup) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 103a323..5a83435 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -295,7 +295,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     public String memberId() {
-        Generation generation = generation();
+        Generation generation = generationIfStable();
         if (generation != null)
             return generation.memberId;
         return JoinGroupRequest.UNKNOWN_MEMBER_ID;