You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/06/24 15:51:00 UTC
[kafka] branch 2.8 updated: KAFKA-12991;
Fix unsafe access to `AbstractCoordinator.state` (#10879)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new bab9398 KAFKA-12991; Fix unsafe access to `AbstractCoordinator.state` (#10879)
bab9398 is described below
commit bab9398a436d06b86ff45f5063653f2f27a8ea3e
Author: David Jacot <dj...@confluent.io>
AuthorDate: Thu Jun 24 17:40:40 2021 +0200
KAFKA-12991; Fix unsafe access to `AbstractCoordinator.state` (#10879)
This patch fixes the unsynchronized accesses to `AbstractCoordinator.state`.
(cherry-picked from commit 574af88305273f21456a9b10f21c182181cfc600)
Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
.../consumer/internals/AbstractCoordinator.java | 16 +++++-----
.../consumer/internals/ConsumerCoordinator.java | 34 +++++++++++++---------
2 files changed, 30 insertions(+), 20 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 2f0d6c8..b70b67c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -1119,13 +1119,15 @@ public abstract class AbstractCoordinator implements Closeable {
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
// since we may be sending the request during rebalance, we should check
// this case and ignore the REBALANCE_IN_PROGRESS error
- if (state == MemberState.STABLE) {
- log.info("Attempt to heartbeat failed since group is rebalancing");
- requestRejoin();
- future.raise(error);
- } else {
- log.debug("Ignoring heartbeat response with error {} during {} state", error, state);
- future.complete(null);
+ synchronized (AbstractCoordinator.this) {
+ if (state == MemberState.STABLE) {
+ log.info("Attempt to heartbeat failed since group is rebalancing");
+ requestRejoin();
+ future.raise(error);
+ } else {
+ log.debug("Ignoring heartbeat response with error {} during {} state", error, state);
+ future.complete(null);
+ }
}
} else if (error == Errors.ILLEGAL_GENERATION ||
error == Errors.UNKNOWN_MEMBER_ID ||
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b415b22..2dae166 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1218,13 +1218,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (generationUnchanged()) {
future.raise(error);
} else {
- if (ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
- future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
- "consumer member's old generation is fenced by its group instance id, it is possible that " +
- "this consumer has already participated another rebalance and got a new generation"));
- } else {
- future.raise(new CommitFailedException());
+ KafkaException exception;
+ synchronized (ConsumerCoordinator.this) {
+ if (ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
+ exception = new RebalanceInProgressException("Offset commit cannot be completed since the " +
+ "consumer member's old generation is fenced by its group instance id, it is possible that " +
+ "this consumer has already participated another rebalance and got a new generation");
+ } else {
+ exception = new CommitFailedException();
+ }
}
+ future.raise(exception);
}
return;
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
@@ -1247,14 +1251,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// only need to reset generation and re-join group if generation has not changed or we are not in rebalancing;
// otherwise only raise rebalance-in-progress error
- if (!generationUnchanged() && ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
- future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
- "consumer member's generation is already stale, meaning it has already participated another rebalance and " +
- "got a new generation. You can try completing the rebalance by calling poll() and then retry commit again"));
- } else {
- resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
- future.raise(new CommitFailedException());
+ KafkaException exception;
+ synchronized (ConsumerCoordinator.this) {
+ if (!generationUnchanged() && ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
+ exception = new RebalanceInProgressException("Offset commit cannot be completed since the " +
+ "consumer member's generation is already stale, meaning it has already participated another rebalance and " +
+ "got a new generation. You can try completing the rebalance by calling poll() and then retry commit again");
+ } else {
+ resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
+ exception = new CommitFailedException();
+ }
}
+ future.raise(exception);
return;
} else {
future.raise(new KafkaException("Unexpected error in commit: " + error.message()));