You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2023/09/16 16:15:44 UTC

[kafka] branch trunk updated: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager (#14359)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e1dc6d9f349 KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager (#14359)
e1dc6d9f349 is described below

commit e1dc6d9f3493eb35e3d3eef1d70c2d1fc94d74c2
Author: Kirk True <ki...@kirktrue.pro>
AuthorDate: Sat Sep 16 09:15:37 2023 -0700

    KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager (#14359)
    
    This continues the work of providing the groundwork for the fetch
    refactoring work by introducing some new classes and refactoring the
    existing code to use the new classes where applicable.
    
    Changes:
    
    * Minor clean up of the events classes to make data immutable,
      private, and implement toString().
    * Added IdempotentCloser which prevents a resource from being closed
      more than once. It's general enough that it could be used elsewhere
      in the project, but it's limited to the consumer internals for now.
    * Split core Fetcher code into classes to buffer raw results
      (FetchBuffer) and to collect raw results into ConsumerRecords
      (FetchCollector). These can be tested and changed in isolation from
      the core fetcher logic.
    * Added NodeStatusDetector which abstracts methods from
      ConsumerNetworkClient so that it and NetworkClientDelegate can be
      used in AbstractFetch via the interface instead of using
      ConsumerNetworkClient directly.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../apache/kafka/clients/NetworkClientUtils.java   |   8 +
 .../kafka/clients/consumer/KafkaConsumer.java      |  12 +-
 .../clients/consumer/internals/AbstractFetch.java  | 517 ++++--------------
 .../clients/consumer/internals/CompletedFetch.java |  40 +-
 .../consumer/internals/ConsumerNetworkClient.java  |   2 +-
 .../clients/consumer/internals/ConsumerUtils.java  |  25 +
 .../internals/DefaultBackgroundThread.java         |   1 +
 .../consumer/internals/DefaultEventHandler.java    |   1 +
 .../clients/consumer/internals/FetchBuffer.java    | 149 ++++++
 .../clients/consumer/internals/FetchCollector.java | 372 +++++++++++++
 .../clients/consumer/internals/FetchUtils.java     |  53 ++
 .../kafka/clients/consumer/internals/Fetcher.java  |  35 +-
 .../consumer/internals/NetworkClientDelegate.java  |   9 +
 .../consumer/internals/OffsetsRequestManager.java  |   5 +
 .../consumer/internals/PrototypeAsyncConsumer.java |  11 +-
 .../internals/events/ApplicationEvent.java         |  44 +-
 .../events/ApplicationEventProcessor.java          |  11 +-
 .../events/AssignmentChangeApplicationEvent.java   |  48 +-
 .../consumer/internals/events/BackgroundEvent.java |  46 +-
 .../internals/events/CommitApplicationEvent.java   |  54 +-
 .../events/CompletableApplicationEvent.java        |  31 +-
 .../internals/events/ErrorBackgroundEvent.java     |  41 +-
 .../events/ListOffsetsApplicationEvent.java        |   3 +-
 .../NewTopicsMetadataUpdateRequestEvent.java       |   7 +
 .../internals/events/NoopApplicationEvent.java     |  38 +-
 .../{ => events}/NoopBackgroundEvent.java          |  42 +-
 .../events/OffsetFetchApplicationEvent.java        |   6 +-
 .../internals/events/PollApplicationEvent.java     |  39 +-
 .../kafka/common/internals/IdempotentCloser.java   | 174 +++++++
 .../consumer/internals/CompletedFetchTest.java     | 216 ++++----
 .../consumer/internals/FetchBufferTest.java        | 194 +++++++
 .../consumer/internals/FetchCollectorTest.java     | 579 +++++++++++++++++++++
 .../internals/OffsetsRequestManagerTest.java       |   2 +-
 .../internals/PrototypeAsyncConsumerTest.java      |  79 ++-
 .../common/internals/IdempotentCloserTest.java     | 183 +++++++
 36 files changed, 2387 insertions(+), 692 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 9862a237fa9..f01281acb0e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -87,7 +87,7 @@
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(AbstractFetch|ConsumerCoordinator|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/>
+              files="(AbstractFetch|ConsumerCoordinator|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/>
 
     <suppress checks="JavaNCSS"
               files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
index 16c76afe3f7..e044fd48ee3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
@@ -132,4 +132,12 @@ public final class NetworkClientUtils {
         if (exception != null)
             throw exception;
     }
+
+    /**
+     * Initiate a connection if currently possible. This is only really useful for resetting the
+     * failed status of a socket.
+     */
+    public static void tryConnect(KafkaClient client, Node node, Time time) {
+        client.ready(node, time.milliseconds());
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f029fe74f89..aa4b913b493 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -918,7 +918,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 }
 
                 throwIfNoAssignorsConfigured();
-                fetcher.clearBufferedDataForUnassignedTopics(topics);
+
+                // Clear the buffered data which are not a part of newly assigned topics
+                final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+
+                for (TopicPartition tp : subscriptions.assignedPartitions()) {
+                    if (topics.contains(tp.topic()))
+                        currentTopicPartitions.add(tp);
+                }
+
+                fetcher.clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
+
                 log.info("Subscribed to topic(s): {}", join(topics, ", "));
                 if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
                     metadata.requestUpdateForNewTopics();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
index 8d6390ca94d..4f579bb0e0f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -18,20 +18,14 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.FetchSessionHandler;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.errors.RecordTooLargeException;
-import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.internals.IdempotentCloser;
 import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.utils.BufferSupplier;
@@ -43,20 +37,17 @@ import org.slf4j.Logger;
 import org.slf4j.helpers.MessageFormatter;
 
 import java.io.Closeable;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Predicate;
+
+import static org.apache.kafka.clients.consumer.internals.FetchUtils.requestMetadataUpdate;
 
 /**
  * {@code AbstractFetch} represents the basic state and logic for record fetching processing.
@@ -73,13 +64,12 @@ public abstract class AbstractFetch<K, V> implements Closeable {
     protected final FetchConfig<K, V> fetchConfig;
     protected final Time time;
     protected final FetchMetricsManager metricsManager;
+    protected final FetchBuffer fetchBuffer;
+    protected final BufferSupplier decompressionBufferSupplier;
+    protected final Set<Integer> nodesWithPendingFetchRequests;
+    protected final IdempotentCloser idempotentCloser = new IdempotentCloser();
 
-    private final BufferSupplier decompressionBufferSupplier;
-    private final ConcurrentLinkedQueue<CompletedFetch<K, V>> completedFetches;
     private final Map<Integer, FetchSessionHandler> sessionHandlers;
-    private final Set<Integer> nodesWithPendingFetchRequests;
-
-    private CompletedFetch<K, V> nextInLineFetch;
 
     public AbstractFetch(final LogContext logContext,
                          final ConsumerNetworkClient client,
@@ -95,7 +85,7 @@ public abstract class AbstractFetch<K, V> implements Closeable {
         this.subscriptions = subscriptions;
         this.fetchConfig = fetchConfig;
         this.decompressionBufferSupplier = BufferSupplier.create();
-        this.completedFetches = new ConcurrentLinkedQueue<>();
+        this.fetchBuffer = new FetchBuffer(logContext);
         this.sessionHandlers = new HashMap<>();
         this.nodesWithPendingFetchRequests = new HashSet<>();
         this.metricsManager = metricsManager;
@@ -109,7 +99,7 @@ public abstract class AbstractFetch<K, V> implements Closeable {
      * @return true if there are completed fetches, false otherwise
      */
     boolean hasCompletedFetches() {
-        return !completedFetches.isEmpty();
+        return !fetchBuffer.isEmpty();
     }
 
     /**
@@ -117,7 +107,7 @@ public abstract class AbstractFetch<K, V> implements Closeable {
      * @return true if there are completed fetches that can be returned, false otherwise
      */
     public boolean hasAvailableFetches() {
-        return completedFetches.stream().anyMatch(fetch -> subscriptions.isFetchable(fetch.partition));
+        return fetchBuffer.hasCompletedFetches(fetch -> subscriptions.isFetchable(fetch.partition));
     }
 
     /**
@@ -181,17 +171,16 @@ public abstract class AbstractFetch<K, V> implements Closeable {
                 log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
                         fetchConfig.isolationLevel, fetchOffset, partition, partitionData);
 
-                CompletedFetch<K, V> completedFetch = new CompletedFetch<>(
+                CompletedFetch completedFetch = new CompletedFetch(
                         logContext,
                         subscriptions,
-                        fetchConfig,
                         decompressionBufferSupplier,
                         partition,
                         partitionData,
                         metricAggregator,
                         fetchOffset,
                         requestVersion);
-                completedFetches.add(completedFetch);
+                fetchBuffer.add(completedFetch);
             }
 
             metricsManager.recordLatency(resp.requestLatencyMs());
@@ -205,14 +194,14 @@ public abstract class AbstractFetch<K, V> implements Closeable {
      * Implements the core logic for a failed fetch request/response.
      *
      * @param fetchTarget {@link Node} from which the fetch data was requested
-     * @param e {@link RuntimeException} representing the error that resulted in the failure
+     * @param t {@link Throwable} representing the error that resulted in the failure
      */
-    protected void handleFetchResponse(final Node fetchTarget, final RuntimeException e) {
+    protected void handleFetchResponse(final Node fetchTarget, final Throwable t) {
         try {
             final FetchSessionHandler handler = sessionHandler(fetchTarget.id());
 
             if (handler != null) {
-                handler.handleError(e);
+                handler.handleError(t);
                 handler.sessionTopicPartitions().forEach(subscriptions::clearPreferredReadReplica);
             }
         } finally {
@@ -221,6 +210,20 @@ public abstract class AbstractFetch<K, V> implements Closeable {
         }
     }
 
+    protected void handleCloseFetchSessionResponse(final Node fetchTarget,
+                                                   final FetchSessionHandler.FetchRequestData data) {
+        int sessionId = data.metadata().sessionId();
+        log.debug("Successfully sent a close message for fetch session: {} to node: {}", sessionId, fetchTarget);
+    }
+
+    public void handleCloseFetchSessionResponse(final Node fetchTarget,
+                                                final FetchSessionHandler.FetchRequestData data,
+                                                final Throwable t) {
+        int sessionId = data.metadata().sessionId();
+        log.debug("Unable to a close message for fetch session: {} to node: {}. " +
+                "This may result in unnecessary fetch sessions at the broker.", sessionId, fetchTarget, t);
+    }
+
     /**
      * Creates a new {@link FetchRequest fetch request} in preparation for sending to the Kafka cluster.
      *
@@ -256,138 +259,23 @@ public abstract class AbstractFetch<K, V> implements Closeable {
     }
 
     /**
-     * Return the fetched records, empty the record buffer and update the consumed position.
-     *
-     * </p>
-     *
-     * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the consumed position is not updated.
+     * Return the list of <em>fetchable</em> partitions, which are the set of partitions to which we are subscribed,
+     * but <em>excluding</em> any partitions for which we still have buffered data. The idea is that since the user
+     * has yet to process the data for the partition that has already been fetched, we should not go send for more data
+     * until the previously-fetched data has been processed.
      *
-     * @return A {@link Fetch} for the requested partitions
-     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
-     *         the defaultResetPolicy is NONE
-     * @throws TopicAuthorizationException If there is TopicAuthorization error in fetchResponse.
+     * @return {@link Set} of {@link TopicPartition topic partitions} for which we should fetch data
      */
-    public Fetch<K, V> collectFetch() {
-        Fetch<K, V> fetch = Fetch.empty();
-        Queue<CompletedFetch<K, V>> pausedCompletedFetches = new ArrayDeque<>();
-        int recordsRemaining = fetchConfig.maxPollRecords;
+    private Set<TopicPartition> fetchablePartitions() {
+        // This is the set of partitions we have in our buffer
+        Set<TopicPartition> buffered = fetchBuffer.bufferedPartitions();
 
-        try {
-            while (recordsRemaining > 0) {
-                if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
-                    CompletedFetch<K, V> records = completedFetches.peek();
-                    if (records == null) break;
-
-                    if (!records.isInitialized()) {
-                        try {
-                            nextInLineFetch = initializeCompletedFetch(records);
-                        } catch (Exception e) {
-                            // Remove a completedFetch upon a parse with exception if (1) it contains no records, and
-                            // (2) there are no fetched records with actual content preceding this exception.
-                            // The first condition ensures that the completedFetches is not stuck with the same completedFetch
-                            // in cases such as the TopicAuthorizationException, and the second condition ensures that no
-                            // potential data loss due to an exception in a following record.
-                            if (fetch.isEmpty() && FetchResponse.recordsOrFail(records.partitionData).sizeInBytes() == 0) {
-                                completedFetches.poll();
-                            }
-                            throw e;
-                        }
-                    } else {
-                        nextInLineFetch = records;
-                    }
-                    completedFetches.poll();
-                } else if (subscriptions.isPaused(nextInLineFetch.partition)) {
-                    // when the partition is paused we add the records back to the completedFetches queue instead of draining
-                    // them so that they can be returned on a subsequent poll if the partition is resumed at that time
-                    log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);
-                    pausedCompletedFetches.add(nextInLineFetch);
-                    nextInLineFetch = null;
-                } else {
-                    Fetch<K, V> nextFetch = fetchRecords(recordsRemaining);
-                    recordsRemaining -= nextFetch.numRecords();
-                    fetch.add(nextFetch);
-                }
-            }
-        } catch (KafkaException e) {
-            if (fetch.isEmpty())
-                throw e;
-        } finally {
-            // add any polled completed fetches for paused partitions back to the completed fetches queue to be
-            // re-evaluated in the next poll
-            completedFetches.addAll(pausedCompletedFetches);
-        }
+        // This is the test that returns true if the partition is *not* buffered
+        Predicate<TopicPartition> isNotBuffered = tp -> !buffered.contains(tp);
 
-        return fetch;
-    }
-
-    private Fetch<K, V> fetchRecords(final int maxRecords) {
-        if (!subscriptions.isAssigned(nextInLineFetch.partition)) {
-            // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
-            log.debug("Not returning fetched records for partition {} since it is no longer assigned",
-                    nextInLineFetch.partition);
-        } else if (!subscriptions.isFetchable(nextInLineFetch.partition)) {
-            // this can happen when a partition is paused before fetched records are returned to the consumer's
-            // poll call or if the offset is being reset
-            log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
-                    nextInLineFetch.partition);
-        } else {
-            SubscriptionState.FetchPosition position = subscriptions.position(nextInLineFetch.partition);
-            if (position == null) {
-                throw new IllegalStateException("Missing position for fetchable partition " + nextInLineFetch.partition);
-            }
-
-            if (nextInLineFetch.nextFetchOffset() == position.offset) {
-                List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(maxRecords);
-
-                log.trace("Returning {} fetched records at offset {} for assigned partition {}",
-                        partRecords.size(), position, nextInLineFetch.partition);
-
-                boolean positionAdvanced = false;
-
-                if (nextInLineFetch.nextFetchOffset() > position.offset) {
-                    SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
-                            nextInLineFetch.nextFetchOffset(),
-                            nextInLineFetch.lastEpoch(),
-                            position.currentLeader);
-                    log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",
-                            position, nextPosition, nextInLineFetch.partition, partRecords.size());
-                    subscriptions.position(nextInLineFetch.partition, nextPosition);
-                    positionAdvanced = true;
-                }
-
-                Long partitionLag = subscriptions.partitionLag(nextInLineFetch.partition, fetchConfig.isolationLevel);
-                if (partitionLag != null)
-                    metricsManager.recordPartitionLag(nextInLineFetch.partition, partitionLag);
-
-                Long lead = subscriptions.partitionLead(nextInLineFetch.partition);
-                if (lead != null) {
-                    metricsManager.recordPartitionLead(nextInLineFetch.partition, lead);
-                }
-
-                return Fetch.forPartition(nextInLineFetch.partition, partRecords, positionAdvanced);
-            } else {
-                // these records aren't next in line based on the last consumed position, ignore them
-                // they must be from an obsolete request
-                log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
-                        nextInLineFetch.partition, nextInLineFetch.nextFetchOffset(), position);
-            }
-        }
-
-        log.trace("Draining fetched records for partition {}", nextInLineFetch.partition);
-        nextInLineFetch.drain();
-
-        return Fetch.empty();
-    }
-
-    private List<TopicPartition> fetchablePartitions() {
-        Set<TopicPartition> exclude = new HashSet<>();
-        if (nextInLineFetch != null && !nextInLineFetch.isConsumed()) {
-            exclude.add(nextInLineFetch.partition);
-        }
-        for (CompletedFetch<K, V> completedFetch : completedFetches) {
-            exclude.add(completedFetch.partition);
-        }
-        return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp));
+        // Return all partitions that are in an otherwise fetchable state *and* for which we don't already have some
+        // messages sitting in our buffer.
+        return new HashSet<>(subscriptions.fetchablePartitions(isNotBuffered));
     }
 
     /**
@@ -421,7 +309,7 @@ public abstract class AbstractFetch<K, V> implements Closeable {
                         " using the leader instead.", nodeId, partition);
                 // Note that this condition may happen due to stale metadata, so we clear preferred replica and
                 // refresh metadata.
-                requestMetadataUpdate(partition);
+                requestMetadataUpdate(metadata, subscriptions, partition);
                 return leaderReplica;
             }
         } else {
@@ -429,6 +317,37 @@ public abstract class AbstractFetch<K, V> implements Closeable {
         }
     }
 
+    private Map<Node, FetchSessionHandler.FetchRequestData> prepareCloseFetchSessionRequests() {
+        final Cluster cluster = metadata.fetch();
+        Map<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<>();
+
+        try {
+            sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
+                // set the session handler to notify close. This will set the next metadata request to send close message.
+                sessionHandler.notifyClose();
+
+                // FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will
+                // skip sending the close request.
+                final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
+
+                if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
+                    log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget);
+                    return;
+                }
+
+                fetchable.put(fetchTarget, sessionHandler.newBuilder());
+            });
+        } finally {
+            sessionHandlers.clear();
+        }
+
+        Map<Node, FetchSessionHandler.FetchRequestData> reqs = new LinkedHashMap<>();
+        for (Map.Entry<Node, FetchSessionHandler.Builder> entry : fetchable.entrySet()) {
+            reqs.put(entry.getKey(), entry.getValue().build());
+        }
+        return reqs;
+    }
+
     /**
      * Create fetch requests for all nodes for which we have assigned partitions
      * that have no existing requests in flight.
@@ -493,262 +412,30 @@ public abstract class AbstractFetch<K, V> implements Closeable {
         return reqs;
     }
 
-    /**
-     * Initialize a CompletedFetch object.
-     */
-    private CompletedFetch<K, V> initializeCompletedFetch(final CompletedFetch<K, V> completedFetch) {
-        final TopicPartition tp = completedFetch.partition;
-        final Errors error = Errors.forCode(completedFetch.partitionData.errorCode());
-        boolean recordMetrics = true;
-
-        try {
-            if (!subscriptions.hasValidPosition(tp)) {
-                // this can happen when a rebalance happened while fetch is still in-flight
-                log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp);
-                return null;
-            } else if (error == Errors.NONE) {
-                final CompletedFetch<K, V> ret = handleInitializeCompletedFetchSuccess(completedFetch);
-                recordMetrics = ret == null;
-                return ret;
-            } else {
-                handleInitializeCompletedFetchErrors(completedFetch, error);
-                return null;
-            }
-        } finally {
-            if (recordMetrics) {
-                completedFetch.recordAggregatedMetrics(0, 0);
-            }
-
-            if (error != Errors.NONE)
-                // we move the partition to the end if there was an error. This way, it's more likely that partitions for
-                // the same topic can remain together (allowing for more efficient serialization).
-                subscriptions.movePartitionToEnd(tp);
-        }
-    }
-
-    private CompletedFetch<K, V> handleInitializeCompletedFetchSuccess(final CompletedFetch<K, V> completedFetch) {
-        final TopicPartition tp = completedFetch.partition;
-        final long fetchOffset = completedFetch.nextFetchOffset();
-
-        // we are interested in this fetch only if the beginning offset matches the
-        // current consumed position
-        SubscriptionState.FetchPosition position = subscriptions.position(tp);
-        if (position == null || position.offset != fetchOffset) {
-            log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
-                    "the expected offset {}", tp, fetchOffset, position);
-            return null;
-        }
-
-        final FetchResponseData.PartitionData partition = completedFetch.partitionData;
-        log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
-                FetchResponse.recordsSize(partition), tp, position);
-        Iterator<? extends RecordBatch> batches = FetchResponse.recordsOrFail(partition).batches().iterator();
-
-        if (!batches.hasNext() && FetchResponse.recordsSize(partition) > 0) {
-            if (completedFetch.requestVersion < 3) {
-                // Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
-                Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
-                throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " +
-                        recordTooLargePartitions + " whose size is larger than the fetch size " + fetchConfig.fetchSize +
-                        " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " +
-                        "newer to avoid this issue. Alternately, increase the fetch size on the client (using " +
-                        ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
-                        recordTooLargePartitions);
-            } else {
-                // This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74)
-                throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +
-                        fetchOffset + ". Received a non-empty fetch response from the server, but no " +
-                        "complete records were found.");
-            }
-        }
-
-        if (partition.highWatermark() >= 0) {
-            log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark());
-            subscriptions.updateHighWatermark(tp, partition.highWatermark());
-        }
-
-        if (partition.logStartOffset() >= 0) {
-            log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset());
-            subscriptions.updateLogStartOffset(tp, partition.logStartOffset());
-        }
-
-        if (partition.lastStableOffset() >= 0) {
-            log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset());
-            subscriptions.updateLastStableOffset(tp, partition.lastStableOffset());
-        }
-
-        if (FetchResponse.isPreferredReplica(partition)) {
-            subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica(), () -> {
-                long expireTimeMs = time.milliseconds() + metadata.metadataExpireMs();
-                log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}",
-                        tp, partition.preferredReadReplica(), expireTimeMs);
-                return expireTimeMs;
-            });
-        }
-
-        completedFetch.setInitialized();
-        return completedFetch;
-    }
-
-    private void handleInitializeCompletedFetchErrors(final CompletedFetch<K, V> completedFetch,
-                                                      final Errors error) {
-        final TopicPartition tp = completedFetch.partition;
-        final long fetchOffset = completedFetch.nextFetchOffset();
-
-        if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
-                error == Errors.REPLICA_NOT_AVAILABLE ||
-                error == Errors.KAFKA_STORAGE_ERROR ||
-                error == Errors.FENCED_LEADER_EPOCH ||
-                error == Errors.OFFSET_NOT_AVAILABLE) {
-            log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
-            requestMetadataUpdate(tp);
-        } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-            log.warn("Received unknown topic or partition error in fetch for partition {}", tp);
-            requestMetadataUpdate(tp);
-        } else if (error == Errors.UNKNOWN_TOPIC_ID) {
-            log.warn("Received unknown topic ID error in fetch for partition {}", tp);
-            requestMetadataUpdate(tp);
-        } else if (error == Errors.INCONSISTENT_TOPIC_ID) {
-            log.warn("Received inconsistent topic ID error in fetch for partition {}", tp);
-            requestMetadataUpdate(tp);
-        } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
-            Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);
-
-            if (!clearedReplicaId.isPresent()) {
-                // If there's no preferred replica to clear, we're fetching from the leader so handle this error normally
-                SubscriptionState.FetchPosition position = subscriptions.position(tp);
-
-                if (position == null || fetchOffset != position.offset) {
-                    log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
-                            "does not match the current offset {}", tp, fetchOffset, position);
-                } else {
-                    handleOffsetOutOfRange(position, tp);
-                }
-            } else {
-                log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}",
-                        clearedReplicaId.get(), tp, error, fetchOffset);
-            }
-        } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
-            //we log the actual partition and not just the topic to help with ACL propagation issues in large clusters
-            log.warn("Not authorized to read from partition {}.", tp);
-            throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
-        } else if (error == Errors.UNKNOWN_LEADER_EPOCH) {
-            log.debug("Received unknown leader epoch error in fetch for partition {}", tp);
-        } else if (error == Errors.UNKNOWN_SERVER_ERROR) {
-            log.warn("Unknown server error while fetching offset {} for topic-partition {}",
-                    fetchOffset, tp);
-        } else if (error == Errors.CORRUPT_MESSAGE) {
-            throw new KafkaException("Encountered corrupt message when fetching offset "
-                    + fetchOffset
-                    + " for topic-partition "
-                    + tp);
-        } else {
-            throw new IllegalStateException("Unexpected error code "
-                    + error.code()
-                    + " while fetching at offset "
-                    + fetchOffset
-                    + " from topic-partition " + tp);
-        }
-    }
-
-    private void handleOffsetOutOfRange(final SubscriptionState.FetchPosition fetchPosition,
-                                        final TopicPartition topicPartition) {
-        String errorMessage = "Fetch position " + fetchPosition + " is out of range for partition " + topicPartition;
-
-        if (subscriptions.hasDefaultOffsetResetPolicy()) {
-            log.info("{}, resetting offset", errorMessage);
-            subscriptions.requestOffsetReset(topicPartition);
-        } else {
-            log.info("{}, raising error to the application since no reset policy is configured", errorMessage);
-            throw new OffsetOutOfRangeException(errorMessage,
-                    Collections.singletonMap(topicPartition, fetchPosition.offset));
-        }
-    }
-
-    /**
-     * Clear the buffered data which are not a part of newly assigned partitions. Any previously
-     * {@link CompletedFetch fetched data} is dropped if it is for a partition that is no longer in
-     * {@code assignedPartitions}.
-     *
-     * @param assignedPartitions Newly-assigned {@link TopicPartition}
-     */
-    public void clearBufferedDataForUnassignedPartitions(final Collection<TopicPartition> assignedPartitions) {
-        final Iterator<CompletedFetch<K, V>> completedFetchesItr = completedFetches.iterator();
-
-        while (completedFetchesItr.hasNext()) {
-            final CompletedFetch<K, V> completedFetch = completedFetchesItr.next();
-            final TopicPartition tp = completedFetch.partition;
-
-            if (!assignedPartitions.contains(tp)) {
-                log.debug("Removing {} from buffered data as it is no longer an assigned partition", tp);
-                completedFetch.drain();
-                completedFetchesItr.remove();
-            }
-        }
-
-        if (nextInLineFetch != null && !assignedPartitions.contains(nextInLineFetch.partition)) {
-            nextInLineFetch.drain();
-            nextInLineFetch = null;
-        }
-    }
-
-    /**
-     * Clear the buffered data which are not a part of newly assigned topics
-     *
-     * @param assignedTopics  newly assigned topics
-     */
-    public void clearBufferedDataForUnassignedTopics(Collection<String> assignedTopics) {
-        final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
-
-        for (TopicPartition tp : subscriptions.assignedPartitions()) {
-            if (assignedTopics.contains(tp.topic())) {
-                currentTopicPartitions.add(tp);
-            }
-        }
-
-        clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
-    }
-
-    protected FetchSessionHandler sessionHandler(int node) {
-        return sessionHandlers.get(node);
-    }
-
-    // Visible for testing
-    void maybeCloseFetchSessions(final Timer timer) {
-        final Cluster cluster = metadata.fetch();
+    protected void maybeCloseFetchSessions(final Timer timer) {
         final List<RequestFuture<ClientResponse>> requestFutures = new ArrayList<>();
+        Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareCloseFetchSessionRequests();
 
-        sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
-            // set the session handler to notify close. This will set the next metadata request to send close message.
-            sessionHandler.notifyClose();
-
-            final int sessionId = sessionHandler.sessionId();
-            // FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will
-            // skip sending the close request.
-            final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
-            if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
-                log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget);
-                return;
-            }
-
-            final FetchRequest.Builder request = createFetchRequest(fetchTarget, sessionHandler.newBuilder().build());
+        for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
+            final Node fetchTarget = entry.getKey();
+            final FetchSessionHandler.FetchRequestData data = entry.getValue();
+            final FetchRequest.Builder request = createFetchRequest(fetchTarget, data);
             final RequestFuture<ClientResponse> responseFuture = client.send(fetchTarget, request);
 
             responseFuture.addListener(new RequestFutureListener<ClientResponse>() {
                 @Override
                 public void onSuccess(ClientResponse value) {
-                    log.debug("Successfully sent a close message for fetch session: {} to node: {}", sessionId, fetchTarget);
+                    handleCloseFetchSessionResponse(fetchTarget, data);
                 }
 
                 @Override
                 public void onFailure(RuntimeException e) {
-                    log.debug("Unable to a close message for fetch session: {} to node: {}. " +
-                            "This may result in unnecessary fetch sessions at the broker.", sessionId, fetchTarget, e);
+                    handleCloseFetchSessionResponse(fetchTarget, data, e);
                 }
             });
 
             requestFutures.add(responseFuture);
