You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/09/13 07:46:28 UTC

[kafka] branch 3.3 updated: KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits (#12626)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 389bb2004fe KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits (#12626)
389bb2004fe is described below

commit 389bb2004fe7937ad4ab3f7574b6bd7472ed8c5c
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Sep 13 00:43:09 2022 -0700

    KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits (#12626)
    
    Asynchronous offset commits may throw an unexpected WakeupException following #11631 and #12244. This patch fixes the problem by passing through a flag to ensureCoordinatorReady to indicate whether wakeups should be disabled. This is used to disable wakeups in the context of asynchronous offset commits. All other uses leave wakeups enabled.
    
    Note: this patch builds on top of #12611.
    
    Co-Authored-By: Guozhang Wang wangguoz@gmail.com
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 .../consumer/internals/AbstractCoordinator.java       | 19 ++++++++++++++++---
 .../consumer/internals/ConsumerCoordinator.java       | 13 ++++++++-----
 .../consumer/internals/ConsumerNetworkClient.java     | 17 ++++++++++++++++-
 .../consumer/internals/AbstractCoordinatorTest.java   | 15 +++++++++++++++
 4 files changed, 55 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d2ece9efc58..c78caaba059 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -231,14 +231,27 @@ public abstract class AbstractCoordinator implements Closeable {
     protected void onLeavePrepare() {}
 
     /**
-     * Visible for testing.
-     *
      * Ensure that the coordinator is ready to receive requests.
      *
      * @param timer Timer bounding how long this method can block
      * @return true If coordinator discovery and initial connection succeeded, false otherwise
      */
     protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
+        return ensureCoordinatorReady(timer, false);
+    }
+
+    /**
+     * Ensure that the coordinator is ready to receive requests. This will return
+     * immediately without blocking. It is intended to be called in an asynchronous
+     * context when wakeups are not expected.
+     *
+     * @return true If coordinator discovery and initial connection succeeded, false otherwise
+     */
+    protected synchronized boolean ensureCoordinatorReadyAsync() {
+        return ensureCoordinatorReady(time.timer(0), true);
+    }
+
+    private synchronized boolean ensureCoordinatorReady(final Timer timer, boolean disableWakeup) {
         if (!coordinatorUnknown())
             return true;
 
@@ -249,7 +262,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 throw fatalException;
             }
             final RequestFuture<Void> future = lookupCoordinator();
-            client.poll(future, timer);
+            client.poll(future, timer, disableWakeup);
 
             if (!future.isDone()) {
                 // ran out of time
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 00af72a807b..5eb7d878197 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import java.time.Duration;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -489,10 +488,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
-    private boolean coordinatorUnknownAndUnready(Timer timer) {
+    private boolean coordinatorUnknownAndUnreadySync(Timer timer) {
         return coordinatorUnknown() && !ensureCoordinatorReady(timer);
     }
 
+    private boolean coordinatorUnknownAndUnreadyAsync() {
+        return coordinatorUnknown() && !ensureCoordinatorReadyAsync();
+    }
+
     /**
      * Poll for coordinator events. This ensures that the coordinator is known and that the consumer
      * has joined the group (if it is using group management). This also handles periodic offset commits
@@ -518,7 +521,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             // Always update the heartbeat last poll time so that the heartbeat thread does not leave the
             // group proactively due to application inactivity even if (say) the coordinator cannot be found.
             pollHeartbeat(timer.currentTimeMs());
-            if (coordinatorUnknownAndUnready(timer)) {
+            if (coordinatorUnknownAndUnreadySync(timer)) {
                 return false;
             }
 
@@ -1069,7 +1072,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (offsets.isEmpty()) {
             // No need to check coordinator if offsets is empty since commit of empty offsets is completed locally.
             future = doCommitOffsetsAsync(offsets, callback);
-        } else if (!coordinatorUnknownAndUnready(time.timer(Duration.ZERO))) {
+        } else if (!coordinatorUnknownAndUnreadyAsync()) {
             // we need to make sure coordinator is ready before committing, since
             // this is for async committing we do not try to block, but just try once to
             // clear the previous discover-coordinator future, resend, or get responses;
@@ -1158,7 +1161,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             return true;
 
         do {
-            if (coordinatorUnknownAndUnready(timer)) {
+            if (coordinatorUnknownAndUnreadySync(timer)) {
                 return false;
             }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4b9112016e9..6646dc6c893 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -211,8 +211,23 @@ public class ConsumerNetworkClient implements Closeable {
      * @throws InterruptException if the calling thread is interrupted
      */
     public boolean poll(RequestFuture<?> future, Timer timer) {
+        return poll(future, timer, false);
+    }
+
+    /**
+     * Block until the provided request future request has finished or the timeout has expired.
+     *
+     * @param future The request future to wait for
+     * @param timer Timer bounding how long this method can block
+     * @param disableWakeup true if we should not check for wakeups, false otherwise
+     *
+     * @return true if the future is done, false otherwise
+     * @throws WakeupException if {@link #wakeup()} is called from another thread and `disableWakeup` is false
+     * @throws InterruptException if the calling thread is interrupted
+     */
+    public boolean poll(RequestFuture<?> future, Timer timer, boolean disableWakeup) {
         do {
-            poll(timer, future);
+            poll(timer, future, disableWakeup);
         } while (!future.isDone() && timer.notExpired());
         return future.isDone();
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index cbc4e7495e1..4471b6f88cf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -272,6 +272,21 @@ public class AbstractCoordinatorTest {
         assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS);
     }
 
+    @Test
+    public void testWakeupFromEnsureCoordinatorReady() {
+        setupCoordinator();
+
+        consumerClient.wakeup();
+
+        // No wakeup should occur from the async variation.
+        coordinator.ensureCoordinatorReadyAsync();
+
+        // But should wakeup in sync variation even if timer is 0.
+        assertThrows(WakeupException.class, () -> {
+            coordinator.ensureCoordinatorReady(mockTime.timer(0));
+        });
+    }
+
     @Test
     public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception {
         setupCoordinator();