You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/28 21:44:43 UTC

[kafka] branch trunk updated: KAFKA-9823: Follow-up, check state for handling commit error response (#8548)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f7edf46  KAFKA-9823: Follow-up, check state for handling commit error response (#8548)
f7edf46 is described below

commit f7edf46a5e5c30310542c00eb2d771bd72f42fd3
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Apr 28 14:44:11 2020 -0700

    KAFKA-9823: Follow-up, check state for handling commit error response (#8548)
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../consumer/internals/AbstractCoordinator.java    |  2 +-
 .../consumer/internals/ConsumerCoordinator.java    | 22 +++---
 .../internals/ConsumerCoordinatorTest.java         | 86 +++++++++++++++++++++-
 3 files changed, 98 insertions(+), 12 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 9b4307f..e724384 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -123,11 +123,11 @@ public abstract class AbstractCoordinator implements Closeable {
     private final GroupRebalanceConfig rebalanceConfig;
     protected final ConsumerNetworkClient client;
     protected final Time time;
+    protected MemberState state = MemberState.UNJOINED;
 
     private Node coordinator = null;
     private boolean rejoinNeeded = true;
     private boolean needsJoinPrepare = true;
-    private MemberState state = MemberState.UNJOINED;
     private HeartbeatThread heartbeatThread = null;
     private RequestFuture<ByteBuffer> joinFuture = null;
     private RequestFuture<Void> findCoordinatorFuture = null;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 208193f..914071b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1202,13 +1202,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         } else if (error == Errors.FENCED_INSTANCE_ID) {
                             log.info("OffsetCommit failed with {} due to group instance id {} fenced", sentGeneration, rebalanceConfig.groupInstanceId);
 
-                            // if the generation has changed, do not raise the fatal error but rebalance-in-progress
+                            // if the generation has changed or we are not in rebalancing, do not raise the fatal error but rebalance-in-progress
                             if (generationUnchanged()) {
                                 future.raise(error);
                             } else {
-                                future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
-                                    "consumer member's old generation is fenced by its group instance id, it is possible that " +
-                                    "this consumer has already participated another rebalance and got a new generation"));
+                                if (ConsumerCoordinator.this.state == MemberState.REBALANCING) {
+                                    future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
+                                        "consumer member's old generation is fenced by its group instance id, it is possible that " +
+                                        "this consumer has already participated another rebalance and got a new generation"));
+                                } else {
+                                    future.raise(new CommitFailedException());
+                                }
                             }
                             return;
                         } else if (error == Errors.REBALANCE_IN_PROGRESS) {
@@ -1229,15 +1233,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                 || error == Errors.ILLEGAL_GENERATION) {
                             log.info("OffsetCommit failed with {}: {}", sentGeneration, error.message());
 
-                            // only need to reset generation and re-join group if generation has not changed;
+                            // only need to reset generation and re-join group if generation has not changed or we are not in rebalancing;
                             // otherwise only raise rebalance-in-progress error
-                            if (generationUnchanged()) {
-                                resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
-                                future.raise(new CommitFailedException());
-                            } else {
+                            if (!generationUnchanged() && ConsumerCoordinator.this.state == MemberState.REBALANCING) {
                                 future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
                                     "consumer member's generation is already stale, meaning it has already participated another rebalance and " +
                                     "got a new generation. You can try completing the rebalance by calling poll() and then retry commit again"));
+                            } else {
+                                resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
+                                future.raise(new CommitFailedException());
                             }
                             return;
                         } else {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c62d59a..afba13b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2070,7 +2070,7 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
-    public void testCommitOffsetIllegalGenerationWithNewGenearion() {
+    public void testCommitOffsetIllegalGenerationWithNewGeneration() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
@@ -2090,6 +2090,7 @@ public class ConsumerCoordinatorTest {
             "memberId-new",
             null);
         coordinator.setNewGeneration(newGen);
+        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
 
         assertTrue(consumerClient.poll(future, time.timer(30000)));
         assertTrue(future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
@@ -2099,6 +2100,31 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testCommitOffsetIllegalGenerationWithResetGenearion() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        final AbstractCoordinator.Generation currGen = new AbstractCoordinator.Generation(
+            1,
+            "memberId",
+            null);
+        coordinator.setNewGeneration(currGen);
+
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.ILLEGAL_GENERATION);
+        RequestFuture<Void> future = coordinator.sendOffsetCommitRequest(singletonMap(t1p,
+            new OffsetAndMetadata(100L, "metadata")));
+
+        // reset the generation
+        coordinator.setNewGeneration(AbstractCoordinator.Generation.NO_GENERATION);
+
+        assertTrue(consumerClient.poll(future, time.timer(30000)));
+        assertTrue(future.exception().getClass().isInstance(new CommitFailedException()));
+
+        // the generation should not be reset
+        assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
+    }
+
+    @Test
     public void testCommitOffsetUnknownMemberWithNewGenearion() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -2119,6 +2145,7 @@ public class ConsumerCoordinatorTest {
             "memberId-new",
             null);
         coordinator.setNewGeneration(newGen);
+        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
 
         assertTrue(consumerClient.poll(future, time.timer(30000)));
         assertTrue(future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
@@ -2128,7 +2155,7 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
-    public void testCommitOffsetFencedInstanceWithNewGenearion() {
+    public void testCommitOffsetUnknownMemberWithResetGenearion() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
@@ -2138,6 +2165,32 @@ public class ConsumerCoordinatorTest {
             null);
         coordinator.setNewGeneration(currGen);
 
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
+        RequestFuture<Void> future = coordinator.sendOffsetCommitRequest(singletonMap(t1p,
+            new OffsetAndMetadata(100L, "metadata")));
+
+        // reset the generation
+        coordinator.setNewGeneration(AbstractCoordinator.Generation.NO_GENERATION);
+
+        assertTrue(consumerClient.poll(future, time.timer(30000)));
+        assertTrue(future.exception().getClass().isInstance(new CommitFailedException()));
+
+        // the generation should not be reset
+        assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
+    }
+
+    @Test
+    public void testCommitOffsetFencedInstanceWithRebalancingGenearion() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        final AbstractCoordinator.Generation currGen = new AbstractCoordinator.Generation(
+            1,
+            "memberId",
+            null);
+        coordinator.setNewGeneration(currGen);
+        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.FENCED_INSTANCE_ID);
         RequestFuture<Void> future = coordinator.sendOffsetCommitRequest(singletonMap(t1p,
             new OffsetAndMetadata(100L, "metadata")));
@@ -2157,6 +2210,35 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testCommitOffsetFencedInstanceWithNewGenearion() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        final AbstractCoordinator.Generation currGen = new AbstractCoordinator.Generation(
+            1,
+            "memberId",
+            null);
+        coordinator.setNewGeneration(currGen);
+
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.FENCED_INSTANCE_ID);
+        RequestFuture<Void> future = coordinator.sendOffsetCommitRequest(singletonMap(t1p,
+            new OffsetAndMetadata(100L, "metadata")));
+
+        // change the generation
+        final AbstractCoordinator.Generation newGen = new AbstractCoordinator.Generation(
+            2,
+            "memberId-new",
+            null);
+        coordinator.setNewGeneration(newGen);
+
+        assertTrue(consumerClient.poll(future, time.timer(30000)));
+        assertTrue(future.exception().getClass().isInstance(new CommitFailedException()));
+
+        // the generation should not be reset
+        assertEquals(newGen, coordinator.generation());
+    }
+
+    @Test
     public void testCommitOffsetRebalanceInProgress() {
         // we cannot retry if a rebalance occurs before the commit completed
         final String consumerId = "leader";