You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/21 04:27:26 UTC

[kafka] branch trunk updated: KAFKA-9701 (fix): Only check protocol name when generation is valid (#8324)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c75dc5e  KAFKA-9701 (fix): Only check protocol name when generation is valid (#8324)
c75dc5e is described below

commit c75dc5e2e09cee4ba47a9b6c484cb225acdb086e
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Fri Mar 20 21:26:57 2020 -0700

    KAFKA-9701 (fix): Only check protocol name when generation is valid (#8324)
    
    This bug was incurred by #7994 with a too-strong consistency check. It is because a reset generation operation could be called in between the joinGroupRequest -> joinGroupResponse -> SyncGroupRequest -> SyncGroupResponse sequence of events, if user calls unsubscribe in the middle of consumer#poll().
    
    Proper fix is to avoid the protocol name check when the generation is invalid.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../consumer/internals/AbstractCoordinator.java    |  3 +-
 .../internals/AbstractCoordinatorTest.java         | 66 +++++++++++++++-------
 2 files changed, 49 insertions(+), 20 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 2d93766..8427b92 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -913,7 +913,8 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     private boolean isProtocolNameInconsistent(String protocolName) {
-        return protocolName != null && !protocolName.equals(generation().protocolName);
+        return protocolName != null && generation() != Generation.NO_GENERATION
+                   && !protocolName.equals(generation().protocolName);
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index f958836..e2315cd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -49,7 +49,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
@@ -365,8 +364,8 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testJoinGroupProtocolTypeAndName() {
-        String wrongProtocolType = "wrong-type";
-        String wrongProtocolName = "wrong-name";
+        final String wrongProtocolType = "wrong-type";
+        final String wrongProtocolName = "wrong-name";
 
         // No Protocol Type in both JoinGroup and SyncGroup responses
         assertTrue(joinGroupWithProtocolTypeAndName(null, null, null));
@@ -391,6 +390,39 @@ public class AbstractCoordinatorTest {
             () -> joinGroupWithProtocolTypeAndName(PROTOCOL_TYPE, PROTOCOL_TYPE, wrongProtocolName));
     }
 
+    @Test
+    public void testNoGenerationWillNotTriggerProtocolNameCheck() {
+        final String wrongProtocolName = "wrong-name";
+
+        setupCoordinator();
+        mockClient.reset();
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(mockTime.timer(0));
+
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof JoinGroupRequest)) {
+                return false;
+            }
+            JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
+            return joinGroupRequest.data().protocolType().equals(PROTOCOL_TYPE);
+        }, joinGroupFollowerResponse(defaultGeneration, memberId,
+            "memberid", Errors.NONE, PROTOCOL_TYPE));
+
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof SyncGroupRequest)) {
+                return false;
+            }
+            coordinator.resetGenerationOnLeaveGroup();
+
+            SyncGroupRequest syncGroupRequest = (SyncGroupRequest) body;
+            return syncGroupRequest.data.protocolType().equals(PROTOCOL_TYPE)
+                       && syncGroupRequest.data.protocolName().equals(PROTOCOL_NAME);
+        }, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, wrongProtocolName));
+
+        // No exception shall be thrown as the generation is reset.
+        coordinator.joinGroupIfNeeded(mockTime.timer(100L));
+    }
+
     private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtocolType,
                                                      String syncGroupResponseProtocolType,
                                                      String syncGroupResponseProtocolName) {
@@ -665,7 +697,7 @@ public class AbstractCoordinatorTest {
         try {
             coordinator.ensureActiveGroup();
             fail("Should have woken up from ensureActiveGroup()");
-        } catch (WakeupException e) {
+        } catch (WakeupException ignored) {
         }
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
@@ -703,7 +735,7 @@ public class AbstractCoordinatorTest {
         try {
             coordinator.ensureActiveGroup();
             fail("Should have woken up from ensureActiveGroup()");
-        } catch (WakeupException e) {
+        } catch (WakeupException ignored) {
         }
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
@@ -738,7 +770,7 @@ public class AbstractCoordinatorTest {
         try {
             coordinator.ensureActiveGroup();
             fail("Should have woken up from ensureActiveGroup()");
-        } catch (WakeupException e) {
+        } catch (WakeupException ignored) {
         }
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
@@ -811,7 +843,7 @@ public class AbstractCoordinatorTest {
         try {
             coordinator.ensureActiveGroup();
             fail("Should have woken up from ensureActiveGroup()");
-        } catch (WakeupException e) {
+        } catch (WakeupException ignored) {
         }
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
@@ -884,7 +916,7 @@ public class AbstractCoordinatorTest {
         try {
             coordinator.ensureActiveGroup();
             fail("Should have woken up from ensureActiveGroup()");
-        } catch (WakeupException e) {
+        } catch (WakeupException ignored) {
         }
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
@@ -945,7 +977,7 @@ public class AbstractCoordinatorTest {
         try {
             coordinator.ensureActiveGroup();
             fail("Should have woken up from ensureActiveGroup()");
-        } catch (WakeupException e) {
+        } catch (WakeupException ignored) {
         }
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
@@ -990,12 +1022,8 @@ public class AbstractCoordinatorTest {
 
     private void awaitFirstHeartbeat(final AtomicBoolean heartbeatReceived) throws Exception {
         mockTime.sleep(HEARTBEAT_INTERVAL_MS);
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return heartbeatReceived.get();
-            }
-        }, 3000, "Should have received a heartbeat request after joining the group");
+        TestUtils.waitForCondition(heartbeatReceived::get,
+            3000, "Should have received a heartbeat request after joining the group");
     }
 
     private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
@@ -1063,10 +1091,10 @@ public class AbstractCoordinatorTest {
         private int onJoinCompleteInvokes = 0;
         private boolean wakeupOnJoinComplete = false;
 
-        public DummyCoordinator(GroupRebalanceConfig rebalanceConfig,
-                                ConsumerNetworkClient client,
-                                Metrics metrics,
-                                Time time) {
+        DummyCoordinator(GroupRebalanceConfig rebalanceConfig,
+                         ConsumerNetworkClient client,
+                         Metrics metrics,
+                         Time time) {
             super(rebalanceConfig, new LogContext(), client, metrics, METRIC_GROUP_PREFIX, time);
         }