You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/12/19 17:48:59 UTC

[kafka] branch trunk updated: KAFKA-14264; New logic to discover group coordinator (#12862)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4548c272ae3 KAFKA-14264; New logic to discover group coordinator (#12862)
4548c272ae3 is described below

commit 4548c272ae39a6630174ab428bc82fce7c98f7fd
Author: Philip Nee <pn...@confluent.io>
AuthorDate: Mon Dec 19 09:48:52 2022 -0800

    KAFKA-14264; New logic to discover group coordinator (#12862)
    
    [KAFKA-14264](https://issues.apache.org/jira/browse/KAFKA-14264)
    In this patch, we refactored the existing FindCoordinator mechanism. In particular, we first centralize all of the network operation (send, poll) in `NetworkClientDelegate`, then we introduced a RequestManager interface that is responsible to handle the timing of different kind of requests, based on the implementation.  In this path, we implemented a `CoordinatorRequestManager` which determines when to create an `UnsentRequest` upon polling the request manager.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../internals/CoordinatorRequestManager.java       | 222 +++++++++++++++++
 .../internals/DefaultBackgroundThread.java         | 189 ++++++++-------
 .../consumer/internals/DefaultEventHandler.java    |  31 +--
 ...BackgroundEvent.java => ErrorEventHandler.java} |  23 +-
 .../consumer/internals/NetworkClientDelegate.java  | 267 +++++++++++++++++++++
 .../consumer/internals/PrototypeAsyncConsumer.java |   7 +-
 .../BackgroundEvent.java => RequestManager.java}   |  18 +-
 .../clients/consumer/internals/RequestState.java   |  91 +++++++
 .../internals/events/ApplicationEvent.java         |  14 +-
 ...onEvent.java => ApplicationEventProcessor.java} |  35 +--
 .../consumer/internals/events/BackgroundEvent.java |   2 +-
 ...kgroundEvent.java => ErrorBackgroundEvent.java} |  15 +-
 .../internals/events/NoopApplicationEvent.java     |  16 +-
 .../common/requests/FindCoordinatorResponse.java   |  18 ++
 .../internals/CoordinatorRequestManagerTest.java   | 202 ++++++++++++++++
 .../internals/DefaultBackgroundThreadTest.java     | 177 +++++++-------
 .../internals/DefaultEventHandlerTest.java         |  92 +++----
 .../internals/NetworkClientDelegateTest.java       | 175 ++++++++++++++
 .../consumer/internals/RequestStateTest.java       |  48 ++++
 19 files changed, 1306 insertions(+), 336 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
