You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/28 21:44:43 UTC
[kafka] branch trunk updated: KAFKA-9823: Follow-up,
check state for handling commit error response (#8548)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f7edf46 KAFKA-9823: Follow-up, check state for handling commit error response (#8548)
f7edf46 is described below
commit f7edf46a5e5c30310542c00eb2d771bd72f42fd3
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Apr 28 14:44:11 2020 -0700
KAFKA-9823: Follow-up, check state for handling commit error response (#8548)
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../consumer/internals/AbstractCoordinator.java | 2 +-
.../consumer/internals/ConsumerCoordinator.java | 22 +++---
.../internals/ConsumerCoordinatorTest.java | 86 +++++++++++++++++++++-
3 files changed, 98 insertions(+), 12 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 9b4307f..e724384 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -123,11 +123,11 @@ public abstract class AbstractCoordinator implements Closeable {
private final GroupRebalanceConfig rebalanceConfig;
protected final ConsumerNetworkClient client;
protected final Time time;
+ protected MemberState state = MemberState.UNJOINED;
private Node coordinator = null;
private boolean rejoinNeeded = true;
private boolean needsJoinPrepare = true;
- private MemberState state = MemberState.UNJOINED;
private HeartbeatThread heartbeatThread = null;
private RequestFuture<ByteBuffer> joinFuture = null;
private RequestFuture<Void> findCoordinatorFuture = null;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 208193f..914071b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1202,13 +1202,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.info("OffsetCommit failed with {} due to group instance id {} fenced", sentGeneration, rebalanceConfig.groupInstanceId);
- // if the generation has changed, do not raise the fatal error but rebalance-in-progress
+ // if the generation has changed or we are not in rebalancing, do not raise the fatal error but rebalance-in-progress
if (generationUnchanged()) {
future.raise(error);
} else {
- future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
- "consumer member's old generation is fenced by its group instance id, it is possible that " +
- "this consumer has already participated another rebalance and got a new generation"));
+ if (ConsumerCoordinator.this.state == MemberState.REBALANCING) {
+ future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
+ "consumer member's old generation is fenced by its group instance id, it is possible that " +
+ "this consumer has already participated another rebalance and got a new generation"));
+ } else {
+ future.raise(new CommitFailedException());
+ }
}
return;
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
@@ -1229,15 +1233,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
|| error == Errors.ILLEGAL_GENERATION) {
log.info("OffsetCommit failed with {}: {}", sentGeneration, error.message());
- // only need to reset generation and re-join group if generation has not changed;
+ // only need to reset generation and re-join group if generation has not changed or we are not in rebalancing;
// otherwise only raise rebalance-in-progress error
- if (generationUnchanged()) {
- resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
- future.raise(new CommitFailedException());
- } else {
+ if (!generationUnchanged() && ConsumerCoordinator.this.state == MemberState.REBALANCING) {
future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
"consumer member's generation is already stale, meaning it has already participated another rebalance and " +
"got a new generation. You can try completing the rebalance by calling poll() and then retry commit again"));
+ } else {
+ resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
+ future.raise(new CommitFailedException());
}
return;
} else {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c62d59a..afba13b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2070,7 +2070,7 @@ public class ConsumerCoordinatorTest {
}
@Test
- public void testCommitOffsetIllegalGenerationWithNewGenearion() {
+ public void testCommitOffsetIllegalGenerationWithNewGeneration() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -2090,6 +2090,7 @@ public class ConsumerCoordinatorTest {
"memberId-new",
null);
coordinator.setNewGeneration(newGen);
+ coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
assertTrue(consumerClient.poll(future, time.timer(30000)));
assertTrue(future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
@@ -2099,6 +2100,31 @@ public class ConsumerCoordinatorTest {
}
@Test
+ public void testCommitOffsetIllegalGenerationWithResetGenearion() {
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+ final AbstractCoordinator.Generation currGen = new AbstractCoordinator.Generation(
+ 1,
+ "memberId",
+ null);
+ coordinator.setNewGeneration(currGen);
+
+ prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.ILLEGAL_GENERATION);
+ RequestFuture<Void> future = coordinator.sendOffsetCommitRequest(singletonMap(t1p,
+ new OffsetAndMetadata(100L, "metadata")));
+
+ // reset the generation
+ coordinator.setNewGeneration(AbstractCoordinator.Generation.NO_GENERATION);
+
+ assertTrue(consumerClient.poll(future, time.timer(30000)));
+ assertTrue(future.exception().getClass().isInstance(new CommitFailedException()));
+
+ // the generation should not be reset
+ assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
+ }
+
+ @Test
public void testCommitOffsetUnknownMemberWithNewGenearion() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -2119,6 +2145,7 @@ public class ConsumerCoordinatorTest {
"memberId-new",
null);
coordinator.setNewGeneration(newGen);
+ coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
assertTrue(consumerClient.poll(future, time.timer(30000)));
assertTrue(future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
@@ -2128,7 +2155,7 @@ public class ConsumerCoordinatorTest {
}
@Test
- public void testCommitOffsetFencedInstanceWithNewGenearion() {
+ public void testCommitOffsetUnknownMemberWithResetGenearion() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -2138,6 +2165,32 @@ public class ConsumerCoordinatorTest {
null);
coordinator.setNewGeneration(currGen);
+ prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
+ RequestFuture<Void> future = coordinator.sendOffsetCommitRequest(singletonMap(t1p,
+ new OffsetAndMetadata(100L, "metadata")));
+
+ // reset the generation
+ coordinator.setNewGeneration(AbstractCoordinator.Generation.NO_GENERATION);
+
+ assertTrue(consumerClient.poll(future, time.timer(30000)));
+ assertTrue(future.exception().getClass().isInstance(new CommitFailedException()));
+
+ // the generation should not be reset
+ assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
+ }
+
+ @Test
+ public void testCommitOffsetFencedInstanceWithRebalancingGenearion() {
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+ final AbstractCoordinator.Generation currGen = new AbstractCoordinator.Generation(
+ 1,
+ "memberId",
+ null);
+ coordinator.setNewGeneration(currGen);
+ coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.FENCED_INSTANCE_ID);
RequestFuture<Void> future = coordinator.sendOffsetCommitRequest(singletonMap(t1p,
new OffsetAndMetadata(100L, "metadata")));
@@ -2157,6 +2210,35 @@ public class ConsumerCoordinatorTest {
}
@Test
+ public void testCommitOffsetFencedInstanceWithNewGenearion() {
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+ final AbstractCoordinator.Generation currGen = new AbstractCoordinator.Generation(
+ 1,
+ "memberId",
+ null);
+ coordinator.setNewGeneration(currGen);
+
+ prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.FENCED_INSTANCE_ID);
+ RequestFuture<Void> future = coordinator.sendOffsetCommitRequest(singletonMap(t1p,
+ new OffsetAndMetadata(100L, "metadata")));
+
+ // change the generation
+ final AbstractCoordinator.Generation newGen = new AbstractCoordinator.Generation(
+ 2,
+ "memberId-new",
+ null);
+ coordinator.setNewGeneration(newGen);
+
+ assertTrue(consumerClient.poll(future, time.timer(30000)));
+ assertTrue(future.exception().getClass().isInstance(new CommitFailedException()));
+
+ // the generation should not be reset
+ assertEquals(newGen, coordinator.generation());
+ }
+
+ @Test
public void testCommitOffsetRebalanceInProgress() {
// we cannot retry if a rebalance occurs before the commit completed
final String consumerId = "leader";