-        });
+        }
 
         // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until
         // all requests have received a response.
@@ -765,27 +452,35 @@ public abstract class AbstractFetch<K, V> implements Closeable {
         }
     }
 
-    public void close(final Timer timer) {
-        // we do not need to re-enable wakeups since we are closing already
-        client.disableWakeups();
-
-        if (nextInLineFetch != null) {
-            nextInLineFetch.drain();
-            nextInLineFetch = null;
-        }
+    // Visible for testing
+    protected FetchSessionHandler sessionHandler(int node) {
+        return sessionHandlers.get(node);
+    }
 
+    /**
+     * This method is called by {@link #close(Timer)} which is guarded by the {@link IdempotentCloser}) such as to only
+     * be executed once the first time that any of the {@link #close()} methods are called. Subclasses can override
+     * this method without the need for extra synchronization at the instance level.
+     *
+     * @param timer Timer to enforce time limit
+     */
+    // Visible for testing
+    protected void closeInternal(Timer timer) {
+        // we do not need to re-enable wake-ups since we are closing already
+        client.disableWakeups();
         maybeCloseFetchSessions(timer);
+        Utils.closeQuietly(fetchBuffer, "fetchBuffer");
         Utils.closeQuietly(decompressionBufferSupplier, "decompressionBufferSupplier");
-        sessionHandlers.clear();
+    }
+
+    public void close(final Timer timer) {
+        idempotentCloser.close(() -> {
+            closeInternal(timer);
+        });
     }
 
     @Override
     public void close() {
         close(time.timer(0));
     }
-
-    private void requestMetadataUpdate(final TopicPartition topicPartition) {
-        metadata.requestUpdate(false);
-        subscriptions.clearPreferredReadReplica(topicPartition);
-    }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
index 26a134c1b8d..7a8ee105157 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
@@ -52,12 +52,10 @@ import java.util.Set;
 
 /**
  * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
- * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
- *
- * @param <K> Record key type
- * @param <V> Record value type
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to
+ * {@link #fetchRecords(FetchConfig, int)}.
  */
-public class CompletedFetch<K, V> {
+public class CompletedFetch {
 
     final TopicPartition partition;
     final FetchResponseData.PartitionData partitionData;
@@ -65,7 +63,6 @@ public class CompletedFetch<K, V> {
 
     private final Logger log;
     private final SubscriptionState subscriptions;
-    private final FetchConfig<K, V> fetchConfig;
     private final BufferSupplier decompressionBufferSupplier;
     private final Iterator<? extends RecordBatch> batches;
     private final Set<Long> abortedProducerIds;
@@ -86,7 +83,6 @@ public class CompletedFetch<K, V> {
 
     CompletedFetch(LogContext logContext,
                    SubscriptionState subscriptions,
-                   FetchConfig<K, V> fetchConfig,
                    BufferSupplier decompressionBufferSupplier,
                    TopicPartition partition,
                    FetchResponseData.PartitionData partitionData,
@@ -95,7 +91,6 @@ public class CompletedFetch<K, V> {
                    short requestVersion) {
         this.log = logContext.logger(CompletedFetch.class);
         this.subscriptions = subscriptions;
-        this.fetchConfig = fetchConfig;
         this.decompressionBufferSupplier = decompressionBufferSupplier;
         this.partition = partition;
         this.partitionData = partitionData;
@@ -140,7 +135,7 @@ public class CompletedFetch<K, V> {
     /**
      * Draining a {@link CompletedFetch} will signal that the data has been consumed and the underlying resources
      * are closed. This is somewhat analogous to {@link Closeable#close() closing}, though no error will result if a
-     * caller invokes {@link #fetchRecords(int)}; an empty {@link List list} will be returned instead.
+     * caller invokes {@link #fetchRecords(FetchConfig, int)}; an empty {@link List list} will be returned instead.
      */
     void drain() {
         if (!isConsumed) {
@@ -156,7 +151,7 @@ public class CompletedFetch<K, V> {
         }
     }
 
-    private void maybeEnsureValid(RecordBatch batch) {
+    private <K, V> void maybeEnsureValid(FetchConfig<K, V> fetchConfig, RecordBatch batch) {
         if (fetchConfig.checkCrcs && batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
             try {
                 batch.ensureValid();
@@ -167,7 +162,7 @@ public class CompletedFetch<K, V> {
         }
     }
 
-    private void maybeEnsureValid(Record record) {
+    private <K, V> void maybeEnsureValid(FetchConfig<K, V> fetchConfig, Record record) {
         if (fetchConfig.checkCrcs) {
             try {
                 record.ensureValid();
@@ -185,7 +180,7 @@ public class CompletedFetch<K, V> {
         }
     }
 
-    private Record nextFetchedRecord() {
+    private <K, V> Record nextFetchedRecord(FetchConfig<K, V> fetchConfig) {
         while (true) {
             if (records == null || !records.hasNext()) {
                 maybeCloseRecordStream();
@@ -204,7 +199,7 @@ public class CompletedFetch<K, V> {
 
                 currentBatch = batches.next();
                 lastEpoch = maybeLeaderEpoch(currentBatch.partitionLeaderEpoch());
-                maybeEnsureValid(currentBatch);
+                maybeEnsureValid(fetchConfig, currentBatch);
 
                 if (fetchConfig.isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
                     // remove from the aborted transaction queue all aborted transactions which have begun
@@ -230,7 +225,7 @@ public class CompletedFetch<K, V> {
                 // skip any records out of range
                 if (record.offset() >= nextFetchOffset) {
                     // we only do validation when the message should not be skipped.
-                    maybeEnsureValid(record);
+                    maybeEnsureValid(fetchConfig, record);
 
                     // control records are not returned to the user
                     if (!currentBatch.isControlBatch()) {
@@ -250,10 +245,11 @@ public class CompletedFetch<K, V> {
      * {@link Deserializer deserialization} of the {@link Record record's} key and value are performed in
      * this step.
      *
+     * @param fetchConfig {@link FetchConfig Configuration} to use, including, but not limited to, {@link Deserializer}s
      * @param maxRecords The number of records to return; the number returned may be {@code 0 <= maxRecords}
      * @return {@link ConsumerRecord Consumer records}
      */
-    List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
+    <K, V> List<ConsumerRecord<K, V>> fetchRecords(FetchConfig<K, V> fetchConfig, int maxRecords) {
         // Error when fetching the next record before deserialization.
         if (corruptLastRecord)
             throw new KafkaException("Received exception when fetching the next record from " + partition
@@ -271,7 +267,7 @@ public class CompletedFetch<K, V> {
                 // use the last record to do deserialization again.
                 if (cachedRecordException == null) {
                     corruptLastRecord = true;
-                    lastRecord = nextFetchedRecord();
+                    lastRecord = nextFetchedRecord(fetchConfig);
                     corruptLastRecord = false;
                 }
 
@@ -280,7 +276,7 @@ public class CompletedFetch<K, V> {
 
                 Optional<Integer> leaderEpoch = maybeLeaderEpoch(currentBatch.partitionLeaderEpoch());
                 TimestampType timestampType = currentBatch.timestampType();
-                ConsumerRecord<K, V> record = parseRecord(partition, leaderEpoch, timestampType, lastRecord);
+                ConsumerRecord<K, V> record = parseRecord(fetchConfig, partition, leaderEpoch, timestampType, lastRecord);
                 records.add(record);
                 recordsRead++;
                 bytesRead += lastRecord.sizeInBytes();
@@ -306,10 +302,11 @@ public class CompletedFetch<K, V> {
     /**
      * Parse the record entry, deserializing the key / value fields if necessary
      */
-    ConsumerRecord<K, V> parseRecord(TopicPartition partition,
-                                     Optional<Integer> leaderEpoch,
-                                     TimestampType timestampType,
-                                     Record record) {
+    <K, V> ConsumerRecord<K, V> parseRecord(FetchConfig<K, V> fetchConfig,
+                                            TopicPartition partition,
+                                            Optional<Integer> leaderEpoch,
+                                            TimestampType timestampType,
+                                            Record record) {
         try {
             long offset = record.offset();
             long timestamp = record.timestamp();
@@ -324,6 +321,7 @@ public class CompletedFetch<K, V> {
                     valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(),
                     key, value, headers, leaderEpoch);
         } catch (RuntimeException e) {
+            log.error("Deserializers with error: {}", fetchConfig.deserializers);
             throw new RecordDeserializationException(partition, record.offset(),
                     "Error deserializing key/value for partition " + partition +
                             " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 97c64f93750..81b6d9a1b7a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -586,7 +586,7 @@ public class ConsumerNetworkClient implements Closeable {
     public void tryConnect(Node node) {
         lock.lock();
         try {
-            client.ready(node, time.milliseconds());
+            NetworkClientUtils.tryConnect(client, node, time);
         } finally {
             lock.unlock();
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
index 5e6ff77b64a..01f55a66cc3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
@@ -26,6 +26,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerInterceptor;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -34,6 +38,7 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
 
 import java.util.Collections;
 import java.util.List;
@@ -41,6 +46,8 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 public final class ConsumerUtils {
@@ -141,4 +148,22 @@ public final class ConsumerUtils {
         return (List<ConsumerInterceptor<K, V>>) ClientUtils.configuredInterceptors(config, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
     }
 
+    public static <T> T getResult(CompletableFuture<T> future, Timer timer) {
+        try {
+            return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+
+            if (t instanceof WakeupException)
+                throw new WakeupException();
+            else if (t instanceof KafkaException)
+                throw (KafkaException) t;
+            else
+                throw new KafkaException(t);
+        } catch (InterruptedException e) {
+            throw new InterruptException(e);
+        } catch (java.util.concurrent.TimeoutException e) {
+            throw new TimeoutException(e);
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
index 1e2c8a7935f..9b4cd89361f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
@@ -165,6 +165,7 @@ public class DefaultBackgroundThread extends KafkaThread {
                             retryBackoffMs,
                             requestTimeoutMs,
                             apiVersions,
+                            networkClientDelegate,
                             logContext);
             CoordinatorRequestManager coordinatorRequestManager = null;
             CommitRequestManager commitRequestManager = null;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
index 9d8871ed46c..4d620a4ffac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
@@ -43,6 +43,7 @@ import java.util.concurrent.LinkedBlockingQueue;
  * {@code BackgroundEvent} from the {@link DefaultBackgroundThread}.
  */
 public class DefaultEventHandler implements EventHandler {
+
     private final BlockingQueue<ApplicationEvent> applicationEventQueue;
     private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
     private final DefaultBackgroundThread backgroundThread;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java
new file mode 100644
index 00000000000..d82b37ad99a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.IdempotentCloser;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Predicate;
+
+/**
+ * {@code FetchBuffer} buffers up {@link CompletedFetch the results} from the broker responses as they are received.
+ * It is essentially a wrapper around a {@link java.util.Queue} of {@link CompletedFetch}. There is at most one
+ * {@link CompletedFetch} per partition in the queue.
+ *
+ * <p/>
+ *
+ * <em>Note</em>: this class is not thread-safe and is intended to only be used from a single thread.
+ */
+public class FetchBuffer implements Closeable {
+
+    private final Logger log;
+    private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
+    private final IdempotentCloser idempotentCloser = new IdempotentCloser();
+
+    private CompletedFetch nextInLineFetch;
+
+    public FetchBuffer(final LogContext logContext) {
+        this.log = logContext.logger(FetchBuffer.class);
+        this.completedFetches = new ConcurrentLinkedQueue<>();
+    }
+
+    /**
+     * Returns {@code true} if there are no completed fetches pending to return to the user.
+     *
+     * @return {@code true} if the buffer is empty, {@code false} otherwise
+     */
+    boolean isEmpty() {
+        return completedFetches.isEmpty();
+    }
+
+    /**
+     * Return whether we have any completed fetches pending return to the user. This method is thread-safe. Has
+     * visibility for testing.
+     *
+     * @return {@code true} if there are completed fetches that match the {@link Predicate}, {@code false} otherwise
+     */
+    boolean hasCompletedFetches(Predicate<CompletedFetch> predicate) {
+        return completedFetches.stream().anyMatch(predicate);
+    }
+
+    void add(CompletedFetch completedFetch) {
+        completedFetches.add(completedFetch);
+    }
+
+    void addAll(Collection<CompletedFetch> completedFetches) {
+        this.completedFetches.addAll(completedFetches);
+    }
+
+    CompletedFetch nextInLineFetch() {
+        return nextInLineFetch;
+    }
+
+    void setNextInLineFetch(CompletedFetch completedFetch) {
+        this.nextInLineFetch = completedFetch;
+    }
+
+    CompletedFetch peek() {
+        return completedFetches.peek();
+    }
+
+    CompletedFetch poll() {
+        return completedFetches.poll();
+    }
+
+    /**
+     * Updates the buffer to retain only the fetch data that corresponds to the given partitions. Any previously
+     * {@link CompletedFetch fetched data} is removed if its partition is not in the given set of partitions.
+     *
+     * @param partitions {@link Set} of {@link TopicPartition}s for which any buffered data should be kept
+     */
+    void retainAll(final Set<TopicPartition> partitions) {
+        completedFetches.removeIf(cf -> maybeDrain(partitions, cf));
+
+        if (maybeDrain(partitions, nextInLineFetch))
+            nextInLineFetch = null;
+    }
+
+    private boolean maybeDrain(final Set<TopicPartition> partitions, final CompletedFetch completedFetch) {
+        if (completedFetch != null && !partitions.contains(completedFetch.partition)) {
+            log.debug("Removing {} from buffered fetch data as it is not in the set of partitions to retain ({})", completedFetch.partition, partitions);
+            completedFetch.drain();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * Return the set of {@link TopicPartition partitions} for which we have data in the buffer.
+     *
+     * @return {@link TopicPartition Partition} set
+     */
+    Set<TopicPartition> bufferedPartitions() {
+        final Set<TopicPartition> partitions = new HashSet<>();
+
+        if (nextInLineFetch != null && !nextInLineFetch.isConsumed()) {
+            partitions.add(nextInLineFetch.partition);
+        }
+
+        completedFetches.forEach(cf -> partitions.add(cf.partition));
+        return partitions;
+    }
+
+    @Override
+    public void close() {
+        idempotentCloser.close(() -> {
+            log.debug("Closing the fetch buffer");
+
+            if (nextInLineFetch != null) {
+                nextInLineFetch.drain();
+                nextInLineFetch = null;
+            }
+
+            completedFetches.forEach(CompletedFetch::drain);
+            completedFetches.clear();
+        }, () -> log.warn("The fetch buffer was previously closed"));
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
new file mode 100644
index 00000000000..0e1f2f18a3f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+
+import static org.apache.kafka.clients.consumer.internals.FetchUtils.requestMetadataUpdate;
+
+/**
+ * {@code FetchCollector} operates at the {@link RecordBatch} level, as that is what is stored in the
+ * {@link FetchBuffer}. Each {@link org.apache.kafka.common.record.Record} in the {@link RecordBatch} is converted
+ * to a {@link ConsumerRecord} and added to the returned {@link Fetch}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+public class FetchCollector<K, V> {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final FetchConfig<K, V> fetchConfig;
+    private final FetchMetricsManager metricsManager;
+    private final Time time;
+
+    public FetchCollector(final LogContext logContext,
+                          final ConsumerMetadata metadata,
+                          final SubscriptionState subscriptions,
+                          final FetchConfig<K, V> fetchConfig,
+                          final FetchMetricsManager metricsManager,
+                          final Time time) {
+        this.log = logContext.logger(FetchCollector.class);
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.fetchConfig = fetchConfig;
+        this.metricsManager = metricsManager;
+        this.time = time;
+    }
+
+    /**
+     * Return the fetched {@link ConsumerRecord records}, empty the {@link FetchBuffer record buffer}, and
+     * update the consumed position.
+     *
+     * </p>
+     *
+     * NOTE: returning an {@link Fetch#empty() empty} fetch guarantees the consumed position is not updated.
+     *
+     * @param fetchBuffer {@link FetchBuffer} from which to retrieve the {@link ConsumerRecord records}
+     *
+     * @return A {@link Fetch} for the requested partitions
+     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
+     *         the defaultResetPolicy is NONE
+     * @throws TopicAuthorizationException If there is TopicAuthorization error in fetchResponse.
+     */
+    public Fetch<K, V> collectFetch(final FetchBuffer fetchBuffer) {
+        final Fetch<K, V> fetch = Fetch.empty();
+        final Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();
+        int recordsRemaining = fetchConfig.maxPollRecords;
+
+        try {
+            while (recordsRemaining > 0) {
+                final CompletedFetch nextInLineFetch = fetchBuffer.nextInLineFetch();
+
+                if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
+                    final CompletedFetch completedFetch = fetchBuffer.peek();
+
+                    if (completedFetch == null)
+                        break;
+
+                    if (!completedFetch.isInitialized()) {
+                        try {
+                            fetchBuffer.setNextInLineFetch(initialize(completedFetch));
+                        } catch (Exception e) {
+                            // Remove a completedFetch upon a parse with exception if (1) it contains no completedFetch, and
+                            // (2) there are no fetched completedFetch with actual content preceding this exception.
+                            // The first condition ensures that the completedFetches is not stuck with the same completedFetch
+                            // in cases such as the TopicAuthorizationException, and the second condition ensures that no
+                            // potential data loss due to an exception in a following record.
+                            if (fetch.isEmpty() && FetchResponse.recordsOrFail(completedFetch.partitionData).sizeInBytes() == 0)
+                                fetchBuffer.poll();
+
+                            throw e;
+                        }
+                    } else {
+                        fetchBuffer.setNextInLineFetch(completedFetch);
+                    }
+
+                    fetchBuffer.poll();
+                } else if (subscriptions.isPaused(nextInLineFetch.partition)) {
+                    // when the partition is paused we add the records back to the completedFetches queue instead of draining
+                    // them so that they can be returned on a subsequent poll if the partition is resumed at that time
+                    log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);
+                    pausedCompletedFetches.add(nextInLineFetch);
+                    fetchBuffer.setNextInLineFetch(null);
+                } else {
+                    final Fetch<K, V> nextFetch = fetchRecords(nextInLineFetch);
+                    recordsRemaining -= nextFetch.numRecords();
+                    fetch.add(nextFetch);
+                }
+            }
+        } catch (KafkaException e) {
+            if (fetch.isEmpty())
+                throw e;
+        } finally {
+            // add any polled completed fetches for paused partitions back to the completed fetches queue to be
+            // re-evaluated in the next poll
+            fetchBuffer.addAll(pausedCompletedFetches);
+        }
+
+        return fetch;
+    }
+
+    private Fetch<K, V> fetchRecords(final CompletedFetch nextInLineFetch) {
+        final TopicPartition tp = nextInLineFetch.partition;
+
+        if (!subscriptions.isAssigned(tp)) {
+            // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
+            log.debug("Not returning fetched records for partition {} since it is no longer assigned", tp);
+        } else if (!subscriptions.isFetchable(tp)) {
+            // this can happen when a partition is paused before fetched records are returned to the consumer's
+            // poll call or if the offset is being reset
+            log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", tp);
+        } else {
+            SubscriptionState.FetchPosition position = subscriptions.position(tp);
+
+            if (position == null)
+                throw new IllegalStateException("Missing position for fetchable partition " + tp);
+
+            if (nextInLineFetch.nextFetchOffset() == position.offset) {
+                List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(fetchConfig, fetchConfig.maxPollRecords);
+
+                log.trace("Returning {} fetched records at offset {} for assigned partition {}",
+                        partRecords.size(), position, tp);
+
+                boolean positionAdvanced = false;
+
+                if (nextInLineFetch.nextFetchOffset() > position.offset) {
+                    SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
+                            nextInLineFetch.nextFetchOffset(),
+                            nextInLineFetch.lastEpoch(),
+                            position.currentLeader);
+                    log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",
+                            position, nextPosition, tp, partRecords.size());
+                    subscriptions.position(tp, nextPosition);
+                    positionAdvanced = true;
+                }
+
+                Long partitionLag = subscriptions.partitionLag(tp, fetchConfig.isolationLevel);
+                if (partitionLag != null)
+                    metricsManager.recordPartitionLag(tp, partitionLag);
+
+                Long lead = subscriptions.partitionLead(tp);
+                if (lead != null) {
+                    metricsManager.recordPartitionLead(tp, lead);
+                }
+
+                return Fetch.forPartition(tp, partRecords, positionAdvanced);
+            } else {
+                // these records aren't next in line based on the last consumed position, ignore them
+                // they must be from an obsolete request
+                log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
+                        tp, nextInLineFetch.nextFetchOffset(), position);
+            }
+        }
+
+        log.trace("Draining fetched records for partition {}", tp);
+        nextInLineFetch.drain();
+
+        return Fetch.empty();
+    }
+
+    /**
+     * Initialize a CompletedFetch object.
+     */
+    protected CompletedFetch initialize(final CompletedFetch completedFetch) {
+        final TopicPartition tp = completedFetch.partition;
+        final Errors error = Errors.forCode(completedFetch.partitionData.errorCode());
+        boolean recordMetrics = true;
+
+        try {
+            if (!subscriptions.hasValidPosition(tp)) {
+                // this can happen when a rebalance happened while fetch is still in-flight
+                log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp);
+                return null;
+            } else if (error == Errors.NONE) {
+                final CompletedFetch ret = handleInitializeSuccess(completedFetch);
+                recordMetrics = ret == null;
+                return ret;
+            } else {
+                handleInitializeErrors(completedFetch, error);
+                return null;
+            }
+        } finally {
+            if (recordMetrics) {
+                completedFetch.recordAggregatedMetrics(0, 0);
+            }
+
+            if (error != Errors.NONE)
+                // we move the partition to the end if there was an error. This way, it's more likely that partitions for
+                // the same topic can remain together (allowing for more efficient serialization).
+                subscriptions.movePartitionToEnd(tp);
+        }
+    }
+
+    private CompletedFetch handleInitializeSuccess(final CompletedFetch completedFetch) {
+        final TopicPartition tp = completedFetch.partition;
+        final long fetchOffset = completedFetch.nextFetchOffset();
+
+        // we are interested in this fetch only if the beginning offset matches the
+        // current consumed position
+        SubscriptionState.FetchPosition position = subscriptions.position(tp);
+        if (position == null || position.offset != fetchOffset) {
+            log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
+                    "the expected offset {}", tp, fetchOffset, position);
+            return null;
+        }
+
+        final FetchResponseData.PartitionData partition = completedFetch.partitionData;
+        log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
+                FetchResponse.recordsSize(partition), tp, position);
+        Iterator<? extends RecordBatch> batches = FetchResponse.recordsOrFail(partition).batches().iterator();
+
+        if (!batches.hasNext() && FetchResponse.recordsSize(partition) > 0) {
+            if (completedFetch.requestVersion < 3) {
+                // Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
+                Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
+                throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " +
+                        recordTooLargePartitions + " whose size is larger than the fetch size " + fetchConfig.fetchSize +
+                        " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " +
+                        "newer to avoid this issue. Alternately, increase the fetch size on the client (using " +
+                        ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
+                        recordTooLargePartitions);
+            } else {
+                // This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74)
+                throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +
+                        fetchOffset + ". Received a non-empty fetch response from the server, but no " +
+                        "complete records were found.");
+            }
+        }
+
+        if (partition.highWatermark() >= 0) {
+            log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark());
+            subscriptions.updateHighWatermark(tp, partition.highWatermark());
+        }
+
+        if (partition.logStartOffset() >= 0) {
+            log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset());
+            subscriptions.updateLogStartOffset(tp, partition.logStartOffset());
+        }
+
+        if (partition.lastStableOffset() >= 0) {
+            log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset());
+            subscriptions.updateLastStableOffset(tp, partition.lastStableOffset());
+        }
+
+        if (FetchResponse.isPreferredReplica(partition)) {
+            subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica(), () -> {
+                long expireTimeMs = time.milliseconds() + metadata.metadataExpireMs();
+                log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}",
+                        tp, partition.preferredReadReplica(), expireTimeMs);
+                return expireTimeMs;
+            });
+        }
+
+        completedFetch.setInitialized();
+        return completedFetch;
+    }
+
+    private void handleInitializeErrors(final CompletedFetch completedFetch, final Errors error) {
+        final TopicPartition tp = completedFetch.partition;
+        final long fetchOffset = completedFetch.nextFetchOffset();
+
+        if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
+                error == Errors.REPLICA_NOT_AVAILABLE ||
+                error == Errors.KAFKA_STORAGE_ERROR ||
+                error == Errors.FENCED_LEADER_EPOCH ||
+                error == Errors.OFFSET_NOT_AVAILABLE) {
+            log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
+            requestMetadataUpdate(metadata, subscriptions, tp);
+        } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+            log.warn("Received unknown topic or partition error in fetch for partition {}", tp);
+            requestMetadataUpdate(metadata, subscriptions, tp);
+        } else if (error == Errors.UNKNOWN_TOPIC_ID) {
+            log.warn("Received unknown topic ID error in fetch for partition {}", tp);
+            requestMetadataUpdate(metadata, subscriptions, tp);
+        } else if (error == Errors.INCONSISTENT_TOPIC_ID) {
+            log.warn("Received inconsistent topic ID error in fetch for partition {}", tp);
+            requestMetadataUpdate(metadata, subscriptions, tp);
+        } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
+            Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);
+
+            if (!clearedReplicaId.isPresent()) {
+                // If there's no preferred replica to clear, we're fetching from the leader so handle this error normally
+                SubscriptionState.FetchPosition position = subscriptions.position(tp);
+
+                if (position == null || fetchOffset != position.offset) {
+                    log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
+                            "does not match the current offset {}", tp, fetchOffset, position);
+                } else {
+                    String errorMessage = "Fetch position " + position + " is out of range for partition " + tp;
+
+                    if (subscriptions.hasDefaultOffsetResetPolicy()) {
+                        log.info("{}, resetting offset", errorMessage);
+                        subscriptions.requestOffsetReset(tp);
+                    } else {
+                        log.info("{}, raising error to the application since no reset policy is configured", errorMessage);
+                        throw new OffsetOutOfRangeException(errorMessage,
+                                Collections.singletonMap(tp, position.offset));
+                    }
+                }
+            } else {
+                log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}",
+                        clearedReplicaId.get(), tp, error, fetchOffset);
+            }
+        } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+            //we log the actual partition and not just the topic to help with ACL propagation issues in large clusters
+            log.warn("Not authorized to read from partition {}.", tp);
+            throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
+        } else if (error == Errors.UNKNOWN_LEADER_EPOCH) {
+            log.debug("Received unknown leader epoch error in fetch for partition {}", tp);
+        } else if (error == Errors.UNKNOWN_SERVER_ERROR) {
+            log.warn("Unknown server error while fetching offset {} for topic-partition {}",
+                    fetchOffset, tp);
+        } else if (error == Errors.CORRUPT_MESSAGE) {
+            throw new KafkaException("Encountered corrupt message when fetching offset "
+                    + fetchOffset
+                    + " for topic-partition "
+                    + tp);
+        } else {
+            throw new IllegalStateException("Unexpected error code "
+                    + error.code()
+                    + " while fetching at offset "
+                    + fetchOffset
+                    + " from topic-partition " + tp);
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java
new file mode 100644
index 00000000000..0b2faa58e7d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * {@code FetchUtils} provides a place for disparate parts of the fetch logic to live.
+ */
+public class FetchUtils {
+
+    /**
+     * Performs two combined actions based on the state related to the {@link TopicPartition}:
+     *
+     * <ol>
+     *     <li>
+     *         Invokes {@link ConsumerMetadata#requestUpdate(boolean)} to signal that the metadata is incorrect and
+     *         needs to be updated
+     *     </li>
+     *     <li>
+     *         Invokes {@link SubscriptionState#clearPreferredReadReplica(TopicPartition)} to clear out any read replica
+     *         information that may be present.
+     *     </li>
+     * </ol>
+     *
+     * This utility method should be invoked if the client detects (or is told by a node in the broker) that an
+     * attempt was made to fetch from a node that isn't the leader or preferred replica.
+     *
+     * @param metadata {@link ConsumerMetadata} for which to request an update
+     * @param subscriptions {@link SubscriptionState} to clear any internal read replica node
+     * @param topicPartition {@link TopicPartition} for which this state change is related
+     */
+    static void requestMetadataUpdate(final ConsumerMetadata metadata,
+                                      final SubscriptionState subscriptions,
+                                      final TopicPartition topicPartition) {
+        metadata.requestUpdate(false);
+        subscriptions.clearPreferredReadReplica(topicPartition);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index c46f0c53cce..dec02c6b90f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -19,14 +19,14 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.FetchSessionHandler;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Timer;
-import org.slf4j.Logger;
 
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This class manages the fetching process with the brokers.
@@ -49,8 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class Fetcher<K, V> extends AbstractFetch<K, V> {
 
-    private final Logger log;
-    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+    private final FetchCollector<K, V> fetchCollector;
 
     public Fetcher(LogContext logContext,
                    ConsumerNetworkClient client,
@@ -60,7 +59,16 @@ public class Fetcher<K, V> extends AbstractFetch<K, V> {
                    FetchMetricsManager metricsManager,
                    Time time) {
         super(logContext, client, metadata, subscriptions, fetchConfig, metricsManager, time);
-        this.log = logContext.logger(Fetcher.class);
+        this.fetchCollector = new FetchCollector<>(logContext,
+                metadata,
+                subscriptions,
+                fetchConfig,
+                metricsManager,
+                time);
+    }
+
+    public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> assignedPartitions) {
+        fetchBuffer.retainAll(new HashSet<>(assignedPartitions));
     }
 
     /**
@@ -98,16 +106,7 @@ public class Fetcher<K, V> extends AbstractFetch<K, V> {
         return fetchRequestMap.size();
     }
 
-    public void close(final Timer timer) {
-        if (!isClosed.compareAndSet(false, true)) {
-            log.info("Fetcher {} is already closed.", this);
-            return;
-        }
-
-        // Shared states (e.g. sessionHandlers) could be accessed by multiple threads (such as heartbeat thread), hence,
-        // it is necessary to acquire a lock on the fetcher instance before modifying the states.
-        synchronized (this) {
-            super.close(timer);
-        }
+    public Fetch<K, V> collectFetch() {
+        return fetchCollector.collectFetch(fetchBuffer);
     }
-}
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 9fa8ad716cb..445005bf545 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.NetworkClientUtils;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Node;
@@ -34,11 +35,13 @@ import org.slf4j.Logger;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -51,6 +54,7 @@ public class NetworkClientDelegate implements AutoCloseable {
     private final int requestTimeoutMs;
     private final Queue<UnsentRequest> unsentRequests;
     private final long retryBackoffMs;
+    private final Set<Node> tryConnectNodes;
 
     public NetworkClientDelegate(
             final Time time,
@@ -63,6 +67,11 @@ public class NetworkClientDelegate implements AutoCloseable {
         this.unsentRequests = new ArrayDeque<>();
         this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
         this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.tryConnectNodes = new HashSet<>();
+    }
+
+    public void tryConnect(Node node) {
+        NetworkClientUtils.tryConnect(client, node, time);
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 10977b7f78e..a846f8c2bb2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -82,6 +82,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
     private final long requestTimeoutMs;
     private final Time time;
     private final ApiVersions apiVersions;
+    private final NetworkClientDelegate networkClientDelegate;
 
     public OffsetsRequestManager(final SubscriptionState subscriptionState,
                                  final ConsumerMetadata metadata,
@@ -90,12 +91,14 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
                                  final long retryBackoffMs,
                                  final long requestTimeoutMs,
                                  final ApiVersions apiVersions,
+                                 final NetworkClientDelegate networkClientDelegate,
                                  final LogContext logContext) {
         requireNonNull(subscriptionState);
         requireNonNull(metadata);
         requireNonNull(isolationLevel);
         requireNonNull(time);
         requireNonNull(apiVersions);
+        requireNonNull(networkClientDelegate);
         requireNonNull(logContext);
 
         this.metadata = metadata;
@@ -107,6 +110,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
         this.time = time;
         this.requestTimeoutMs = requestTimeoutMs;
         this.apiVersions = apiVersions;
+        this.networkClientDelegate = networkClientDelegate;
         this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptionState,
                 time, retryBackoffMs, apiVersions);
         // Register the cluster metadata update callback. Note this only relies on the
@@ -429,6 +433,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
 
             NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
             if (nodeApiVersions == null) {
+                networkClientDelegate.tryConnect(node);
                 return;
             }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index f6b8ede5745..98e63f2a00a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -364,17 +364,8 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
 
         final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(partitions);
         wakeupTrigger.setActiveTask(event.future());
-        eventHandler.add(event);
         try {
-            return event.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            throw new InterruptException(e);
-        } catch (TimeoutException e) {
-            throw new org.apache.kafka.common.errors.TimeoutException(e);
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof WakeupException)
-                throw new WakeupException();
-            throw new KafkaException(e);
+            return eventHandler.addAndGet(event, time.timer(timeout));
         } finally {
             wakeupTrigger.clearActiveTask();
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index eb1bffaf81d..65ba01959cd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -16,23 +16,51 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import java.util.Objects;
+
 /**
  * This is the abstract definition of the events created by the KafkaConsumer API
  */
-abstract public class ApplicationEvent {
-    public final Type type;
+public abstract class ApplicationEvent {
+
+    public enum Type {
+        NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE,
+        LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS,
+    }
+
+    private final Type type;
 
     protected ApplicationEvent(Type type) {
-        this.type = type;
+        this.type = Objects.requireNonNull(type);
+    }
+
+    public Type type() {
+        return type;
     }
 
     @Override
-    public String toString() {
-        return type + " ApplicationEvent";
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ApplicationEvent that = (ApplicationEvent) o;
+
+        return type == that.type;
     }
 
-    public enum Type {
-        NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE,
-        LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS,
+    @Override
+    public int hashCode() {
+        return type.hashCode();
+    }
+
+    protected String toStringBase() {
+        return "type=" + type;
+    }
+
+    @Override
+    public String toString() {
+        return "ApplicationEvent{" +
+                toStringBase() +
+                '}';
     }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 2cfbc2d04e4..234a228ba4f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.consumer.internals.events;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
-import org.apache.kafka.clients.consumer.internals.NoopBackgroundEvent;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
@@ -47,7 +46,7 @@ public class ApplicationEventProcessor {
 
     public boolean process(final ApplicationEvent event) {
         Objects.requireNonNull(event);
-        switch (event.type) {
+        switch (event.type()) {
             case NOOP:
                 return process((NoopApplicationEvent) event);
             case COMMIT:
@@ -78,7 +77,7 @@ public class ApplicationEventProcessor {
      * @param event a {@link NoopApplicationEvent}
      */
     private boolean process(final NoopApplicationEvent event) {
-        return backgroundEventQueue.add(new NoopBackgroundEvent(event.message));
+        return backgroundEventQueue.add(new NoopBackgroundEvent(event.message()));
     }
 
     private boolean process(final PollApplicationEvent event) {
@@ -87,7 +86,7 @@ public class ApplicationEventProcessor {
         }
 
         CommitRequestManager manager = requestManagers.commitRequestManager.get();
-        manager.updateAutoCommitTimer(event.pollTimeMs);
+        manager.updateAutoCommitTimer(event.pollTimeMs());
         return true;
     }
 
@@ -132,8 +131,8 @@ public class ApplicationEventProcessor {
             return false;
         }
         CommitRequestManager manager = requestManagers.commitRequestManager.get();
-        manager.updateAutoCommitTimer(event.currentTimeMs);
-        manager.maybeAutoCommit(event.offsets);
+        manager.updateAutoCommitTimer(event.currentTimeMs());
+        manager.maybeAutoCommit(event.offsets());
         return true;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
index 4346d96dbf3..ccf7199f260 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
@@ -19,15 +19,55 @@ package org.apache.kafka.clients.consumer.internals.events;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collections;
 import java.util.Map;
 
 public class AssignmentChangeApplicationEvent extends ApplicationEvent {
-    final Map<TopicPartition, OffsetAndMetadata> offsets;
-    final long currentTimeMs;
 
-    public AssignmentChangeApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, final long currentTimeMs) {
+    private final Map<TopicPartition, OffsetAndMetadata> offsets;
+    private final long currentTimeMs;
+
+    public AssignmentChangeApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets,
+                                            final long currentTimeMs) {
         super(Type.ASSIGNMENT_CHANGE);
-        this.offsets = offsets;
+        this.offsets = Collections.unmodifiableMap(offsets);
         this.currentTimeMs = currentTimeMs;
     }
+
+    public Map<TopicPartition, OffsetAndMetadata> offsets() {
+        return offsets;
+    }
+
+    public long currentTimeMs() {
+        return currentTimeMs;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+
+        AssignmentChangeApplicationEvent that = (AssignmentChangeApplicationEvent) o;
+
+        if (currentTimeMs != that.currentTimeMs) return false;
+        return offsets.equals(that.offsets);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + offsets.hashCode();
+        result = 31 * result + (int) (currentTimeMs ^ (currentTimeMs >>> 32));
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "AssignmentChangeApplicationEvent{" +
+                toStringBase() +
+                ", offsets=" + offsets +
+                ", currentTimeMs=" + currentTimeMs +
+                '}';
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index 722526f041a..b0f7c3454f6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -16,16 +16,50 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import java.util.Objects;
+
 /**
  * This is the abstract definition of the events created by the background thread.
  */
-abstract public class BackgroundEvent {
-    public final EventType type;
+public abstract class BackgroundEvent {
 
-    public BackgroundEvent(EventType type) {
-        this.type = type;
-    }
-    public enum EventType {
+    public enum Type {
         NOOP, ERROR,
     }
+
+    protected final Type type;
+
+    public BackgroundEvent(Type type) {
+        this.type = Objects.requireNonNull(type);
+    }
+
+    public Type type() {
+        return type;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        BackgroundEvent that = (BackgroundEvent) o;
+
+        return type == that.type;
+    }
+
+    @Override
+    public int hashCode() {
+        return type.hashCode();
+    }
+
+    protected String toStringBase() {
+        return "type=" + type;
+    }
+
+    @Override
+    public String toString() {
+        return "BackgroundEvent{" +
+                toStringBase() +
+                '}';
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
index 67c416a8a2f..5f9bad09326 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
@@ -19,46 +19,50 @@ package org.apache.kafka.clients.consumer.internals.events;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collections;
 import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
-public class CommitApplicationEvent extends ApplicationEvent {
-    final private CompletableFuture<Void> future;
-    final private Map<TopicPartition, OffsetAndMetadata> offsets;
+public class CommitApplicationEvent extends CompletableApplicationEvent<Void> {
+
+    private final Map<TopicPartition, OffsetAndMetadata> offsets;
 
     public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets) {
         super(Type.COMMIT);
-        this.offsets = offsets;
-        Optional<Exception> exception = isValid(offsets);
-        if (exception.isPresent()) {
-            throw new RuntimeException(exception.get());
+        this.offsets = Collections.unmodifiableMap(offsets);
+        for (OffsetAndMetadata offsetAndMetadata : offsets.values()) {
+            if (offsetAndMetadata.offset() < 0) {
+                throw new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset());
+            }
         }
-        this.future = new CompletableFuture<>();
-    }
-
-    public CompletableFuture<Void> future() {
-        return future;
     }
 
     public Map<TopicPartition, OffsetAndMetadata> offsets() {
         return offsets;
     }
 
-    private Optional<Exception> isValid(final Map<TopicPartition, OffsetAndMetadata> offsets) {
-        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
-            TopicPartition topicPartition = entry.getKey();
-            OffsetAndMetadata offsetAndMetadata = entry.getValue();
-            if (offsetAndMetadata.offset() < 0) {
-                return Optional.of(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()));
-            }
-        }
-        return Optional.empty();
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+
+        CommitApplicationEvent that = (CommitApplicationEvent) o;
+
+        return offsets.equals(that.offsets);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + offsets.hashCode();
+        return result;
     }
 
     @Override
     public String toString() {
-        return "CommitApplicationEvent("
-                + "offsets=" + offsets + ")";
+        return "CommitApplicationEvent{" +
+                toStringBase() +
+                ", offsets=" + offsets +
+                '}';
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
index 3bd862861a4..8146d9583ae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
@@ -16,14 +16,10 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
 import org.apache.kafka.common.utils.Timer;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Application event with a result in the form of a future, that can be retrieved within a
@@ -33,7 +29,7 @@ import java.util.concurrent.TimeUnit;
  */
 public abstract class CompletableApplicationEvent<T> extends ApplicationEvent {
 
-    protected final CompletableFuture<T> future;
+    private final CompletableFuture<T> future;
 
     protected CompletableApplicationEvent(Type type) {
         super(type);
@@ -45,20 +41,7 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent {
     }
 
     public T get(Timer timer) {
-        try {
-            return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-
-            if (t instanceof KafkaException)
-                throw (KafkaException) t;
-            else
-                throw new KafkaException(t);
-        } catch (InterruptedException e) {
-            throw new InterruptException(e);
-        } catch (java.util.concurrent.TimeoutException e) {
-            throw new TimeoutException(e);
-        }
+        return ConsumerUtils.getResult(future, timer);
     }
 
     public void chain(final CompletableFuture<T> providedFuture) {
@@ -89,11 +72,15 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent {
         return result;
     }
 
+    @Override
+    protected String toStringBase() {
+        return super.toStringBase() + ", future=" + future;
+    }
+
     @Override
     public String toString() {
         return getClass().getSimpleName() + "{" +
-                "future=" + future +
-                ", type=" + type +
+                toStringBase() +
                 '}';
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
index abc7b6b2639..4fc08290b71 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
@@ -17,10 +17,41 @@
 package org.apache.kafka.clients.consumer.internals.events;
 
 public class ErrorBackgroundEvent extends BackgroundEvent {
-    private final Throwable exception;
 
-    public ErrorBackgroundEvent(Throwable e) {
-        super(EventType.ERROR);
-        exception = e;
+    private final Throwable error;
+
+    public ErrorBackgroundEvent(Throwable error) {
+        super(Type.ERROR);
+        this.error = error;
+    }
+
+    public Throwable error() {
+        return error;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+
+        ErrorBackgroundEvent that = (ErrorBackgroundEvent) o;
+
+        return error.equals(that.error);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + error.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "ErrorBackgroundEvent{" +
+                toStringBase() +
+                ", error=" + error +
+                '}';
     }
-}
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
index 91b032c3478..2466d062726 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
@@ -86,7 +86,8 @@ public class ListOffsetsApplicationEvent extends CompletableApplicationEvent<Map
     @Override
     public String toString() {
         return getClass().getSimpleName() + " {" +
-                "timestampsToSearch=" + timestampsToSearch + ", " +
+                toStringBase() +
+                ", timestampsToSearch=" + timestampsToSearch + ", " +
                 "requireTimestamps=" + requireTimestamps + '}';
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
index 54cee4ee9de..8bd770f7f12 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
@@ -21,4 +21,11 @@ public class NewTopicsMetadataUpdateRequestEvent extends ApplicationEvent {
     public NewTopicsMetadataUpdateRequestEvent() {
         super(Type.METADATA_UPDATE);
     }
+
+    @Override
+    public String toString() {
+        return "NewTopicsMetadataUpdateRequestEvent{" +
+                toStringBase() +
+                '}';
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java
index 07524542d7d..22817fb2bae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java
@@ -16,19 +16,47 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import java.util.Objects;
+
 /**
- * The event is NoOp. This is intentionally left here for demonstration purpose.
+ * The event is a no-op, but is intentionally left here for demonstration and test purposes.
  */
 public class NoopApplicationEvent extends ApplicationEvent {
-    public final String message;
+
+    private final String message;
 
     public NoopApplicationEvent(final String message) {
         super(Type.NOOP);
-        this.message = message;
+        this.message = Objects.requireNonNull(message);
+    }
+
+    public String message() {
+        return message;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+
+        NoopApplicationEvent that = (NoopApplicationEvent) o;
+
+        return message.equals(that.message);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + message.hashCode();
+        return result;
     }
 
     @Override
     public String toString() {
-        return getClass() + "_" + this.message;
+        return "NoopApplicationEvent{" +
+                toStringBase() +
+                ",message='" + message + '\'' +
+                '}';
     }
-}
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoopBackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopBackgroundEvent.java
similarity index 50%
rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoopBackgroundEvent.java
rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopBackgroundEvent.java
index db6ed2f7a82..c1cbcc253a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoopBackgroundEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopBackgroundEvent.java
@@ -14,23 +14,49 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.clients.consumer.internals;
+package org.apache.kafka.clients.consumer.internals.events;
 
-import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import java.util.Objects;
 
 /**
- * Noop event. Intentionally left it here for demonstration purpose.
+ * No-op event. Intentionally left it here for demonstration purpose.
  */
 public class NoopBackgroundEvent extends BackgroundEvent {
-    public final String message;
+
+    private final String message;
 
     public NoopBackgroundEvent(final String message) {
-        super(EventType.NOOP);
-        this.message = message;
+        super(Type.NOOP);
+        this.message = Objects.requireNonNull(message);
+    }
+
+    public String message() {
+        return message;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+
+        NoopBackgroundEvent that = (NoopBackgroundEvent) o;
+
+        return message.equals(that.message);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + message.hashCode();
+        return result;
     }
 
     @Override
     public String toString() {
-        return getClass() + "_" + this.message;
+        return "NoopBackgroundEvent{" +
+                toStringBase() +
+                ", message='" + message + '\'' +
+                '}';
     }
-}
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java
index e53248425d9..8b1a5492656 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 
 public class OffsetFetchApplicationEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> {
+
     private final Set<TopicPartition> partitions;
 
     public OffsetFetchApplicationEvent(final Set<TopicPartition> partitions) {
@@ -56,9 +57,8 @@ public class OffsetFetchApplicationEvent extends CompletableApplicationEvent<Map
     @Override
     public String toString() {
         return getClass().getSimpleName() + "{" +
-                "partitions=" + partitions +
-                ", future=" + future +
-                ", type=" + type +
+                toStringBase() +
+                ", partitions=" + partitions +
                 '}';
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollApplicationEvent.java
index c14998dc652..b958f0ec417 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollApplicationEvent.java
@@ -17,10 +17,41 @@
 package org.apache.kafka.clients.consumer.internals.events;
 
 public class PollApplicationEvent extends ApplicationEvent {
-    public final long pollTimeMs;
 
-    protected PollApplicationEvent(final long currentTimeMs) {
+    private final long pollTimeMs;
+
+    public PollApplicationEvent(final long pollTimeMs) {
         super(Type.POLL);
-        this.pollTimeMs = currentTimeMs;
+        this.pollTimeMs = pollTimeMs;
+    }
+
+    public long pollTimeMs() {
+        return pollTimeMs;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+
+        PollApplicationEvent that = (PollApplicationEvent) o;
+
+        return pollTimeMs == that.pollTimeMs;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + (int) (pollTimeMs ^ (pollTimeMs >>> 32));
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "PollApplicationEvent{" +
+                toStringBase() +
+                ", pollTimeMs=" + pollTimeMs +
+                '}';
     }
-}
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/IdempotentCloser.java b/clients/src/main/java/org/apache/kafka/common/internals/IdempotentCloser.java
new file mode 100644
index 00000000000..4f70172d59c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/internals/IdempotentCloser.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+/**
+ * {@code IdempotentCloser} encapsulates some basic logic to ensure that a given resource is only closed once.
+ * The underlying mechanism for ensuring that the close only happens once <em>and</em> is thread safe
+ * is via the {@link AtomicBoolean#compareAndSet(boolean, boolean)}. Users can provide callbacks (via optional
+ * {@link Runnable}s) for either the <em>initial</em> close and/or any <em>subsequent</em> closes.
+ *
+ * <p/>
+ *
+ * Here's an example:
+ *
+ * <pre>
+ *
+ * public class MyDataFile implements Closeable {
+ *
+ *     private final IdempotentCloser closer = new IdempotentCloser();
+ *
+ *     private final File file;
+ *
+ *     . . .
+ *
+ *     public boolean write() {
+ *         closer.assertOpen(() -> String.format("Data file %s already closed!", file));
+ *         writeToFile();
+ *     }
+ *
+ *     public boolean isClosed() {
+ *         return closer.isClosed();
+ *     }
+ *
+ *     &#064;Override
+ *     public void close() {
+ *         Runnable onInitialClose = () -> {
+ *             cleanUpFile(file);
+ *             log.debug("Data file {} closed", file);
+ *         };
+ *         Runnable onSubsequentClose = () -> {
+ *             log.warn("Data file {} already closed!", file);
+ *         };
+ *         closer.close(onInitialClose, onSubsequentClose);
+ *     }
+ * }
+ * </pre>
+ */
+public class IdempotentCloser implements AutoCloseable {
+
+    private final AtomicBoolean isClosed;
+
+    /**
+     * Creates an {@code IdempotentCloser} that is not yet closed.
+     */
+    public IdempotentCloser() {
+        this(false);
+    }
+
+    /**
+     * Creates an {@code IdempotentCloser} with the given initial state.
+     *
+     * @param isClosed Initial value for underlying state
+     */
+    public IdempotentCloser(boolean isClosed) {
+        this.isClosed = new AtomicBoolean(isClosed);
+    }
+
+    /**
+     * This method serves as an assert that the {@link IdempotentCloser} is still open. If it is open, this method
+     * simply returns. If it is closed, a new {@link IllegalStateException} will be thrown using the supplied message.
+     *
+     * @param message {@link Supplier} that supplies the message for the exception
+     */
+    public void assertOpen(Supplier<String> message) {
+        if (isClosed.get())
+            throw new IllegalStateException(message.get());
+    }
+
+    /**
+     * This method serves as an assert that the {@link IdempotentCloser} is still open. If it is open, this method
+     * simply returns. If it is closed, a new {@link IllegalStateException} will be thrown using the given message.
+     *
+     * @param message Message to use for the exception
+     */
+    public void assertOpen(String message) {
+        if (isClosed.get())
+            throw new IllegalStateException(message);
+    }
+
+    public boolean isClosed() {
+        return isClosed.get();
+    }
+
+    /**
+     * Closes the resource in a thread-safe manner.
+     *
+     * <p/>
+     *
+     * After the execution has completed, calls to {@link #isClosed()} will return {@code false} and calls to
+     * {@link #assertOpen(String)} and {@link #assertOpen(Supplier)}
+     * will throw an {@link IllegalStateException}.
+     */
+    @Override
+    public void close() {
+        close(null, null);
+    }
+
+    /**
+     * Closes the resource in a thread-safe manner.
+     *
+     * <p/>
+     *
+     * After the execution has completed, calls to {@link #isClosed()} will return {@code false} and calls to
+     * {@link #assertOpen(String)} and {@link #assertOpen(Supplier)}
+     * will throw an {@link IllegalStateException}.
+     *
+     * @param onInitialClose Optional {@link Runnable} to execute when the resource is closed. Note that the
+     *                       object will still be considered closed even if an exception is thrown during the course
+     *                       of its execution; can be {@code null}
+     */
+    public void close(final Runnable onInitialClose) {
+        close(onInitialClose, null);
+    }
+
+    /**
+     * Closes the resource in a thread-safe manner.
+     *
+     * <p/>
+     *
+     * After the execution has completed, calls to {@link #isClosed()} will return {@code false} and calls to
+     * {@link #assertOpen(String)} and {@link #assertOpen(Supplier)}
+     * will throw an {@link IllegalStateException}.
+     *
+     * @param onInitialClose    Optional {@link Runnable} to execute when the resource is closed. Note that the
+     *                          object will still be considered closed even if an exception is thrown during the course
+     *                          of its execution; can be {@code null}
+     * @param onSubsequentClose Optional {@link Runnable} to execute if this resource was previously closed. Note that
+     *                          no state will be affected if an exception is thrown during its execution; can be
+     *                          {@code null}
+     */
+    public void close(final Runnable onInitialClose, final Runnable onSubsequentClose) {
+        if (isClosed.compareAndSet(false, true)) {
+            if (onInitialClose != null)
+                onInitialClose.run();
+        } else {
+            if (onSubsequentClose != null)
+                onSubsequentClose.run();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "IdempotentCloser{" +
+                "isClosed=" + isClosed +
+                '}';
+    }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
index ebb77a1ffd1..de97ee40de2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
@@ -67,19 +67,23 @@ public class CompletedFetchTest {
         FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
                 .setRecords(newRecords(startingOffset, numRecords, fetchOffset));
 
-        CompletedFetch<String, String> completedFetch = newCompletedFetch(fetchOffset, partitionData);
+        FetchConfig<String, String> fetchConfig = newFetchConfig(new StringDeserializer(),
+                new StringDeserializer(),
+                IsolationLevel.READ_UNCOMMITTED,
+                true);
+        CompletedFetch completedFetch = newCompletedFetch(fetchOffset, partitionData);
 
-        List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(10);
+        List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(fetchConfig, 10);
         assertEquals(10, records.size());
         ConsumerRecord<String, String> record = records.get(0);
         assertEquals(10, record.offset());
 
-        records = completedFetch.fetchRecords(10);
+        records = completedFetch.fetchRecords(fetchConfig, 10);
         assertEquals(1, records.size());
         record = records.get(0);
         assertEquals(20, record.offset());
 
-        records = completedFetch.fetchRecords(10);
+        records = completedFetch.fetchRecords(fetchConfig, 10);
         assertEquals(0, records.size());
     }
 
@@ -92,21 +96,23 @@ public class CompletedFetchTest {
                 .setRecords(rawRecords)
                 .setAbortedTransactions(newAbortedTransactions());
 
-        CompletedFetch<String, String> completedFetch = newCompletedFetch(IsolationLevel.READ_COMMITTED,
-                OffsetResetStrategy.NONE,
-                true,
-                0,
-                partitionData);
-        List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(10);
-        assertEquals(0, records.size());
-
-        completedFetch = newCompletedFetch(IsolationLevel.READ_UNCOMMITTED,
-                OffsetResetStrategy.NONE,
-                true,
-                0,
-                partitionData);
-        records = completedFetch.fetchRecords(10);
-        assertEquals(numRecords, records.size());
+        try (final StringDeserializer deserializer = new StringDeserializer()) {
+            FetchConfig<String, String> fetchConfig = newFetchConfig(deserializer,
+                    deserializer,
+                    IsolationLevel.READ_COMMITTED,
+                    true);
+            CompletedFetch completedFetch = newCompletedFetch(0, partitionData);
+            List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(fetchConfig, 10);
+            assertEquals(0, records.size());
+
+            fetchConfig = newFetchConfig(deserializer,
+                    deserializer,
+                    IsolationLevel.READ_UNCOMMITTED,
+                    true);
+            completedFetch = newCompletedFetch(0, partitionData);
+            records = completedFetch.fetchRecords(fetchConfig, 10);
+            assertEquals(numRecords, records.size());
+        }
     }
 
     @Test
@@ -115,13 +121,15 @@ public class CompletedFetchTest {
         Records rawRecords = newTranscactionalRecords(ControlRecordType.COMMIT, numRecords);
         FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
                 .setRecords(rawRecords);
-        CompletedFetch<String, String> completedFetch = newCompletedFetch(IsolationLevel.READ_COMMITTED,
-                OffsetResetStrategy.NONE,
-                true,
-                0,
-                partitionData);
-        List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(10);
-        assertEquals(10, records.size());
+        CompletedFetch completedFetch = newCompletedFetch(0, partitionData);
+        try (final StringDeserializer deserializer = new StringDeserializer()) {
+            FetchConfig<String, String> fetchConfig = newFetchConfig(deserializer,
+                    deserializer,
+                    IsolationLevel.READ_COMMITTED,
+                    true);
+            List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(fetchConfig, 10);
+            assertEquals(10, records.size());
+        }
     }
 
     @Test
@@ -132,9 +140,13 @@ public class CompletedFetchTest {
         FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
                 .setRecords(newRecords(startingOffset, numRecords, fetchOffset));
 
-        CompletedFetch<String, String> completedFetch = newCompletedFetch(fetchOffset, partitionData);
+        CompletedFetch completedFetch = newCompletedFetch(fetchOffset, partitionData);
+        FetchConfig<String, String> fetchConfig = newFetchConfig(new StringDeserializer(),
+                new StringDeserializer(),
+                IsolationLevel.READ_UNCOMMITTED,
+                true);
 
-        List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(-10);
+        List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(fetchConfig, -10);
         assertEquals(0, records.size());
     }
 
@@ -146,82 +158,72 @@ public class CompletedFetchTest {
                 .setLastStableOffset(20)
                 .setLogStartOffset(0);
 
-        CompletedFetch<String, String> completedFetch = newCompletedFetch(IsolationLevel.READ_UNCOMMITTED,
-                OffsetResetStrategy.NONE,
-                false,
-                1,
-                partitionData);
+        CompletedFetch completedFetch = newCompletedFetch(1, partitionData);
+        try (final StringDeserializer deserializer = new StringDeserializer()) {
+            FetchConfig<String, String> fetchConfig = newFetchConfig(deserializer,
+                    deserializer,
+                    IsolationLevel.READ_UNCOMMITTED,
+                    true);
 
-        List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(10);
-        assertEquals(0, records.size());
+            List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(fetchConfig, 10);
+            assertEquals(0, records.size());
+        }
     }
 
     @Test
     public void testCorruptedMessage() {
         // Create one good record and then one "corrupted" record.
-        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0);
-        builder.append(new SimpleRecord(new UUIDSerializer().serialize(TOPIC_NAME, UUID.randomUUID())));
-        builder.append(0L, "key".getBytes(), "value".getBytes());
-        Records records = builder.build();
-
-        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
-                .setPartitionIndex(0)
-                .setHighWatermark(10)
-                .setLastStableOffset(20)
-                .setLogStartOffset(0)
-                .setRecords(records);
-
-        CompletedFetch<UUID, UUID> completedFetch = newCompletedFetch(new UUIDDeserializer(),
-                new UUIDDeserializer(),
-                IsolationLevel.READ_COMMITTED,
-                OffsetResetStrategy.NONE,
-                false,
-                0,
-                partitionData);
-
-        completedFetch.fetchRecords(10);
-
-        assertThrows(RecordDeserializationException.class, () -> completedFetch.fetchRecords(10));
-    }
-
-    private CompletedFetch<String, String> newCompletedFetch(long fetchOffset,
-                                                             FetchResponseData.PartitionData partitionData) {
-        return newCompletedFetch(
-                IsolationLevel.READ_UNCOMMITTED,
-                OffsetResetStrategy.NONE,
-                true,
-                fetchOffset,
-                partitionData);
+        try (final MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0);
+             final UUIDSerializer serializer = new UUIDSerializer();
+             final UUIDDeserializer deserializer = new UUIDDeserializer()) {
+            builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, UUID.randomUUID())));
+            builder.append(0L, "key".getBytes(), "value".getBytes());
+            Records records = builder.build();
+
+            FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
+                    .setPartitionIndex(0)
+                    .setHighWatermark(10)
+                    .setLastStableOffset(20)
+                    .setLogStartOffset(0)
+                    .setRecords(records);
+
+            FetchConfig<UUID, UUID> fetchConfig = newFetchConfig(deserializer,
+                    deserializer,
+                    IsolationLevel.READ_COMMITTED,
+                    false);
+            CompletedFetch completedFetch = newCompletedFetch(0, partitionData);
+
+            completedFetch.fetchRecords(fetchConfig, 10);
+
+            assertThrows(RecordDeserializationException.class,
+                    () -> completedFetch.fetchRecords(fetchConfig, 10));
+        }
     }
 
-    private CompletedFetch<String, String> newCompletedFetch(IsolationLevel isolationLevel,
-                                                             OffsetResetStrategy offsetResetStrategy,
-                                                             boolean checkCrcs,
-                                                             long fetchOffset,
-                                                             FetchResponseData.PartitionData partitionData) {
-        return newCompletedFetch(new StringDeserializer(),
-                new StringDeserializer(),
-                isolationLevel,
-                offsetResetStrategy,
-                checkCrcs,
-                fetchOffset,
-                partitionData);
-    }
-
-    private <K, V> CompletedFetch<K, V> newCompletedFetch(Deserializer<K> keyDeserializer,
-                                                          Deserializer<V> valueDeserializer,
-                                                          IsolationLevel isolationLevel,
-                                                          OffsetResetStrategy offsetResetStrategy,
-                                                          boolean checkCrcs,
-                                                          long fetchOffset,
-                                                          FetchResponseData.PartitionData partitionData) {
+    private CompletedFetch newCompletedFetch(long fetchOffset,
+                                             FetchResponseData.PartitionData partitionData) {
         LogContext logContext = new LogContext();
-        SubscriptionState subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
+        SubscriptionState subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.NONE);
         FetchMetricsRegistry metricsRegistry = new FetchMetricsRegistry();
         FetchMetricsManager metrics = new FetchMetricsManager(new Metrics(), metricsRegistry);
         FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(metrics, Collections.singleton(TP));
 
-        FetchConfig<K, V> fetchConfig = new FetchConfig<>(
+        return new CompletedFetch(
+                logContext,
+                subscriptions,
+                BufferSupplier.create(),
+                TP,
+                partitionData,
+                metricAggregator,
+                fetchOffset,
+                ApiKeys.FETCH.latestVersion());
+    }
+
+    private static <K, V> FetchConfig<K, V> newFetchConfig(Deserializer<K> keyDeserializer,
+                                                           Deserializer<V> valueDeserializer,
+                                                           IsolationLevel isolationLevel,
+                                                           boolean checkCrcs) {
+        return new FetchConfig<>(
                 ConsumerConfig.DEFAULT_FETCH_MIN_BYTES,
                 ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
                 ConsumerConfig.DEFAULT_FETCH_MAX_WAIT_MS,
@@ -232,29 +234,21 @@ public class CompletedFetchTest {
                 new Deserializers<>(keyDeserializer, valueDeserializer),
                 isolationLevel
         );
-        return new CompletedFetch<>(
-                logContext,
-                subscriptions,
-                fetchConfig,
-                BufferSupplier.create(),
-                TP,
-                partitionData,
-                metricAggregator,
-                fetchOffset,
-                ApiKeys.FETCH.latestVersion());
     }
 
     private Records newRecords(long baseOffset, int count, long firstMessageId) {
-        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, baseOffset);
-        for (int i = 0; i < count; i++)
-            builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + i)).getBytes());
-        return builder.build();
+        try (final MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, baseOffset)) {
+            for (int i = 0; i < count; i++)
+                builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + i)).getBytes());
+            return builder.build();
+        }
     }
 
     private Records newTranscactionalRecords(ControlRecordType controlRecordType, int numRecords) {
         Time time = new MockTime();
         ByteBuffer buffer = ByteBuffer.allocate(1024);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
+
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
                 RecordBatch.CURRENT_MAGIC_VALUE,
                 CompressionType.NONE,
                 TimestampType.CREATE_TIME,
@@ -264,12 +258,13 @@ public class CompletedFetchTest {
                 PRODUCER_EPOCH,
                 0,
                 true,
-                RecordBatch.NO_PARTITION_LEADER_EPOCH);
+                RecordBatch.NO_PARTITION_LEADER_EPOCH)) {
+            for (int i = 0; i < numRecords; i++)
+                builder.append(new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
 
-        for (int i = 0; i < numRecords; i++)
-            builder.append(new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
+            builder.build();
+        }
 
-        builder.build();
         writeTransactionMarker(buffer, controlRecordType, numRecords, time);
         buffer.flip();
 
@@ -295,5 +290,4 @@ public class CompletedFetchTest {
         abortedTransaction.setProducerId(PRODUCER_ID);
         return Collections.singletonList(abortedTransaction);
     }
-
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
new file mode 100644
index 00000000000..319f009f4d3
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * This tests the {@link FetchBuffer} functionality in addition to what {@link FetcherTest} covers in its tests.
+ * One of the main concerns of these tests are that we correctly handle both places that data is held internally:
+ *
+ * <ol>
+ *     <li>A special "next in line" buffer</li>
+ *     <li>The remainder of the buffers in a queue</li>
+ * </ol>
+ */
+public class FetchBufferTest {
+
+    private final Time time = new MockTime(0, 0, 0);
+    private final TopicPartition topicAPartition0 = new TopicPartition("topic-a", 0);
+    private final TopicPartition topicAPartition1 = new TopicPartition("topic-a", 1);
+    private final TopicPartition topicAPartition2 = new TopicPartition("topic-a", 2);
+    private final Set<TopicPartition> allPartitions = partitions(topicAPartition0, topicAPartition1, topicAPartition2);
+    private LogContext logContext;
+
+    private SubscriptionState subscriptions;
+
+    private FetchMetricsManager metricsManager;
+
+    @BeforeEach
+    public void setup() {
+        logContext = new LogContext();
+
+        Properties p = new Properties();
+        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        ConsumerConfig config = new ConsumerConfig(p);
+
+        subscriptions = createSubscriptionState(config, logContext);
+
+        Metrics metrics = createMetrics(config, time);
+        metricsManager = createFetchMetricsManager(metrics);
+    }
+
+    /**
+     * Verifies the basics: we can add buffered data to the queue, peek to view them, and poll to remove them.
+     */
+    @Test
+    public void testBasicPeekAndPoll() {
+        try (FetchBuffer fetchBuffer = new FetchBuffer(logContext)) {
+            CompletedFetch completedFetch = completedFetch(topicAPartition0);
+            assertTrue(fetchBuffer.isEmpty());
+            fetchBuffer.add(completedFetch);
+            assertTrue(fetchBuffer.hasCompletedFetches(p -> true));
+            assertFalse(fetchBuffer.isEmpty());
+            assertNotNull(fetchBuffer.peek());
+            assertSame(completedFetch, fetchBuffer.peek());
+            assertSame(completedFetch, fetchBuffer.poll());
+            assertNull(fetchBuffer.peek());
+        }
+    }
+
+    /**
+     * Verifies {@link FetchBuffer#close()}} closes the buffered data for both the queue and the next-in-line buffer.
+     */
+    @Test
+    public void testCloseClearsData() {
+        // We don't use the try-with-resources approach because we want to have access to the FetchBuffer after
+        // the try block so that we can run our asserts on the object.
+        FetchBuffer fetchBuffer = null;
+
+        try {
+            fetchBuffer = new FetchBuffer(logContext);
+            assertNull(fetchBuffer.nextInLineFetch());
+            assertTrue(fetchBuffer.isEmpty());
+
+            fetchBuffer.add(completedFetch(topicAPartition0));
+            assertFalse(fetchBuffer.isEmpty());
+
+            fetchBuffer.setNextInLineFetch(completedFetch(topicAPartition0));
+            assertNotNull(fetchBuffer.nextInLineFetch());
+        } finally {
+            if (fetchBuffer != null)
+                fetchBuffer.close();
+        }
+
+        assertNull(fetchBuffer.nextInLineFetch());
+        assertTrue(fetchBuffer.isEmpty());
+    }
+
+    /**
+     * Tests that the buffer returns partitions for both the queue and the next-in-line buffer.
+     */
+    @Test
+    public void testBufferedPartitions() {
+        try (FetchBuffer fetchBuffer = new FetchBuffer(logContext)) {
+            fetchBuffer.setNextInLineFetch(completedFetch(topicAPartition0));
+            fetchBuffer.add(completedFetch(topicAPartition1));
+            fetchBuffer.add(completedFetch(topicAPartition2));
+            assertEquals(allPartitions, fetchBuffer.bufferedPartitions());
+
+            fetchBuffer.setNextInLineFetch(null);
+            assertEquals(partitions(topicAPartition1, topicAPartition2), fetchBuffer.bufferedPartitions());
+
+            fetchBuffer.poll();
+            assertEquals(partitions(topicAPartition2), fetchBuffer.bufferedPartitions());
+
+            fetchBuffer.poll();
+            assertEquals(partitions(), fetchBuffer.bufferedPartitions());
+        }
+    }
+
+    /**
+     * Tests that the buffer manipulates partitions for both the queue and the next-in-line buffer.
+     */
+    @Test
+    public void testAddAllAndRetainAll() {
+        try (FetchBuffer fetchBuffer = new FetchBuffer(logContext)) {
+            fetchBuffer.setNextInLineFetch(completedFetch(topicAPartition0));
+            fetchBuffer.addAll(Arrays.asList(completedFetch(topicAPartition1), completedFetch(topicAPartition2)));
+            assertEquals(allPartitions, fetchBuffer.bufferedPartitions());
+
+            fetchBuffer.retainAll(partitions(topicAPartition1, topicAPartition2));
+            assertEquals(partitions(topicAPartition1, topicAPartition2), fetchBuffer.bufferedPartitions());
+
+            fetchBuffer.retainAll(partitions(topicAPartition2));
+            assertEquals(partitions(topicAPartition2), fetchBuffer.bufferedPartitions());
+
+            fetchBuffer.retainAll(partitions());
+            assertEquals(partitions(), fetchBuffer.bufferedPartitions());
+        }
+    }
+
+    private CompletedFetch completedFetch(TopicPartition tp) {
+        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData();
+        FetchMetricsAggregator metricsAggregator = new FetchMetricsAggregator(metricsManager, allPartitions);
+        return new CompletedFetch(
+                logContext,
+                subscriptions,
+                BufferSupplier.create(),
+                tp,
+                partitionData,
+                metricsAggregator,
+                0L,
+                ApiKeys.FETCH.latestVersion());
+    }
+
+    /**
+     * This is a handy utility method for returning a set from a varargs array.
+     */
+    private static Set<TopicPartition> partitions(TopicPartition... partitions) {
+        return new HashSet<>(Arrays.asList(partitions));
+    }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
new file mode 100644
index 00000000000..1f157b84189
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchConfig;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * This tests the {@link FetchCollector} functionality in addition to what {@link FetcherTest} tests during the course
+ * of its tests.
+ */
+public class FetchCollectorTest {
+
+    private final static int DEFAULT_RECORD_COUNT = 10;
+    private final static int DEFAULT_MAX_POLL_RECORDS = ConsumerConfig.DEFAULT_MAX_POLL_RECORDS;
+    private final Time time = new MockTime(0, 0, 0);
+    private final TopicPartition topicAPartition0 = new TopicPartition("topic-a", 0);
+    private final TopicPartition topicAPartition1 = new TopicPartition("topic-a", 1);
+    private final TopicPartition topicAPartition2 = new TopicPartition("topic-a", 2);
+    private final Set<TopicPartition> allPartitions = partitions(topicAPartition0, topicAPartition1, topicAPartition2);
+    private LogContext logContext;
+
+    private SubscriptionState subscriptions;
+    private FetchConfig<String, String> fetchConfig;
+    private FetchMetricsManager metricsManager;
+    private ConsumerMetadata metadata;
+    private FetchBuffer fetchBuffer;
+    private FetchCollector<String, String> fetchCollector;
+    private CompletedFetchBuilder completedFetchBuilder;
+
+    @Test
+    public void testFetchNormal() {
+        int recordCount = DEFAULT_MAX_POLL_RECORDS;
+        buildDependencies();
+        assignAndSeek(topicAPartition0);
+
+        CompletedFetch completedFetch = completedFetchBuilder
+                .recordCount(recordCount)
+                .build();
+
+        // Validate that the buffer is empty until after we add the fetch data.
+        assertTrue(fetchBuffer.isEmpty());
+        fetchBuffer.add(completedFetch);
+        assertFalse(fetchBuffer.isEmpty());
+
+        // Validate that the completed fetch isn't initialized just because we add it to the buffer.
+        assertFalse(completedFetch.isInitialized());
+
+        // Fetch the data and validate that we get all the records we want back.
+        Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
+        assertFalse(fetch.isEmpty());
+        assertEquals(recordCount, fetch.numRecords());
+
+        // When we collected the data from the buffer, this will cause the completed fetch to get initialized.
+        assertTrue(completedFetch.isInitialized());
+
+        // However, even though we've collected the data, it isn't (completely) consumed yet.
+        assertFalse(completedFetch.isConsumed());
+
+        // The buffer is now considered "empty" because our queue is empty.
+        assertTrue(fetchBuffer.isEmpty());
+        assertNull(fetchBuffer.peek());
+        assertNull(fetchBuffer.poll());
+
+        // However, while the queue is "empty", the next-in-line fetch is actually still in the buffer.
+        assertNotNull(fetchBuffer.nextInLineFetch());
+
+        // Validate that the next fetch position has been updated to point to the record after our last fetched
+        // record.
+        SubscriptionState.FetchPosition position = subscriptions.position(topicAPartition0);
+        assertEquals(recordCount, position.offset);
+
+        // Now attempt to collect more records from the fetch buffer.
+        fetch = fetchCollector.collectFetch(fetchBuffer);
+
+        // The Fetch object is non-null, but it's empty.
+        assertEquals(0, fetch.numRecords());
+        assertTrue(fetch.isEmpty());
+
+        // However, once we read *past* the end of the records in the CompletedFetch, then we will call
+        // drain on it, and it will be considered all consumed.
+        assertTrue(completedFetch.isConsumed());
+    }
+
+    @Test
+    public void testFetchWithReadReplica() {
+        buildDependencies();
+        assignAndSeek(topicAPartition0);
+
+        // Set the preferred read replica and just to be safe, verify it was set.
+        int preferredReadReplicaId = 67;
+        subscriptions.updatePreferredReadReplica(topicAPartition0, preferredReadReplicaId, time::milliseconds);
+        assertNotNull(subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
+        assertEquals(Optional.of(preferredReadReplicaId), subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
+
+        CompletedFetch completedFetch = completedFetchBuilder.build();
+        fetchBuffer.add(completedFetch);
+        Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
+
+        // The Fetch and read replica settings should be empty.
+        assertEquals(DEFAULT_RECORD_COUNT, fetch.numRecords());
+        assertEquals(Optional.of(preferredReadReplicaId), subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
+    }
+
+    @Test
+    public void testNoResultsIfInitializing() {
+        buildDependencies();
+
+        // Intentionally call assign (vs. assignAndSeek) so that we don't set the position. The SubscriptionState
+        // will consider the partition as in the SubscriptionState.FetchStates.INITIALIZED state.
+        assign(topicAPartition0);
+
+        // The position should thus be null and considered un-fetchable and invalid.
+        assertNull(subscriptions.position(topicAPartition0));
+        assertFalse(subscriptions.isFetchable(topicAPartition0));
+        assertFalse(subscriptions.hasValidPosition(topicAPartition0));
+
+        // Add some valid CompletedFetch records to the FetchBuffer queue and collect them into the Fetch.
+        CompletedFetch completedFetch = completedFetchBuilder.build();
+        fetchBuffer.add(completedFetch);
+        Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
+
+        // Verify that no records are fetched for the partition as it did not have a valid position set.
+        assertEquals(0, fetch.numRecords());
+    }
+
+    @ParameterizedTest
+    @MethodSource("testErrorInInitializeSource")
+    public void testErrorInInitialize(int recordCount, RuntimeException expectedException) {
+        buildDependencies();
+        assignAndSeek(topicAPartition0);
+
+        // Create a FetchCollector that fails on CompletedFetch initialization.
+        fetchCollector = new FetchCollector<String, String>(logContext,
+                metadata,
+                subscriptions,
+                fetchConfig,
+                metricsManager,
+                time) {
+
+            @Override
+            protected CompletedFetch initialize(final CompletedFetch completedFetch) {
+                throw expectedException;
+            }
+        };
+
+        // Add the CompletedFetch to the FetchBuffer queue
+        CompletedFetch completedFetch = completedFetchBuilder
+                .recordCount(recordCount)
+                .build();
+        fetchBuffer.add(completedFetch);
+
+        // At first, the queue is populated
+        assertFalse(fetchBuffer.isEmpty());
+
+        // Now run our ill-fated collectFetch.
+        assertThrows(expectedException.getClass(), () -> fetchCollector.collectFetch(fetchBuffer));
+
+        // If the number of records in the CompletedFetch was 0, the call to FetchCollector.collectFetch() will
+        // remove it from the queue. If there are records in the CompletedFetch, FetchCollector.collectFetch will
+        // leave it on the queue.
+        assertEquals(recordCount == 0, fetchBuffer.isEmpty());
+    }
+
+    @Test
+    public void testFetchingPausedPartitionsYieldsNoRecords() {
+        buildDependencies();
+        assignAndSeek(topicAPartition0);
+
+        // The partition should not be 'paused' in the SubscriptionState until we explicitly tell it to.
+        assertFalse(subscriptions.isPaused(topicAPartition0));
+        subscriptions.pause(topicAPartition0);
+        assertTrue(subscriptions.isPaused(topicAPartition0));
+
+        CompletedFetch completedFetch = completedFetchBuilder.build();
+
+        // Set the CompletedFetch to the next-in-line fetch, *not* the queue.
+        fetchBuffer.setNextInLineFetch(completedFetch);
+
+        // The next-in-line CompletedFetch should reference the same object that was just created
+        assertSame(fetchBuffer.nextInLineFetch(), completedFetch);
+
+        // The FetchBuffer queue should be empty as the CompletedFetch was added to the next-in-line.
+        // CompletedFetch, not the queue.
+        assertTrue(fetchBuffer.isEmpty());
+
+        // Ensure that the partition for the next-in-line CompletedFetch is still 'paused'.
+        assertTrue(subscriptions.isPaused(completedFetch.partition));
+
+        Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
+
+        // There should be no records in the Fetch as the partition being fetched is 'paused'.
+        assertEquals(0, fetch.numRecords());
+
+        // The FetchBuffer queue should not be empty; the CompletedFetch is added to the FetchBuffer queue by
+        // the FetchCollector when it detects a 'paused' partition.
+        assertFalse(fetchBuffer.isEmpty());
+
+        // The next-in-line CompletedFetch should be null; the CompletedFetch is added to the FetchBuffer
+        // queue by the FetchCollector when it detects a 'paused' partition.
+        assertNull(fetchBuffer.nextInLineFetch());
+    }
+
+    @ParameterizedTest
+    @MethodSource("testFetchWithMetadataRefreshErrorsSource")
+    public void testFetchWithMetadataRefreshErrors(final Errors error) {
+        buildDependencies();
+        assignAndSeek(topicAPartition0);
+
+        CompletedFetch completedFetch = completedFetchBuilder
+                .error(error)
+                .build();
+        fetchBuffer.add(completedFetch);
+
+        // Set the preferred read replica and just to be safe, verify it was set.
+        int preferredReadReplicaId = 5;
+        subscriptions.updatePreferredReadReplica(topicAPartition0, preferredReadReplicaId, time::milliseconds);
+        assertNotNull(subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
+        assertEquals(Optional.of(preferredReadReplicaId), subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
+
+        // Fetch the data and validate that we get all the records we want back.
+        Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
+        assertTrue(fetch.isEmpty());
+        assertTrue(metadata.updateRequested());
+        assertEquals(Optional.empty(), subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
+    }
+
+    @Test
+    public void testFetchWithOffsetOutOfRange() {
+        buildDependencies();
+        assignAndSeek(topicAPartition0);
+
+        CompletedFetch completedFetch = completedFetchBuilder.build();
+        fetchBuffer.add(completedFetch);
+
+        // Fetch the data and validate that we get our first batch of records back.
+        Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
+        assertFalse(fetch.isEmpty());
+        assertEquals(DEFAULT_RECORD_COUNT, fetch.numRecords());
+
+        // Try to fetch more data and validate that we get an empty Fetch back.
+        completedFetch = completedFetchBuilder
+                .fetchOffset(fetch.numRecords())
+                .error(Errors.OFFSET_OUT_OF_RANGE)
+                .build();
+        fetchBuffer.add(completedFetch);
+        fetch = fetchCollector.collectFetch(fetchBuffer);
+        assertTrue(fetch.isEmpty());
+
+        // Try to fetch more data and validate that we get an empty Fetch back.
+        completedFetch = completedFetchBuilder
+                .fetchOffset(fetch.numRecords())
+                .error(Errors.OFFSET_OUT_OF_RANGE)
+                .build();
+        fetchBuffer.add(completedFetch);
+        fetch = fetchCollector.collectFetch(fetchBuffer);
+        assertTrue(fetch.isEmpty());
+    }
+
+    @Test
+    public void testFetchWithOffsetOutOfRangeWithPreferredReadReplica() {
+        int records = 10;
+        buildDependencies(records);
+        assignAndSeek(topicAPartition0);
+
+        // Set the preferred read replica and just to be safe, verify it was set.
+        int preferredReadReplicaId = 67;
+        subscriptions.updatePreferredReadReplica(topicAPartition0, preferredReadReplicaId, time::milliseconds);
+        assertNotNull(subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
+        assertEquals(Optional.of(preferredReadReplicaId), subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
+
+        CompletedFetch completedFetch = completedFetchBuilder
+                .error(Errors.OFFSET_OUT_OF_RANGE)
+                .build();
+        fetchBuffer.add(completedFetch);
+        Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
+
+        // The Fetch and read replica settings should be empty.
+        assertTrue(fetch.isEmpty());
+        assertEquals(Optional.empty(), subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
+    }
+
+    @Test
+    public void testFetchWithTopicAuthorizationFailed() {
+        buildDependencies();
+        assignAndSeek(topicAPartition0);
+
+        // Try to data and validate that we get an empty Fetch back.
+        CompletedFetch completedFetch = completedFetchBuilder
+                .error(Errors.TOPIC_AUTHORIZATION_FAILED)
+                .build();
+        fetchBuffer.add(completedFetch);
+        assertThrows(TopicAuthorizationException.class, () -> fetchCollector.collectFetch(fetchBuffer));
+    }
+
+    @Test
+    public void testFetchWithUnknownLeaderEpoch() {
+        buildDependencies();
+        assignAndSeek(topicAPartition0);
+
+        // Try to data and validate that we get an empty Fetch back.
+        CompletedFetch completedFetch = completedFetchBuilder
+                .error(Errors.UNKNOWN_LEADER_EPOCH)
+                .build();
+        fetchBuffer.add(completedFetch);
+        Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
+        assertTrue(fetch.isEmpty());
+    }
+
+    @Test
+    public void testFetchWithUnknownServerError() {
+        buildDependencies();
+        assignAndSeek(topicAPartition0);
+
+        // Try to data and validate that we get an empty Fetch back.
+        CompletedFetch completedFetch = completedFetchBuilder
+                .error(Errors.UNKNOWN_SERVER_ERROR)
+                .build();
+        fetchBuffer.add(completedFetch);
+        Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
+        assertTrue(fetch.isEmpty());
+    }
+
+    @Test
+    public void testFetchWithCorruptMessage() {
+        buildDependencies();
+        assignAndSeek(topicAPartition0);
+
+        // Try to data and validate that we get an empty Fetch back.
+        CompletedFetch completedFetch = completedFetchBuilder
+                .error(Errors.CORRUPT_MESSAGE)
+                .build();
+        fetchBuffer.add(completedFetch);
+        assertThrows(KafkaException.class, () -> fetchCollector.collectFetch(fetchBuffer));
+    }
+
+    @ParameterizedTest
+    @MethodSource("testFetchWithOtherErrorsSource")
+    public void testFetchWithOtherErrors(final Errors error) {
+        buildDependencies();
+        assignAndSeek(topicAPartition0);
+
+        CompletedFetch completedFetch = completedFetchBuilder
+                .error(error)
+                .build();
+        fetchBuffer.add(completedFetch);
+        assertThrows(IllegalStateException.class, () -> fetchCollector.collectFetch(fetchBuffer));
+    }
+
+    /**
+     * This is a handy utility method for returning a set from a varargs array.
+     */
+    private static Set<TopicPartition> partitions(TopicPartition... partitions) {
+        return new HashSet<>(Arrays.asList(partitions));
+    }
+
+    private void buildDependencies() {
+        buildDependencies(DEFAULT_MAX_POLL_RECORDS);
+    }
+
+    private void buildDependencies(int maxPollRecords) {
+        logContext = new LogContext();
+
+        Properties p = new Properties();
+        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxPollRecords));
+
+        ConsumerConfig config = new ConsumerConfig(p);
+
+        Deserializers<String, String> deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer());
+
+        subscriptions = createSubscriptionState(config, logContext);
+        fetchConfig = createFetchConfig(config, deserializers);
+
+        Metrics metrics = createMetrics(config, time);
+        metricsManager = createFetchMetricsManager(metrics);
+        metadata = new ConsumerMetadata(
+                0,
+                1000,
+                10000,
+                false,
+                false,
+                subscriptions,
+                logContext,
+                new ClusterResourceListeners());
+        fetchCollector = new FetchCollector<>(
+                logContext,
+                metadata,
+                subscriptions,
+                fetchConfig,
+                metricsManager,
+                time);
+        fetchBuffer = new FetchBuffer(logContext);
+        completedFetchBuilder = new CompletedFetchBuilder();
+    }
+
+    private void assign(TopicPartition... partitions) {
+        subscriptions.assignFromUser(partitions(partitions));
+    }
+
+    private void assignAndSeek(TopicPartition tp) {
+        assign(tp);
+        subscriptions.seek(tp, 0);
+    }
+
+    /**
+     * Supplies the {@link Arguments} to {@link #testFetchWithMetadataRefreshErrors(Errors)}.
+     */
+    private static Stream<Arguments> testFetchWithMetadataRefreshErrorsSource() {
+        List<Errors> errors = Arrays.asList(
+                Errors.NOT_LEADER_OR_FOLLOWER,
+                Errors.REPLICA_NOT_AVAILABLE,
+                Errors.KAFKA_STORAGE_ERROR,
+                Errors.FENCED_LEADER_EPOCH,
+                Errors.OFFSET_NOT_AVAILABLE,
+                Errors.UNKNOWN_TOPIC_OR_PARTITION,
+                Errors.UNKNOWN_TOPIC_ID,
+                Errors.INCONSISTENT_TOPIC_ID
+        );
+
+        return errors.stream().map(Arguments::of);
+    }
+
+    /**
+     * Supplies the {@link Arguments} to {@link #testFetchWithOtherErrors(Errors)}.
+     */
+    private static Stream<Arguments> testFetchWithOtherErrorsSource() {
+        List<Errors> errors = new ArrayList<>(Arrays.asList(Errors.values()));
+        errors.removeAll(Arrays.asList(
+                Errors.NONE,
+                Errors.NOT_LEADER_OR_FOLLOWER,
+                Errors.REPLICA_NOT_AVAILABLE,
+                Errors.KAFKA_STORAGE_ERROR,
+                Errors.FENCED_LEADER_EPOCH,
+                Errors.OFFSET_NOT_AVAILABLE,
+                Errors.UNKNOWN_TOPIC_OR_PARTITION,
+                Errors.UNKNOWN_TOPIC_ID,
+                Errors.INCONSISTENT_TOPIC_ID,
+                Errors.OFFSET_OUT_OF_RANGE,
+                Errors.TOPIC_AUTHORIZATION_FAILED,
+                Errors.UNKNOWN_LEADER_EPOCH,
+                Errors.UNKNOWN_SERVER_ERROR,
+                Errors.CORRUPT_MESSAGE
+        ));
+
+        return errors.stream().map(Arguments::of);
+    }
+
+
+    /**
+     * Supplies the {@link Arguments} to {@link #testErrorInInitialize(int, RuntimeException)}.
+     */
+    private static Stream<Arguments> testErrorInInitializeSource() {
+        return Stream.of(
+                Arguments.of(10, new RuntimeException()),
+                Arguments.of(0, new RuntimeException()),
+                Arguments.of(10, new KafkaException()),
+                Arguments.of(0, new KafkaException())
+        );
+    }
+
+    private class CompletedFetchBuilder {
+
+        private long fetchOffset = 0;
+
+        private int recordCount = DEFAULT_RECORD_COUNT;
+
+        private Errors error = null;
+
+        private CompletedFetchBuilder fetchOffset(long fetchOffset) {
+            this.fetchOffset = fetchOffset;
+            return this;
+        }
+
+        private CompletedFetchBuilder recordCount(int recordCount) {
+            this.recordCount = recordCount;
+            return this;
+        }
+
+        private CompletedFetchBuilder error(Errors error) {
+            this.error = error;
+            return this;
+        }
+
+        private CompletedFetch build() {
+            Records records;
+            ByteBuffer allocate = ByteBuffer.allocate(1024);
+
+            try (MemoryRecordsBuilder builder = MemoryRecords.builder(allocate,
+                    CompressionType.NONE,
+                    TimestampType.CREATE_TIME,
+                    0)) {
+                for (int i = 0; i < recordCount; i++)
+                    builder.append(0L, "key".getBytes(), ("value-" + i).getBytes());
+
+                records = builder.build();
+            }
+
+            FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
+                    .setPartitionIndex(topicAPartition0.partition())
+                    .setHighWatermark(1000)
+                    .setRecords(records);
+
+            if (error != null)
+                partitionData.setErrorCode(error.code());
+
+            FetchMetricsAggregator metricsAggregator = new FetchMetricsAggregator(metricsManager, allPartitions);
+            return new CompletedFetch(
+                    logContext,
+                    subscriptions,
+                    BufferSupplier.create(),
+                    topicAPartition0,
+                    partitionData,
+                    metricsAggregator,
+                    fetchOffset,
+                    ApiKeys.FETCH.latestVersion());
+        }
+    }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
index da762edf31c..0fec9a9e089 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
@@ -101,7 +101,7 @@ public class OffsetsRequestManagerTest {
         apiVersions = mock(ApiVersions.class);
         requestManager = new OffsetsRequestManager(subscriptionState, metadata,
                 DEFAULT_ISOLATION_LEVEL, time, RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS,
-                apiVersions, new LogContext());
+                apiVersions, mock(NetworkClientDelegate.class), new LogContext());
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
index 0e07b952be4..58635950fb1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
@@ -21,7 +21,9 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
@@ -44,6 +46,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentMatchers;
 import org.mockito.MockedConstruction;
+import org.mockito.stubbing.Answer;
 
 import java.time.Duration;
 import java.util.Collections;
@@ -52,7 +55,10 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.singleton;
@@ -95,7 +101,10 @@ public class PrototypeAsyncConsumerTest {
         this.config = new ConsumerConfig(consumerProps);
         this.logContext = new LogContext();
         this.subscriptions = mock(SubscriptionState.class);
-        this.eventHandler = mock(DefaultEventHandler.class);
+        final DefaultBackgroundThread bt = mock(DefaultBackgroundThread.class);
+        final BlockingQueue<ApplicationEvent> aq = new LinkedBlockingQueue<>();
+        final BlockingQueue<BackgroundEvent> bq = new LinkedBlockingQueue<>();
+        this.eventHandler = spy(new DefaultEventHandler(bt, aq, bq));
         this.metrics = new Metrics(time);
     }
 
@@ -157,34 +166,64 @@ public class PrototypeAsyncConsumerTest {
 
     @Test
     public void testCommitted() {
-        Set<TopicPartition> mockTopicPartitions = mockTopicPartitionOffset().keySet();
+        Map<TopicPartition, OffsetAndMetadata> offsets = mockTopicPartitionOffset();
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> committedFuture = new CompletableFuture<>();
-        try (MockedConstruction<OffsetFetchApplicationEvent> mockConstruction =
-                 mockConstruction(OffsetFetchApplicationEvent.class, (mock, ctx) -> {
-                     when(mock.future()).thenReturn(committedFuture);
-                 })) {
-            committedFuture.complete(mockTopicPartitionOffset());
-            consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
-            assertDoesNotThrow(() -> consumer.committed(mockTopicPartitions, Duration.ofMillis(1000)));
+        committedFuture.complete(offsets);
+
+        try (MockedConstruction<OffsetFetchApplicationEvent> ignored = offsetFetchEventMocker(committedFuture)) {
+            this.consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
+            assertDoesNotThrow(() -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000)));
             verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
         }
     }
 
     @Test
     public void testCommitted_ExceptionThrown() {
-        Set<TopicPartition> mockTopicPartitions = mockTopicPartitionOffset().keySet();
+        Map<TopicPartition, OffsetAndMetadata> offsets = mockTopicPartitionOffset();
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> committedFuture = new CompletableFuture<>();
-        try (MockedConstruction<OffsetFetchApplicationEvent> mockConstruction =
-                 mockConstruction(OffsetFetchApplicationEvent.class, (mock, ctx) -> {
-                     when(mock.future()).thenReturn(committedFuture);
-                 })) {
-            committedFuture.completeExceptionally(new KafkaException("Test exception"));
-            consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
-            assertThrows(KafkaException.class, () -> consumer.committed(mockTopicPartitions, Duration.ofMillis(1000)));
+        committedFuture.completeExceptionally(new KafkaException("Test exception"));
+
+        try (MockedConstruction<OffsetFetchApplicationEvent> ignored = offsetFetchEventMocker(committedFuture)) {
+            this.consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
+            assertThrows(KafkaException.class, () -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000)));
             verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
         }
     }
 
+    /**
+     * This is a rather ugly bit of code. Not my choice :(
+     *
+     * <p/>
+     *
+     * Inside the {@link org.apache.kafka.clients.consumer.Consumer#committed(Set, Duration)} call we create an
+     * instance of {@link OffsetFetchApplicationEvent} that holds the partitions and internally holds a
+     * {@link CompletableFuture}. We want to test different behaviours of the {@link Future#get()}, such as
+     * returning normally, timing out, throwing an error, etc. By mocking the construction of the event object that
+     * is created, we can affect that behavior.
+     */
+    private static MockedConstruction<OffsetFetchApplicationEvent> offsetFetchEventMocker(CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
+        // This "answer" is where we pass the future to be invoked by the ConsumerUtils.getResult() method
+        Answer<Map<TopicPartition, OffsetAndMetadata>> getInvocationAnswer = invocation -> {
+            // This argument captures the actual argument value that was passed to the event's get() method, so we
+            // just "forward" that value to our mocked call
+            Timer timer = invocation.getArgument(0);
+            return ConsumerUtils.getResult(future, timer);
+        };
+
+        MockedConstruction.MockInitializer<OffsetFetchApplicationEvent> mockInitializer = (mock, ctx) -> {
+            // When the event's get() method is invoked, we call the "answer" method just above
+            when(mock.get(any())).thenAnswer(getInvocationAnswer);
+
+            // When the event's type() method is invoked, we have to return the type as it will be null in the mock
+            when(mock.type()).thenReturn(ApplicationEvent.Type.FETCH_COMMITTED_OFFSET);
+
+            // This is needed for the WakeupTrigger code that keeps track of the active task
+            when(mock.future()).thenReturn(future);
+        };
+
+        return mockConstruction(OffsetFetchApplicationEvent.class, mockInitializer);
+    }
+
     @Test
     public void testAssign() {
         this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST);
@@ -326,7 +365,7 @@ public class PrototypeAsyncConsumerTest {
         assertTrue(wakeupTrigger.getPendingTask() == null);
     }
 
-    private HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
+    private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
         final TopicPartition t0 = new TopicPartition("t0", 2);
         final TopicPartition t1 = new TopicPartition("t0", 3);
         HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<>();
@@ -335,7 +374,7 @@ public class PrototypeAsyncConsumerTest {
         return topicPartitionOffsets;
     }
 
-    private HashMap<TopicPartition, OffsetAndTimestamp> mockOffsetAndTimestamp() {
+    private Map<TopicPartition, OffsetAndTimestamp> mockOffsetAndTimestamp() {
         final TopicPartition t0 = new TopicPartition("t0", 2);
         final TopicPartition t1 = new TopicPartition("t0", 3);
         HashMap<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = new HashMap<>();
@@ -344,7 +383,7 @@ public class PrototypeAsyncConsumerTest {
         return offsetAndTimestamp;
     }
 
-    private HashMap<TopicPartition, Long> mockTimestampToSearch() {
+    private Map<TopicPartition, Long> mockTimestampToSearch() {
         final TopicPartition t0 = new TopicPartition("t0", 2);
         final TopicPartition t1 = new TopicPartition("t0", 3);
         HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
diff --git a/clients/src/test/java/org/apache/kafka/common/internals/IdempotentCloserTest.java b/clients/src/test/java/org/apache/kafka/common/internals/IdempotentCloserTest.java
new file mode 100644
index 00000000000..2c3c60846bd
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/internals/IdempotentCloserTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class IdempotentCloserTest {
+
+    private static final Runnable CALLBACK_NO_OP = () -> { };
+
+    private static final Runnable CALLBACK_WITH_RUNTIME_EXCEPTION = () -> {
+        throw new RuntimeException("Simulated error during callback");
+    };
+
+    /**
+     * Tests basic functionality, i.e. that close <em>means</em> closed.
+     */
+    @Test
+    public void testBasicClose() {
+        IdempotentCloser ic = new IdempotentCloser();
+        assertFalse(ic.isClosed());
+        ic.close();
+        assertTrue(ic.isClosed());
+    }
+
+    /**
+     * Tests that the onClose callback is only invoked once.
+     */
+    @Test
+    public void testCountCloses() {
+        AtomicInteger onCloseCounter = new AtomicInteger();
+        IdempotentCloser ic = new IdempotentCloser();
+
+        // Verify initial invariants.
+        assertFalse(ic.isClosed());
+        assertEquals(0, onCloseCounter.get());
+
+        // Close with our onClose callback to increment our counter.
+        ic.close(onCloseCounter::getAndIncrement);
+        assertTrue(ic.isClosed());
+        assertEquals(1, onCloseCounter.get());
+
+        // Close with our onClose callback again, but verify it wasn't invoked as it was previously closed.
+        ic.close(onCloseCounter::getAndIncrement);
+        assertTrue(ic.isClosed());
+        assertEquals(1, onCloseCounter.get());
+    }
+
+    /**
+     * Tests that the onClose callback is only invoked once, while the onPreviousClose callback can be invoked
+     * a variable number of times.
+     */
+    @Test
+    public void testEnsureIdempotentClose() {
+        AtomicInteger onCloseCounter = new AtomicInteger();
+        AtomicInteger onPreviousCloseCounter = new AtomicInteger();
+
+        IdempotentCloser ic = new IdempotentCloser();
+
+        // Verify initial invariants.
+        assertFalse(ic.isClosed());
+        assertEquals(0, onCloseCounter.get());
+        assertEquals(0, onPreviousCloseCounter.get());
+
+        // Our first close passes in both callbacks. As a result, our onClose callback should be run but our
+        // onPreviousClose callback should not be invoked.
+        ic.close(onCloseCounter::getAndIncrement, onPreviousCloseCounter::getAndIncrement);
+        assertTrue(ic.isClosed());
+        assertEquals(1, onCloseCounter.get());
+        assertEquals(0, onPreviousCloseCounter.get());
+
+        // Our second close again passes in both callbacks. As this is the second close, our onClose callback
+        // should not be run but our onPreviousClose callback should be executed.
+        ic.close(onCloseCounter::getAndIncrement, onPreviousCloseCounter::getAndIncrement);
+        assertTrue(ic.isClosed());
+        assertEquals(1, onCloseCounter.get());
+        assertEquals(1, onPreviousCloseCounter.get());
+
+        // Our third close yet again passes in both callbacks. As before, our onClose callback should not be run
+        // but our onPreviousClose callback should be run again.
+        ic.close(onCloseCounter::getAndIncrement, onPreviousCloseCounter::getAndIncrement);
+        assertTrue(ic.isClosed());
+        assertEquals(1, onCloseCounter.get());
+        assertEquals(2, onPreviousCloseCounter.get());
+    }
+
+    /**
+     * Tests that the {@link IdempotentCloser#assertOpen(String)} method will not throw an
+     * exception if the closer is in the "open" state, but if invoked after it's in the "closed" state, it will
+     * throw the exception.
+     */
+    @Test
+    public void testCloseBeforeThrows() {
+        IdempotentCloser ic = new IdempotentCloser();
+
+        // Verify initial invariants.
+        assertFalse(ic.isClosed());
+
+        // maybeThrowIllegalStateException doesn't throw anything since the closer is still in its "open" state.
+        assertDoesNotThrow(() -> ic.assertOpen(() -> "test"));
+
+        // Post-close, our call to maybeThrowIllegalStateException will, in fact, throw said exception.
+        ic.close();
+        assertTrue(ic.isClosed());
+        assertThrows(IllegalStateException.class, () -> ic.assertOpen(() -> "test"));
+    }
+
+    /**
+     * Tests that if the invoked onClose callback throws an exception, that:
+     *
+     * <ol>
+     *     <li>The exception does not prevent the {@link IdempotentCloser} from being updated to the closed state</li>
+     *     <li>The exception is bubbled up to the user</li>
+     * </ol>
+     */
+    @Test
+    public void testErrorsInOnCloseCallbacksAreNotSwallowed() {
+        IdempotentCloser ic = new IdempotentCloser();
+
+        // Verify initial invariants.
+        assertFalse(ic.isClosed());
+
+        // Upon close, our onClose callback will throw an error. First ensure that it is thrown at the user.
+        assertThrows(RuntimeException.class, () -> ic.close(CALLBACK_WITH_RUNTIME_EXCEPTION));
+
+        // Make sure the IdempotentCloser is still closed, though.
+        assertTrue(ic.isClosed());
+    }
+
+    /**
+     * Tests that if the invoked onSubsequentClose callback throws an exception, that it is thrown from
+     * {@link IdempotentCloser#close(Runnable, Runnable)} so the user can handle it.
+     */
+    @Test
+    public void testErrorsInOnPreviousCloseCallbacksAreNotSwallowed() {
+        IdempotentCloser ic = new IdempotentCloser();
+
+        // Verify initial invariants.
+        assertFalse(ic.isClosed());
+
+        // Perform the initial close. No errors here.
+        ic.close(CALLBACK_NO_OP);
+        assertTrue(ic.isClosed());
+
+        // Perform the subsequent close and verify that the exception is bubbled up to the user.
+        assertThrows(RuntimeException.class, () -> ic.close(CALLBACK_NO_OP, CALLBACK_WITH_RUNTIME_EXCEPTION));
+        assertTrue(ic.isClosed());
+    }
+
+    /**
+     * Tests that if the {@link IdempotentCloser} is created with its initial state as closed, the various APIs
+     * will behave as expected.
+     */
+    @Test
+    public void testCreatedClosed() {
+        IdempotentCloser ic = new IdempotentCloser(true);
+        assertTrue(ic.isClosed());
+        assertThrows(IllegalStateException.class, () -> ic.assertOpen(() -> "test"));
+        assertDoesNotThrow(() -> ic.close(CALLBACK_WITH_RUNTIME_EXCEPTION));
+    }
+}
\ No newline at end of file