new file mode 100644
index 00000000000..bc40b67901f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This is responsible for timing to send the next {@link FindCoordinatorRequest} based on the following criteria:
+ *
+ * Whether there is an existing coordinator.
+ * Whether there is an inflight request.
+ * Whether the backoff timer has expired.
+ * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer
+ * or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The {@link FindCoordinatorRequest} will be handled by the {@link FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be
+ * marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+    private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000;
+    private final Logger log;
+    private final ErrorEventHandler nonRetriableErrorHandler;
+    private final String groupId;
+
+    private final RequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while
+    private long totalDisconnectedMin = 0;
+    private Node coordinator;
+
+    public CoordinatorRequestManager(final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.nonRetriableErrorHandler = errorHandler;
+        this.groupId = groupId;
+        this.coordinatorRequestState = new RequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final RequestState coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.nonRetriableErrorHandler = errorHandler;
+        this.groupId = groupId;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    /**
+     * Poll for the FindCoordinator request.
+     * If we don't need to discover a coordinator, this method will return a PollResult with Long.MAX_VALUE backoff time and an empty list.
+     * If we are still backing off from a previous attempt, this method will return a PollResult with the remaining backoff time and an empty list.
+     * Otherwise, this returns will return a PollResult with a singleton list of UnsentRequest and Long.MAX_VALUE backoff time.
+     * Note that this method does not involve any actual network IO, and it only determines if we need to send a new request or not.
+     *
+     * @param currentTimeMs current time in ms.
+     * @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. This will not be {@code null}.
+     */
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                Collections.emptyList());
+    }
+
+    private NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler(),
+                null);
+    }
+
+    /**
+     * Mark the current coordinator null.
+     *
+     * @param cause         why the coordinator is marked unknown.
+     * @param currentTimeMs the current time in ms.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to cause: {}. "
+                    + "Rediscovery will be attempted.", this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+            totalDisconnectedMin = 0;
+        } else {
+            long durationOfOngoingDisconnectMs = Math.max(0, currentTimeMs - timeMarkedUnknownMs);
+            long currDisconnectMin = durationOfOngoingDisconnectMs / COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS;
+            if (currDisconnectMin > this.totalDisconnectedMin) {
+                log.debug("Consumer has been disconnected from the group coordinator for {}ms", durationOfOngoingDisconnectMs);
+                totalDisconnectedMin = currDisconnectMin;
+            }
+        }
+    }
+
+    private void onSuccessfulResponse(final FindCoordinatorResponseData.Coordinator coordinator) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+    }
+
+    private void onFailedCoordinatorResponse(final Exception exception, final long currentTimeMs) {
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+        markCoordinatorUnknown("FindCoordinator failed with exception", currentTimeMs);
+
+        if (exception instanceof RetriableException) {
+            log.debug("FindCoordinator request failed due to retriable exception", exception);
+            return;
+        }
+
+        if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
+            log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage());
+            nonRetriableErrorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId));
+            return;
+        }
+
+        log.warn("FindCoordinator request failed due to fatal exception", exception);
+        nonRetriableErrorHandler.handle(exception);
+    }
+
+    /**
+     * Handles the response and exception upon completing the {@link FindCoordinatorRequest}. This is invoked in the callback
+     * {@link FindCoordinatorRequestHandler}. If the response was successful, a coordinator node will be updated. If the
+     * response failed due to errors, the current coordinator will be marked unknown.
+     *
+     * @param currentTimeMs current time ins ms.
+     * @param response      the response for finding the coordinator. null if an exception is thrown.
+     * @param e             the exception, null if a valid response is received.
+     */
+    protected void onResponse(final long currentTimeMs, final FindCoordinatorResponse response, final Exception e) {
+        // handles Runtime exception
+        if (e != null) {
+            onFailedCoordinatorResponse(e, currentTimeMs);
+            return;
+        }
+
+        Optional<FindCoordinatorResponseData.Coordinator> coordinator = response.coordinatorByKey(this.groupId);
+        if (!coordinator.isPresent()) {
+            String msg = String.format("Response did not contain expected coordinator section for groupId: %s", this.groupId);
+            onFailedCoordinatorResponse(new IllegalStateException(msg), currentTimeMs);
+            return;
+        }
+
+        FindCoordinatorResponseData.Coordinator node = coordinator.get();
+        if (node.errorCode() != Errors.NONE.code()) {
+            onFailedCoordinatorResponse(Errors.forCode(node.errorCode()).exception(), currentTimeMs);
+            return;
+        }
+        onSuccessfulResponse(node);
+    }
+
+    /**
+     * Returns the current coordinator node.
+     *
+     * @return the current coordinator node.
+     */
+    public Optional<Node> coordinator() {
+        return Optional.ofNullable(this.coordinator);
+    }
+
+    private class FindCoordinatorRequestHandler extends NetworkClientDelegate.AbstractRequestFutureCompletionHandler {
+        @Override
+        public void handleResponse(final ClientResponse r, final Exception e) {
+            CoordinatorRequestManager.this.onResponse(
+                    r.receivedTimeMs(), (FindCoordinatorResponse) r.responseBody(),
+                    e);
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
index 6788bd09bd2..3ceb5e15b87 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
@@ -16,24 +16,26 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
-import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Background thread runnable that consumes {@code ApplicationEvent} and
@@ -44,63 +46,86 @@ import java.util.concurrent.atomic.AtomicReference;
  * initialized by the polling thread.
  */
 public class DefaultBackgroundThread extends KafkaThread {
+    private static final int MAX_POLL_TIMEOUT_MS = 5000;
     private static final String BACKGROUND_THREAD_NAME =
-        "consumer_background_thread";
+            "consumer_background_thread";
     private final Time time;
     private final Logger log;
     private final BlockingQueue<ApplicationEvent> applicationEventQueue;
     private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
-    private final ConsumerNetworkClient networkClient;
-    private final SubscriptionState subscriptions;
     private final ConsumerMetadata metadata;
-    private final Metrics metrics;
     private final ConsumerConfig config;
+    // empty if groupId is null
+    private final Optional<CoordinatorRequestManager> coordinatorManager;
+    private final ApplicationEventProcessor applicationEventProcessor;
+    private final NetworkClientDelegate networkClientDelegate;
+    private final ErrorEventHandler errorEventHandler;
 
-    private String clientId;
-    private long retryBackoffMs;
-    private int heartbeatIntervalMs;
     private boolean running;
-    private Optional<ApplicationEvent> inflightEvent = Optional.empty();
-    private final AtomicReference<Optional<RuntimeException>> exception =
-        new AtomicReference<>(Optional.empty());
 
+    // Visible for testing
+    DefaultBackgroundThread(final Time time,
+                            final ConsumerConfig config,
+                            final LogContext logContext,
+                            final BlockingQueue<ApplicationEvent> applicationEventQueue,
+                            final BlockingQueue<BackgroundEvent> backgroundEventQueue,
+                            final ErrorEventHandler errorEventHandler,
+                            final ApplicationEventProcessor processor,
+                            final ConsumerMetadata metadata,
+                            final NetworkClientDelegate networkClient,
+                            final CoordinatorRequestManager coordinatorManager) {
+        super(BACKGROUND_THREAD_NAME, true);
+        this.time = time;
+        this.running = true;
+        this.log = logContext.logger(getClass());
+        this.applicationEventQueue = applicationEventQueue;
+        this.backgroundEventQueue = backgroundEventQueue;
+        this.applicationEventProcessor = processor;
+        this.config = config;
+        this.metadata = metadata;
+        this.networkClientDelegate = networkClient;
+        this.coordinatorManager = Optional.ofNullable(coordinatorManager);
+        this.errorEventHandler = errorEventHandler;
+    }
     public DefaultBackgroundThread(final Time time,
                                    final ConsumerConfig config,
+                                   final GroupRebalanceConfig rebalanceConfig,
                                    final LogContext logContext,
                                    final BlockingQueue<ApplicationEvent> applicationEventQueue,
                                    final BlockingQueue<BackgroundEvent> backgroundEventQueue,
-                                   final SubscriptionState subscriptions,
                                    final ConsumerMetadata metadata,
-                                   final ConsumerNetworkClient networkClient,
-                                   final Metrics metrics) {
+                                   final KafkaClient networkClient) {
         super(BACKGROUND_THREAD_NAME, true);
         try {
             this.time = time;
-            this.log = logContext.logger(DefaultBackgroundThread.class);
+            this.log = logContext.logger(getClass());
             this.applicationEventQueue = applicationEventQueue;
             this.backgroundEventQueue = backgroundEventQueue;
             this.config = config;
-            setConfig();
-            this.inflightEvent = Optional.empty();
             // subscriptionState is initialized by the polling thread
-            this.subscriptions = subscriptions;
             this.metadata = metadata;
-            this.networkClient = networkClient;
-            this.metrics = metrics;
+            this.networkClientDelegate = new NetworkClientDelegate(
+                    this.time,
+                    this.config,
+                    logContext,
+                    networkClient);
             this.running = true;
+            this.errorEventHandler = new ErrorEventHandler(this.backgroundEventQueue);
+            String groupId = rebalanceConfig.groupId;
+            this.coordinatorManager = groupId == null ?
+                    Optional.empty() :
+                    Optional.of(new CoordinatorRequestManager(
+                            logContext,
+                            config,
+                            errorEventHandler,
+                            groupId));
+            this.applicationEventProcessor = new ApplicationEventProcessor(backgroundEventQueue);
         } catch (final Exception e) {
-            // now propagate the exception
             close();
-            throw new KafkaException("Failed to construct background processor", e);
+            throw new KafkaException("Failed to construct background processor", e.getCause());
         }
     }
 
-    private void setConfig() {
-        this.retryBackoffMs = this.config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
-        this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
-        this.heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
-    }
-
     @Override
     public void run() {
         try {
@@ -109,23 +134,13 @@ public class DefaultBackgroundThread extends KafkaThread {
                 try {
                     runOnce();
                 } catch (final WakeupException e) {
-                    log.debug(
-                        "Exception thrown, background thread won't terminate",
-                        e
-                    );
-                    // swallow the wakeup exception to prevent killing the
-                    // background thread.
+                    log.debug("WakeupException caught, background thread won't be interrupted");
+                    // swallow the wakeup exception to prevent killing the background thread.
                 }
             }
         } catch (final Throwable t) {
-            log.error(
-                "The background thread failed due to unexpected error",
-                t
-            );
-            if (t instanceof RuntimeException)
-                this.exception.set(Optional.of((RuntimeException) t));
-            else
-                this.exception.set(Optional.of(new RuntimeException(t)));
+            log.error("The background thread failed due to unexpected error", t);
+            throw new RuntimeException(t);
         } finally {
             close();
             log.debug("{} closed", getClass());
@@ -133,63 +148,57 @@ public class DefaultBackgroundThread extends KafkaThread {
     }
 
     /**
-     * Process event from a single poll
+     * Poll and process an {@link ApplicationEvent}. It performs the following tasks:
+     * 1. Drains and try to process all of the requests in the queue.
+     * 2. Poll request managers to queue up the necessary requests.
+     * 3. Poll the networkClient to send and retrieve the response.
      */
     void runOnce() {
-        this.inflightEvent = maybePollEvent();
-        if (this.inflightEvent.isPresent()) {
-            log.debug("processing application event: {}", this.inflightEvent);
-        }
-        if (this.inflightEvent.isPresent() && maybeConsumeInflightEvent(this.inflightEvent.get())) {
-            // clear inflight event upon successful consumption
-            this.inflightEvent = Optional.empty();
+        drainAndProcess();
+
+        final long currentTimeMs = time.milliseconds();
+        long pollWaitTimeMs = MAX_POLL_TIMEOUT_MS;
+
+        if (coordinatorManager.isPresent()) {
+            pollWaitTimeMs = Math.min(
+                    pollWaitTimeMs,
+                    handlePollResult(coordinatorManager.get().poll(currentTimeMs)));
         }
 
-        // if there are pending events to process, poll then continue without
-        // blocking.
-        if (!applicationEventQueue.isEmpty() || inflightEvent.isPresent()) {
-            networkClient.poll(time.timer(0));
-            return;
+        networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
+    }
+
+    private void drainAndProcess() {
+        Queue<ApplicationEvent> events = pollApplicationEvent();
+        Iterator<ApplicationEvent> iter = events.iterator();
+        while (iter.hasNext()) {
+            ApplicationEvent event = iter.next();
+            log.debug("processing application event: {}", event);
+            consumeApplicationEvent(event);
         }
-        // if there are no events to process, poll until timeout. The timeout
-        // will be the minimum of the requestTimeoutMs, nextHeartBeatMs, and
-        // nextMetadataUpdate. See NetworkClient.poll impl.
-        networkClient.poll(time.timer(timeToNextHeartbeatMs(time.milliseconds())));
     }
 
-    private long timeToNextHeartbeatMs(final long nowMs) {
-        // TODO: implemented when heartbeat is added to the impl
-        return 100;
+    long handlePollResult(NetworkClientDelegate.PollResult res) {
+        if (!res.unsentRequests.isEmpty()) {
+            networkClientDelegate.addAll(res.unsentRequests);
+        }
+        return res.timeUntilNextPollMs;
     }
 
-    private Optional<ApplicationEvent> maybePollEvent() {
-        if (this.inflightEvent.isPresent() || this.applicationEventQueue.isEmpty()) {
-            return this.inflightEvent;
+    private Queue<ApplicationEvent> pollApplicationEvent() {
+        if (this.applicationEventQueue.isEmpty()) {
+            return new LinkedList<>();
         }
-        return Optional.ofNullable(this.applicationEventQueue.poll());
+
+        LinkedList<ApplicationEvent> res = new LinkedList<>();
+        this.applicationEventQueue.drainTo(res);
+        return res;
     }
 
-    /**
-     * ApplicationEvent are consumed here.
-     *
-     * @param event an {@link ApplicationEvent}
-     * @return true when successfully consumed the event.
-     */
-    private boolean maybeConsumeInflightEvent(final ApplicationEvent event) {
+    private void consumeApplicationEvent(final ApplicationEvent event) {
         log.debug("try consuming event: {}", Optional.ofNullable(event));
         Objects.requireNonNull(event);
-        return event.process();
-    }
-
-    /**
-     * Processes {@link NoopApplicationEvent} and equeue a
-     * {@link NoopBackgroundEvent}. This is intentionally left here for
-     * demonstration purpose.
-     *
-     * @param event a {@link NoopApplicationEvent}
-     */
-    private void process(final NoopApplicationEvent event) {
-        backgroundEventQueue.add(new NoopBackgroundEvent(event.message));
+        applicationEventProcessor.process(event);
     }
 
     public boolean isRunning() {
@@ -197,13 +206,13 @@ public class DefaultBackgroundThread extends KafkaThread {
     }
 
     public void wakeup() {
-        networkClient.wakeup();
+        networkClientDelegate.wakeup();
     }
 
     public void close() {
         this.running = false;
         this.wakeup();
-        Utils.closeQuietly(networkClient, "consumer network client");
+        Utils.closeQuietly(networkClientDelegate, "network client utils");
         Utils.closeQuietly(metadata, "consumer metadata client");
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
index 492d0d4c270..1543e46bc6c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
@@ -18,6 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
@@ -48,6 +50,7 @@ public class DefaultEventHandler implements EventHandler {
     private final DefaultBackgroundThread backgroundThread;
 
     public DefaultEventHandler(final ConsumerConfig config,
+                               final GroupRebalanceConfig groupRebalanceConfig,
                                final LogContext logContext,
                                final SubscriptionState subscriptionState,
                                final ApiVersions apiVersions,
@@ -56,6 +59,7 @@ public class DefaultEventHandler implements EventHandler {
                                final Sensor fetcherThrottleTimeSensor) {
         this(Time.SYSTEM,
                 config,
+                groupRebalanceConfig,
                 logContext,
                 new LinkedBlockingQueue<>(),
                 new LinkedBlockingQueue<>(),
@@ -68,6 +72,7 @@ public class DefaultEventHandler implements EventHandler {
 
     public DefaultEventHandler(final Time time,
                                final ConsumerConfig config,
+                               final GroupRebalanceConfig groupRebalanceConfig,
                                final LogContext logContext,
                                final BlockingQueue<ApplicationEvent> applicationEventQueue,
                                final BlockingQueue<BackgroundEvent> backgroundEventQueue,
@@ -94,7 +99,7 @@ public class DefaultEventHandler implements EventHandler {
             channelBuilder,
             logContext
         );
-        final NetworkClient netClient = new NetworkClient(
+        final NetworkClient networkClient = new NetworkClient(
             selector,
             metadata,
             config.getString(ConsumerConfig.CLIENT_ID_CONFIG),
@@ -113,49 +118,37 @@ public class DefaultEventHandler implements EventHandler {
             fetcherThrottleTimeSensor,
             logContext
         );
-        final ConsumerNetworkClient networkClient = new ConsumerNetworkClient(
-                logContext,
-                netClient,
-                metadata,
-                time,
-                config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
-                config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
-                config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG));
         this.backgroundThread = new DefaultBackgroundThread(
             time,
             config,
+            groupRebalanceConfig,
             logContext,
             this.applicationEventQueue,
             this.backgroundEventQueue,
-            subscriptionState,
             metadata,
-            networkClient,
-            new Metrics(time)
-        );
+            networkClient);
     }
 
     // VisibleForTesting
     DefaultEventHandler(final Time time,
                         final ConsumerConfig config,
+                        final GroupRebalanceConfig groupRebalanceConfig,
                         final LogContext logContext,
                         final BlockingQueue<ApplicationEvent> applicationEventQueue,
                         final BlockingQueue<BackgroundEvent> backgroundEventQueue,
-                        final SubscriptionState subscriptionState,
                         final ConsumerMetadata metadata,
-                        final ConsumerNetworkClient networkClient) {
+                        final KafkaClient networkClient) {
         this.applicationEventQueue = applicationEventQueue;
         this.backgroundEventQueue = backgroundEventQueue;
         this.backgroundThread = new DefaultBackgroundThread(
             time,
             config,
+            groupRebalanceConfig,
             logContext,
             this.applicationEventQueue,
             this.backgroundEventQueue,
-            subscriptionState,
             metadata,
-            networkClient,
-            new Metrics(time)
-        );
+            networkClient);
         backgroundThread.start();
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ErrorEventHandler.java
similarity index 59%
copy from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
copy to clients/src/main/java/org/apache/kafka/clients/consumer/internals/ErrorEventHandler.java
index 89eac1048d9..99eba4e91b6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ErrorEventHandler.java
@@ -14,18 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.clients.consumer.internals.events;
+package org.apache.kafka.clients.consumer.internals;
 
-/**
- * This is the abstract definition of the events created by the background thread.
- */
-abstract public class BackgroundEvent {
-    public final EventType type;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+
+import java.util.Queue;
 
-    public BackgroundEvent(EventType type) {
-        this.type = type;
+public class ErrorEventHandler {
+    private final Queue<BackgroundEvent> backgroundEventQueue;
+
+    public ErrorEventHandler(Queue<BackgroundEvent> backgroundEventQueue) {
+        this.backgroundEventQueue = backgroundEventQueue;
     }
-    public enum EventType {
-        NOOP,
+
+    public void handle(Exception e) {
+        backgroundEventQueue.add(new ErrorBackgroundEvent(e));
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
new file mode 100644
index 00000000000..cc36a4e638d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle network poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private final int requestTimeoutMs;
+    private final Queue<UnsentRequest> unsentRequests;
+    private final long retryBackoffMs;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final ConsumerConfig config,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+    }
+
+    /**
+     * Returns the responses of the sent requests. This method will try to send the unsent requests, poll for responses,
+     * and check the disconnected nodes.
+     *
+     * @param timeoutMs     timeout time
+     * @param currentTimeMs current time
+     * @return a list of client response
+     */
+    public List<ClientResponse> poll(final long timeoutMs, final long currentTimeMs) {
+        trySend(currentTimeMs);
+
+        long pollTimeoutMs = timeoutMs;
+        if (!unsentRequests.isEmpty()) {
+            pollTimeoutMs = Math.min(retryBackoffMs, pollTimeoutMs);
+        }
+        List<ClientResponse> res = this.client.poll(pollTimeoutMs, currentTimeMs);
+        checkDisconnects();
+        wakeup();
+        return res;
+    }
+
+    /**
+     * Tries to send the requests in the unsentRequest queue. If the request doesn't have an assigned node, it will
+     * find the leastLoadedOne, and will be retried in the next {@code poll()}. If the request is expired, a
+     * {@link TimeoutException} will be thrown.
+     */
+    private void trySend(final long currentTimeMs) {
+        Iterator<UnsentRequest> iterator = unsentRequests.iterator();
+        while (iterator.hasNext()) {
+            UnsentRequest unsent = iterator.next();
+            unsent.timer.update(currentTimeMs);
+            if (unsent.timer.isExpired()) {
+                iterator.remove();
+                unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException(
+                        "Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms.")));
+                continue;
+            }
+
+            if (!doSend(unsent, currentTimeMs)) {
+                // continue to retry until timeout.
+                continue;
+            }
+            iterator.remove();
+        }
+    }
+
+    private boolean doSend(final UnsentRequest r,
+                           final long currentTimeMs) {
+        Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs));
+        if (node == null || nodeUnavailable(node)) {
+            log.debug("No broker available to send the request: {}. Retrying.", r);
+            return false;
+        }
+        ClientRequest request = makeClientRequest(r, node, currentTimeMs);
+        if (!client.isReady(node, currentTimeMs)) {
+            // enqueue the request again if the node isn't ready yet. The request will be handled in the next iteration
+            // of the event loop
+            log.debug("Node is not ready, handle the request in the next event loop: node={}, request={}", node, r);
+            return false;
+        }
+        client.send(request, currentTimeMs);
+        return true;
+    }
+
+    private void checkDisconnects() {
+        // Check the connection of the unsent request. Disconnect the disconnected node if it is unable to be connected.
+        Iterator<UnsentRequest> iter = unsentRequests.iterator();
+        while (iter.hasNext()) {
+            UnsentRequest u = iter.next();
+            if (u.node.isPresent() && client.connectionFailed(u.node.get())) {
+                iter.remove();
+                AuthenticationException authenticationException = client.authenticationException(u.node.get());
+                u.callback.ifPresent(r -> r.onFailure(authenticationException));
+            }
+        }
+    }
+
+    private ClientRequest makeClientRequest(final UnsentRequest unsent, final Node node, final long currentTimeMs) {
+        return client.newClientRequest(
+                node.idString(),
+                unsent.abstractBuilder,
+                currentTimeMs,
+                true,
+                (int) unsent.timer.remainingMs(),
+                unsent.callback.orElse(new DefaultRequestFutureCompletionHandler()));
+    }
+
+    public Node leastLoadedNode() {
+        return this.client.leastLoadedNode(time.milliseconds());
+    }
+
+    public void add(final UnsentRequest r) {
+        r.setTimer(this.time, this.requestTimeoutMs);
+        unsentRequests.add(r);
+    }
+
+    public void wakeup() {
+        client.wakeup();
+    }
+
+    /**
+     * Check if the code is disconnected and unavailable for immediate reconnection (i.e. if it is in reconnect
+     * backoff window following the disconnect).
+     */
+    public boolean nodeUnavailable(final Node node) {
+        return client.connectionFailed(node) && client.connectionDelay(node, time.milliseconds()) > 0;
+    }
+
+    public void close() throws IOException {
+        this.client.close();
+    }
+
+    public void addAll(final List<UnsentRequest> requests) {
+        this.unsentRequests.addAll(requests);
+    }
+
+    public static class PollResult {
+        public final long timeUntilNextPollMs;
+        public final List<UnsentRequest> unsentRequests;
+
+        public PollResult(final long timeMsTillNextPoll, final List<UnsentRequest> unsentRequests) {
+            this.timeUntilNextPollMs = timeMsTillNextPoll;
+            this.unsentRequests = Collections.unmodifiableList(unsentRequests);
+        }
+    }
+
+    public static class UnsentRequest {
+        private final AbstractRequest.Builder abstractBuilder;
+        private final Optional<AbstractRequestFutureCompletionHandler> callback;
+        private Optional<Node> node; // empty if random node can be choosen
+        private Timer timer;
+
+        public UnsentRequest(final AbstractRequest.Builder abstractBuilder,
+                             final AbstractRequestFutureCompletionHandler callback) {
+            this(abstractBuilder, callback, null);
+        }
+
+        public UnsentRequest(final AbstractRequest.Builder abstractBuilder,
+                             final AbstractRequestFutureCompletionHandler callback,
+                             final Node node) {
+            Objects.requireNonNull(abstractBuilder);
+            this.abstractBuilder = abstractBuilder;
+            this.node = Optional.ofNullable(node);
+            this.callback = Optional.ofNullable(callback);
+        }
+
+        public void setTimer(final Time time, final long requestTimeoutMs) {
+            this.timer = time.timer(requestTimeoutMs);
+        }
+
+        public static UnsentRequest makeUnsentRequest(
+                final AbstractRequest.Builder<?> requestBuilder,
+                final AbstractRequestFutureCompletionHandler callback,
+                final Node node) {
+            return new UnsentRequest(requestBuilder, callback, node);
+        }
+
+        @Override
+        public String toString() {
+            return abstractBuilder.toString();
+        }
+    }
+
+    public static class DefaultRequestFutureCompletionHandler extends AbstractRequestFutureCompletionHandler {
+        @Override
+        public void handleResponse(ClientResponse r, Exception t) {}
+    }
+
+    public abstract static class AbstractRequestFutureCompletionHandler implements RequestCompletionHandler {
+        private final RequestFuture<ClientResponse> future;
+
+        AbstractRequestFutureCompletionHandler() {
+            this.future = new RequestFuture<>();
+        }
+
+        abstract public void handleResponse(ClientResponse r, Exception e);
+
+        public void onFailure(final RuntimeException e) {
+            future.raise(e);
+            handleResponse(null, e);
+        }
+
+        @Override
+        public void onComplete(final ClientResponse response) {
+            fireCompletion(response);
+            handleResponse(response, null);
+        }
+
+        private void fireCompletion(final ClientResponse response) {
+            if (response.authenticationException() != null) {
+                future.raise(response.authenticationException());
+            } else if (response.wasDisconnected()) {
+                future.raise(DisconnectException.INSTANCE);
+            } else if (response.versionMismatch() != null) {
+                future.raise(response.versionMismatch());
+            } else {
+                future.complete(response);
+            }
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index e1a2ff035aa..9c66c8a76ea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -113,6 +113,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
                 valueDeserializer, metrics.reporters(), interceptorList);
         this.eventHandler = new DefaultEventHandler(
                 config,
+                groupRebalanceConfig,
                 logContext,
                 subscriptions,
                 new ApiVersions(),
@@ -478,11 +479,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
         CompletableFuture<Void> commitFuture = new CompletableFuture<>();
 
         public CommitApplicationEvent() {
-        }
-
-        @Override
-        public boolean process() {
-            return true;
+            super(Type.COMMIT);
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java
similarity index 69%
copy from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
copy to clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java
index 89eac1048d9..13c7c4d9d16 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java
@@ -14,18 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.clients.consumer.internals.events;
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 
 /**
- * This is the abstract definition of the events created by the background thread.
+ * {@code PollResult} consist of {@code UnsentRequest} if there are requests to send; otherwise, return the time till
+ * the next poll event.
  */
-abstract public class BackgroundEvent {
-    public final EventType type;
-
-    public BackgroundEvent(EventType type) {
-        this.type = type;
-    }
-    public enum EventType {
-        NOOP,
-    }
+public interface RequestManager {
+    PollResult poll(long currentTimeMs);
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java
new file mode 100644
index 00000000000..7ee7db64ca7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+
+class RequestState {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.2;
+    private final ExponentialBackoff exponentialBackoff;
+    private long lastSentMs = -1;
+    private long lastReceivedMs = -1;
+    private int numAttempts = 0;
+    private long backoffMs = 0;
+
+    public RequestState(ConsumerConfig config) {
+        this.exponentialBackoff = new ExponentialBackoff(
+                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                RECONNECT_BACKOFF_EXP_BASE,
+                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                RECONNECT_BACKOFF_JITTER);
+    }
+
+    // Visible for testing
+    RequestState(final int reconnectBackoffMs,
+                 final int reconnectBackoffExpBase,
+                 final int reconnectBackoffMaxMs,
+                 final int jitter) {
+        this.exponentialBackoff = new ExponentialBackoff(
+                reconnectBackoffMs,
+                reconnectBackoffExpBase,
+                reconnectBackoffMaxMs,
+                jitter);
+    }
+
+    public void reset() {
+        this.lastSentMs = -1;
+        this.lastReceivedMs = -1;
+        this.numAttempts = 0;
+        this.backoffMs = exponentialBackoff.backoff(0);
+    }
+
+    public boolean canSendRequest(final long currentTimeMs) {
+        if (this.lastSentMs == -1) {
+            // no request has been sent
+            return true;
+        }
+
+        if (this.lastReceivedMs == -1 ||
+                this.lastReceivedMs < this.lastSentMs) {
+            // there is an inflight request
+            return false;
+        }
+
+        return requestBackoffExpired(currentTimeMs);
+    }
+
+    public void updateLastSend(final long currentTimeMs) {
+        // Here we update the timer everytime we try to send a request. Also increment number of attempts.
+        this.lastSentMs = currentTimeMs;
+    }
+
+    public void updateLastFailedAttempt(final long currentTimeMs) {
+        this.lastReceivedMs = currentTimeMs;
+        this.backoffMs = exponentialBackoff.backoff(numAttempts);
+        this.numAttempts++;
+    }
+
+    private boolean requestBackoffExpired(final long currentTimeMs) {
+        return remainingBackoffMs(currentTimeMs) <= 0;
+    }
+
+    long remainingBackoffMs(final long currentTimeMs) {
+        return Math.max(0, this.backoffMs - (currentTimeMs - this.lastReceivedMs));
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index 683681a19fb..2218441485c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -20,10 +20,22 @@ package org.apache.kafka.clients.consumer.internals.events;
  * This is the abstract definition of the events created by the KafkaConsumer API
  */
 abstract public class ApplicationEvent {
+    public final Type type;
+
+    protected ApplicationEvent(Type type) {
+        this.type = type;
+    }
     /**
      * process the application event. Return true upon succesful execution,
      * false otherwise.
      * @return true if the event was successfully executed; false otherwise.
      */
-    public abstract boolean process();
+
+    @Override
+    public String toString() {
+        return type + " ApplicationEvent";
+    }
+    public enum Type {
+        NOOP, COMMIT,
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
similarity index 62%
copy from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java
copy to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 0fe9dccb103..e031fb9f7bb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -18,29 +18,32 @@ package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.internals.NoopBackgroundEvent;
 
+import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
 
-/**
- * The event is NoOp. This is intentionally left here for demonstration purpose.
- */
-public class NoopApplicationEvent extends ApplicationEvent {
-    public final String message;
+public class ApplicationEventProcessor {
     private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
 
-    public NoopApplicationEvent(final BlockingQueue<BackgroundEvent> backgroundEventQueue,
-                                final String message) {
-
-        this.message = message;
+    public ApplicationEventProcessor(final BlockingQueue<BackgroundEvent> backgroundEventQueue) {
         this.backgroundEventQueue = backgroundEventQueue;
     }
-
-    @Override
-    public boolean process() {
-        return backgroundEventQueue.add(new NoopBackgroundEvent(message));
+    public boolean process(final ApplicationEvent event) {
+        Objects.requireNonNull(event);
+        switch (event.type) {
+            case NOOP:
+                return process((NoopApplicationEvent) event);
+        }
+        return false;
     }
 
-    @Override
-    public String toString() {
-        return getClass() + "_" + this.message;
+    /**
+     * Processes {@link NoopApplicationEvent} and equeue a
+     * {@link NoopBackgroundEvent}. This is intentionally left here for
+     * demonstration purpose.
+     *
+     * @param event a {@link NoopApplicationEvent}
+     */
+    private boolean process(final NoopApplicationEvent event) {
+        return backgroundEventQueue.add(new NoopBackgroundEvent(event.message));
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index 89eac1048d9..722526f041a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -26,6 +26,6 @@ abstract public class BackgroundEvent {
         this.type = type;
     }
     public enum EventType {
-        NOOP,
+        NOOP, ERROR,
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
similarity index 75%
copy from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
copy to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
index 89eac1048d9..90502526be5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
@@ -16,16 +16,11 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-/**
- * This is the abstract definition of the events created by the background thread.
- */
-abstract public class BackgroundEvent {
-    public final EventType type;
+public class ErrorBackgroundEvent extends BackgroundEvent {
+    private final Exception exception;
 
-    public BackgroundEvent(EventType type) {
-        this.type = type;
-    }
-    public enum EventType {
-        NOOP,
+    public ErrorBackgroundEvent(Exception e) {
+        super(EventType.ERROR);
+        exception = e;
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java
index 0fe9dccb103..07524542d7d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java
@@ -16,27 +16,15 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-import org.apache.kafka.clients.consumer.internals.NoopBackgroundEvent;
-
-import java.util.concurrent.BlockingQueue;
-
 /**
  * The event is NoOp. This is intentionally left here for demonstration purpose.
  */
 public class NoopApplicationEvent extends ApplicationEvent {
     public final String message;
-    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
-
-    public NoopApplicationEvent(final BlockingQueue<BackgroundEvent> backgroundEventQueue,
-                                final String message) {
 
+    public NoopApplicationEvent(final String message) {
+        super(Type.NOOP);
         this.message = message;
-        this.backgroundEventQueue = backgroundEventQueue;
-    }
-
-    @Override
-    public boolean process() {
-        return backgroundEventQueue.add(new NoopBackgroundEvent(message));
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index e96e8a0c0db..f01bcb33a06 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -29,6 +29,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 
 
 public class FindCoordinatorResponse extends AbstractResponse {
@@ -50,6 +52,22 @@ public class FindCoordinatorResponse extends AbstractResponse {
         this.data = data;
     }
 
+    public Optional<Coordinator> coordinatorByKey(String key) {
+        Objects.requireNonNull(key);
+        if (this.data.coordinators().isEmpty()) {
+            // version <= 3
+            return Optional.of(new Coordinator()
+                    .setErrorCode(data.errorCode())
+                    .setErrorMessage(data.errorMessage())
+                    .setHost(data.host())
+                    .setPort(data.port())
+                    .setNodeId(data.nodeId())
+                    .setKey(key));
+        }
+        // version >= 4
+        return data.coordinators().stream().filter(c -> c.key().equals(key)).findFirst();
+    }
+
     @Override
     public FindCoordinatorResponseData data() {
         return data;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
new file mode 100644
index 00000000000..3f7e7956682
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CoordinatorRequestManagerTest {
+    private MockTime time;
+    private MockClient client;
+    private SubscriptionState subscriptions;
+    private ConsumerMetadata metadata;
+    private LogContext logContext;
+    private ErrorEventHandler errorEventHandler;
+    private Node node;
+    private final Properties properties = new Properties();
+    private String groupId;
+    private int requestTimeoutMs;
+    private RequestState coordinatorRequestState;
+
+    @BeforeEach
+    public void setup() {
+        this.logContext = new LogContext();
+        this.time = new MockTime(0);
+        this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST);
+        this.metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false,
+                false, subscriptions, logContext, new ClusterResourceListeners());
+        this.client = new MockClient(time, metadata);
+        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)));
+        this.node = metadata.fetch().nodes().get(0);
+        this.errorEventHandler = mock(ErrorEventHandler.class);
+        properties.put(RETRY_BACKOFF_MS_CONFIG, "100");
+        this.groupId = "group-1";
+        this.requestTimeoutMs = 500;
+        this.coordinatorRequestState = mock(RequestState.class);
+        properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(RETRY_BACKOFF_MS_CONFIG, 100);
+    }
+    
+    @Test
+    public void testPoll() {
+        CoordinatorRequestManager coordinatorManager = setupCoordinatorManager();
+        when(coordinatorRequestState.canSendRequest(time.milliseconds())).thenReturn(true);
+        NetworkClientDelegate.PollResult res = coordinatorManager.poll(time.milliseconds());
+        assertEquals(1, res.unsentRequests.size());
+
+        when(coordinatorRequestState.canSendRequest(time.milliseconds())).thenReturn(false);
+        NetworkClientDelegate.PollResult res2 = coordinatorManager.poll(time.milliseconds());
+        assertTrue(res2.unsentRequests.isEmpty());
+    }
+
+    @Test
+    public void testOnResponse() {
+        CoordinatorRequestManager coordinatorManager = setupCoordinatorManager();
+        FindCoordinatorResponse resp = FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node);
+        coordinatorManager.onResponse(time.milliseconds(), resp, null);
+        verify(errorEventHandler, never()).handle(any());
+        assertNotNull(coordinatorManager.coordinator());
+
+        FindCoordinatorResponse retriableErrorResp =
+                FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE,
+                groupId, node);
+        coordinatorManager.onResponse(time.milliseconds(), retriableErrorResp, null);
+        verify(errorEventHandler, never()).handle(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        assertFalse(coordinatorManager.coordinator().isPresent());
+
+        coordinatorManager.onResponse(
+                time.milliseconds(), null,
+                new RuntimeException("some error"));
+        assertFalse(coordinatorManager.coordinator().isPresent());
+    }
+
+    @Test
+    public void testFindCoordinatorBackoff() {
+        this.coordinatorRequestState = new RequestState(
+                100,
+                2,
+                1000,
+                0);
+        CoordinatorRequestManager coordinatorManager = setupCoordinatorManager();
+
+        NetworkClientDelegate.PollResult res = coordinatorManager.poll(time.milliseconds());
+        assertEquals(1, res.unsentRequests.size());
+        coordinatorManager.onResponse(
+                time.milliseconds(), FindCoordinatorResponse.prepareResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, "key",
+                this.node), null);
+        // Need to wait for 100ms until the next send
+        res = coordinatorManager.poll(time.milliseconds());
+        assertTrue(res.unsentRequests.isEmpty());
+        this.time.sleep(50);
+        res = coordinatorManager.poll(time.milliseconds());
+        assertTrue(res.unsentRequests.isEmpty());
+        this.time.sleep(50);
+        // should be able to send after 100ms
+        res = coordinatorManager.poll(time.milliseconds());
+        assertEquals(1, res.unsentRequests.size());
+        coordinatorManager.onResponse(
+                time.milliseconds(), FindCoordinatorResponse.prepareResponse(Errors.NONE, "key",
+                        this.node), null);
+    }
+
+    @Test
+    public void testPollWithExistingCoordinator() {
+        CoordinatorRequestManager coordinatorManager = setupCoordinatorManager();
+        FindCoordinatorResponse resp = FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node);
+        coordinatorManager.onResponse(time.milliseconds(), resp, null);
+        verify(errorEventHandler, never()).handle(any());
+        assertNotNull(coordinatorManager.coordinator());
+
+        NetworkClientDelegate.PollResult pollResult = coordinatorManager.poll(time.milliseconds());
+        assertEquals(Long.MAX_VALUE, pollResult.timeUntilNextPollMs);
+        assertTrue(pollResult.unsentRequests.isEmpty());
+    }
+
+    @Test
+    public void testRequestFutureCompletionHandler() {
+        NetworkClientDelegate.AbstractRequestFutureCompletionHandler h = new MockRequestFutureCompletionHandlerBase();
+        try {
+            h.onFailure(new RuntimeException());
+        } catch (Exception e) {
+            assertEquals("MockRequestFutureCompletionHandlerBase should throw an exception", e.getMessage());
+        }
+    }
+
+    @Test
+    public void testNullGroupIdShouldThrow() {
+        this.groupId = null;
+        assertThrows(RuntimeException.class, this::setupCoordinatorManager);
+    }
+
+    private static class MockRequestFutureCompletionHandlerBase extends NetworkClientDelegate.AbstractRequestFutureCompletionHandler {
+        @Override
+        public void handleResponse(ClientResponse r, Exception t) {
+            throw new RuntimeException("MockRequestFutureCompletionHandlerBase should throw an exception");
+        }
+    }
+
+    @Test
+    public void testFindCoordinatorResponseVersions() {
+        // v4
+        FindCoordinatorResponse respNew = FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, this.node);
+        assertTrue(respNew.coordinatorByKey(groupId).isPresent());
+        assertEquals(groupId, respNew.coordinatorByKey(groupId).get().key());
+        assertEquals(this.node.id(), respNew.coordinatorByKey(groupId).get().nodeId());
+
+        // <= v3
+        FindCoordinatorResponse respOld = FindCoordinatorResponse.prepareOldResponse(Errors.NONE, this.node);
+        assertTrue(respOld.coordinatorByKey(groupId).isPresent());
+        assertEquals(this.node.id(), respNew.coordinatorByKey(groupId).get().nodeId());
+    }
+
+    private CoordinatorRequestManager setupCoordinatorManager() {
+        return new CoordinatorRequestManager(
+                this.logContext,
+                this.errorEventHandler,
+                this.groupId,
+                this.coordinatorRequestState);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
index f159fad6bc5..0c2cd2b5aaa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
@@ -16,22 +16,23 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Time;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.InOrder;
 import org.mockito.Mockito;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -39,133 +40,119 @@ import java.util.concurrent.LinkedBlockingQueue;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class DefaultBackgroundThreadTest {
     private static final long REFRESH_BACK_OFF_MS = 100;
     private final Properties properties = new Properties();
     private MockTime time;
-    private SubscriptionState subscriptions;
     private ConsumerMetadata metadata;
-    private LogContext context;
-    private ConsumerNetworkClient consumerClient;
-    private Metrics metrics;
+    private NetworkClientDelegate networkClient;
     private BlockingQueue<BackgroundEvent> backgroundEventsQueue;
     private BlockingQueue<ApplicationEvent> applicationEventsQueue;
+    private ApplicationEventProcessor processor;
+    private CoordinatorRequestManager coordinatorManager;
+    private ErrorEventHandler errorEventHandler;
+    private int requestTimeoutMs = 500;
 
     @BeforeEach
     @SuppressWarnings("unchecked")
     public void setup() {
-        this.time = new MockTime();
-        this.subscriptions = mock(SubscriptionState.class);
+        this.time = new MockTime(0);
         this.metadata = mock(ConsumerMetadata.class);
-        this.context = new LogContext();
-        this.consumerClient = mock(ConsumerNetworkClient.class);
-        this.metrics = mock(Metrics.class);
+        this.networkClient = mock(NetworkClientDelegate.class);
         this.applicationEventsQueue = (BlockingQueue<ApplicationEvent>) mock(BlockingQueue.class);
         this.backgroundEventsQueue = (BlockingQueue<BackgroundEvent>) mock(BlockingQueue.class);
-        properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        properties.put(RETRY_BACKOFF_MS_CONFIG, REFRESH_BACK_OFF_MS);
+        this.processor = mock(ApplicationEventProcessor.class);
+        this.coordinatorManager = mock(CoordinatorRequestManager.class);
+        this.errorEventHandler = mock(ErrorEventHandler.class);
     }
 
     @Test
-    public void testStartupAndTearDown() throws InterruptedException {
-        final MockClient client = new MockClient(time, metadata);
-        this.consumerClient = new ConsumerNetworkClient(
-            context,
-            client,
-            metadata,
-            time,
-            100,
-            1000,
-            100
-        );
-        this.applicationEventsQueue = new LinkedBlockingQueue<>();
-        final DefaultBackgroundThread backgroundThread = setupMockHandler();
+    public void testStartupAndTearDown() {
+        DefaultBackgroundThread backgroundThread = mockBackgroundThread();
         backgroundThread.start();
-        assertTrue(client.active());
+        assertTrue(backgroundThread.isRunning());
         backgroundThread.close();
-        assertFalse(client.active());
     }
 
     @Test
-    public void testInterruption() throws InterruptedException {
-        final MockClient client = new MockClient(time, metadata);
-        this.consumerClient = new ConsumerNetworkClient(
-            context,
-            client,
-            metadata,
-            time,
-            100,
-            1000,
-            100
-        );
+    public void testApplicationEvent() {
         this.applicationEventsQueue = new LinkedBlockingQueue<>();
-        final DefaultBackgroundThread backgroundThread = setupMockHandler();
-        backgroundThread.start();
-        assertTrue(client.active());
+        this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+        when(coordinatorManager.poll(anyLong())).thenReturn(mockPollResult());
+        DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+        ApplicationEvent e = new NoopApplicationEvent("noop event");
+        this.applicationEventsQueue.add(e);
+        backgroundThread.runOnce();
+        verify(processor, times(1)).process(e);
         backgroundThread.close();
-        assertFalse(client.active());
     }
 
     @Test
-    void testWakeup() {
-        this.time = new MockTime(0);
-        final MockClient client = new MockClient(time, metadata);
-        this.consumerClient = new ConsumerNetworkClient(
-            context,
-            client,
-            metadata,
-            time,
-            100,
-            1000,
-            100
-        );
-        when(applicationEventsQueue.isEmpty()).thenReturn(true);
-        when(applicationEventsQueue.isEmpty()).thenReturn(true);
-        final DefaultBackgroundThread runnable = setupMockHandler();
-        client.poll(0, time.milliseconds());
-        runnable.wakeup();
-
-        assertThrows(WakeupException.class, runnable::runOnce);
-        runnable.close();
+    void testFindCoordinator() {
+        DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+        when(this.coordinatorManager.poll(time.milliseconds())).thenReturn(mockPollResult());
+        backgroundThread.runOnce();
+        Mockito.verify(coordinatorManager, times(1)).poll(anyLong());
+        Mockito.verify(networkClient, times(1)).poll(anyLong(), anyLong());
+        backgroundThread.close();
     }
 
     @Test
-    void testNetworkAndBlockingQueuePoll() {
-        // ensure network poll and application queue poll will happen in a
-        // single iteration
-        this.time = new MockTime(100);
-        final DefaultBackgroundThread runnable = setupMockHandler();
-        runnable.runOnce();
+    void testPollResultTimer() {
+        DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+        // purposely setting a non MAX time to ensure it is returning Long.MAX_VALUE upon success
+        NetworkClientDelegate.PollResult success = new NetworkClientDelegate.PollResult(
+                10,
+                Collections.singletonList(findCoordinatorUnsentRequest(time, requestTimeoutMs)));
+        assertEquals(10, backgroundThread.handlePollResult(success));
 
-        when(applicationEventsQueue.isEmpty()).thenReturn(false);
-        when(applicationEventsQueue.poll())
-            .thenReturn(new NoopApplicationEvent(backgroundEventsQueue, "nothing"));
-        final InOrder inOrder = Mockito.inOrder(applicationEventsQueue, this.consumerClient);
-        assertFalse(inOrder.verify(applicationEventsQueue).isEmpty());
-        inOrder.verify(applicationEventsQueue).poll();
-        inOrder.verify(this.consumerClient).poll(any(Timer.class));
-        runnable.close();
+        NetworkClientDelegate.PollResult failure = new NetworkClientDelegate.PollResult(
+                10,
+                new ArrayList<>());
+        assertEquals(10, backgroundThread.handlePollResult(failure));
     }
 
-    private DefaultBackgroundThread setupMockHandler() {
+    private static NetworkClientDelegate.UnsentRequest findCoordinatorUnsentRequest(final Time time,
+                                                                                   final long timeout) {
+        NetworkClientDelegate.UnsentRequest req = new NetworkClientDelegate.UnsentRequest(
+                new FindCoordinatorRequest.Builder(
+                        new FindCoordinatorRequestData()
+                                .setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id())
+                                .setKey("foobar")),
+                null);
+        req.setTimer(time, timeout);
+        return req;
+    }
+
+    private DefaultBackgroundThread mockBackgroundThread() {
+        properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(RETRY_BACKOFF_MS_CONFIG, REFRESH_BACK_OFF_MS);
+
         return new DefaultBackgroundThread(
-            this.time,
-            new ConsumerConfig(properties),
-            new LogContext(),
-            applicationEventsQueue,
-            backgroundEventsQueue,
-            this.subscriptions,
-            this.metadata,
-            this.consumerClient,
-            this.metrics
-        );
+                this.time,
+                new ConsumerConfig(properties),
+                new LogContext(),
+                applicationEventsQueue,
+                backgroundEventsQueue,
+                this.errorEventHandler,
+                processor,
+                this.metadata,
+                this.networkClient,
+                this.coordinatorManager);
+    }
+
+    private NetworkClientDelegate.PollResult mockPollResult() {
+        return new NetworkClientDelegate.PollResult(
+                0,
+                Collections.singletonList(findCoordinatorUnsentRequest(time, requestTimeoutMs)));
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandlerTest.java
index 0da35e83494..b39724d8497 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandlerTest.java
@@ -16,20 +16,13 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
 
 import java.util.Optional;
 import java.util.Properties;
@@ -39,78 +32,49 @@ import java.util.concurrent.LinkedBlockingQueue;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class DefaultEventHandlerTest {
+    private int sessionTimeoutMs = 1000;
+    private int rebalanceTimeoutMs = 1000;
+    private int heartbeatIntervalMs = 1000;
+    private String groupId = "g-1";
+    private Optional<String> groupInstanceId = Optional.of("g-1");
+    private long retryBackoffMs = 1000;
     private final Properties properties = new Properties();
+    private GroupRebalanceConfig rebalanceConfig;
 
     @BeforeEach
     public void setup() {
         properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         properties.put(RETRY_BACKOFF_MS_CONFIG, "100");
+
+        this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+                rebalanceTimeoutMs,
+                heartbeatIntervalMs,
+                groupId,
+                groupInstanceId,
+                retryBackoffMs,
+                true);
     }
 
     @Test
-    @Timeout(1)
-    public void testBasicPollAndAddWithNoopEvent() {
-        final Time time = new MockTime(1);
-        final LogContext logContext = new LogContext();
-        final SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
-        final ConsumerMetadata metadata = newConsumerMetadata(false, subscriptions);
-        final MockClient client = new MockClient(time, metadata);
-        final ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(
-            logContext,
-            client,
-            metadata,
-            time,
-            100,
-            1000,
-            100
-        );
+    public void testBasicHandlerOps() {
+        final DefaultBackgroundThread bt = mock(DefaultBackgroundThread.class);
         final BlockingQueue<ApplicationEvent> aq = new LinkedBlockingQueue<>();
         final BlockingQueue<BackgroundEvent> bq = new LinkedBlockingQueue<>();
-        final DefaultEventHandler handler = new DefaultEventHandler(
-            time,
-            new ConsumerConfig(properties),
-            logContext,
-            aq,
-            bq,
-            subscriptions,
-            metadata,
-            consumerClient
-        );
-        assertTrue(client.active());
+        final DefaultEventHandler handler = new DefaultEventHandler(bt, aq, bq);
         assertTrue(handler.isEmpty());
-        handler.add(
-            new NoopApplicationEvent(
-                bq,
-                "testBasicPollAndAddWithNoopEvent"
-            )
-        );
-        while (handler.isEmpty()) {
-            time.sleep(100);
-        }
-        final Optional<BackgroundEvent> poll = handler.poll();
-        assertTrue(poll.isPresent());
-        assertTrue(poll.get() instanceof NoopBackgroundEvent);
-
-        assertFalse(client.hasInFlightRequests()); // noop does not send network request
-    }
-
-    private static ConsumerMetadata newConsumerMetadata(final boolean includeInternalTopics,
-                                                        final SubscriptionState subscriptions) {
-        final long refreshBackoffMs = 50;
-        final long expireMs = 50000;
-        return new ConsumerMetadata(
-            refreshBackoffMs,
-            expireMs,
-            includeInternalTopics,
-            false,
-            subscriptions,
-            new LogContext(),
-            new ClusterResourceListeners()
-        );
+        assertFalse(handler.poll().isPresent());
+        handler.add(new NoopApplicationEvent("test"));
+        assertEquals(1, aq.size());
+        handler.close();
+        verify(bt, times(1)).close();
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
new file mode 100644
index 00000000000..8d5ab9d641f
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Objects;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class NetworkClientDelegateTest {
+    private MockTime time;
+    private Properties properties;
+    private LogContext logContext;
+    private KafkaClient client;
+    private String groupId;
+    private SubscriptionState subscription;
+    private ConsumerMetadata metadata;
+    private Node node;
+    private int requestTimeoutMs = 500;
+
+    @BeforeEach
+    public void setup() {
+        this.time = new MockTime(0);
+        this.properties = new Properties();
+        properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(RETRY_BACKOFF_MS_CONFIG, 100);
+        properties.put(REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
+        this.logContext = new LogContext();
+        this.client = mock(NetworkClient.class);
+        this.node = mockNode();
+        this.groupId = "group-1";
+    }
+
+    @Test
+    public void testPoll() {
+        NetworkClientDelegate ncd = mockNetworkClientDelegate();
+
+        // Successful case
+        NetworkClientDelegate.DefaultRequestFutureCompletionHandler callback = mock(NetworkClientDelegate.DefaultRequestFutureCompletionHandler.class);
+        NetworkClientDelegate.UnsentRequest r = mockUnsentFindCoordinatorRequest(callback);
+        ncd.add(r);
+        when(this.client.leastLoadedNode(time.milliseconds())).thenReturn(this.node);
+        when(this.client.isReady(this.node, time.milliseconds())).thenReturn(true);
+        ncd.poll(100, time.milliseconds());
+        verify(client, times(1)).send(any(), eq(time.milliseconds()));
+        verify(callback, never()).onFailure(any());
+
+        // Timeout case
+        NetworkClientDelegate.DefaultRequestFutureCompletionHandler callback2 = mock(NetworkClientDelegate.DefaultRequestFutureCompletionHandler.class);
+        NetworkClientDelegate.UnsentRequest r2 = mockUnsentFindCoordinatorRequest(callback2);
+        ncd.add(r2);
+        time.sleep(501);
+        when(this.client.leastLoadedNode(time.milliseconds())).thenReturn(this.node);
+        ncd.poll(100, time.milliseconds());
+        verify(client, never()).send(any(), eq(time.milliseconds()));
+        verify(callback2, times(1)).onFailure(any());
+
+        // Node found but not ready: first loop (the request should get re-enqueued into the unsentRequests
+        NetworkClientDelegate.DefaultRequestFutureCompletionHandler callback3 = mock(NetworkClientDelegate.DefaultRequestFutureCompletionHandler.class);
+        NetworkClientDelegate.UnsentRequest r3 = mockUnsentFindCoordinatorRequest(callback3);
+        ncd.add(r3);
+        when(this.client.leastLoadedNode(time.milliseconds())).thenReturn(this.node);
+        when(this.client.isReady(this.node, time.milliseconds())).thenReturn(false);
+        ncd.poll(100, time.milliseconds());
+        verify(client, never()).send(any(), eq(time.milliseconds()));
+        verify(callback3, never()).onFailure(any());
+
+        // The request expires in 500ms.
+        time.sleep(499);
+        when(this.client.leastLoadedNode(time.milliseconds())).thenReturn(this.node);
+        when(this.client.isReady(this.node, time.milliseconds())).thenReturn(true);
+        ncd.poll(100, time.milliseconds());
+        verify(client, times(1)).send(any(), eq(time.milliseconds()));
+        verify(callback3, never()).onFailure(any());
+    }
+
+    @Test
+    public void testUnableToFindBrokerAndTimeout() {
+        NetworkClientDelegate ncd = mockNetworkClientDelegate();
+
+        NetworkClientDelegate.DefaultRequestFutureCompletionHandler callback = mock(NetworkClientDelegate.DefaultRequestFutureCompletionHandler.class);
+        NetworkClientDelegate.UnsentRequest r = mockUnsentFindCoordinatorRequest(callback);
+        ncd.add(r);
+        final long timeoutMs = 100;
+        long totalTimeoutMs = 0;
+        while (totalTimeoutMs <= this.requestTimeoutMs) {
+            when(this.client.leastLoadedNode(time.milliseconds())).thenReturn(null);
+            ncd.poll(timeoutMs, time.milliseconds());
+            totalTimeoutMs += timeoutMs;
+            this.time.sleep(timeoutMs);
+        }
+        verify(client, times(6)).poll(eq(timeoutMs), anyLong());
+        verify(callback, times(1)).onFailure(isA(TimeoutException.class));
+    }
+
+    @Test
+    public void testNodeUnready() {
+        NetworkClientDelegate ncd = mockNetworkClientDelegate();
+
+        NetworkClientDelegate.DefaultRequestFutureCompletionHandler callback = mock(NetworkClientDelegate.DefaultRequestFutureCompletionHandler.class);
+        NetworkClientDelegate.UnsentRequest r = mockUnsentFindCoordinatorRequest(callback);
+        ncd.add(r);
+        final long timeoutMs = 100;
+        long totalTimeoutMs = 0;
+        while (totalTimeoutMs <= this.requestTimeoutMs) {
+            when(this.client.leastLoadedNode(time.milliseconds())).thenReturn(this.node);
+            when(this.client.isReady(this.node, time.milliseconds())).thenReturn(false);
+            ncd.poll(timeoutMs, time.milliseconds());
+            totalTimeoutMs += timeoutMs;
+            this.time.sleep(timeoutMs);
+        }
+        verify(client, times(6)).poll(eq(timeoutMs), anyLong());
+        verify(callback, times(1)).onFailure(isA(TimeoutException.class));
+    }
+
+    public NetworkClientDelegate mockNetworkClientDelegate() {
+        return new NetworkClientDelegate(this.time, new ConsumerConfig(this.properties), this.logContext, this.client);
+    }
+
+    public NetworkClientDelegate.UnsentRequest mockUnsentFindCoordinatorRequest(NetworkClientDelegate.AbstractRequestFutureCompletionHandler callback) {
+        Objects.requireNonNull(groupId);
+        NetworkClientDelegate.UnsentRequest req = new NetworkClientDelegate.UnsentRequest(
+                new FindCoordinatorRequest.Builder(
+                        new FindCoordinatorRequestData()
+                                .setKey(this.groupId)
+                                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())),
+                callback);
+        req.setTimer(this.time, this.requestTimeoutMs);
+        return req;
+    }
+
+    private Node mockNode() {
+        return new Node(0, "localhost", 99);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java
new file mode 100644
index 00000000000..4761a9f993c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RequestStateTest {
+    @Test
+    public void testRequestStateSimple() {
+        RequestState state = new RequestState(
+                100,
+                2,
+                1000,
+                0);
+
+        // ensure not permitting consecutive requests
+        assertTrue(state.canSendRequest(0));
+        state.updateLastSend(0);
+        assertFalse(state.canSendRequest(0));
+        state.updateLastFailedAttempt(35);
+        assertTrue(state.canSendRequest(135));
+        state.updateLastFailedAttempt(140);
+        assertFalse(state.canSendRequest(200));
+        // exponential backoff
+        assertTrue(state.canSendRequest(340));
+
+        // test reset
+        state.reset();
+        assertTrue(state.canSendRequest(200));
+    }
+}