You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/09/13 07:46:28 UTC
[kafka] branch 3.3 updated: KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits (#12626)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 389bb2004fe KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits (#12626)
389bb2004fe is described below
commit 389bb2004fe7937ad4ab3f7574b6bd7472ed8c5c
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Sep 13 00:43:09 2022 -0700
KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits (#12626)
Asynchronous offset commits may throw an unexpected WakeupException following #11631 and #12244. This patch fixes the problem by passing through a flag to ensureCoordinatorReady to indicate whether wakeups should be disabled. This is used to disable wakeups in the context of asynchronous offset commits. All other uses leave wakeups enabled.
Note: this patch builds on top of #12611.
Co-Authored-By: Guozhang Wang wangguoz@gmail.com
Reviewers: Luke Chen <sh...@gmail.com>
---
.../consumer/internals/AbstractCoordinator.java | 19 ++++++++++++++++---
.../consumer/internals/ConsumerCoordinator.java | 13 ++++++++-----
.../consumer/internals/ConsumerNetworkClient.java | 17 ++++++++++++++++-
.../consumer/internals/AbstractCoordinatorTest.java | 15 +++++++++++++++
4 files changed, 55 insertions(+), 9 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d2ece9efc58..c78caaba059 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -231,14 +231,27 @@ public abstract class AbstractCoordinator implements Closeable {
protected void onLeavePrepare() {}
/**
- * Visible for testing.
- *
* Ensure that the coordinator is ready to receive requests.
*
* @param timer Timer bounding how long this method can block
* @return true If coordinator discovery and initial connection succeeded, false otherwise
*/
protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
+ return ensureCoordinatorReady(timer, false);
+ }
+
+ /**
+ * Ensure that the coordinator is ready to receive requests. This will return
+ * immediately without blocking. It is intended to be called in an asynchronous
+ * context when wakeups are not expected.
+ *
+ * @return true If coordinator discovery and initial connection succeeded, false otherwise
+ */
+ protected synchronized boolean ensureCoordinatorReadyAsync() {
+ return ensureCoordinatorReady(time.timer(0), true);
+ }
+
+ private synchronized boolean ensureCoordinatorReady(final Timer timer, boolean disableWakeup) {
if (!coordinatorUnknown())
return true;
@@ -249,7 +262,7 @@ public abstract class AbstractCoordinator implements Closeable {
throw fatalException;
}
final RequestFuture<Void> future = lookupCoordinator();
- client.poll(future, timer);
+ client.poll(future, timer, disableWakeup);
if (!future.isDone()) {
// ran out of time
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 00af72a807b..5eb7d878197 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import java.time.Duration;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -489,10 +488,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
}
- private boolean coordinatorUnknownAndUnready(Timer timer) {
+ private boolean coordinatorUnknownAndUnreadySync(Timer timer) {
return coordinatorUnknown() && !ensureCoordinatorReady(timer);
}
+ private boolean coordinatorUnknownAndUnreadyAsync() {
+ return coordinatorUnknown() && !ensureCoordinatorReadyAsync();
+ }
+
/**
* Poll for coordinator events. This ensures that the coordinator is known and that the consumer
* has joined the group (if it is using group management). This also handles periodic offset commits
@@ -518,7 +521,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// Always update the heartbeat last poll time so that the heartbeat thread does not leave the
// group proactively due to application inactivity even if (say) the coordinator cannot be found.
pollHeartbeat(timer.currentTimeMs());
- if (coordinatorUnknownAndUnready(timer)) {
+ if (coordinatorUnknownAndUnreadySync(timer)) {
return false;
}
@@ -1069,7 +1072,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (offsets.isEmpty()) {
// No need to check coordinator if offsets is empty since commit of empty offsets is completed locally.
future = doCommitOffsetsAsync(offsets, callback);
- } else if (!coordinatorUnknownAndUnready(time.timer(Duration.ZERO))) {
+ } else if (!coordinatorUnknownAndUnreadyAsync()) {
// we need to make sure coordinator is ready before committing, since
// this is for async committing we do not try to block, but just try once to
// clear the previous discover-coordinator future, resend, or get responses;
@@ -1158,7 +1161,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
return true;
do {
- if (coordinatorUnknownAndUnready(timer)) {
+ if (coordinatorUnknownAndUnreadySync(timer)) {
return false;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4b9112016e9..6646dc6c893 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -211,8 +211,23 @@ public class ConsumerNetworkClient implements Closeable {
* @throws InterruptException if the calling thread is interrupted
*/
public boolean poll(RequestFuture<?> future, Timer timer) {
+ return poll(future, timer, false);
+ }
+
+ /**
+ * Block until the provided request future request has finished or the timeout has expired.
+ *
+ * @param future The request future to wait for
+ * @param timer Timer bounding how long this method can block
+ * @param disableWakeup true if we should not check for wakeups, false otherwise
+ *
+ * @return true if the future is done, false otherwise
+ * @throws WakeupException if {@link #wakeup()} is called from another thread and `disableWakeup` is false
+ * @throws InterruptException if the calling thread is interrupted
+ */
+ public boolean poll(RequestFuture<?> future, Timer timer, boolean disableWakeup) {
do {
- poll(timer, future);
+ poll(timer, future, disableWakeup);
} while (!future.isDone() && timer.notExpired());
return future.isDone();
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index cbc4e7495e1..4471b6f88cf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -272,6 +272,21 @@ public class AbstractCoordinatorTest {
assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS);
}
+ @Test
+ public void testWakeupFromEnsureCoordinatorReady() {
+ setupCoordinator();
+
+ consumerClient.wakeup();
+
+ // No wakeup should occur from the async variation.
+ coordinator.ensureCoordinatorReadyAsync();
+
+ // But should wakeup in sync variation even if timer is 0.
+ assertThrows(WakeupException.class, () -> {
+ coordinator.ensureCoordinatorReady(mockTime.timer(0));
+ });
+ }
+
@Test
public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception {
setupCoordinator();