You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/12/19 17:48:59 UTC
[kafka] branch trunk updated: KAFKA-14264; New logic to discover group coordinator (#12862)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4548c272ae3 KAFKA-14264; New logic to discover group coordinator (#12862)
4548c272ae3 is described below
commit 4548c272ae39a6630174ab428bc82fce7c98f7fd
Author: Philip Nee <pn...@confluent.io>
AuthorDate: Mon Dec 19 09:48:52 2022 -0800
KAFKA-14264; New logic to discover group coordinator (#12862)
[KAFKA-14264](https://issues.apache.org/jira/browse/KAFKA-14264)
In this patch, we refactored the existing FindCoordinator mechanism. In particular, we first centralize all of the network operation (send, poll) in `NetworkClientDelegate`, then we introduced a RequestManager interface that is responsible to handle the timing of different kind of requests, based on the implementation. In this path, we implemented a `CoordinatorRequestManager` which determines when to create an `UnsentRequest` upon polling the request manager.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../internals/CoordinatorRequestManager.java | 222 +++++++++++++++++
.../internals/DefaultBackgroundThread.java | 189 ++++++++-------
.../consumer/internals/DefaultEventHandler.java | 31 +--
...BackgroundEvent.java => ErrorEventHandler.java} | 23 +-
.../consumer/internals/NetworkClientDelegate.java | 267 +++++++++++++++++++++
.../consumer/internals/PrototypeAsyncConsumer.java | 7 +-
.../BackgroundEvent.java => RequestManager.java} | 18 +-
.../clients/consumer/internals/RequestState.java | 91 +++++++
.../internals/events/ApplicationEvent.java | 14 +-
...onEvent.java => ApplicationEventProcessor.java} | 35 +--
.../consumer/internals/events/BackgroundEvent.java | 2 +-
...kgroundEvent.java => ErrorBackgroundEvent.java} | 15 +-
.../internals/events/NoopApplicationEvent.java | 16 +-
.../common/requests/FindCoordinatorResponse.java | 18 ++
.../internals/CoordinatorRequestManagerTest.java | 202 ++++++++++++++++
.../internals/DefaultBackgroundThreadTest.java | 177 +++++++-------
.../internals/DefaultEventHandlerTest.java | 92 +++----
.../internals/NetworkClientDelegateTest.java | 175 ++++++++++++++
.../consumer/internals/RequestStateTest.java | 48 ++++
19 files changed, 1306 insertions(+), 336 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
new file mode 100644
index 00000000000..bc40b67901f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This is responsible for timing to send the next {@link FindCoordinatorRequest} based on the following criteria:
+ *
+ * Whether there is an existing coordinator.
+ * Whether there is an inflight request.
+ * Whether the backoff timer has expired.
+ * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer
+ * or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The {@link FindCoordinatorRequest} will be handled by the {@link FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be
+ * marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+ private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000;
+ private final Logger log;
+ private final ErrorEventHandler nonRetriableErrorHandler;
+ private final String groupId;
+
+ private final RequestState coordinatorRequestState;
+ private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while
+ private long totalDisconnectedMin = 0;
+ private Node coordinator;
+
+ public CoordinatorRequestManager(final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final String groupId) {
+ Objects.requireNonNull(groupId);
+ this.log = logContext.logger(this.getClass());
+ this.nonRetriableErrorHandler = errorHandler;
+ this.groupId = groupId;
+ this.coordinatorRequestState = new RequestState(config);
+ }
+
+ // Visible for testing
+ CoordinatorRequestManager(final LogContext logContext,
+ final ErrorEventHandler errorHandler,
+ final String groupId,
+ final RequestState coordinatorRequestState) {
+ Objects.requireNonNull(groupId);
+ this.log = logContext.logger(this.getClass());
+ this.nonRetriableErrorHandler = errorHandler;
+ this.groupId = groupId;
+ this.coordinatorRequestState = coordinatorRequestState;
+ }
+
+ /**
+ * Poll for the FindCoordinator request.
+ * If we don't need to discover a coordinator, this method will return a PollResult with Long.MAX_VALUE backoff time and an empty list.
+ * If we are still backing off from a previous attempt, this method will return a PollResult with the remaining backoff time and an empty list.
+ * Otherwise, this returns will return a PollResult with a singleton list of UnsentRequest and Long.MAX_VALUE backoff time.
+ * Note that this method does not involve any actual network IO, and it only determines if we need to send a new request or not.
+ *
+ * @param currentTimeMs current time in ms.
+ * @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. This will not be {@code null}.
+ */
+ @Override
+ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+ if (this.coordinator != null) {
+ return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList());
+ }
+
+ if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+ NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(currentTimeMs);
+ return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.singletonList(request));
+ }
+
+ return new NetworkClientDelegate.PollResult(
+ coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+ Collections.emptyList());
+ }
+
+ private NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long currentTimeMs) {
+ coordinatorRequestState.updateLastSend(currentTimeMs);
+ FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+ .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+ .setKey(this.groupId);
+ return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+ new FindCoordinatorRequest.Builder(data),
+ new FindCoordinatorRequestHandler(),
+ null);
+ }
+
+ /**
+ * Mark the current coordinator null.
+ *
+ * @param cause why the coordinator is marked unknown.
+ * @param currentTimeMs the current time in ms.
+ */
+ protected void markCoordinatorUnknown(final String cause, final long currentTimeMs) {
+ if (this.coordinator != null) {
+ log.info("Group coordinator {} is unavailable or invalid due to cause: {}. "
+ + "Rediscovery will be attempted.", this.coordinator, cause);
+ this.coordinator = null;
+ timeMarkedUnknownMs = currentTimeMs;
+ totalDisconnectedMin = 0;
+ } else {
+ long durationOfOngoingDisconnectMs = Math.max(0, currentTimeMs - timeMarkedUnknownMs);
+ long currDisconnectMin = durationOfOngoingDisconnectMs / COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS;
+ if (currDisconnectMin > this.totalDisconnectedMin) {
+ log.debug("Consumer has been disconnected from the group coordinator for {}ms", durationOfOngoingDisconnectMs);
+ totalDisconnectedMin = currDisconnectMin;
+ }
+ }
+ }
+
+ private void onSuccessfulResponse(final FindCoordinatorResponseData.Coordinator coordinator) {
+ // use MAX_VALUE - node.id as the coordinator id to allow separate connections
+ // for the coordinator in the underlying network client layer
+ int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+ this.coordinator = new Node(
+ coordinatorConnectionId,
+ coordinator.host(),
+ coordinator.port());
+ log.info("Discovered group coordinator {}", coordinator);
+ coordinatorRequestState.reset();
+ }
+
+ private void onFailedCoordinatorResponse(final Exception exception, final long currentTimeMs) {
+ coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+ markCoordinatorUnknown("FindCoordinator failed with exception", currentTimeMs);
+
+ if (exception instanceof RetriableException) {
+ log.debug("FindCoordinator request failed due to retriable exception", exception);
+ return;
+ }
+
+ if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
+ log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage());
+ nonRetriableErrorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId));
+ return;
+ }
+
+ log.warn("FindCoordinator request failed due to fatal exception", exception);
+ nonRetriableErrorHandler.handle(exception);
+ }
+
+ /**
+ * Handles the response and exception upon completing the {@link FindCoordinatorRequest}. This is invoked in the callback
+ * {@link FindCoordinatorRequestHandler}. If the response was successful, a coordinator node will be updated. If the
+ * response failed due to errors, the current coordinator will be marked unknown.
+ *
+ * @param currentTimeMs current time ins ms.
+ * @param response the response for finding the coordinator. null if an exception is thrown.
+ * @param e the exception, null if a valid response is received.
+ */
+ protected void onResponse(final long currentTimeMs, final FindCoordinatorResponse response, final Exception e) {
+ // handles Runtime exception
+ if (e != null) {
+ onFailedCoordinatorResponse(e, currentTimeMs);
+ return;
+ }
+
+ Optional<FindCoordinatorResponseData.Coordinator> coordinator = response.coordinatorByKey(this.groupId);
+ if (!coordinator.isPresent()) {
+ String msg = String.format("Response did not contain expected coordinator section for groupId: %s", this.groupId);
+ onFailedCoordinatorResponse(new IllegalStateException(msg), currentTimeMs);
+ return;
+ }
+
+ FindCoordinatorResponseData.Coordinator node = coordinator.get();
+ if (node.errorCode() != Errors.NONE.code()) {
+ onFailedCoordinatorResponse(Errors.forCode(node.errorCode()).exception(), currentTimeMs);
+ return;
+ }
+ onSuccessfulResponse(node);
+ }
+
+ /**
+ * Returns the current coordinator node.
+ *
+ * @return the current coordinator node.
+ */
+ public Optional<Node> coordinator() {
+ return Optional.ofNullable(this.coordinator);
+ }
+
+ private class FindCoordinatorRequestHandler extends NetworkClientDelegate.AbstractRequestFutureCompletionHandler {
+ @Override
+ public void handleResponse(final ClientResponse r, final Exception e) {
+ CoordinatorRequestManager.this.onResponse(
+ r.receivedTimeMs(), (FindCoordinatorResponse) r.responseBody(),
+ e);
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
index 6788bd09bd2..3ceb5e15b87 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
@@ -16,24 +16,26 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
-import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;
+import java.util.Queue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
/**
* Background thread runnable that consumes {@code ApplicationEvent} and
@@ -44,63 +46,86 @@ import java.util.concurrent.atomic.AtomicReference;
* initialized by the polling thread.
*/
public class DefaultBackgroundThread extends KafkaThread {
+ private static final int MAX_POLL_TIMEOUT_MS = 5000;
private static final String BACKGROUND_THREAD_NAME =
- "consumer_background_thread";
+ "consumer_background_thread";
private final Time time;
private final Logger log;
private final BlockingQueue<ApplicationEvent> applicationEventQueue;
private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
- private final ConsumerNetworkClient networkClient;
- private final SubscriptionState subscriptions;
private final ConsumerMetadata metadata;
- private final Metrics metrics;
private final ConsumerConfig config;
+ // empty if groupId is null
+ private final Optional<CoordinatorRequestManager> coordinatorManager;
+ private final ApplicationEventProcessor applicationEventProcessor;
+ private final NetworkClientDelegate networkClientDelegate;
+ private final ErrorEventHandler errorEventHandler;
- private String clientId;
- private long retryBackoffMs;
- private int heartbeatIntervalMs;
private boolean running;
- private Optional<ApplicationEvent> inflightEvent = Optional.empty();
- private final AtomicReference<Optional<RuntimeException>> exception =
- new AtomicReference<>(Optional.empty());
+ // Visible for testing
+ DefaultBackgroundThread(final Time time,
+ final ConsumerConfig config,
+ final LogContext logContext,
+ final BlockingQueue<ApplicationEvent> applicationEventQueue,
+ final BlockingQueue<BackgroundEvent> backgroundEventQueue,
+ final ErrorEventHandler errorEventHandler,
+ final ApplicationEventProcessor processor,
+ final ConsumerMetadata metadata,
+ final NetworkClientDelegate networkClient,
+ final CoordinatorRequestManager coordinatorManager) {
+ super(BACKGROUND_THREAD_NAME, true);
+ this.time = time;
+ this.running = true;
+ this.log = logContext.logger(getClass());
+ this.applicationEventQueue = applicationEventQueue;
+ this.backgroundEventQueue = backgroundEventQueue;
+ this.applicationEventProcessor = processor;
+ this.config = config;
+ this.metadata = metadata;
+ this.networkClientDelegate = networkClient;
+ this.coordinatorManager = Optional.ofNullable(coordinatorManager);
+ this.errorEventHandler = errorEventHandler;
+ }
public DefaultBackgroundThread(final Time time,
final ConsumerConfig config,
+ final GroupRebalanceConfig rebalanceConfig,
final LogContext logContext,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
- final SubscriptionState subscriptions,
final ConsumerMetadata metadata,
- final ConsumerNetworkClient networkClient,
- final Metrics metrics) {
+ final KafkaClient networkClient) {
super(BACKGROUND_THREAD_NAME, true);
try {
this.time = time;
- this.log = logContext.logger(DefaultBackgroundThread.class);
+ this.log = logContext.logger(getClass());
this.applicationEventQueue = applicationEventQueue;
this.backgroundEventQueue = backgroundEventQueue;
this.config = config;
- setConfig();
- this.inflightEvent = Optional.empty();
// subscriptionState is initialized by the polling thread
- this.subscriptions = subscriptions;
this.metadata = metadata;
- this.networkClient = networkClient;
- this.metrics = metrics;
+ this.networkClientDelegate = new NetworkClientDelegate(
+ this.time,
+ this.config,
+ logContext,
+ networkClient);
this.running = true;
+ this.errorEventHandler = new ErrorEventHandler(this.backgroundEventQueue);
+ String groupId = rebalanceConfig.groupId;
+ this.coordinatorManager = groupId == null ?
+ Optional.empty() :
+ Optional.of(new CoordinatorRequestManager(
+ logContext,
+ config,
+ errorEventHandler,
+ groupId));
+ this.applicationEventProcessor = new ApplicationEventProcessor(backgroundEventQueue);
} catch (final Exception e) {
- // now propagate the exception
close();
- throw new KafkaException("Failed to construct background processor", e);
+ throw new KafkaException("Failed to construct background processor", e.getCause());
}
}
- private void setConfig() {
- this.retryBackoffMs = this.config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
- this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
- this.heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
- }
-
@Override
public void run() {
try {
@@ -109,23 +134,13 @@ public class DefaultBackgroundThread extends KafkaThread {
try {
runOnce();
} catch (final WakeupException e) {
- log.debug(
- "Exception thrown, background thread won't terminate",
- e
- );
- // swallow the wakeup exception to prevent killing the
- // background thread.
+ log.debug("WakeupException caught, background thread won't be interrupted");
+ // swallow the wakeup exception to prevent killing the background thread.
}
}
} catch (final Throwable t) {
- log.error(
- "The background thread failed due to unexpected error",
- t
- );
- if (t instanceof RuntimeException)
- this.exception.set(Optional.of((RuntimeException) t));
- else
- this.exception.set(Optional.of(new RuntimeException(t)));
+ log.error("The background thread failed due to unexpected error", t);
+ throw new RuntimeException(t);
} finally {
close();
log.debug("{} closed", getClass());
@@ -133,63 +148,57 @@ public class DefaultBackgroundThread extends KafkaThread {
}
/**
- * Process event from a single poll
+ * Poll and process an {@link ApplicationEvent}. It performs the following tasks:
+ * 1. Drains and try to process all of the requests in the queue.
+ * 2. Poll request managers to queue up the necessary requests.
+ * 3. Poll the networkClient to send and retrieve the response.
*/
void runOnce() {
- this.inflightEvent = maybePollEvent();
- if (this.inflightEvent.isPresent()) {
- log.debug("processing application event: {}", this.inflightEvent);
- }
- if (this.inflightEvent.isPresent() && maybeConsumeInflightEvent(this.inflightEvent.get())) {
- // clear inflight event upon successful consumption
- this.inflightEvent = Optional.empty();
+ drainAndProcess();
+
+ final long currentTimeMs = time.milliseconds();
+ long pollWaitTimeMs = MAX_POLL_TIMEOUT_MS;
+
+ if (coordinatorManager.isPresent()) {
+ pollWaitTimeMs = Math.min(
+ pollWaitTimeMs,
+ handlePollResult(coordinatorManager.get().poll(currentTimeMs)));
}
- // if there are pending events to process, poll then continue without
- // blocking.
- if (!applicationEventQueue.isEmpty() || inflightEvent.isPresent()) {
- networkClient.poll(time.timer(0));
- return;
+ networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
+ }
+
+ private void drainAndProcess() {
+ Queue<ApplicationEvent> events = pollApplicationEvent();
+ Iterator<ApplicationEvent> iter = events.iterator();
+ while (iter.hasNext()) {
+ ApplicationEvent event = iter.next();
+ log.debug("processing application event: {}", event);
+ consumeApplicationEvent(event);
}
- // if there are no events to process, poll until timeout. The timeout
- // will be the minimum of the requestTimeoutMs, nextHeartBeatMs, and
- // nextMetadataUpdate. See NetworkClient.poll impl.
- networkClient.poll(time.timer(timeToNextHeartbeatMs(time.milliseconds())));
}
- private long timeToNextHeartbeatMs(final long nowMs) {
- // TODO: implemented when heartbeat is added to the impl
- return 100;
+ long handlePollResult(NetworkClientDelegate.PollResult res) {
+ if (!res.unsentRequests.isEmpty()) {
+ networkClientDelegate.addAll(res.unsentRequests);
+ }
+ return res.timeUntilNextPollMs;
}
- private Optional<ApplicationEvent> maybePollEvent() {
- if (this.inflightEvent.isPresent() || this.applicationEventQueue.isEmpty()) {
- return this.inflightEvent;
+ private Queue<ApplicationEvent> pollApplicationEvent() {
+ if (this.applicationEventQueue.isEmpty()) {
+ return new LinkedList<>();
}
- return Optional.ofNullable(this.applicationEventQueue.poll());
+
+ LinkedList<ApplicationEvent> res = new LinkedList<>();
+ this.applicationEventQueue.drainTo(res);
+ return res;
}
- /**
- * ApplicationEvent are consumed here.
- *
- * @param event an {@link ApplicationEvent}
- * @return true when successfully consumed the event.
- */
- private boolean maybeConsumeInflightEvent(final ApplicationEvent event) {
+ private void consumeApplicationEvent(final ApplicationEvent event) {
log.debug("try consuming event: {}", Optional.ofNullable(event));
Objects.requireNonNull(event);
- return event.process();
- }
-
- /**
- * Processes {@link NoopApplicationEvent} and equeue a
- * {@link NoopBackgroundEvent}. This is intentionally left here for
- * demonstration purpose.
- *
- * @param event a {@link NoopApplicationEvent}
- */
- private void process(final NoopApplicationEvent event) {
- backgroundEventQueue.add(new NoopBackgroundEvent(event.message));
+ applicationEventProcessor.process(event);
}
public boolean isRunning() {
@@ -197,13 +206,13 @@ public class DefaultBackgroundThread extends KafkaThread {
}
public void wakeup() {
- networkClient.wakeup();
+ networkClientDelegate.wakeup();
}
public void close() {
this.running = false;
this.wakeup();
- Utils.closeQuietly(networkClient, "consumer network client");
+ Utils.closeQuietly(networkClientDelegate, "network client utils");
Utils.closeQuietly(metadata, "consumer metadata client");
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
index 492d0d4c270..1543e46bc6c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
@@ -18,6 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
@@ -48,6 +50,7 @@ public class DefaultEventHandler implements EventHandler {
private final DefaultBackgroundThread backgroundThread;
public DefaultEventHandler(final ConsumerConfig config,
+ final GroupRebalanceConfig groupRebalanceConfig,
final LogContext logContext,
final SubscriptionState subscriptionState,
final ApiVersions apiVersions,
@@ -56,6 +59,7 @@ public class DefaultEventHandler implements EventHandler {
final Sensor fetcherThrottleTimeSensor) {
this(Time.SYSTEM,
config,
+ groupRebalanceConfig,
logContext,
new LinkedBlockingQueue<>(),
new LinkedBlockingQueue<>(),
@@ -68,6 +72,7 @@ public class DefaultEventHandler implements EventHandler {
public DefaultEventHandler(final Time time,
final ConsumerConfig config,
+ final GroupRebalanceConfig groupRebalanceConfig,
final LogContext logContext,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
@@ -94,7 +99,7 @@ public class DefaultEventHandler implements EventHandler {
channelBuilder,
logContext
);
- final NetworkClient netClient = new NetworkClient(
+ final NetworkClient networkClient = new NetworkClient(
selector,
metadata,
config.getString(ConsumerConfig.CLIENT_ID_CONFIG),
@@ -113,49 +118,37 @@ public class DefaultEventHandler implements EventHandler {
fetcherThrottleTimeSensor,
logContext
);
- final ConsumerNetworkClient networkClient = new ConsumerNetworkClient(
- logContext,
- netClient,
- metadata,
- time,
- config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
- config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
- config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG));
this.backgroundThread = new DefaultBackgroundThread(
time,
config,
+ groupRebalanceConfig,
logContext,
this.applicationEventQueue,
this.backgroundEventQueue,
- subscriptionState,
metadata,
- networkClient,
- new Metrics(time)
- );
+ networkClient);
}
// VisibleForTesting
DefaultEventHandler(final Time time,
final ConsumerConfig config,
+ final GroupRebalanceConfig groupRebalanceConfig,
final LogContext logContext,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
- final SubscriptionState subscriptionState,
final ConsumerMetadata metadata,
- final ConsumerNetworkClient networkClient) {
+ final KafkaClient networkClient) {
this.applicationEventQueue = applicationEventQueue;
this.backgroundEventQueue = backgroundEventQueue;
this.backgroundThread = new DefaultBackgroundThread(
time,
config,
+ groupRebalanceConfig,
logContext,
this.applicationEventQueue,
this.backgroundEventQueue,
- subscriptionState,
metadata,
- networkClient,
- new Metrics(time)
- );
+ networkClient);
backgroundThread.start();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ErrorEventHandler.java
similarity index 59%
copy from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
copy to clients/src/main/java/org/apache/kafka/clients/consumer/internals/ErrorEventHandler.java
index 89eac1048d9..99eba4e91b6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ErrorEventHandler.java
@@ -14,18 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.clients.consumer.internals.events;
+package org.apache.kafka.clients.consumer.internals;
-/**
- * This is the abstract definition of the events created by the background thread.
- */
-abstract public class BackgroundEvent {
- public final EventType type;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+
+import java.util.Queue;
- public BackgroundEvent(EventType type) {
- this.type = type;
+public class ErrorEventHandler {
+ private final Queue<BackgroundEvent> backgroundEventQueue;
+
+ public ErrorEventHandler(Queue<BackgroundEvent> backgroundEventQueue) {
+ this.backgroundEventQueue = backgroundEventQueue;
}
- public enum EventType {
- NOOP,
+
+ public void handle(Exception e) {
+ backgroundEventQueue.add(new ErrorBackgroundEvent(e));
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
new file mode 100644
index 00000000000..cc36a4e638d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle network poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+ private final KafkaClient client;
+ private final Time time;
+ private final Logger log;
+ private final int requestTimeoutMs;
+ private final Queue<UnsentRequest> unsentRequests;
+ private final long retryBackoffMs;
+
+ public NetworkClientDelegate(
+ final Time time,
+ final ConsumerConfig config,
+ final LogContext logContext,
+ final KafkaClient client) {
+ this.time = time;
+ this.client = client;
+ this.log = logContext.logger(getClass());
+ this.unsentRequests = new ArrayDeque<>();
+ this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+ this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+ }
+
+ /**
+ * Returns the responses of the sent requests. This method will try to send the unsent requests, poll for responses,
+ * and check the disconnected nodes.
+ *
+ * @param timeoutMs timeout time
+ * @param currentTimeMs current time
+ * @return a list of client response
+ */
+ public List<ClientResponse> poll(final long timeoutMs, final long currentTimeMs) {
+ trySend(currentTimeMs);
+
+ long pollTimeoutMs = timeoutMs;
+ if (!unsentRequests.isEmpty()) {
+ pollTimeoutMs = Math.min(retryBackoffMs, pollTimeoutMs);
+ }
+ List<ClientResponse> res = this.client.poll(pollTimeoutMs, currentTimeMs);
+ checkDisconnects();
+ wakeup();
+ return res;
+ }
+
+ /**
+ * Tries to send the requests in the unsentRequest queue. If the request doesn't have an assigned node, it will
+ * find the leastLoadedOne, and will be retried in the next {@code poll()}. If the request is expired, a
+ * {@link TimeoutException} will be thrown.
+ */
+ private void trySend(final long currentTimeMs) {
+ Iterator<UnsentRequest> iterator = unsentRequests.iterator();
+ while (iterator.hasNext()) {
+ UnsentRequest unsent = iterator.next();
+ unsent.timer.update(currentTimeMs);
+ if (unsent.timer.isExpired()) {
+ iterator.remove();
+ unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException(
+ "Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms.")));
+ continue;
+ }
+
+ if (!doSend(unsent, currentTimeMs)) {
+ // continue to retry until timeout.
+ continue;
+ }
+ iterator.remove();
+ }
+ }
+
+ private boolean doSend(final UnsentRequest r,
+ final long currentTimeMs) {
+ Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs));
+ if (node == null || nodeUnavailable(node)) {
+ log.debug("No broker available to send the request: {}. Retrying.", r);
+ return false;
+ }
+ ClientRequest request = makeClientRequest(r, node, currentTimeMs);
+ if (!client.isReady(node, currentTimeMs)) {
+ // enqueue the request again if the node isn't ready yet. The request will be handled in the next iteration
+ // of the event loop
+ log.debug("Node is not ready, handle the request in the next event loop: node={}, request={}", node, r);
+ return false;
+ }
+ client.send(request, currentTimeMs);
+ return true;
+ }
+
+ private void checkDisconnects() {
+ // Check the connection of the unsent request. Disconnect the disconnected node if it is unable to be connected.
+ Iterator<UnsentRequest> iter = unsentRequests.iterator();
+ while (iter.hasNext()) {
+ UnsentRequest u = iter.next();
+ if (u.node.isPresent() && client.connectionFailed(u.node.get())) {
+ iter.remove();
+ AuthenticationException authenticationException = client.authenticationException(u.node.get());
+ u.callback.ifPresent(r -> r.onFailure(authenticationException));
+ }
+ }
+ }
+
+ private ClientRequest makeClientRequest(final UnsentRequest unsent, final Node node, final long currentTimeMs) {
+ return client.newClientRequest(
+ node.idString(),
+ unsent.abstractBuilder,
+ currentTimeMs,
+ true,
+ (int) unsent.timer.remainingMs(),
+ unsent.callback.orElse(new DefaultRequestFutureCompletionHandler()));
+ }
+
+ public Node leastLoadedNode() {
+ return this.client.leastLoadedNode(time.milliseconds());
+ }
+
+ public void add(final UnsentRequest r) {
+ r.setTimer(this.time, this.requestTimeoutMs);
+ unsentRequests.add(r);
+ }
+
+ public void wakeup() {
+ client.wakeup();
+ }
+
+ /**
+ * Check if the code is disconnected and unavailable for immediate reconnection (i.e. if it is in reconnect
+ * backoff window following the disconnect).
+ */
+ public boolean nodeUnavailable(final Node node) {
+ return client.connectionFailed(node) && client.connectionDelay(node, time.milliseconds()) > 0;
+ }
+
+ public void close() throws IOException {
+ this.client.close();
+ }
+
+ public void addAll(final List<UnsentRequest> requests) {
+ this.unsentRequests.addAll(requests);
+ }
+
+ public static class PollResult {
+ public final long timeUntilNextPollMs;
+ public final List<UnsentRequest> unsentRequests;
+
+ public PollResult(final long timeMsTillNextPoll, final List<UnsentRequest> unsentRequests) {
+ this.timeUntilNextPollMs = timeMsTillNextPoll;
+ this.unsentRequests = Collections.unmodifiableList(unsentRequests);
+ }
+ }
+
+ public static class UnsentRequest {
+ private final AbstractRequest.Builder abstractBuilder;
+ private final Optional<AbstractRequestFutureCompletionHandler> callback;
+ private Optional<Node> node; // empty if random node can be choosen
+ private Timer timer;
+
+ public UnsentRequest(final AbstractRequest.Builder abstractBuilder,
+ final AbstractRequestFutureCompletionHandler callback) {
+ this(abstractBuilder, callback, null);
+ }
+
+ public UnsentRequest(final AbstractRequest.Builder abstractBuilder,
+ final AbstractRequestFutureCompletionHandler callback,
+ final Node node) {
+ Objects.requireNonNull(abstractBuilder);
+ this.abstractBuilder = abstractBuilder;
+ this.node = Optional.ofNullable(node);
+ this.callback = Optional.ofNullable(callback);
+ }
+
+ public void setTimer(final Time time, final long requestTimeoutMs) {
+ this.timer = time.timer(requestTimeoutMs);
+ }
+
+ public static UnsentRequest makeUnsentRequest(
+ final AbstractRequest.Builder<?> requestBuilder,
+ final AbstractRequestFutureCompletionHandler callback,
+ final Node node) {
+ return new UnsentRequest(requestBuilder, callback, node);
+ }
+
+ @Override
+ public String toString() {
+ return abstractBuilder.toString();
+ }
+ }
+
+ public static class DefaultRequestFutureCompletionHandler extends AbstractRequestFutureCompletionHandler {
+ @Override
+ public void handleResponse(ClientResponse r, Exception t) {}
+ }
+
+ public abstract static class AbstractRequestFutureCompletionHandler implements RequestCompletionHandler {
+ private final RequestFuture<ClientResponse> future;
+
+ AbstractRequestFutureCompletionHandler() {
+ this.future = new RequestFuture<>();
+ }
+
+ abstract public void handleResponse(ClientResponse r, Exception e);
+
+ public void onFailure(final RuntimeException e) {
+ future.raise(e);
+ handleResponse(null, e);
+ }
+
+ @Override
+ public void onComplete(final ClientResponse response) {
+ fireCompletion(response);
+ handleResponse(response, null);
+ }
+
+ private void fireCompletion(final ClientResponse response) {
+ if (response.authenticationException() != null) {
+ future.raise(response.authenticationException());
+ } else if (response.wasDisconnected()) {
+ future.raise(DisconnectException.INSTANCE);
+ } else if (response.versionMismatch() != null) {
+ future.raise(response.versionMismatch());
+ } else {
+ future.complete(response);
+ }
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index e1a2ff035aa..9c66c8a76ea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -113,6 +113,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
valueDeserializer, metrics.reporters(), interceptorList);
this.eventHandler = new DefaultEventHandler(
config,
+ groupRebalanceConfig,
logContext,
subscriptions,
new ApiVersions(),
@@ -478,11 +479,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
public CommitApplicationEvent() {
- }
-
- @Override
- public boolean process() {
- return true;
+ super(Type.COMMIT);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java
similarity index 69%
copy from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
copy to clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java
index 89eac1048d9..13c7c4d9d16 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java
@@ -14,18 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.clients.consumer.internals.events;
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
/**
- * This is the abstract definition of the events created by the background thread.
+ * {@code PollResult} consist of {@code UnsentRequest} if there are requests to send; otherwise, return the time till
+ * the next poll event.
*/
-abstract public class BackgroundEvent {
- public final EventType type;
-
- public BackgroundEvent(EventType type) {
- this.type = type;
- }
- public enum EventType {
- NOOP,
- }
+public interface RequestManager {
+ PollResult poll(long currentTimeMs);
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java
new file mode 100644
index 00000000000..7ee7db64ca7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+
+class RequestState {
+ final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+ final static double RECONNECT_BACKOFF_JITTER = 0.2;
+ private final ExponentialBackoff exponentialBackoff;
+ private long lastSentMs = -1;
+ private long lastReceivedMs = -1;
+ private int numAttempts = 0;
+ private long backoffMs = 0;
+
+ public RequestState(ConsumerConfig config) {
+ this.exponentialBackoff = new ExponentialBackoff(
+ config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+ RECONNECT_BACKOFF_EXP_BASE,
+ config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+ RECONNECT_BACKOFF_JITTER);
+ }
+
+ // Visible for testing
+ RequestState(final int reconnectBackoffMs,
+ final int reconnectBackoffExpBase,
+ final int reconnectBackoffMaxMs,
+ final int jitter) {
+ this.exponentialBackoff = new ExponentialBackoff(
+ reconnectBackoffMs,
+ reconnectBackoffExpBase,
+ reconnectBackoffMaxMs,
+ jitter);
+ }
+
+ public void reset() {
+ this.lastSentMs = -1;
+ this.lastReceivedMs = -1;
+ this.numAttempts = 0;
+ this.backoffMs = exponentialBackoff.backoff(0);
+ }
+
+ public boolean canSendRequest(final long currentTimeMs) {
+ if (this.lastSentMs == -1) {
+ // no request has been sent
+ return true;
+ }
+
+ if (this.lastReceivedMs == -1 ||
+ this.lastReceivedMs < this.lastSentMs) {
+ // there is an inflight request
+ return false;
+ }
+
+ return requestBackoffExpired(currentTimeMs);
+ }
+
+ public void updateLastSend(final long currentTimeMs) {
+ // Here we update the timer everytime we try to send a request. Also increment number of attempts.
+ this.lastSentMs = currentTimeMs;
+ }
+
+ public void updateLastFailedAttempt(final long currentTimeMs) {
+ this.lastReceivedMs = currentTimeMs;
+ this.backoffMs = exponentialBackoff.backoff(numAttempts);
+ this.numAttempts++;
+ }
+
+ private boolean requestBackoffExpired(final long currentTimeMs) {
+ return remainingBackoffMs(currentTimeMs) <= 0;
+ }
+
+ long remainingBackoffMs(final long currentTimeMs) {
+ return Math.max(0, this.backoffMs - (currentTimeMs - this.lastReceivedMs));
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index 683681a19fb..2218441485c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -20,10 +20,22 @@ package org.apache.kafka.clients.consumer.internals.events;
* This is the abstract definition of the events created by the KafkaConsumer API
*/
abstract public class ApplicationEvent {
+ public final Type type;
+
+ protected ApplicationEvent(Type type) {
+ this.type = type;
+ }
/**
* process the application event. Return true upon succesful execution,
* false otherwise.
* @return true if the event was successfully executed; false otherwise.
*/
- public abstract boolean process();
+
+ @Override
+ public String toString() {
+ return type + " ApplicationEvent";
+ }
+ public enum Type {
+ NOOP, COMMIT,
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
similarity index 62%
copy from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java
copy to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 0fe9dccb103..e031fb9f7bb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -18,29 +18,32 @@ package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.NoopBackgroundEvent;
+import java.util.Objects;
import java.util.concurrent.BlockingQueue;
-/**
- * The event is NoOp. This is intentionally left here for demonstration purpose.
- */
-public class NoopApplicationEvent extends ApplicationEvent {
- public final String message;
+public class ApplicationEventProcessor {
private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
- public NoopApplicationEvent(final BlockingQueue<BackgroundEvent> backgroundEventQueue,
- final String message) {
-
- this.message = message;
+ public ApplicationEventProcessor(final BlockingQueue<BackgroundEvent> backgroundEventQueue) {
this.backgroundEventQueue = backgroundEventQueue;
}
-
- @Override
- public boolean process() {
- return backgroundEventQueue.add(new NoopBackgroundEvent(message));
+ public boolean process(final ApplicationEvent event) {
+ Objects.requireNonNull(event);
+ switch (event.type) {
+ case NOOP:
+ return process((NoopApplicationEvent) event);
+ }
+ return false;
}
- @Override
- public String toString() {
- return getClass() + "_" + this.message;
+ /**
+ * Processes {@link NoopApplicationEvent} and equeue a
+ * {@link NoopBackgroundEvent}. This is intentionally left here for
+ * demonstration purpose.
+ *
+ * @param event a {@link NoopApplicationEvent}
+ */
+ private boolean process(final NoopApplicationEvent event) {
+ return backgroundEventQueue.add(new NoopBackgroundEvent(event.message));
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index 89eac1048d9..722526f041a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -26,6 +26,6 @@ abstract public class BackgroundEvent {
this.type = type;
}
public enum EventType {
- NOOP,
+ NOOP, ERROR,
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
similarity index 75%
copy from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
copy to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
index 89eac1048d9..90502526be5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
@@ -16,16 +16,11 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-/**
- * This is the abstract definition of the events created by the background thread.
- */
-abstract public class BackgroundEvent {
- public final EventType type;
+public class ErrorBackgroundEvent extends BackgroundEvent {
+ private final Exception exception;
- public BackgroundEvent(EventType type) {
- this.type = type;
- }
- public enum EventType {
- NOOP,
+ public ErrorBackgroundEvent(Exception e) {
+ super(EventType.ERROR);
+ exception = e;
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java
index 0fe9dccb103..07524542d7d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java
@@ -16,27 +16,15 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-import org.apache.kafka.clients.consumer.internals.NoopBackgroundEvent;
-
-import java.util.concurrent.BlockingQueue;
-
/**
* The event is NoOp. This is intentionally left here for demonstration purpose.
*/
public class NoopApplicationEvent extends ApplicationEvent {
public final String message;
- private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
-
- public NoopApplicationEvent(final BlockingQueue<BackgroundEvent> backgroundEventQueue,
- final String message) {
+ public NoopApplicationEvent(final String message) {
+ super(Type.NOOP);
this.message = message;
- this.backgroundEventQueue = backgroundEventQueue;
- }
-
- @Override
- public boolean process() {
- return backgroundEventQueue.add(new NoopBackgroundEvent(message));
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index e96e8a0c0db..f01bcb33a06 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -29,6 +29,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
public class FindCoordinatorResponse extends AbstractResponse {
@@ -50,6 +52,22 @@ public class FindCoordinatorResponse extends AbstractResponse {
this.data = data;
}
+ public Optional<Coordinator> coordinatorByKey(String key) {
+ Objects.requireNonNull(key);
+ if (this.data.coordinators().isEmpty()) {
+ // version <= 3
+ return Optional.of(new Coordinator()
+ .setErrorCode(data.errorCode())
+ .setErrorMessage(data.errorMessage())
+ .setHost(data.host())
+ .setPort(data.port())
+ .setNodeId(data.nodeId())
+ .setKey(key));
+ }
+ // version >= 4
+ return data.coordinators().stream().filter(c -> c.key().equals(key)).findFirst();
+ }
+
@Override
public FindCoordinatorResponseData data() {
return data;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
new file mode 100644
index 00000000000..3f7e7956682
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CoordinatorRequestManagerTest {
+ private MockTime time;
+ private MockClient client;
+ private SubscriptionState subscriptions;
+ private ConsumerMetadata metadata;
+ private LogContext logContext;
+ private ErrorEventHandler errorEventHandler;
+ private Node node;
+ private final Properties properties = new Properties();
+ private String groupId;
+ private int requestTimeoutMs;
+ private RequestState coordinatorRequestState;
+
+ @BeforeEach
+ public void setup() {
+ this.logContext = new LogContext();
+ this.time = new MockTime(0);
+ this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST);
+ this.metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false,
+ false, subscriptions, logContext, new ClusterResourceListeners());
+ this.client = new MockClient(time, metadata);
+ this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)));
+ this.node = metadata.fetch().nodes().get(0);
+ this.errorEventHandler = mock(ErrorEventHandler.class);
+ properties.put(RETRY_BACKOFF_MS_CONFIG, "100");
+ this.groupId = "group-1";
+ this.requestTimeoutMs = 500;
+ this.coordinatorRequestState = mock(RequestState.class);
+ properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ properties.put(RETRY_BACKOFF_MS_CONFIG, 100);
+ }
+
+ @Test
+ public void testPoll() {
+ CoordinatorRequestManager coordinatorManager = setupCoordinatorManager();
+ when(coordinatorRequestState.canSendRequest(time.milliseconds())).thenReturn(true);
+ NetworkClientDelegate.PollResult res = coordinatorManager.poll(time.milliseconds());
+ assertEquals(1, res.unsentRequests.size());
+
+ when(coordinatorRequestState.canSendRequest(time.milliseconds())).thenReturn(false);
+ NetworkClientDelegate.PollResult res2 = coordinatorManager.poll(time.milliseconds());
+ assertTrue(res2.unsentRequests.isEmpty());
+ }
+
+ @Test
+ public void testOnResponse() {
+ CoordinatorRequestManager coordinatorManager = setupCoordinatorManager();
+ FindCoordinatorResponse resp = FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node);
+ coordinatorManager.onResponse(time.milliseconds(), resp, null);
+ verify(errorEventHandler, never()).handle(any());
+ assertNotNull(coordinatorManager.coordinator());
+
+ FindCoordinatorResponse retriableErrorResp =
+ FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE,
+ groupId, node);
+ coordinatorManager.onResponse(time.milliseconds(), retriableErrorResp, null);
+ verify(errorEventHandler, never()).handle(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ assertFalse(coordinatorManager.coordinator().isPresent());
+
+ coordinatorManager.onResponse(
+ time.milliseconds(), null,
+ new RuntimeException("some error"));
+ assertFalse(coordinatorManager.coordinator().isPresent());
+ }
+
+ @Test
+ public void testFindCoordinatorBackoff() {
+ this.coordinatorRequestState = new RequestState(
+ 100,
+ 2,
+ 1000,
+ 0);
+ CoordinatorRequestManager coordinatorManager = setupCoordinatorManager();
+
+ NetworkClientDelegate.PollResult res = coordinatorManager.poll(time.milliseconds());
+ assertEquals(1, res.unsentRequests.size());
+ coordinatorManager.onResponse(
+ time.milliseconds(), FindCoordinatorResponse.prepareResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, "key",
+ this.node), null);
+ // Need to wait for 100ms until the next send
+ res = coordinatorManager.poll(time.milliseconds());
+ assertTrue(res.unsentRequests.isEmpty());
+ this.time.sleep(50);
+ res = coordinatorManager.poll(time.milliseconds());
+ assertTrue(res.unsentRequests.isEmpty());
+ this.time.sleep(50);
+ // should be able to send after 100ms
+ res = coordinatorManager.poll(time.milliseconds());
+ assertEquals(1, res.unsentRequests.size());
+ coordinatorManager.onResponse(
+ time.milliseconds(), FindCoordinatorResponse.prepareResponse(Errors.NONE, "key",
+ this.node), null);
+ }
+
+ @Test
+ public void testPollWithExistingCoordinator() {
+ CoordinatorRequestManager coordinatorManager = setupCoordinatorManager();
+ FindCoordinatorResponse resp = FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node);
+ coordinatorManager.onResponse(time.milliseconds(), resp, null);
+ verify(errorEventHandler, never()).handle(any());
+ assertNotNull(coordinatorManager.coordinator());
+
+ NetworkClientDelegate.PollResult pollResult = coordinatorManager.poll(time.milliseconds());
+ assertEquals(Long.MAX_VALUE, pollResult.timeUntilNextPollMs);
+ assertTrue(pollResult.unsentRequests.isEmpty());
+ }
+
+ @Test
+ public void testRequestFutureCompletionHandler() {
+ NetworkClientDelegate.AbstractRequestFutureCompletionHandler h = new MockRequestFutureCompletionHandlerBase();
+ try {
+ h.onFailure(new RuntimeException());
+ } catch (Exception e) {
+ assertEquals("MockRequestFutureCompletionHandlerBase should throw an exception", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testNullGroupIdShouldThrow() {
+ this.groupId = null;
+ assertThrows(RuntimeException.class, this::setupCoordinatorManager);
+ }
+
+ private static class MockRequestFutureCompletionHandlerBase extends NetworkClientDelegate.AbstractRequestFutureCompletionHandler {
+ @Override
+ public void handleResponse(ClientResponse r, Exception t) {
+ throw new RuntimeException("MockRequestFutureCompletionHandlerBase should throw an exception");
+ }
+ }
+
+ @Test
+ public void testFindCoordinatorResponseVersions() {
+ // v4
+ FindCoordinatorResponse respNew = FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, this.node);
+ assertTrue(respNew.coordinatorByKey(groupId).isPresent());
+ assertEquals(groupId, respNew.coordinatorByKey(groupId).get().key());
+ assertEquals(this.node.id(), respNew.coordinatorByKey(groupId).get().nodeId());
+
+ // <= v3
+ FindCoordinatorResponse respOld = FindCoordinatorResponse.prepareOldResponse(Errors.NONE, this.node);
+ assertTrue(respOld.coordinatorByKey(groupId).isPresent());
+ assertEquals(this.node.id(), respNew.coordinatorByKey(groupId).get().nodeId());
+ }
+
+ private CoordinatorRequestManager setupCoordinatorManager() {
+ return new CoordinatorRequestManager(
+ this.logContext,
+ this.errorEventHandler,
+ this.groupId,
+ this.coordinatorRequestState);
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
index f159fad6bc5..0c2cd2b5aaa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
@@ -16,22 +16,23 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.mockito.InOrder;
import org.mockito.Mockito;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -39,133 +40,119 @@ import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class DefaultBackgroundThreadTest {
private static final long REFRESH_BACK_OFF_MS = 100;
private final Properties properties = new Properties();
private MockTime time;
- private SubscriptionState subscriptions;
private ConsumerMetadata metadata;
- private LogContext context;
- private ConsumerNetworkClient consumerClient;
- private Metrics metrics;
+ private NetworkClientDelegate networkClient;
private BlockingQueue<BackgroundEvent> backgroundEventsQueue;
private BlockingQueue<ApplicationEvent> applicationEventsQueue;
+ private ApplicationEventProcessor processor;
+ private CoordinatorRequestManager coordinatorManager;
+ private ErrorEventHandler errorEventHandler;
+ private int requestTimeoutMs = 500;
@BeforeEach
@SuppressWarnings("unchecked")
public void setup() {
- this.time = new MockTime();
- this.subscriptions = mock(SubscriptionState.class);
+ this.time = new MockTime(0);
this.metadata = mock(ConsumerMetadata.class);
- this.context = new LogContext();
- this.consumerClient = mock(ConsumerNetworkClient.class);
- this.metrics = mock(Metrics.class);
+ this.networkClient = mock(NetworkClientDelegate.class);
this.applicationEventsQueue = (BlockingQueue<ApplicationEvent>) mock(BlockingQueue.class);
this.backgroundEventsQueue = (BlockingQueue<BackgroundEvent>) mock(BlockingQueue.class);
- properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(RETRY_BACKOFF_MS_CONFIG, REFRESH_BACK_OFF_MS);
+ this.processor = mock(ApplicationEventProcessor.class);
+ this.coordinatorManager = mock(CoordinatorRequestManager.class);
+ this.errorEventHandler = mock(ErrorEventHandler.class);
}
@Test
- public void testStartupAndTearDown() throws InterruptedException {
- final MockClient client = new MockClient(time, metadata);
- this.consumerClient = new ConsumerNetworkClient(
- context,
- client,
- metadata,
- time,
- 100,
- 1000,
- 100
- );
- this.applicationEventsQueue = new LinkedBlockingQueue<>();
- final DefaultBackgroundThread backgroundThread = setupMockHandler();
+ public void testStartupAndTearDown() {
+ DefaultBackgroundThread backgroundThread = mockBackgroundThread();
backgroundThread.start();
- assertTrue(client.active());
+ assertTrue(backgroundThread.isRunning());
backgroundThread.close();
- assertFalse(client.active());
}
@Test
- public void testInterruption() throws InterruptedException {
- final MockClient client = new MockClient(time, metadata);
- this.consumerClient = new ConsumerNetworkClient(
- context,
- client,
- metadata,
- time,
- 100,
- 1000,
- 100
- );
+ public void testApplicationEvent() {
this.applicationEventsQueue = new LinkedBlockingQueue<>();
- final DefaultBackgroundThread backgroundThread = setupMockHandler();
- backgroundThread.start();
- assertTrue(client.active());
+ this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+ when(coordinatorManager.poll(anyLong())).thenReturn(mockPollResult());
+ DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+ ApplicationEvent e = new NoopApplicationEvent("noop event");
+ this.applicationEventsQueue.add(e);
+ backgroundThread.runOnce();
+ verify(processor, times(1)).process(e);
backgroundThread.close();
- assertFalse(client.active());
}
@Test
- void testWakeup() {
- this.time = new MockTime(0);
- final MockClient client = new MockClient(time, metadata);
- this.consumerClient = new ConsumerNetworkClient(
- context,
- client,
- metadata,
- time,
- 100,
- 1000,
- 100
- );
- when(applicationEventsQueue.isEmpty()).thenReturn(true);
- when(applicationEventsQueue.isEmpty()).thenReturn(true);
- final DefaultBackgroundThread runnable = setupMockHandler();
- client.poll(0, time.milliseconds());
- runnable.wakeup();
-
- assertThrows(WakeupException.class, runnable::runOnce);
- runnable.close();
+ void testFindCoordinator() {
+ DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+ when(this.coordinatorManager.poll(time.milliseconds())).thenReturn(mockPollResult());
+ backgroundThread.runOnce();
+ Mockito.verify(coordinatorManager, times(1)).poll(anyLong());
+ Mockito.verify(networkClient, times(1)).poll(anyLong(), anyLong());
+ backgroundThread.close();
}
@Test
- void testNetworkAndBlockingQueuePoll() {
- // ensure network poll and application queue poll will happen in a
- // single iteration
- this.time = new MockTime(100);
- final DefaultBackgroundThread runnable = setupMockHandler();
- runnable.runOnce();
+ void testPollResultTimer() {
+ DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+ // purposely setting a non MAX time to ensure it is returning Long.MAX_VALUE upon success
+ NetworkClientDelegate.PollResult success = new NetworkClientDelegate.PollResult(
+ 10,
+ Collections.singletonList(findCoordinatorUnsentRequest(time, requestTimeoutMs)));
+ assertEquals(10, backgroundThread.handlePollResult(success));
- when(applicationEventsQueue.isEmpty()).thenReturn(false);
- when(applicationEventsQueue.poll())
- .thenReturn(new NoopApplicationEvent(backgroundEventsQueue, "nothing"));
- final InOrder inOrder = Mockito.inOrder(applicationEventsQueue, this.consumerClient);
- assertFalse(inOrder.verify(applicationEventsQueue).isEmpty());
- inOrder.verify(applicationEventsQueue).poll();
- inOrder.verify(this.consumerClient).poll(any(Timer.class));
- runnable.close();
+ NetworkClientDelegate.PollResult failure = new NetworkClientDelegate.PollResult(
+ 10,
+ new ArrayList<>());
+ assertEquals(10, backgroundThread.handlePollResult(failure));
}
- private DefaultBackgroundThread setupMockHandler() {
+ private static NetworkClientDelegate.UnsentRequest findCoordinatorUnsentRequest(final Time time,
+ final long timeout) {
+ NetworkClientDelegate.UnsentRequest req = new NetworkClientDelegate.UnsentRequest(
+ new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
+ .setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id())
+ .setKey("foobar")),
+ null);
+ req.setTimer(time, timeout);
+ return req;
+ }
+
+ private DefaultBackgroundThread mockBackgroundThread() {
+ properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ properties.put(RETRY_BACKOFF_MS_CONFIG, REFRESH_BACK_OFF_MS);
+
return new DefaultBackgroundThread(
- this.time,
- new ConsumerConfig(properties),
- new LogContext(),
- applicationEventsQueue,
- backgroundEventsQueue,
- this.subscriptions,
- this.metadata,
- this.consumerClient,
- this.metrics
- );
+ this.time,
+ new ConsumerConfig(properties),
+ new LogContext(),
+ applicationEventsQueue,
+ backgroundEventsQueue,
+ this.errorEventHandler,
+ processor,
+ this.metadata,
+ this.networkClient,
+ this.coordinatorManager);
+ }
+
+ private NetworkClientDelegate.PollResult mockPollResult() {
+ return new NetworkClientDelegate.PollResult(
+ 0,
+ Collections.singletonList(findCoordinatorUnsentRequest(time, requestTimeoutMs)));
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandlerTest.java
index 0da35e83494..b39724d8497 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandlerTest.java
@@ -16,20 +16,13 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
import java.util.Optional;
import java.util.Properties;
@@ -39,78 +32,49 @@ import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
public class DefaultEventHandlerTest {
+ private int sessionTimeoutMs = 1000;
+ private int rebalanceTimeoutMs = 1000;
+ private int heartbeatIntervalMs = 1000;
+ private String groupId = "g-1";
+ private Optional<String> groupInstanceId = Optional.of("g-1");
+ private long retryBackoffMs = 1000;
private final Properties properties = new Properties();
+ private GroupRebalanceConfig rebalanceConfig;
@BeforeEach
public void setup() {
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(RETRY_BACKOFF_MS_CONFIG, "100");
+
+ this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+ rebalanceTimeoutMs,
+ heartbeatIntervalMs,
+ groupId,
+ groupInstanceId,
+ retryBackoffMs,
+ true);
}
@Test
- @Timeout(1)
- public void testBasicPollAndAddWithNoopEvent() {
- final Time time = new MockTime(1);
- final LogContext logContext = new LogContext();
- final SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
- final ConsumerMetadata metadata = newConsumerMetadata(false, subscriptions);
- final MockClient client = new MockClient(time, metadata);
- final ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(
- logContext,
- client,
- metadata,
- time,
- 100,
- 1000,
- 100
- );
+ public void testBasicHandlerOps() {
+ final DefaultBackgroundThread bt = mock(DefaultBackgroundThread.class);
final BlockingQueue<ApplicationEvent> aq = new LinkedBlockingQueue<>();
final BlockingQueue<BackgroundEvent> bq = new LinkedBlockingQueue<>();
- final DefaultEventHandler handler = new DefaultEventHandler(
- time,
- new ConsumerConfig(properties),
- logContext,
- aq,
- bq,
- subscriptions,
- metadata,
- consumerClient
- );
- assertTrue(client.active());
+ final DefaultEventHandler handler = new DefaultEventHandler(bt, aq, bq);
assertTrue(handler.isEmpty());
- handler.add(
- new NoopApplicationEvent(
- bq,
- "testBasicPollAndAddWithNoopEvent"
- )
- );
- while (handler.isEmpty()) {
- time.sleep(100);
- }
- final Optional<BackgroundEvent> poll = handler.poll();
- assertTrue(poll.isPresent());
- assertTrue(poll.get() instanceof NoopBackgroundEvent);
-
- assertFalse(client.hasInFlightRequests()); // noop does not send network request
- }
-
- private static ConsumerMetadata newConsumerMetadata(final boolean includeInternalTopics,
- final SubscriptionState subscriptions) {
- final long refreshBackoffMs = 50;
- final long expireMs = 50000;
- return new ConsumerMetadata(
- refreshBackoffMs,
- expireMs,
- includeInternalTopics,
- false,
- subscriptions,
- new LogContext(),
- new ClusterResourceListeners()
- );
+ assertFalse(handler.poll().isPresent());
+ handler.add(new NoopApplicationEvent("test"));
+ assertEquals(1, aq.size());
+ handler.close();
+ verify(bt, times(1)).close();
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
new file mode 100644
index 00000000000..8d5ab9d641f
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Objects;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class NetworkClientDelegateTest {
+ private MockTime time;
+ private Properties properties;
+ private LogContext logContext;
+ private KafkaClient client;
+ private String groupId;
+ private SubscriptionState subscription;
+ private ConsumerMetadata metadata;
+ private Node node;
+ private int requestTimeoutMs = 500;
+
+ @BeforeEach
+ public void setup() {
+ this.time = new MockTime(0);
+ this.properties = new Properties();
+ properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ properties.put(RETRY_BACKOFF_MS_CONFIG, 100);
+ properties.put(REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
+ this.logContext = new LogContext();
+ this.client = mock(NetworkClient.class);
+ this.node = mockNode();
+ this.groupId = "group-1";
+ }
+
+ @Test
+ public void testPoll() {
+ NetworkClientDelegate ncd = mockNetworkClientDelegate();
+
+ // Successful case
+ NetworkClientDelegate.DefaultRequestFutureCompletionHandler callback = mock(NetworkClientDelegate.DefaultRequestFutureCompletionHandler.class);
+ NetworkClientDelegate.UnsentRequest r = mockUnsentFindCoordinatorRequest(callback);
+ ncd.add(r);
+ when(this.client.leastLoadedNode(time.milliseconds())).thenReturn(this.node);
+ when(this.client.isReady(this.node, time.milliseconds())).thenReturn(true);
+ ncd.poll(100, time.milliseconds());
+ verify(client, times(1)).send(any(), eq(time.milliseconds()));
+ verify(callback, never()).onFailure(any());
+
+ // Timeout case
+ NetworkClientDelegate.DefaultRequestFutureCompletionHandler callback2 = mock(NetworkClientDelegate.DefaultRequestFutureCompletionHandler.class);
+ NetworkClientDelegate.UnsentRequest r2 = mockUnsentFindCoordinatorRequest(callback2);
+ ncd.add(r2);
+ time.sleep(501);
+ when(this.client.leastLoadedNode(time.milliseconds())).thenReturn(this.node);
+ ncd.poll(100, time.milliseconds());
+ verify(client, never()).send(any(), eq(time.milliseconds()));
+ verify(callback2, times(1)).onFailure(any());
+
+ // Node found but not ready: first loop (the request should get re-enqueued into the unsentRequests
+ NetworkClientDelegate.DefaultRequestFutureCompletionHandler callback3 = mock(NetworkClientDelegate.DefaultRequestFutureCompletionHandler.class);
+ NetworkClientDelegate.UnsentRequest r3 = mockUnsentFindCoordinatorRequest(callback3);
+ ncd.add(r3);
+ when(this.client.leastLoadedNode(time.milliseconds())).thenReturn(this.node);
+ when(this.client.isReady(this.node, time.milliseconds())).thenReturn(false);
+ ncd.poll(100, time.milliseconds());
+ verify(client, never()).send(any(), eq(time.milliseconds()));
+ verify(callback3, never()).onFailure(any());
+
+ // The request expires in 500ms.
+ time.sleep(499);
+ when(this.client.leastLoadedNode(time.milliseconds())).thenReturn(this.node);
+ when(this.client.isReady(this.node, time.milliseconds())).thenReturn(true);
+ ncd.poll(100, time.milliseconds());
+ verify(client, times(1)).send(any(), eq(time.milliseconds()));
+ verify(callback3, never()).onFailure(any());
+ }
+
+ @Test
+ public void testUnableToFindBrokerAndTimeout() {
+ NetworkClientDelegate ncd = mockNetworkClientDelegate();
+
+ NetworkClientDelegate.DefaultRequestFutureCompletionHandler callback = mock(NetworkClientDelegate.DefaultRequestFutureCompletionHandler.class);
+ NetworkClientDelegate.UnsentRequest r = mockUnsentFindCoordinatorRequest(callback);
+ ncd.add(r);
+ final long timeoutMs = 100;
+ long totalTimeoutMs = 0;
+ while (totalTimeoutMs <= this.requestTimeoutMs) {
+ when(this.client.leastLoadedNode(time.milliseconds())).thenReturn(null);
+ ncd.poll(timeoutMs, time.milliseconds());
+ totalTimeoutMs += timeoutMs;
+ this.time.sleep(timeoutMs);
+ }
+ verify(client, times(6)).poll(eq(timeoutMs), anyLong());
+ verify(callback, times(1)).onFailure(isA(TimeoutException.class));
+ }
+
+ @Test
+ public void testNodeUnready() {
+ NetworkClientDelegate ncd = mockNetworkClientDelegate();
+
+ NetworkClientDelegate.DefaultRequestFutureCompletionHandler callback = mock(NetworkClientDelegate.DefaultRequestFutureCompletionHandler.class);
+ NetworkClientDelegate.UnsentRequest r = mockUnsentFindCoordinatorRequest(callback);
+ ncd.add(r);
+ final long timeoutMs = 100;
+ long totalTimeoutMs = 0;
+ while (totalTimeoutMs <= this.requestTimeoutMs) {
+ when(this.client.leastLoadedNode(time.milliseconds())).thenReturn(this.node);
+ when(this.client.isReady(this.node, time.milliseconds())).thenReturn(false);
+ ncd.poll(timeoutMs, time.milliseconds());
+ totalTimeoutMs += timeoutMs;
+ this.time.sleep(timeoutMs);
+ }
+ verify(client, times(6)).poll(eq(timeoutMs), anyLong());
+ verify(callback, times(1)).onFailure(isA(TimeoutException.class));
+ }
+
+ public NetworkClientDelegate mockNetworkClientDelegate() {
+ return new NetworkClientDelegate(this.time, new ConsumerConfig(this.properties), this.logContext, this.client);
+ }
+
+ public NetworkClientDelegate.UnsentRequest mockUnsentFindCoordinatorRequest(NetworkClientDelegate.AbstractRequestFutureCompletionHandler callback) {
+ Objects.requireNonNull(groupId);
+ NetworkClientDelegate.UnsentRequest req = new NetworkClientDelegate.UnsentRequest(
+ new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
+ .setKey(this.groupId)
+ .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())),
+ callback);
+ req.setTimer(this.time, this.requestTimeoutMs);
+ return req;
+ }
+
+ private Node mockNode() {
+ return new Node(0, "localhost", 99);
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java
new file mode 100644
index 00000000000..4761a9f993c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RequestStateTest {
+ @Test
+ public void testRequestStateSimple() {
+ RequestState state = new RequestState(
+ 100,
+ 2,
+ 1000,
+ 0);
+
+ // ensure not permitting consecutive requests
+ assertTrue(state.canSendRequest(0));
+ state.updateLastSend(0);
+ assertFalse(state.canSendRequest(0));
+ state.updateLastFailedAttempt(35);
+ assertTrue(state.canSendRequest(135));
+ state.updateLastFailedAttempt(140);
+ assertFalse(state.canSendRequest(200));
+ // exponential backoff
+ assertTrue(state.canSendRequest(340));
+
+ // test reset
+ state.reset();
+ assertTrue(state.canSendRequest(200));
+ }
+}