You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/06/24 15:51:00 UTC

[kafka] branch 2.8 updated: KAFKA-12991; Fix unsafe access to `AbstractCoordinator.state` (#10879)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new bab9398  KAFKA-12991; Fix unsafe access to `AbstractCoordinator.state` (#10879)
bab9398 is described below

commit bab9398a436d06b86ff45f5063653f2f27a8ea3e
Author: David Jacot <dj...@confluent.io>
AuthorDate: Thu Jun 24 17:40:40 2021 +0200

    KAFKA-12991; Fix unsafe access to `AbstractCoordinator.state` (#10879)
    
    This patch fixes the unsynchronized accesses to `AbstractCoordinator.state`.
    
    (cherry-picked from commit 574af88305273f21456a9b10f21c182181cfc600)
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../consumer/internals/AbstractCoordinator.java    | 16 +++++-----
 .../consumer/internals/ConsumerCoordinator.java    | 34 +++++++++++++---------
 2 files changed, 30 insertions(+), 20 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 2f0d6c8..b70b67c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -1119,13 +1119,15 @@ public abstract class AbstractCoordinator implements Closeable {
             } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                 // since we may be sending the request during rebalance, we should check
                 // this case and ignore the REBALANCE_IN_PROGRESS error
-                if (state == MemberState.STABLE) {
-                    log.info("Attempt to heartbeat failed since group is rebalancing");
-                    requestRejoin();
-                    future.raise(error);
-                } else {
-                    log.debug("Ignoring heartbeat response with error {} during {} state", error, state);
-                    future.complete(null);
+                synchronized (AbstractCoordinator.this) {
+                    if (state == MemberState.STABLE) {
+                        log.info("Attempt to heartbeat failed since group is rebalancing");
+                        requestRejoin();
+                        future.raise(error);
+                    } else {
+                        log.debug("Ignoring heartbeat response with error {} during {} state", error, state);
+                        future.complete(null);
+                    }
                 }
             } else if (error == Errors.ILLEGAL_GENERATION ||
                        error == Errors.UNKNOWN_MEMBER_ID ||
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b415b22..2dae166 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1218,13 +1218,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                             if (generationUnchanged()) {
                                 future.raise(error);
                             } else {
-                                if (ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
-                                    future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
-                                        "consumer member's old generation is fenced by its group instance id, it is possible that " +
-                                        "this consumer has already participated another rebalance and got a new generation"));
-                                } else {
-                                    future.raise(new CommitFailedException());
+                                KafkaException exception;
+                                synchronized (ConsumerCoordinator.this) {
+                                    if (ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
+                                        exception = new RebalanceInProgressException("Offset commit cannot be completed since the " +
+                                            "consumer member's old generation is fenced by its group instance id, it is possible that " +
+                                            "this consumer has already participated another rebalance and got a new generation");
+                                    } else {
+                                        exception = new CommitFailedException();
+                                    }
                                 }
+                                future.raise(exception);
                             }
                             return;
                         } else if (error == Errors.REBALANCE_IN_PROGRESS) {
@@ -1247,14 +1251,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
                             // only need to reset generation and re-join group if generation has not changed or we are not in rebalancing;
                             // otherwise only raise rebalance-in-progress error
-                            if (!generationUnchanged() && ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
-                                future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
-                                    "consumer member's generation is already stale, meaning it has already participated another rebalance and " +
-                                    "got a new generation. You can try completing the rebalance by calling poll() and then retry commit again"));
-                            } else {
-                                resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
-                                future.raise(new CommitFailedException());
+                            KafkaException exception;
+                            synchronized (ConsumerCoordinator.this) {
+                                if (!generationUnchanged() && ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
+                                    exception = new RebalanceInProgressException("Offset commit cannot be completed since the " +
+                                        "consumer member's generation is already stale, meaning it has already participated another rebalance and " +
+                                        "got a new generation. You can try completing the rebalance by calling poll() and then retry commit again");
+                                } else {
+                                    resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
+                                    exception = new CommitFailedException();
+                                }
                             }
+                            future.raise(exception);
                             return;
                         } else {
                             future.raise(new KafkaException("Unexpected error in commit: " + error.message()));