You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/21 04:27:58 UTC
[kafka] branch 2.5 updated: KAFKA-9701 (fix): Only check protocol
name when generation is valid (#8324)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 3ea04ff KAFKA-9701 (fix): Only check protocol name when generation is valid (#8324)
3ea04ff is described below
commit 3ea04ff5a1ed0f93161835d367318a29d1c243d1
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Fri Mar 20 21:26:57 2020 -0700
KAFKA-9701 (fix): Only check protocol name when generation is valid (#8324)
This bug was incurred by #7994 with a too-strong consistency check. It is because a reset generation operation could be called in between the joinGroupRequest -> joinGroupResponse -> SyncGroupRequest -> SyncGroupResponse sequence of events, if user calls unsubscribe in the middle of consumer#poll().
Proper fix is to avoid the protocol name check when the generation is invalid.
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../consumer/internals/AbstractCoordinator.java | 3 +-
.../internals/AbstractCoordinatorTest.java | 66 +++++++++++++++-------
2 files changed, 49 insertions(+), 20 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index e01c543..50db862 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -913,7 +913,8 @@ public abstract class AbstractCoordinator implements Closeable {
}
private boolean isProtocolNameInconsistent(String protocolName) {
- return protocolName != null && !protocolName.equals(generation().protocolName);
+ return protocolName != null && generation() != Generation.NO_GENERATION
+ && !protocolName.equals(generation().protocolName);
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index f958836..e2315cd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -49,7 +49,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
@@ -365,8 +364,8 @@ public class AbstractCoordinatorTest {
@Test
public void testJoinGroupProtocolTypeAndName() {
- String wrongProtocolType = "wrong-type";
- String wrongProtocolName = "wrong-name";
+ final String wrongProtocolType = "wrong-type";
+ final String wrongProtocolName = "wrong-name";
// No Protocol Type in both JoinGroup and SyncGroup responses
assertTrue(joinGroupWithProtocolTypeAndName(null, null, null));
@@ -391,6 +390,39 @@ public class AbstractCoordinatorTest {
() -> joinGroupWithProtocolTypeAndName(PROTOCOL_TYPE, PROTOCOL_TYPE, wrongProtocolName));
}
+ @Test
+ public void testNoGenerationWillNotTriggerProtocolNameCheck() {
+ final String wrongProtocolName = "wrong-name";
+
+ setupCoordinator();
+ mockClient.reset();
+ mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(mockTime.timer(0));
+
+ mockClient.prepareResponse(body -> {
+ if (!(body instanceof JoinGroupRequest)) {
+ return false;
+ }
+ JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
+ return joinGroupRequest.data().protocolType().equals(PROTOCOL_TYPE);
+ }, joinGroupFollowerResponse(defaultGeneration, memberId,
+ "memberid", Errors.NONE, PROTOCOL_TYPE));
+
+ mockClient.prepareResponse(body -> {
+ if (!(body instanceof SyncGroupRequest)) {
+ return false;
+ }
+ coordinator.resetGenerationOnLeaveGroup();
+
+ SyncGroupRequest syncGroupRequest = (SyncGroupRequest) body;
+ return syncGroupRequest.data.protocolType().equals(PROTOCOL_TYPE)
+ && syncGroupRequest.data.protocolName().equals(PROTOCOL_NAME);
+ }, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, wrongProtocolName));
+
+ // No exception shall be thrown as the generation is reset.
+ coordinator.joinGroupIfNeeded(mockTime.timer(100L));
+ }
+
private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtocolType,
String syncGroupResponseProtocolType,
String syncGroupResponseProtocolName) {
@@ -665,7 +697,7 @@ public class AbstractCoordinatorTest {
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
- } catch (WakeupException e) {
+ } catch (WakeupException ignored) {
}
assertEquals(1, coordinator.onJoinPrepareInvokes);
@@ -703,7 +735,7 @@ public class AbstractCoordinatorTest {
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
- } catch (WakeupException e) {
+ } catch (WakeupException ignored) {
}
assertEquals(1, coordinator.onJoinPrepareInvokes);
@@ -738,7 +770,7 @@ public class AbstractCoordinatorTest {
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
- } catch (WakeupException e) {
+ } catch (WakeupException ignored) {
}
assertEquals(1, coordinator.onJoinPrepareInvokes);
@@ -811,7 +843,7 @@ public class AbstractCoordinatorTest {
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
- } catch (WakeupException e) {
+ } catch (WakeupException ignored) {
}
assertEquals(1, coordinator.onJoinPrepareInvokes);
@@ -884,7 +916,7 @@ public class AbstractCoordinatorTest {
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
- } catch (WakeupException e) {
+ } catch (WakeupException ignored) {
}
assertEquals(1, coordinator.onJoinPrepareInvokes);
@@ -945,7 +977,7 @@ public class AbstractCoordinatorTest {
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
- } catch (WakeupException e) {
+ } catch (WakeupException ignored) {
}
assertEquals(1, coordinator.onJoinPrepareInvokes);
@@ -990,12 +1022,8 @@ public class AbstractCoordinatorTest {
private void awaitFirstHeartbeat(final AtomicBoolean heartbeatReceived) throws Exception {
mockTime.sleep(HEARTBEAT_INTERVAL_MS);
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return heartbeatReceived.get();
- }
- }, 3000, "Should have received a heartbeat request after joining the group");
+ TestUtils.waitForCondition(heartbeatReceived::get,
+ 3000, "Should have received a heartbeat request after joining the group");
}
private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
@@ -1063,10 +1091,10 @@ public class AbstractCoordinatorTest {
private int onJoinCompleteInvokes = 0;
private boolean wakeupOnJoinComplete = false;
- public DummyCoordinator(GroupRebalanceConfig rebalanceConfig,
- ConsumerNetworkClient client,
- Metrics metrics,
- Time time) {
+ DummyCoordinator(GroupRebalanceConfig rebalanceConfig,
+ ConsumerNetworkClient client,
+ Metrics metrics,
+ Time time) {
super(rebalanceConfig, new LogContext(), client, metrics, METRIC_GROUP_PREFIX, time);
}