You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/11/15 13:00:47 UTC

(kafka) branch trunk updated: KAFKA-15277: Design & implement support for internal Consumer delegates (#14670)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22f7ffe5e16 KAFKA-15277: Design & implement support for internal Consumer delegates (#14670)
22f7ffe5e16 is described below

commit 22f7ffe5e1623d279096b45ab475768eeb05eee1
Author: Kirk True <ki...@kirktrue.pro>
AuthorDate: Wed Nov 15 05:00:40 2023 -0800

    KAFKA-15277: Design & implement support for internal Consumer delegates (#14670)
    
    The consumer refactoring project introduced another `Consumer` implementation, creating two different, coexisting implementations of the `Consumer` interface:
    
    * `KafkaConsumer` (AKA "existing", "legacy" consumer)
    * `PrototypeAsyncConsumer` (AKA "new", "refactored" consumer)
    
    The goal of this task is to refactor the code via the delegation pattern so that we can keep a top-level `KafkaConsumer` but then delegate to another implementation under the covers. There will be two delegates at first:
    
    * `LegacyKafkaConsumer`
    * `AsyncKafkaConsumer`
    
    `LegacyKafkaConsumer` is essentially a renamed `KafkaConsumer`. That implementation handles the existing group protocol. `AsyncKafkaConsumer` is renamed from `PrototypeAsyncConsumer` and will implement the new consumer group protocol from KIP-848. Both of those implementations will live in the `internals` sub-package to discourage their use.
    
    This task is part of the work to implement support for the new KIP-848 consumer group protocol.
    
    Reviewers: Philip Nee <pn...@confluent.io>, Andrew Schofield <as...@confluent.io>, David Jacot <dj...@confluent.io>
---
 .../kafka/clients/consumer/ConsumerConfig.java     |   13 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |  971 ++-------------
 ...eAsyncConsumer.java => AsyncKafkaConsumer.java} |  203 ++--
 .../consumer/internals/CommitRequestManager.java   |    3 +-
 .../consumer/internals/ConsumerDelegate.java       |   42 +
 .../internals/ConsumerDelegateCreator.java         |  115 ++
 .../consumer/internals/ConsumerNetworkThread.java  |   10 +-
 .../clients/consumer/internals/ConsumerUtils.java  |    6 +
 .../consumer/internals/LegacyKafkaConsumer.java    | 1257 ++++++++++++++++++++
 .../kafka/clients/consumer/ConsumerConfigTest.java |   31 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 1211 +++++++++++--------
 ...nsumerTest.java => AsyncKafkaConsumerTest.java} |    8 +-
 .../internals/CommitRequestManagerTest.java        |    6 +
 .../consumer/internals/ConsumerTestBuilder.java    |    8 +-
 .../kafka/api/BaseAsyncConsumerTest.scala          |   10 +-
 .../kafka/api/IntegrationTestHarness.scala         |   14 -
 gradle/spotbugs-exclude.xml                        |   10 +
 .../integration/TaskAssignorIntegrationTest.java   |   11 +-
 18 files changed, 2434 insertions(+), 1495 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 43bd2eb1741..213fa3ee52b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -662,6 +662,7 @@ public class ConsumerConfig extends AbstractConfig {
         CommonClientConfigs.warnDisablingExponentialBackoff(this);
         Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
         maybeOverrideClientId(refinedConfigs);
+        maybeOverrideEnableAutoCommit(refinedConfigs);
         return refinedConfigs;
     }
 
@@ -695,17 +696,17 @@ public class ConsumerConfig extends AbstractConfig {
         return newConfigs;
     }
 
-    boolean maybeOverrideEnableAutoCommit() {
+    private void maybeOverrideEnableAutoCommit(Map<String, Object> configs) {
         Optional<String> groupId = Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG));
-        boolean enableAutoCommit = getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+        Map<String, Object> originals = originals();
+        boolean enableAutoCommit = originals.containsKey(ENABLE_AUTO_COMMIT_CONFIG) ? getBoolean(ENABLE_AUTO_COMMIT_CONFIG) : false;
         if (!groupId.isPresent()) { // overwrite in case of default group id where the config is not explicitly provided
-            if (!originals().containsKey(ENABLE_AUTO_COMMIT_CONFIG)) {
-                enableAutoCommit = false;
+            if (!originals.containsKey(ENABLE_AUTO_COMMIT_CONFIG)) {
+                configs.put(ENABLE_AUTO_COMMIT_CONFIG, false);
             } else if (enableAutoCommit) {
-                throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used.");
+                throw new InvalidConfigurationException(ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used.");
             }
         }
-        return enableAutoCommit;
     }
 
     public ConsumerConfig(Properties props) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index bd795e033ab..82e1f8b93da 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -16,26 +16,12 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import org.apache.kafka.clients.ApiVersions;
-import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.GroupRebalanceConfig;
-import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
-import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.consumer.internals.ConsumerDelegate;
+import org.apache.kafka.clients.consumer.internals.ConsumerDelegateCreator;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
-import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.clients.consumer.internals.Deserializers;
-import org.apache.kafka.clients.consumer.internals.Fetch;
-import org.apache.kafka.clients.consumer.internals.FetchConfig;
-import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
-import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
-import org.apache.kafka.clients.consumer.internals.OffsetFetcher;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
-import org.apache.kafka.clients.consumer.internals.TopicMetadataFetcher;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -43,52 +29,23 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.errors.InvalidGroupIdException;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
-import org.slf4j.Logger;
-import org.slf4j.event.Level;
 
-import java.net.InetSocketAddress;
 import java.time.Duration;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
-import static org.apache.kafka.common.utils.Utils.closeQuietly;
-import static org.apache.kafka.common.utils.Utils.isBlank;
-import static org.apache.kafka.common.utils.Utils.join;
 import static org.apache.kafka.common.utils.Utils.propsToMap;
-import static org.apache.kafka.common.utils.Utils.swallow;
 
 /**
  * A client that consumes records from a Kafka cluster.
@@ -468,8 +425,7 @@ import static org.apache.kafka.common.utils.Utils.swallow;
  *
  * <h3><a name="multithreaded">Multi-threaded Processing</a></h3>
  *
- * The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
- * making the call. It is the responsibility of the user to ensure that multi-threaded access
+ * The Kafka consumer is NOT thread-safe. It is the responsibility of the user to ensure that multi-threaded access
  * is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.
  *
  * <p>
@@ -567,43 +523,9 @@ import static org.apache.kafka.common.utils.Utils.swallow;
  */
 public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
-    private static final long NO_CURRENT_THREAD = -1L;
-    static final String DEFAULT_REASON = "rebalance enforced by user";
-
-    // Visible for testing
-    final Metrics metrics;
-    final KafkaConsumerMetrics kafkaConsumerMetrics;
-
-    private Logger log;
-    private final String clientId;
-    private final Optional<String> groupId;
-    private final ConsumerCoordinator coordinator;
-    private final Deserializers<K, V> deserializers;
-    private final Fetcher<K, V> fetcher;
-    private final OffsetFetcher offsetFetcher;
-    private final TopicMetadataFetcher topicMetadataFetcher;
-    private final ConsumerInterceptors<K, V> interceptors;
-    private final IsolationLevel isolationLevel;
-
-    private final Time time;
-    private final ConsumerNetworkClient client;
-    private final SubscriptionState subscriptions;
-    private final ConsumerMetadata metadata;
-    private final long retryBackoffMs;
-    private final long retryBackoffMaxMs;
-    private final long requestTimeoutMs;
-    private final int defaultApiTimeoutMs;
-    private volatile boolean closed = false;
-    private final List<ConsumerPartitionAssignor> assignors;
-
-    // currentThread holds the threadId of the current thread accessing KafkaConsumer
-    // and is used to prevent multi-threaded access
-    private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
-    // refcount is used to allow reentrant access by the thread who has acquired currentThread
-    private final AtomicInteger refcount = new AtomicInteger(0);
-
-    // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
-    private boolean cachedSubscriptionHasAllFetchPositions;
+    private final static ConsumerDelegateCreator CREATOR = new ConsumerDelegateCreator();
+
+    private final ConsumerDelegate<K, V> delegate;
 
     /**
      * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -674,165 +596,30 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 keyDeserializer, valueDeserializer);
     }
 
-    @SuppressWarnings("unchecked")
     KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
-        try {
-            GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
-                    GroupRebalanceConfig.ProtocolType.CONSUMER);
-
-            this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
-            this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
-            LogContext logContext = createLogContext(config, groupRebalanceConfig);
-            this.log = logContext.logger(getClass());
-            boolean enableAutoCommit = config.maybeOverrideEnableAutoCommit();
-            groupId.ifPresent(groupIdStr -> {
-                if (groupIdStr.isEmpty()) {
-                    log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
-                }
-            });
-
-            log.debug("Initializing the Kafka consumer");
-            this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-            this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
-            this.time = Time.SYSTEM;
-            this.metrics = createMetrics(config, time);
-            this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
-            this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
-
-            List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
-            this.interceptors = new ConsumerInterceptors<>(interceptorList);
-            this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
-            this.subscriptions = createSubscriptionState(config, logContext);
-            ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
-                    metrics.reporters(),
-                    interceptorList,
-                    Arrays.asList(this.deserializers.keyDeserializer, this.deserializers.valueDeserializer));
-            this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
-            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
-            this.metadata.bootstrap(addresses);
-
-            FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics);
-            FetchConfig fetchConfig = new FetchConfig(config);
-            this.isolationLevel = fetchConfig.isolationLevel;
-
-            ApiVersions apiVersions = new ApiVersions();
-            this.client = createConsumerNetworkClient(config,
-                    metrics,
-                    logContext,
-                    apiVersions,
-                    time,
-                    metadata,
-                    fetchMetricsManager.throttleTimeSensor(),
-                    retryBackoffMs);
-
-            this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
-                    config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
-                    config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
-            );
-
-            // no coordinator will be constructed for the default (null) group id
-            if (!groupId.isPresent()) {
-                config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
-                config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
-                this.coordinator = null;
-            } else {
-                this.coordinator = new ConsumerCoordinator(groupRebalanceConfig,
-                        logContext,
-                        this.client,
-                        assignors,
-                        this.metadata,
-                        this.subscriptions,
-                        metrics,
-                        CONSUMER_METRIC_GROUP_PREFIX,
-                        this.time,
-                        enableAutoCommit,
-                        config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
-                        this.interceptors,
-                        config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
-                        config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
-            }
-            this.fetcher = new Fetcher<>(
-                    logContext,
-                    this.client,
-                    this.metadata,
-                    this.subscriptions,
-                    fetchConfig,
-                    this.deserializers,
-                    fetchMetricsManager,
-                    this.time,
-                    apiVersions);
-            this.offsetFetcher = new OffsetFetcher(logContext,
-                    client,
-                    metadata,
-                    subscriptions,
-                    time,
-                    retryBackoffMs,
-                    requestTimeoutMs,
-                    isolationLevel,
-                    apiVersions);
-            this.topicMetadataFetcher = new TopicMetadataFetcher(logContext,
-                    client,
-                    retryBackoffMs,
-                    retryBackoffMaxMs);
-
-            this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX);
-
-            config.logUnused();
-            AppInfoParser.registerAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics, time.milliseconds());
-            log.debug("Kafka consumer initialized");
-        } catch (Throwable t) {
-            // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
-            // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
-            if (this.log != null) {
-                close(Duration.ZERO, true);
-            }
-            // now propagate the exception
-            throw new KafkaException("Failed to construct kafka consumer", t);
-        }
+        delegate = CREATOR.create(config, keyDeserializer, valueDeserializer);
     }
 
-    // visible for testing
     KafkaConsumer(LogContext logContext,
-                  String clientId,
-                  ConsumerCoordinator coordinator,
+                  Time time,
+                  ConsumerConfig config,
                   Deserializer<K> keyDeserializer,
                   Deserializer<V> valueDeserializer,
-                  Fetcher<K, V> fetcher,
-                  OffsetFetcher offsetFetcher,
-                  TopicMetadataFetcher topicMetadataFetcher,
-                  ConsumerInterceptors<K, V> interceptors,
-                  Time time,
-                  ConsumerNetworkClient client,
-                  Metrics metrics,
+                  KafkaClient client,
                   SubscriptionState subscriptions,
                   ConsumerMetadata metadata,
-                  long retryBackoffMs,
-                  long retryBackoffMaxMs,
-                  long requestTimeoutMs,
-                  int defaultApiTimeoutMs,
-                  List<ConsumerPartitionAssignor> assignors,
-                  String groupId) {
-        this.log = logContext.logger(getClass());
-        this.clientId = clientId;
-        this.coordinator = coordinator;
-        this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
-        this.fetcher = fetcher;
-        this.offsetFetcher = offsetFetcher;
-        this.topicMetadataFetcher = topicMetadataFetcher;
-        this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
-        this.interceptors = Objects.requireNonNull(interceptors);
-        this.time = time;
-        this.client = client;
-        this.metrics = metrics;
-        this.subscriptions = subscriptions;
-        this.metadata = metadata;
-        this.retryBackoffMs = retryBackoffMs;
-        this.retryBackoffMaxMs = retryBackoffMaxMs;
-        this.requestTimeoutMs = requestTimeoutMs;
-        this.defaultApiTimeoutMs = defaultApiTimeoutMs;
-        this.assignors = assignors;
-        this.groupId = Optional.ofNullable(groupId);
-        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
+                  List<ConsumerPartitionAssignor> assignors) {
+        delegate = CREATOR.create(
+            logContext,
+            time,
+            config,
+            keyDeserializer,
+            valueDeserializer,
+            client,
+            subscriptions,
+            metadata,
+            assignors
+        );
     }
 
     /**
@@ -844,12 +631,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @return The set of partitions currently assigned to this consumer
      */
     public Set<TopicPartition> assignment() {
-        acquireAndEnsureOpen();
-        try {
-            return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
-        } finally {
-            release();
-        }
+        return delegate.assignment();
     }
 
     /**
@@ -858,12 +640,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @return The set of topics currently subscribed to
      */
     public Set<String> subscription() {
-        acquireAndEnsureOpen();
-        try {
-            return Collections.unmodifiableSet(new HashSet<>(this.subscriptions.subscription()));
-        } finally {
-            release();
-        }
+        return delegate.subscription();
     }
 
     /**
@@ -903,10 +680,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
-        if (listener == null)
-            throw new IllegalArgumentException("RebalanceListener cannot be null");
-
-        subscribe(topics, Optional.of(listener));
+        delegate.subscribe(topics, listener);
     }
 
     /**
@@ -932,63 +706,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void subscribe(Collection<String> topics) {
-        subscribe(topics, Optional.empty());
-    }
-
-    /**
-     * Internal helper method for {@link #subscribe(Collection)} and
-     * {@link #subscribe(Collection, ConsumerRebalanceListener)}
-     * <p>
-     * Subscribe to the given list of topics to get dynamically assigned partitions.
-     * <b>Topic subscriptions are not incremental. This list will replace the current
-     * assignment (if there is one).</b> It is not possible to combine topic subscription with group management
-     * with manual partition assignment through {@link #assign(Collection)}.
-     *
-     * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
-     *
-     * <p>
-     * @param topics The list of topics to subscribe to
-     * @param listener {@link Optional} listener instance to get notifications on partition assignment/revocation
-     *                 for the subscribed topics
-     * @throws IllegalArgumentException If topics is null or contains null or empty elements
-     * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
-     *                               previously (without a subsequent call to {@link #unsubscribe()}), or if not
-     *                               configured at-least one partition assignment strategy
-     */
-    private void subscribe(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
-        acquireAndEnsureOpen();
-        try {
-            maybeThrowInvalidGroupIdException();
-            if (topics == null)
-                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
-            if (topics.isEmpty()) {
-                // treat subscribing to empty topic list as the same as unsubscribing
-                this.unsubscribe();
-            } else {
-                for (String topic : topics) {
-                    if (isBlank(topic))
-                        throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
-                }
-
-                throwIfNoAssignorsConfigured();
-
-                // Clear the buffered data which are not a part of newly assigned topics
-                final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
-
-                for (TopicPartition tp : subscriptions.assignedPartitions()) {
-                    if (topics.contains(tp.topic()))
-                        currentTopicPartitions.add(tp);
-                }
-
-                fetcher.clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
-
-                log.info("Subscribed to topic(s): {}", join(topics, ", "));
-                if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
-                    metadata.requestUpdateForNewTopics();
-            }
-        } finally {
-            release();
-        }
+        delegate.subscribe(topics);
     }
 
     /**
@@ -1012,10 +730,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
-        if (listener == null)
-            throw new IllegalArgumentException("RebalanceListener cannot be null");
-
-        subscribe(pattern, Optional.of(listener));
+        delegate.subscribe(pattern, listener);
     }
 
     /**
@@ -1036,47 +751,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void subscribe(Pattern pattern) {
-        subscribe(pattern, Optional.empty());
-    }
-
-    /**
-     * Internal helper method for {@link #subscribe(Pattern)} and
-     * {@link #subscribe(Pattern, ConsumerRebalanceListener)}
-     * <p>
-     * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
-     * The pattern matching will be done periodically against all topics existing at the time of check.
-     * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering
-     * the max metadata age, the consumer will refresh metadata more often and check for matching topics.
-     * <p>
-     * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the
-     * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there
-     * is a change to the topics matching the provided pattern and when consumer group membership changes.
-     * Group rebalances only take place during an active call to {@link #poll(Duration)}.
-     *
-     * @param pattern Pattern to subscribe to
-     * @param listener {@link Optional} listener instance to get notifications on partition assignment/revocation
-     *                 for the subscribed topics
-     * @throws IllegalArgumentException If pattern or listener is null
-     * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
-     *                               previously (without a subsequent call to {@link #unsubscribe()}), or if not
-     *                               configured at-least one partition assignment strategy
-     */
-    private void subscribe(Pattern pattern, Optional<ConsumerRebalanceListener> listener) {
-        maybeThrowInvalidGroupIdException();
-        if (pattern == null || pattern.toString().equals(""))
-            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
-                    "null" : "empty"));
-
-        acquireAndEnsureOpen();
-        try {
-            throwIfNoAssignorsConfigured();
-            log.info("Subscribed to pattern: '{}'", pattern);
-            this.subscriptions.subscribe(pattern, listener);
-            this.coordinator.updatePatternSubscription(metadata.fetch());
-            this.metadata.requestUpdateForNewTopics();
-        } finally {
-            release();
-        }
+        delegate.subscribe(pattern);
     }
 
     /**
@@ -1086,18 +761,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. rebalance callback errors)
      */
     public void unsubscribe() {
-        acquireAndEnsureOpen();
-        try {
-            fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
-            if (this.coordinator != null) {
-                this.coordinator.onLeavePrepare();
-                this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
-            }
-            this.subscriptions.unsubscribe();
-            log.info("Unsubscribed all topics or patterns and assigned partitions");
-        } finally {
-            release();
-        }
+        delegate.unsubscribe();
     }
 
     /**
@@ -1121,32 +785,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void assign(Collection<TopicPartition> partitions) {
-        acquireAndEnsureOpen();
-        try {
-            if (partitions == null) {
-                throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
-            } else if (partitions.isEmpty()) {
-                this.unsubscribe();
-            } else {
-                for (TopicPartition tp : partitions) {
-                    String topic = (tp != null) ? tp.topic() : null;
-                    if (isBlank(topic))
-                        throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
-                }
-                fetcher.clearBufferedDataForUnassignedPartitions(partitions);
-
-                // make sure the offsets of topic partitions the consumer is unsubscribing from
-                // are committed since there will be no following rebalance
-                if (coordinator != null)
-                    this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
-
-                log.info("Assigned to partition(s): {}", join(partitions, ", "));
-                if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
-                    metadata.requestUpdateForNewTopics();
-            }
-        } finally {
-            release();
-        }
+        delegate.assign(partitions);
     }
 
     /**
@@ -1185,7 +824,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     @Deprecated
     @Override
     public ConsumerRecords<K, V> poll(final long timeoutMs) {
-        return poll(time.timer(timeoutMs), false);
+        return delegate.poll(timeoutMs);
     }
 
     /**
@@ -1232,110 +871,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public ConsumerRecords<K, V> poll(final Duration timeout) {
-        return poll(time.timer(timeout), true);
-    }
-
-    /**
-     * @throws KafkaException if the rebalance callback throws exception
-     */
-    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
-        acquireAndEnsureOpen();
-        try {
-            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
-
-            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
-                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
-            }
-
-            do {
-                client.maybeTriggerWakeup();
-
-                if (includeMetadataInTimeout) {
-                    // try to update assignment metadata BUT do not need to block on the timer for join group
-                    updateAssignmentMetadataIfNeeded(timer, false);
-                } else {
-                    while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
-                        log.warn("Still waiting for metadata");
-                    }
-                }
-
-                final Fetch<K, V> fetch = pollForFetches(timer);
-                if (!fetch.isEmpty()) {
-                    // before returning the fetched records, we can send off the next round of fetches
-                    // and avoid block waiting for their responses to enable pipelining while the user
-                    // is handling the fetched records.
-                    //
-                    // NOTE: since the consumed position has already been updated, we must not allow
-                    // wakeups or any other errors to be triggered prior to returning the fetched records.
-                    if (sendFetches() > 0 || client.hasPendingRequests()) {
-                        client.transmitSends();
-                    }
-
-                    if (fetch.records().isEmpty()) {
-                        log.trace("Returning empty records from `poll()` "
-                                + "since the consumer's position has advanced for at least one topic partition");
-                    }
-
-                    return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
-                }
-            } while (timer.notExpired());
-
-            return ConsumerRecords.empty();
-        } finally {
-            release();
-            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
-        }
-    }
-
-    private int sendFetches() {
-        offsetFetcher.validatePositionsOnMetadataChange();
-        return fetcher.sendFetches();
-    }
-
-    boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
-        if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
-            return false;
-        }
-
-        return updateFetchPositions(timer);
-    }
-
-    /**
-     * @throws KafkaException if the rebalance callback throws exception
-     */
-    private Fetch<K, V> pollForFetches(Timer timer) {
-        long pollTimeout = coordinator == null ? timer.remainingMs() :
-                Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
-
-        // if data is available already, return it immediately
-        final Fetch<K, V> fetch = fetcher.collectFetch();
-        if (!fetch.isEmpty()) {
-            return fetch;
-        }
-
-        // send any new fetches (won't resend pending fetches)
-        sendFetches();
-
-        // We do not want to be stuck blocking in poll if we are missing some positions
-        // since the offset lookup may be backing off after a failure
-
-        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call
-        // updateAssignmentMetadataIfNeeded before this method.
-        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
-            pollTimeout = retryBackoffMs;
-        }
-
-        log.trace("Polling for fetches with timeout {}", pollTimeout);
-
-        Timer pollTimer = time.timer(pollTimeout);
-        client.poll(pollTimer, () -> {
-            // since a fetch might be completed by the background thread, we need this poll condition
-            // to ensure that we do not block unnecessarily in poll()
-            return !fetcher.hasAvailableFetches();
-        });
-        timer.update(pollTimer.currentTimeMs());
-
-        return fetcher.collectFetch();
+        return delegate.poll(timeout);
     }
 
     /**
@@ -1379,7 +915,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void commitSync() {
-        commitSync(Duration.ofMillis(defaultApiTimeoutMs));
+        delegate.commitSync();
     }
 
     /**
@@ -1422,7 +958,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void commitSync(Duration timeout) {
-        commitSync(subscriptions.allConsumed(), timeout);
+        delegate.commitSync(timeout);
     }
 
     /**
@@ -1470,7 +1006,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
-        commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
+        delegate.commitSync(offsets);
     }
 
     /**
@@ -1518,19 +1054,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
-        acquireAndEnsureOpen();
-        long commitStart = time.nanoseconds();
-        try {
-            maybeThrowInvalidGroupIdException();
-            offsets.forEach(this::updateLastSeenEpochIfNewer);
-            if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {
-                throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
-                        "committing offsets " + offsets);
-            }
-        } finally {
-            kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart);
-            release();
-        }
+        delegate.commitSync(offsets, timeout);
     }
 
     /**
@@ -1540,7 +1064,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void commitAsync() {
-        commitAsync(null);
+        delegate.commitAsync();
     }
 
     /**
@@ -1563,7 +1087,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void commitAsync(OffsetCommitCallback callback) {
-        commitAsync(subscriptions.allConsumed(), callback);
+        delegate.commitAsync(callback);
     }
 
     /**
@@ -1590,15 +1114,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
-        acquireAndEnsureOpen();
-        try {
-            maybeThrowInvalidGroupIdException();
-            log.debug("Committing offsets: {}", offsets);
-            offsets.forEach(this::updateLastSeenEpochIfNewer);
-            coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
-        } finally {
-            release();
-        }
+        delegate.commitAsync(offsets, callback);
     }
 
     /**
@@ -1632,20 +1148,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void seek(TopicPartition partition, long offset) {
-        if (offset < 0)
-            throw new IllegalArgumentException("seek offset must not be a negative number");
-
-        acquireAndEnsureOpen();
-        try {
-            log.info("Seeking to offset {} for partition {}", offset, partition);
-            SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
-                    offset,
-                    Optional.empty(), // This will ensure we skip validation
-                    this.metadata.currentLeader(partition));
-            this.subscriptions.seekUnvalidated(partition, newPosition);
-        } finally {
-            release();
-        }
+        delegate.seek(partition, offset);
     }
 
     /**
@@ -1659,29 +1162,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
-        long offset = offsetAndMetadata.offset();
-        if (offset < 0) {
-            throw new IllegalArgumentException("seek offset must not be a negative number");
-        }
-
-        acquireAndEnsureOpen();
-        try {
-            if (offsetAndMetadata.leaderEpoch().isPresent()) {
-                log.info("Seeking to offset {} for partition {} with epoch {}",
-                        offset, partition, offsetAndMetadata.leaderEpoch().get());
-            } else {
-                log.info("Seeking to offset {} for partition {}", offset, partition);
-            }
-            Metadata.LeaderAndEpoch currentLeaderAndEpoch = this.metadata.currentLeader(partition);
-            SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
-                    offsetAndMetadata.offset(),
-                    offsetAndMetadata.leaderEpoch(),
-                    currentLeaderAndEpoch);
-            this.updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
-            this.subscriptions.seekUnvalidated(partition, newPosition);
-        } finally {
-            release();
-        }
+        delegate.seek(partition, offsetAndMetadata);
     }
 
     /**
@@ -1694,16 +1175,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void seekToBeginning(Collection<TopicPartition> partitions) {
-        if (partitions == null)
-            throw new IllegalArgumentException("Partitions collection cannot be null");
-
-        acquireAndEnsureOpen();
-        try {
-            Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
-            subscriptions.requestOffsetReset(parts, OffsetResetStrategy.EARLIEST);
-        } finally {
-            release();
-        }
+        delegate.seekToBeginning(partitions);
     }
 
     /**
@@ -1719,16 +1191,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void seekToEnd(Collection<TopicPartition> partitions) {
-        if (partitions == null)
-            throw new IllegalArgumentException("Partitions collection cannot be null");
-
-        acquireAndEnsureOpen();
-        try {
-            Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
-            subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST);
-        } finally {
-            release();
-        }
+        delegate.seekToEnd(partitions);
     }
 
     /**
@@ -1759,7 +1222,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public long position(TopicPartition partition) {
-        return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
+        return delegate.position(partition);
     }
 
     /**
@@ -1789,26 +1252,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public long position(TopicPartition partition, final Duration timeout) {
-        acquireAndEnsureOpen();
-        try {
-            if (!this.subscriptions.isAssigned(partition))
-                throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");
-
-            Timer timer = time.timer(timeout);
-            do {
-                SubscriptionState.FetchPosition position = this.subscriptions.validPosition(partition);
-                if (position != null)
-                    return position.offset;
-
-                updateFetchPositions(timer);
-                client.poll(timer);
-            } while (timer.notExpired());
-
-            throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " +
-                    "for partition " + partition + " could be determined");
-        } finally {
-            release();
-        }
+        return delegate.position(partition, timeout);
     }
 
     /**
@@ -1838,7 +1282,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     @Deprecated
     @Override
     public OffsetAndMetadata committed(TopicPartition partition) {
-        return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
+        return delegate.committed(partition);
     }
 
     /**
@@ -1867,7 +1311,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     @Deprecated
     @Override
     public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
-        return committed(Collections.singleton(partition), timeout).get(partition);
+        return delegate.committed(partition, timeout);
     }
 
     /**
@@ -1899,7 +1343,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
-        return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+        return delegate.committed(partitions);
     }
 
     /**
@@ -1927,24 +1371,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout) {
-        acquireAndEnsureOpen();
-        long start = time.nanoseconds();
-        try {
-            maybeThrowInvalidGroupIdException();
-            final Map<TopicPartition, OffsetAndMetadata> offsets;
-            offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
-            if (offsets == null) {
-                throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
-                        "committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " +
-                        "larger to relax the threshold.");
-            } else {
-                offsets.forEach(this::updateLastSeenEpochIfNewer);
-                return offsets;
-            }
-        } finally {
-            kafkaConsumerMetrics.recordCommitted(time.nanoseconds() - start);
-            release();
-        }
+        return delegate.committed(partitions, timeout);
     }
 
     /**
@@ -1974,7 +1401,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Uuid clientInstanceId(Duration timeout) {
-        throw new UnsupportedOperationException();
+        return delegate.clientInstanceId(timeout);
     }
 
   /**
@@ -1982,7 +1409,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<MetricName, ? extends Metric> metrics() {
-        return Collections.unmodifiableMap(this.metrics.metrics());
+        return delegate.metrics();
     }
 
     /**
@@ -2004,7 +1431,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
-        return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
+        return delegate.partitionsFor(topic);
     }
 
     /**
@@ -2028,19 +1455,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
-        acquireAndEnsureOpen();
-        try {
-            Cluster cluster = this.metadata.fetch();
-            List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
-            if (!parts.isEmpty())
-                return parts;
-
-            Timer timer = time.timer(timeout);
-            List<PartitionInfo> topicMetadata = topicMetadataFetcher.getTopicMetadata(topic, metadata.allowAutoTopicCreation(), timer);
-            return topicMetadata != null ? topicMetadata : Collections.emptyList();
-        } finally {
-            release();
-        }
+        return delegate.partitionsFor(topic, timeout);
     }
 
     /**
@@ -2059,7 +1474,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<String, List<PartitionInfo>> listTopics() {
-        return listTopics(Duration.ofMillis(defaultApiTimeoutMs));
+        return delegate.listTopics();
     }
 
     /**
@@ -2079,12 +1494,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
-        acquireAndEnsureOpen();
-        try {
-            return topicMetadataFetcher.getAllTopicMetadata(time.timer(timeout));
-        } finally {
-            release();
-        }
+        return delegate.listTopics(timeout);
     }
 
     /**
@@ -2099,15 +1509,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void pause(Collection<TopicPartition> partitions) {
-        acquireAndEnsureOpen();
-        try {
-            log.debug("Pausing partitions {}", partitions);
-            for (TopicPartition partition: partitions) {
-                subscriptions.pause(partition);
-            }
-        } finally {
-            release();
-        }
+        delegate.pause(partitions);
     }
 
     /**
@@ -2119,15 +1521,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void resume(Collection<TopicPartition> partitions) {
-        acquireAndEnsureOpen();
-        try {
-            log.debug("Resuming partitions {}", partitions);
-            for (TopicPartition partition: partitions) {
-                subscriptions.resume(partition);
-            }
-        } finally {
-            release();
-        }
+        delegate.resume(partitions);
     }
 
     /**
@@ -2137,12 +1531,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Set<TopicPartition> paused() {
-        acquireAndEnsureOpen();
-        try {
-            return Collections.unmodifiableSet(subscriptions.pausedPartitions());
-        } finally {
-            release();
-        }
+        return delegate.paused();
     }
 
     /**
@@ -2168,7 +1557,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
-        return offsetsForTimes(timestampsToSearch, Duration.ofMillis(defaultApiTimeoutMs));
+        return delegate.offsetsForTimes(timestampsToSearch);
     }
 
     /**
@@ -2195,19 +1584,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
-        acquireAndEnsureOpen();
-        try {
-            for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
-                // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
-                // OffsetAndTimestamp is always positive.
-                if (entry.getValue() < 0)
-                    throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
-                            entry.getValue() + ". The target time cannot be negative.");
-            }
-            return offsetFetcher.offsetsForTimes(timestampsToSearch, time.timer(timeout));
-        } finally {
-            release();
-        }
+        return delegate.offsetsForTimes(timestampsToSearch, timeout);
     }
 
     /**
@@ -2226,7 +1603,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
-        return beginningOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+        return delegate.beginningOffsets(partitions);
     }
 
     /**
@@ -2247,12 +1624,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
-        acquireAndEnsureOpen();
-        try {
-            return offsetFetcher.beginningOffsets(partitions, time.timer(timeout));
-        } finally {
-            release();
-        }
+        return delegate.beginningOffsets(partitions, timeout);
     }
 
     /**
@@ -2276,7 +1648,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
-        return endOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+        return delegate.endOffsets(partitions);
     }
 
     /**
@@ -2302,12 +1674,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
-        acquireAndEnsureOpen();
-        try {
-            return offsetFetcher.endOffsets(partitions, time.timer(timeout));
-        } finally {
-            release();
-        }
+        return delegate.endOffsets(partitions, timeout);
     }
 
     /**
@@ -2326,30 +1693,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public OptionalLong currentLag(TopicPartition topicPartition) {
-        acquireAndEnsureOpen();
-        try {
-            final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);
-
-            // if the log end offset is not known and hence cannot return lag and there is
-            // no in-flight list offset requested yet,
-            // issue a list offset request for that partition so that next time
-            // we may get the answer; we do not need to wait for the return value
-            // since we would not try to poll the network client synchronously
-            if (lag == null) {
-                if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null &&
-                        !subscriptions.partitionEndOffsetRequested(topicPartition)) {
-                    log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
-                    subscriptions.requestPartitionEndOffset(topicPartition);
-                    offsetFetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));
-                }
-
-                return OptionalLong.empty();
-            }
-
-            return OptionalLong.of(lag);
-        } finally {
-            release();
-        }
+        return delegate.currentLag(topicPartition);
     }
 
     /**
@@ -2360,13 +1704,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public ConsumerGroupMetadata groupMetadata() {
-        acquireAndEnsureOpen();
-        try {
-            maybeThrowInvalidGroupIdException();
-            return coordinator.groupMetadata();
-        } finally {
-            release();
-        }
+        return delegate.groupMetadata();
     }
 
     /**
@@ -2393,15 +1731,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void enforceRebalance(final String reason) {
-        acquireAndEnsureOpen();
-        try {
-            if (coordinator == null) {
-                throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group.");
-            }
-            coordinator.requestRejoin(reason == null || reason.isEmpty() ? DEFAULT_REASON : reason);
-        } finally {
-            release();
-        }
+        delegate.enforceRebalance(reason);
     }
 
     /**
@@ -2409,7 +1739,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void enforceRebalance() {
-        enforceRebalance(null);
+        delegate.enforceRebalance();
     }
 
     /**
@@ -2424,7 +1754,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void close() {
-        close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+        delegate.close();
     }
 
     /**
@@ -2444,19 +1774,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void close(Duration timeout) {
-        if (timeout.toMillis() < 0)
-            throw new IllegalArgumentException("The timeout cannot be negative.");
-        acquire();
-        try {
-            if (!closed) {
-                // need to close before setting the flag since the close function
-                // itself may trigger rebalance callback that needs the consumer to be open still
-                close(timeout, false);
-            }
-        } finally {
-            closed = true;
-            release();
-        }
+        delegate.close(timeout);
     }
 
     /**
@@ -2466,152 +1784,23 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void wakeup() {
-        this.client.wakeup();
-    }
-
-    private Timer createTimerForRequest(final Duration timeout) {
-        // this.time could be null if an exception occurs in constructor prior to setting the this.time field
-        final Time localTime = (time == null) ? Time.SYSTEM : time;
-        return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
-    }
-
-    private void close(Duration timeout, boolean swallowException) {
-        log.trace("Closing the Kafka consumer");
-        AtomicReference<Throwable> firstException = new AtomicReference<>();
-
-        final Timer closeTimer = createTimerForRequest(timeout);
-        // Close objects with a timeout. The timeout is required because the coordinator & the fetcher send requests to
-        // the server in the process of closing which may not respect the overall timeout defined for closing the
-        // consumer.
-        if (coordinator != null) {
-            // This is a blocking call bound by the time remaining in closeTimer
-            swallow(log, Level.ERROR, "Failed to close coordinator with a timeout(ms)=" + closeTimer.timeoutMs(), () -> coordinator.close(closeTimer), firstException);
-        }
-
-        if (fetcher != null) {
-            // the timeout for the session close is at-most the requestTimeoutMs
-            long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - closeTimer.elapsedMs());
-            if (remainingDurationInTimeout > 0) {
-                remainingDurationInTimeout = Math.min(requestTimeoutMs, remainingDurationInTimeout);
-            }
-
-            closeTimer.reset(remainingDurationInTimeout);
-
-            // This is a blocking call bound by the time remaining in closeTimer
-            swallow(log, Level.ERROR, "Failed to close fetcher with a timeout(ms)=" + closeTimer.timeoutMs(), () -> fetcher.close(closeTimer), firstException);
-        }
-
-        closeQuietly(interceptors, "consumer interceptors", firstException);
-        closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
-        closeQuietly(metrics, "consumer metrics", firstException);
-        closeQuietly(client, "consumer network client", firstException);
-        closeQuietly(deserializers, "consumer deserializers", firstException);
-        AppInfoParser.unregisterAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics);
-        log.debug("Kafka consumer has been closed");
-        Throwable exception = firstException.get();
-        if (exception != null && !swallowException) {
-            if (exception instanceof InterruptException) {
-                throw (InterruptException) exception;
-            }
-            throw new KafkaException("Failed to close kafka consumer", exception);
-        }
+        delegate.wakeup();
     }
 
-    /**
-     * Set the fetch position to the committed position (if there is one)
-     * or reset it using the offset reset policy the user has configured.
-     *
-     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
-     * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
-     *             defined
-     * @return true iff the operation completed without timing out
-     */
-    private boolean updateFetchPositions(final Timer timer) {
-        // If any partitions have been truncated due to a leader change, we need to validate the offsets
-        offsetFetcher.validatePositionsIfNeeded();
-
-        cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions();
-        if (cachedSubscriptionHasAllFetchPositions) return true;
-
-        // If there are any partitions which do not have a valid position and are not
-        // awaiting reset, then we need to fetch committed offsets. We will only do a
-        // coordinator lookup if there are partitions which have missing positions, so
-        // a consumer with manually assigned partitions can avoid a coordinator dependence
-        // by always ensuring that assigned partitions have an initial position.
-        if (coordinator != null && !coordinator.initWithCommittedOffsetsIfNeeded(timer)) return false;
-
-        // If there are partitions still needing a position and a reset policy is defined,
-        // request reset using the default policy. If no reset strategy is defined and there
-        // are partitions with a missing position, then we will raise an exception.
-        subscriptions.resetInitializingPositions();
-
-        // Finally send an asynchronous request to look up and update the positions of any
-        // partitions which are awaiting reset.
-        offsetFetcher.resetPositionsIfNeeded();
-
-        return true;
-    }
-
-    /**
-     * Acquire the light lock and ensure that the consumer hasn't been closed.
-     * @throws IllegalStateException If the consumer has been closed
-     */
-    private void acquireAndEnsureOpen() {
-        acquire();
-        if (this.closed) {
-            release();
-            throw new IllegalStateException("This consumer has already been closed.");
-        }
-    }
-
-    /**
-     * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
-     * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
-     * supported).
-     * @throws ConcurrentModificationException if another thread already has the lock
-     */
-    private void acquire() {
-        final Thread thread = Thread.currentThread();
-        final long threadId = thread.getId();
-        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
-            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access. " +
-                    "currentThread(name: " + thread.getName() + ", id: " + threadId + ")" +
-                    " otherThread(id: " + currentThread.get() + ")"
-            );
-        refcount.incrementAndGet();
-    }
-
-    /**
-     * Release the light lock protecting the consumer from multi-threaded access.
-     */
-    private void release() {
-        if (refcount.decrementAndGet() == 0)
-            currentThread.set(NO_CURRENT_THREAD);
-    }
-
-    private void throwIfNoAssignorsConfigured() {
-        if (assignors.isEmpty())
-            throw new IllegalStateException("Must configure at least one partition assigner class name to " +
-                    ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
-    }
-
-    private void maybeThrowInvalidGroupIdException() {
-        if (!groupId.isPresent())
-            throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
-                    "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
+    // Functions below are for testing only
+    String clientId() {
+        return delegate.clientId();
     }
 
-    private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
-        if (offsetAndMetadata != null)
-            offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
+    Metrics metricsRegistry() {
+        return delegate.metricsRegistry();
     }
 
-    // Functions below are for testing only
-    String getClientId() {
-        return clientId;
+    KafkaConsumerMetrics kafkaConsumerMetrics() {
+        return delegate.kafkaConsumerMetrics();
     }
 
     boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
-        return updateAssignmentMetadataIfNeeded(timer, true);
+        return delegate.updateAssignmentMetadataIfNeeded(timer);
     }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
similarity index 88%
rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index a90d37597a3..9f8d7206578 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -28,6 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerInterceptor;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
@@ -53,7 +55,6 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -79,7 +80,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalLong;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -91,11 +91,10 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
 import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
 import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
 import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
 import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
 import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
@@ -105,16 +104,21 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshC
 import static org.apache.kafka.common.utils.Utils.closeQuietly;
 import static org.apache.kafka.common.utils.Utils.isBlank;
 import static org.apache.kafka.common.utils.Utils.join;
-import static org.apache.kafka.common.utils.Utils.propsToMap;
 
 /**
- * This prototype consumer uses an {@link ApplicationEventHandler event handler} to process
- * {@link ApplicationEvent application events} so that the network IO can be processed in a dedicated
+ * This {@link Consumer} implementation uses an {@link ApplicationEventHandler event handler} to process
+ * {@link ApplicationEvent application events} so that the network I/O can be processed in a dedicated
  * {@link ConsumerNetworkThread network thread}. Visit
  * <a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design">this document</a>
- * for detail implementation.
+ * for implementation detail.
+ *
+ * <p/>
+ *
+ * <em>Note:</em> this {@link Consumer} implementation is part of the revised consumer group protocol from KIP-848.
+ * This class should not be invoked directly; users should instead create a {@link KafkaConsumer} as before.
+ * This consumer implements the new consumer group protocol and is intended to be the default in coming releases.
  */
-public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
+public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
 
     private final ApplicationEventHandler applicationEventHandler;
     private final Time time;
@@ -140,7 +144,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
     private final ConsumerMetadata metadata;
     private final Metrics metrics;
     private final long retryBackoffMs;
-    private final long defaultApiTimeoutMs;
+    private final int defaultApiTimeoutMs;
     private volatile boolean closed = false;
     private final List<ConsumerPartitionAssignor> assignors;
 
@@ -148,30 +152,9 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
     private boolean cachedSubscriptionHasAllFetchPositions;
     private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
 
-    public PrototypeAsyncConsumer(final Properties properties,
-                                  final Deserializer<K> keyDeserializer,
-                                  final Deserializer<V> valueDeserializer) {
-        this(propsToMap(properties), keyDeserializer, valueDeserializer);
-    }
-
-    public PrototypeAsyncConsumer(final Map<String, Object> configs,
-                                  final Deserializer<K> keyDeserializer,
-                                  final Deserializer<V> valueDeserializer) {
-        this(new ConsumerConfig(appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
-                keyDeserializer,
-                valueDeserializer);
-    }
-
-    public PrototypeAsyncConsumer(final ConsumerConfig config,
-                                  final Deserializer<K> keyDeserializer,
-                                  final Deserializer<V> valueDeserializer) {
-        this(Time.SYSTEM, config, keyDeserializer, valueDeserializer);
-    }
-
-    public PrototypeAsyncConsumer(final Time time,
-                                  final ConsumerConfig config,
-                                  final Deserializer<K> keyDeserializer,
-                                  final Deserializer<V> valueDeserializer) {
+    AsyncKafkaConsumer(final ConsumerConfig config,
+                       final Deserializer<K> keyDeserializer,
+                       final Deserializer<V> valueDeserializer) {
         try {
             GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
                     GroupRebalanceConfig.ProtocolType.CONSUMER);
@@ -188,7 +171,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
 
             log.debug("Initializing the Kafka consumer");
             this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
-            this.time = time;
+            this.time = Time.SYSTEM;
             this.metrics = createMetrics(config, time);
             this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
 
@@ -250,7 +233,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
             // no coordinator will be constructed for the default (null) group id
             if (!groupId.isPresent()) {
                 config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
-                //config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+                config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
             }
 
             // The FetchCollector is only used on the application thread.
@@ -278,22 +261,23 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
-    public PrototypeAsyncConsumer(LogContext logContext,
-                                  String clientId,
-                                  Deserializers<K, V> deserializers,
-                                  FetchBuffer fetchBuffer,
-                                  FetchCollector<K, V> fetchCollector,
-                                  ConsumerInterceptors<K, V> interceptors,
-                                  Time time,
-                                  ApplicationEventHandler applicationEventHandler,
-                                  BlockingQueue<BackgroundEvent> backgroundEventQueue,
-                                  Metrics metrics,
-                                  SubscriptionState subscriptions,
-                                  ConsumerMetadata metadata,
-                                  long retryBackoffMs,
-                                  int defaultApiTimeoutMs,
-                                  List<ConsumerPartitionAssignor> assignors,
-                                  String groupId) {
+    // Visible for testing
+    AsyncKafkaConsumer(LogContext logContext,
+                       String clientId,
+                       Deserializers<K, V> deserializers,
+                       FetchBuffer fetchBuffer,
+                       FetchCollector<K, V> fetchCollector,
+                       ConsumerInterceptors<K, V> interceptors,
+                       Time time,
+                       ApplicationEventHandler applicationEventHandler,
+                       BlockingQueue<BackgroundEvent> backgroundEventQueue,
+                       Metrics metrics,
+                       SubscriptionState subscriptions,
+                       ConsumerMetadata metadata,
+                       long retryBackoffMs,
+                       int defaultApiTimeoutMs,
+                       List<ConsumerPartitionAssignor> assignors,
+                       String groupId) {
         this.log = logContext.logger(getClass());
         this.subscriptions = subscriptions;
         this.clientId = clientId;
@@ -314,6 +298,84 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
         this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
     }
 
+    // Visible for testing
+    AsyncKafkaConsumer(LogContext logContext,
+                       Time time,
+                       ConsumerConfig config,
+                       Deserializer<K> keyDeserializer,
+                       Deserializer<V> valueDeserializer,
+                       KafkaClient client,
+                       SubscriptionState subscriptions,
+                       ConsumerMetadata metadata,
+                       List<ConsumerPartitionAssignor> assignors) {
+        this.log = logContext.logger(getClass());
+        this.subscriptions = subscriptions;
+        this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
+        this.fetchBuffer = new FetchBuffer(logContext);
+        this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
+        this.interceptors = new ConsumerInterceptors<>(Collections.emptyList());
+        this.time = time;
+        this.metrics = new Metrics(time);
+        this.groupId = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_ID_CONFIG));
+        this.metadata = metadata;
+        this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+        this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
+        this.assignors = assignors;
+
+        ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
+        FetchMetricsManager fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
+        this.fetchCollector = new FetchCollector<>(logContext,
+                metadata,
+                subscriptions,
+                new FetchConfig(config),
+                deserializers,
+                fetchMetricsManager,
+                time);
+        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
+
+        GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
+            config,
+            GroupRebalanceConfig.ProtocolType.CONSUMER
+        );
+
+        BlockingQueue<ApplicationEvent> applicationEventQueue = new LinkedBlockingQueue<>();
+        BlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>();
+        this.backgroundEventProcessor = new BackgroundEventProcessor(logContext, backgroundEventQueue);
+        ApiVersions apiVersions = new ApiVersions();
+        Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> new NetworkClientDelegate(
+            time,
+            config,
+            logContext,
+            client
+        );
+        Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(
+            time,
+            logContext,
+            backgroundEventQueue,
+            metadata,
+            subscriptions,
+            fetchBuffer,
+            config,
+            groupRebalanceConfig,
+            apiVersions,
+            fetchMetricsManager,
+            networkClientDelegateSupplier
+        );
+        Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
+                logContext,
+                metadata,
+                applicationEventQueue,
+                requestManagersSupplier
+        );
+        this.applicationEventHandler = new ApplicationEventHandler(logContext,
+                time,
+                applicationEventQueue,
+                applicationEventProcessorSupplier,
+                networkClientDelegateSupplier,
+                requestManagersSupplier);
+    }
+
     /**
      * poll implementation using {@link ApplicationEventHandler}.
      *  1. Poll for background events. If there's a fetch response event, process the record and return it. If it is
@@ -970,6 +1032,9 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
     private boolean initWithCommittedOffsetsIfNeeded(Timer timer) {
         final Set<TopicPartition> initializingPartitions = subscriptions.initializingPartitions();
 
+        if (initializingPartitions.isEmpty())
+            return true;
+
         log.debug("Refreshing committed offsets for partitions {}", initializingPartitions);
         try {
             final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(initializingPartitions);
@@ -982,23 +1047,6 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
-    // This is here temporary as we don't have public access to the ConsumerConfig in this module.
-    public static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
-                                                                 Deserializer<?> keyDeserializer,
-                                                                 Deserializer<?> valueDeserializer) {
-        // validate deserializer configuration, if the passed deserializer instance is null, the user must explicitly set a valid deserializer configuration value
-        Map<String, Object> newConfigs = new HashMap<>(configs);
-        if (keyDeserializer != null)
-            newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
-        else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null)
-            throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, "must be non-null.");
-        if (valueDeserializer != null)
-            newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
-        else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null)
-            throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, "must be non-null.");
-        return newConfigs;
-    }
-
     private void throwIfNoAssignorsConfigured() {
         if (assignors.isEmpty())
             throw new IllegalStateException("Must configure at least one partition assigner class name to " +
@@ -1018,7 +1066,8 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
-    boolean updateAssignmentMetadataIfNeeded(Timer timer) {
+    @Override
+    public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
         backgroundEventProcessor.process();
 
         // Keeping this updateAssignmentMetadataIfNeeded wrapping up the updateFetchPositions as
@@ -1096,4 +1145,18 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
+    @Override
+    public String clientId() {
+        return clientId;
+    }
+
+    @Override
+    public Metrics metricsRegistry() {
+        return metrics;
+    }
+
+    @Override
+    public KafkaConsumerMetrics kafkaConsumerMetrics() {
+        return kafkaConsumerMetrics;
+    }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index a04d3fa8437..f996f909302 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -56,6 +56,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
 import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY;
 import static org.apache.kafka.common.protocol.Errors.COORDINATOR_LOAD_IN_PROGRESS;
 import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
@@ -64,8 +65,6 @@ import static org.apache.kafka.common.protocol.Errors.REQUEST_TIMED_OUT;
 
 public class CommitRequestManager implements RequestManager {
 
-    // TODO: current in ConsumerConfig but inaccessible in the internal package.
-    private static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported";
     private final SubscriptionState subscriptions;
     private final LogContext logContext;
     private final Logger log;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java
new file mode 100644
index 00000000000..612827ebe83
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * This extension interface provides a handful of methods to expose internals of the {@link Consumer} for
+ * various tests.
+ *
+ * <p/>
+ *
+ * <em>Note</em>: this is for internal use only and is not intended for use by end users. Internal users should
+ * not attempt to determine the underlying implementation to avoid coding to an unstable interface. Rather, it is
+ * the {@link Consumer} API contract that should serve as the caller's interface.
+ */
+public interface ConsumerDelegate<K, V> extends Consumer<K, V> {
+
+    String clientId();
+
+    Metrics metricsRegistry();
+
+    KafkaConsumerMetrics kafkaConsumerMetrics();
+
+    boolean updateAssignmentMetadataIfNeeded(final Timer timer);
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
new file mode 100644
index 00000000000..bd95e06c864
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different implementations to co-exist under
+ * the covers.
+ *
+ * <p/>
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if
+ * it is using the new consumer group protocol (KIP-848) or if it should fall back to the existing, legacy group
+ * protocol. This is based on the presence and value of the {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG group.protocol}
+ * configuration. If the value is present and equal to &quot;{@code consumer}&quot;, the {@link AsyncKafkaConsumer}
+ * will be returned. Otherwise, the {@link LegacyKafkaConsumer} will be returned.
+ *
+ *
+ * <p/>
+ *
+ * <em>Note</em>: this is for internal use only and is not intended for use by end users. Internal users should
+ * not attempt to determine the underlying implementation to avoid coding to an unstable interface. Rather, it is
+ * the {@link Consumer} API contract that should serve as the caller's interface.
+ */
+public class ConsumerDelegateCreator {
+
+    public <K, V> ConsumerDelegate<K, V> create(ConsumerConfig config,
+                                                Deserializer<K> keyDeserializer,
+                                                Deserializer<V> valueDeserializer) {
+        try {
+            GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));
+
+            if (groupProtocol == GroupProtocol.CONSUMER)
+                return new AsyncKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
+            else
+                return new LegacyKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
+        } catch (KafkaException e) {
+            throw e;
+        } catch (Throwable t) {
+            throw new KafkaException("Failed to construct Kafka consumer", t);
+        }
+    }
+
+    public <K, V> ConsumerDelegate<K, V> create(LogContext logContext,
+                                                Time time,
+                                                ConsumerConfig config,
+                                                Deserializer<K> keyDeserializer,
+                                                Deserializer<V> valueDeserializer,
+                                                KafkaClient client,
+                                                SubscriptionState subscriptions,
+                                                ConsumerMetadata metadata,
+                                                List<ConsumerPartitionAssignor> assignors) {
+        try {
+            GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));
+
+            if (groupProtocol == GroupProtocol.CONSUMER)
+                return new AsyncKafkaConsumer<>(
+                    logContext,
+                    time,
+                    config,
+                    keyDeserializer,
+                    valueDeserializer,
+                    client,
+                    subscriptions,
+                    metadata,
+                    assignors
+                );
+            else
+                return new LegacyKafkaConsumer<>(
+                    logContext,
+                    time,
+                    config,
+                    keyDeserializer,
+                    valueDeserializer,
+                    client,
+                    subscriptions,
+                    metadata,
+                    assignors
+                );
+        } catch (KafkaException e) {
+            throw e;
+        } catch (Throwable t) {
+            throw new KafkaException("Failed to construct Kafka consumer", t);
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
index 77a2952d1b2..6519e3ef48a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
@@ -247,12 +247,10 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
         closeTimeout = timeout;
         wakeup();
 
-        if (timeoutMs > 0) {
-            try {
-                join(timeoutMs);
-            } catch (InterruptedException e) {
-                log.error("Interrupted while waiting for consumer network thread to complete", e);
-            }
+        try {
+            join();
+        } catch (InterruptedException e) {
+            log.error("Interrupted while waiting for consumer network thread to complete", e);
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
index 92b098213b0..e267293f98b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
@@ -56,6 +56,12 @@ import java.util.concurrent.TimeUnit;
 
 public final class ConsumerUtils {
 
+    /**
+     * This configuration has only package-level visibility in {@link ConsumerConfig}, so it's inaccessible in the
+     * internals package where most of its uses live. Attempts were made to move things around, but it was deemed
+     * better to leave it as is.
+     */
+    static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported";
     public static final long DEFAULT_CLOSE_TIMEOUT_MS = 30 * 1000;
     public static final String CONSUMER_JMX_PREFIX = "kafka.consumer";
     public static final String CONSUMER_METRIC_GROUP_PREFIX = "consumer";
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
new file mode 100644
index 00000000000..06495c6a8fc
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
@@ -0,0 +1,1257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_RACK_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+import static org.apache.kafka.common.utils.Utils.isBlank;
+import static org.apache.kafka.common.utils.Utils.join;
+import static org.apache.kafka.common.utils.Utils.swallow;
+
+/**
+ * A client that consumes records from a Kafka cluster using the {@link GroupProtocol#GENERIC generic group protocol}.
+ * In this implementation, all network I/O happens in the thread of the application making the call.
+ *
+ * <p/>
+ *
+ * <em>Note:</em> per its name, this implementation is left for backward compatibility purposes. The updated consumer
+ * group protocol (from KIP-848) introduces allows users continue using the legacy "generic" group protocol.
+ * This class should not be invoked directly; users should instead create a {@link KafkaConsumer} as before.
+ */
+public class LegacyKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
+
+    private static final long NO_CURRENT_THREAD = -1L;
+    public static final String DEFAULT_REASON = "rebalance enforced by user";
+
+    private final Metrics metrics;
+    private final KafkaConsumerMetrics kafkaConsumerMetrics;
+    private Logger log;
+    private final String clientId;
+    private final Optional<String> groupId;
+    private final ConsumerCoordinator coordinator;
+    private final Deserializers<K, V> deserializers;
+    private final Fetcher<K, V> fetcher;
+    private final OffsetFetcher offsetFetcher;
+    private final TopicMetadataFetcher topicMetadataFetcher;
+    private final ConsumerInterceptors<K, V> interceptors;
+    private final IsolationLevel isolationLevel;
+
+    private final Time time;
+    private final ConsumerNetworkClient client;
+    private final SubscriptionState subscriptions;
+    private final ConsumerMetadata metadata;
+    private final long retryBackoffMs;
+    private final long retryBackoffMaxMs;
+    private final int requestTimeoutMs;
+    private final int defaultApiTimeoutMs;
+    private volatile boolean closed = false;
+    private final List<ConsumerPartitionAssignor> assignors;
+
+    // currentThread holds the threadId of the current thread accessing LegacyKafkaConsumer
+    // and is used to prevent multi-threaded access
+    private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
+    // refcount is used to allow reentrant access by the thread who has acquired currentThread
+    private final AtomicInteger refcount = new AtomicInteger(0);
+
+    // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
+    private boolean cachedSubscriptionHasAllFetchPositions;
+
+    LegacyKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
+        try {
+            GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
+                    GroupRebalanceConfig.ProtocolType.CONSUMER);
+
+            this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
+            this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
+            LogContext logContext = createLogContext(config, groupRebalanceConfig);
+            this.log = logContext.logger(getClass());
+            boolean enableAutoCommit = config.getBoolean(ENABLE_AUTO_COMMIT_CONFIG);
+            groupId.ifPresent(groupIdStr -> {
+                if (groupIdStr.isEmpty()) {
+                    log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
+                }
+            });
+
+            log.debug("Initializing the Kafka consumer");
+            this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+            this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+            this.time = Time.SYSTEM;
+            this.metrics = createMetrics(config, time);
+            this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+            this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+
+            List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
+            this.interceptors = new ConsumerInterceptors<>(interceptorList);
+            this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
+            this.subscriptions = createSubscriptionState(config, logContext);
+            ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
+                    metrics.reporters(),
+                    interceptorList,
+                    Arrays.asList(this.deserializers.keyDeserializer, this.deserializers.valueDeserializer));
+            this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
+            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
+            this.metadata.bootstrap(addresses);
+
+            FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics);
+            FetchConfig fetchConfig = new FetchConfig(config);
+            this.isolationLevel = fetchConfig.isolationLevel;
+
+            ApiVersions apiVersions = new ApiVersions();
+            this.client = createConsumerNetworkClient(config,
+                    metrics,
+                    logContext,
+                    apiVersions,
+                    time,
+                    metadata,
+                    fetchMetricsManager.throttleTimeSensor(),
+                    retryBackoffMs);
+
+            this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
+                    config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
+                    config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
+            );
+
+            // no coordinator will be constructed for the default (null) group id
+            if (!groupId.isPresent()) {
+                config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+                config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+                this.coordinator = null;
+            } else {
+                this.coordinator = new ConsumerCoordinator(groupRebalanceConfig,
+                        logContext,
+                        this.client,
+                        assignors,
+                        this.metadata,
+                        this.subscriptions,
+                        metrics,
+                        CONSUMER_METRIC_GROUP_PREFIX,
+                        this.time,
+                        enableAutoCommit,
+                        config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
+                        this.interceptors,
+                        config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
+                        config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
+            }
+            this.fetcher = new Fetcher<>(
+                    logContext,
+                    this.client,
+                    this.metadata,
+                    this.subscriptions,
+                    fetchConfig,
+                    this.deserializers,
+                    fetchMetricsManager,
+                    this.time,
+                    apiVersions);
+            this.offsetFetcher = new OffsetFetcher(logContext,
+                    client,
+                    metadata,
+                    subscriptions,
+                    time,
+                    retryBackoffMs,
+                    requestTimeoutMs,
+                    isolationLevel,
+                    apiVersions);
+            this.topicMetadataFetcher = new TopicMetadataFetcher(logContext,
+                    client,
+                    retryBackoffMs,
+                    retryBackoffMaxMs);
+
+            this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX);
+
+            config.logUnused();
+            AppInfoParser.registerAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics, time.milliseconds());
+            log.debug("Kafka consumer initialized");
+        } catch (Throwable t) {
+            // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
+            // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
+            if (this.log != null) {
+                close(Duration.ZERO, true);
+            }
+            // now propagate the exception
+            throw new KafkaException("Failed to construct kafka consumer", t);
+        }
+    }
+
+    // visible for testing
+    LegacyKafkaConsumer(LogContext logContext,
+                        Time time,
+                        ConsumerConfig config,
+                        Deserializer<K> keyDeserializer,
+                        Deserializer<V> valueDeserializer,
+                        KafkaClient client,
+                        SubscriptionState subscriptions,
+                        ConsumerMetadata metadata,
+                        List<ConsumerPartitionAssignor> assignors) {
+        this.log = logContext.logger(getClass());
+        this.time = time;
+        this.subscriptions = subscriptions;
+        this.metadata = metadata;
+        this.metrics = new Metrics(time);
+        this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
+        this.groupId = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_ID_CONFIG));
+        this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
+        this.isolationLevel = ConsumerUtils.configuredIsolationLevel(config);
+        this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+        this.assignors = assignors;
+        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX);
+        this.interceptors = new ConsumerInterceptors<>(Collections.emptyList());
+        this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+
+        int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
+        int rebalanceTimeoutMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
+        int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+        boolean enableAutoCommit = config.getBoolean(ENABLE_AUTO_COMMIT_CONFIG);
+        boolean throwOnStableOffsetNotSupported = config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+        int autoCommitIntervalMs = config.getInt(AUTO_COMMIT_INTERVAL_MS_CONFIG);
+        String rackId = config.getString(CLIENT_RACK_CONFIG);
+        Optional<String> groupInstanceId = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
+
+        this.client = new ConsumerNetworkClient(
+            logContext,
+            client,
+            metadata,
+            time,
+            retryBackoffMs,
+            requestTimeoutMs,
+            heartbeatIntervalMs
+        );
+
+        if (groupId.isPresent()) {
+            GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(
+                sessionTimeoutMs,
+                rebalanceTimeoutMs,
+                heartbeatIntervalMs,
+                groupId.get(),
+                groupInstanceId,
+                retryBackoffMs,
+                retryBackoffMaxMs,
+                true
+            );
+            this.coordinator = new ConsumerCoordinator(
+                rebalanceConfig,
+                logContext,
+                this.client,
+                assignors,
+                metadata,
+                subscriptions,
+                metrics,
+                CONSUMER_METRIC_GROUP_PREFIX,
+                time,
+                enableAutoCommit,
+                autoCommitIntervalMs,
+                interceptors,
+                throwOnStableOffsetNotSupported,
+                rackId
+            );
+        } else {
+            this.coordinator = null;
+        }
+
+        int maxBytes = config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG);
+        int maxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
+        int minBytes = config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG);
+        int fetchSize = config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
+        int maxPollRecords = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
+        boolean checkCrcs = config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG);
+
+        ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
+        FetchMetricsManager metricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
+        ApiVersions apiVersions = new ApiVersions();
+        FetchConfig fetchConfig = new FetchConfig(
+                minBytes,
+                maxBytes,
+                maxWaitMs,
+                fetchSize,
+                maxPollRecords,
+                checkCrcs,
+                rackId,
+                isolationLevel
+        );
+        this.fetcher = new Fetcher<>(
+            logContext,
+            this.client,
+            metadata,
+            subscriptions,
+            fetchConfig,
+            deserializers,
+            metricsManager,
+            time,
+            apiVersions
+        );
+        this.offsetFetcher = new OffsetFetcher(
+            logContext,
+            this.client,
+            metadata,
+            subscriptions,
+            time,
+            retryBackoffMs,
+            requestTimeoutMs,
+            isolationLevel,
+            apiVersions
+        );
+        this.topicMetadataFetcher = new TopicMetadataFetcher(
+            logContext,
+            this.client,
+            retryBackoffMs,
+            retryBackoffMaxMs
+        );
+    }
+
+    public Set<TopicPartition> assignment() {
+        acquireAndEnsureOpen();
+        try {
+            return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
+        } finally {
+            release();
+        }
+    }
+
+    public Set<String> subscription() {
+        acquireAndEnsureOpen();
+        try {
+            return Collections.unmodifiableSet(new HashSet<>(this.subscriptions.subscription()));
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
+        if (listener == null)
+            throw new IllegalArgumentException("RebalanceListener cannot be null");
+
+        subscribeInternal(topics, Optional.of(listener));
+    }
+
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribeInternal(topics, Optional.empty());
+    }
+
+    /**
+     * Internal helper method for {@link #subscribe(Collection)} and
+     * {@link #subscribe(Collection, ConsumerRebalanceListener)}
+     * <p>
+     * Subscribe to the given list of topics to get dynamically assigned partitions.
+     * <b>Topic subscriptions are not incremental. This list will replace the current
+     * assignment (if there is one).</b> It is not possible to combine topic subscription with group management
+     * with manual partition assignment through {@link #assign(Collection)}.
+     *
+     * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
+     *
+     * <p>
+     * @param topics The list of topics to subscribe to
+     * @param listener {@link Optional} listener instance to get notifications on partition assignment/revocation
+     *                 for the subscribed topics
+     * @throws IllegalArgumentException If topics is null or contains null or empty elements
+     * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
+     *                               previously (without a subsequent call to {@link #unsubscribe()}), or if not
+     *                               configured at-least one partition assignment strategy
+     */
+    private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
+        acquireAndEnsureOpen();
+        try {
+            maybeThrowInvalidGroupIdException();
+            if (topics == null)
+                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+            if (topics.isEmpty()) {
+                // treat subscribing to empty topic list as the same as unsubscribing
+                this.unsubscribe();
+            } else {
+                for (String topic : topics) {
+                    if (isBlank(topic))
+                        throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+                }
+
+                throwIfNoAssignorsConfigured();
+
+                // Clear the buffered data which are not a part of newly assigned topics
+                final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+
+                for (TopicPartition tp : subscriptions.assignedPartitions()) {
+                    if (topics.contains(tp.topic()))
+                        currentTopicPartitions.add(tp);
+                }
+
+                fetcher.clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
+
+                log.info("Subscribed to topic(s): {}", join(topics, ", "));
+                if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
+                    metadata.requestUpdateForNewTopics();
+            }
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
+        if (listener == null)
+            throw new IllegalArgumentException("RebalanceListener cannot be null");
+
+        subscribeInternal(pattern, Optional.of(listener));
+    }
+
+    @Override
+    public void subscribe(Pattern pattern) {
+        subscribeInternal(pattern, Optional.empty());
+    }
+
+    /**
+     * Internal helper method for {@link #subscribe(Pattern)} and
+     * {@link #subscribe(Pattern, ConsumerRebalanceListener)}
+     * <p>
+     * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+     * The pattern matching will be done periodically against all topics existing at the time of check.
+     * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering
+     * the max metadata age, the consumer will refresh metadata more often and check for matching topics.
+     * <p>
+     * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the
+     * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there
+     * is a change to the topics matching the provided pattern and when consumer group membership changes.
+     * Group rebalances only take place during an active call to {@link #poll(Duration)}.
+     *
+     * @param pattern Pattern to subscribe to
+     * @param listener {@link Optional} listener instance to get notifications on partition assignment/revocation
+     *                 for the subscribed topics
+     * @throws IllegalArgumentException If pattern or listener is null
+     * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
+     *                               previously (without a subsequent call to {@link #unsubscribe()}), or if not
+     *                               configured at-least one partition assignment strategy
+     */
+    private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListener> listener) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        acquireAndEnsureOpen();
+        try {
+            throwIfNoAssignorsConfigured();
+            log.info("Subscribed to pattern: '{}'", pattern);
+            this.subscriptions.subscribe(pattern, listener);
+            this.coordinator.updatePatternSubscription(metadata.fetch());
+            this.metadata.requestUpdateForNewTopics();
+        } finally {
+            release();
+        }
+    }
+
+    public void unsubscribe() {
+        acquireAndEnsureOpen();
+        try {
+            fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
+            if (this.coordinator != null) {
+                this.coordinator.onLeavePrepare();
+                this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
+            }
+            this.subscriptions.unsubscribe();
+            log.info("Unsubscribed all topics or patterns and assigned partitions");
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        acquireAndEnsureOpen();
+        try {
+            if (partitions == null) {
+                throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+            } else if (partitions.isEmpty()) {
+                this.unsubscribe();
+            } else {
+                for (TopicPartition tp : partitions) {
+                    String topic = (tp != null) ? tp.topic() : null;
+                    if (isBlank(topic))
+                        throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+                }
+                fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+
+                // make sure the offsets of topic partitions the consumer is unsubscribing from
+                // are committed since there will be no following rebalance
+                if (coordinator != null)
+                    this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
+
+                log.info("Assigned to partition(s): {}", join(partitions, ", "));
+                if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
+                    metadata.requestUpdateForNewTopics();
+            }
+        } finally {
+            release();
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(final Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * @throws KafkaException if the rebalance callback throws exception
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        acquireAndEnsureOpen();
+        try {
+            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
+
+            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
+                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
+            }
+
+            do {
+                client.maybeTriggerWakeup();
+
+                if (includeMetadataInTimeout) {
+                    // try to update assignment metadata BUT do not need to block on the timer for join group
+                    updateAssignmentMetadataIfNeeded(timer, false);
+                } else {
+                    while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
+                        log.warn("Still waiting for metadata");
+                    }
+                }
+
+                final Fetch<K, V> fetch = pollForFetches(timer);
+                if (!fetch.isEmpty()) {
+                    // before returning the fetched records, we can send off the next round of fetches
+                    // and avoid block waiting for their responses to enable pipelining while the user
+                    // is handling the fetched records.
+                    //
+                    // NOTE: since the consumed position has already been updated, we must not allow
+                    // wakeups or any other errors to be triggered prior to returning the fetched records.
+                    if (sendFetches() > 0 || client.hasPendingRequests()) {
+                        client.transmitSends();
+                    }
+
+                    if (fetch.records().isEmpty()) {
+                        log.trace("Returning empty records from `poll()` "
+                                + "since the consumer's position has advanced for at least one topic partition");
+                    }
+
+                    return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
+                }
+            } while (timer.notExpired());
+
+            return ConsumerRecords.empty();
+        } finally {
+            release();
+            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
+        }
+    }
+
+    private int sendFetches() {
+        offsetFetcher.validatePositionsOnMetadataChange();
+        return fetcher.sendFetches();
+    }
+
+    boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
+        if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
+            return false;
+        }
+
+        return updateFetchPositions(timer);
+    }
+
+    /**
+     * @throws KafkaException if the rebalance callback throws exception
+     */
+    private Fetch<K, V> pollForFetches(Timer timer) {
+        long pollTimeout = coordinator == null ? timer.remainingMs() :
+                Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
+
+        // if data is available already, return it immediately
+        final Fetch<K, V> fetch = fetcher.collectFetch();
+        if (!fetch.isEmpty()) {
+            return fetch;
+        }
+
+        // send any new fetches (won't resend pending fetches)
+        sendFetches();
+
+        // We do not want to be stuck blocking in poll if we are missing some positions
+        // since the offset lookup may be backing off after a failure
+
+        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call
+        // updateAssignmentMetadataIfNeeded before this method.
+        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
+            pollTimeout = retryBackoffMs;
+        }
+
+        log.trace("Polling for fetches with timeout {}", pollTimeout);
+
+        Timer pollTimer = time.timer(pollTimeout);
+        client.poll(pollTimer, () -> {
+            // since a fetch might be completed by the background thread, we need this poll condition
+            // to ensure that we do not block unnecessarily in poll()
+            return !fetcher.hasAvailableFetches();
+        });
+        timer.update(pollTimer.currentTimeMs());
+
+        return fetcher.collectFetch();
+    }
+
+    @Override
+    public void commitSync() {
+        commitSync(Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public void commitSync(Duration timeout) {
+        commitSync(subscriptions.allConsumed(), timeout);
+    }
+
+    @Override
+    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
+        commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
+        acquireAndEnsureOpen();
+        long commitStart = time.nanoseconds();
+        try {
+            maybeThrowInvalidGroupIdException();
+            offsets.forEach(this::updateLastSeenEpochIfNewer);
+            if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {
+                throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
+                        "committing offsets " + offsets);
+            }
+        } finally {
+            kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart);
+            release();
+        }
+    }
+
+    @Override
+    public void commitAsync() {
+        commitAsync(null);
+    }
+
+    @Override
+    public void commitAsync(OffsetCommitCallback callback) {
+        commitAsync(subscriptions.allConsumed(), callback);
+    }
+
+    @Override
+    public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
+        acquireAndEnsureOpen();
+        try {
+            maybeThrowInvalidGroupIdException();
+            log.debug("Committing offsets: {}", offsets);
+            offsets.forEach(this::updateLastSeenEpochIfNewer);
+            coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public void seek(TopicPartition partition, long offset) {
+        if (offset < 0)
+            throw new IllegalArgumentException("seek offset must not be a negative number");
+
+        acquireAndEnsureOpen();
+        try {
+            log.info("Seeking to offset {} for partition {}", offset, partition);
+            SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
+                    offset,
+                    Optional.empty(), // This will ensure we skip validation
+                    this.metadata.currentLeader(partition));
+            this.subscriptions.seekUnvalidated(partition, newPosition);
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
+        long offset = offsetAndMetadata.offset();
+        if (offset < 0) {
+            throw new IllegalArgumentException("seek offset must not be a negative number");
+        }
+
+        acquireAndEnsureOpen();
+        try {
+            if (offsetAndMetadata.leaderEpoch().isPresent()) {
+                log.info("Seeking to offset {} for partition {} with epoch {}",
+                        offset, partition, offsetAndMetadata.leaderEpoch().get());
+            } else {
+                log.info("Seeking to offset {} for partition {}", offset, partition);
+            }
+            Metadata.LeaderAndEpoch currentLeaderAndEpoch = this.metadata.currentLeader(partition);
+            SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
+                    offsetAndMetadata.offset(),
+                    offsetAndMetadata.leaderEpoch(),
+                    currentLeaderAndEpoch);
+            this.updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
+            this.subscriptions.seekUnvalidated(partition, newPosition);
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public void seekToBeginning(Collection<TopicPartition> partitions) {
+        if (partitions == null)
+            throw new IllegalArgumentException("Partitions collection cannot be null");
+
+        acquireAndEnsureOpen();
+        try {
+            Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
+            subscriptions.requestOffsetReset(parts, OffsetResetStrategy.EARLIEST);
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public void seekToEnd(Collection<TopicPartition> partitions) {
+        if (partitions == null)
+            throw new IllegalArgumentException("Partitions collection cannot be null");
+
+        acquireAndEnsureOpen();
+        try {
+            Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
+            subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST);
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public long position(TopicPartition partition) {
+        return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public long position(TopicPartition partition, final Duration timeout) {
+        acquireAndEnsureOpen();
+        try {
+            if (!this.subscriptions.isAssigned(partition))
+                throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");
+
+            Timer timer = time.timer(timeout);
+            do {
+                SubscriptionState.FetchPosition position = this.subscriptions.validPosition(partition);
+                if (position != null)
+                    return position.offset;
+
+                updateFetchPositions(timer);
+                client.poll(timer);
+            } while (timer.notExpired());
+
+            throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " +
+                    "for partition " + partition + " could be determined");
+        } finally {
+            release();
+        }
+    }
+
+    @Deprecated
+    @Override
+    public OffsetAndMetadata committed(TopicPartition partition) {
+        return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Deprecated
+    @Override
+    public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
+        return committed(Collections.singleton(partition), timeout).get(partition);
+    }
+
+    @Override
+    public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
+        return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout) {
+        acquireAndEnsureOpen();
+        long start = time.nanoseconds();
+        try {
+            maybeThrowInvalidGroupIdException();
+            final Map<TopicPartition, OffsetAndMetadata> offsets;
+            offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
+            if (offsets == null) {
+                throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
+                        "committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " +
+                        "larger to relax the threshold.");
+            } else {
+                offsets.forEach(this::updateLastSeenEpochIfNewer);
+                return offsets;
+            }
+        } finally {
+            kafkaConsumerMetrics.recordCommitted(time.nanoseconds() - start);
+            release();
+        }
+    }
+
+    @Override
+    public Uuid clientInstanceId(Duration timeout) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<MetricName, ? extends Metric> metrics() {
+        return Collections.unmodifiableMap(this.metrics.metrics());
+    }
+
+    @Override
+    public List<PartitionInfo> partitionsFor(String topic) {
+        return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
+        acquireAndEnsureOpen();
+        try {
+            Cluster cluster = this.metadata.fetch();
+            List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
+            if (!parts.isEmpty())
+                return parts;
+
+            Timer timer = time.timer(timeout);
+            List<PartitionInfo> topicMetadata = topicMetadataFetcher.getTopicMetadata(topic, metadata.allowAutoTopicCreation(), timer);
+            return topicMetadata != null ? topicMetadata : Collections.emptyList();
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public Map<String, List<PartitionInfo>> listTopics() {
+        return listTopics(Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
+        acquireAndEnsureOpen();
+        try {
+            return topicMetadataFetcher.getAllTopicMetadata(time.timer(timeout));
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public void pause(Collection<TopicPartition> partitions) {
+        acquireAndEnsureOpen();
+        try {
+            log.debug("Pausing partitions {}", partitions);
+            for (TopicPartition partition: partitions) {
+                subscriptions.pause(partition);
+            }
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public void resume(Collection<TopicPartition> partitions) {
+        acquireAndEnsureOpen();
+        try {
+            log.debug("Resuming partitions {}", partitions);
+            for (TopicPartition partition: partitions) {
+                subscriptions.resume(partition);
+            }
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public Set<TopicPartition> paused() {
+        acquireAndEnsureOpen();
+        try {
+            return Collections.unmodifiableSet(subscriptions.pausedPartitions());
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
+        return offsetsForTimes(timestampsToSearch, Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
+        acquireAndEnsureOpen();
+        try {
+            for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
+                // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
+                // OffsetAndTimestamp is always positive.
+                if (entry.getValue() < 0)
+                    throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
+                            entry.getValue() + ". The target time cannot be negative.");
+            }
+            return offsetFetcher.offsetsForTimes(timestampsToSearch, time.timer(timeout));
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
+        return beginningOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
+        acquireAndEnsureOpen();
+        try {
+            return offsetFetcher.beginningOffsets(partitions, time.timer(timeout));
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
+        return endOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
+        acquireAndEnsureOpen();
+        try {
+            return offsetFetcher.endOffsets(partitions, time.timer(timeout));
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public OptionalLong currentLag(TopicPartition topicPartition) {
+        acquireAndEnsureOpen();
+        try {
+            final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);
+
+            // if the log end offset is not known and hence cannot return lag and there is
+            // no in-flight list offset requested yet,
+            // issue a list offset request for that partition so that next time
+            // we may get the answer; we do not need to wait for the return value
+            // since we would not try to poll the network client synchronously
+            if (lag == null) {
+                if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null &&
+                        !subscriptions.partitionEndOffsetRequested(topicPartition)) {
+                    log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
+                    subscriptions.requestPartitionEndOffset(topicPartition);
+                    offsetFetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));
+                }
+
+                return OptionalLong.empty();
+            }
+
+            return OptionalLong.of(lag);
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public ConsumerGroupMetadata groupMetadata() {
+        acquireAndEnsureOpen();
+        try {
+            maybeThrowInvalidGroupIdException();
+            return coordinator.groupMetadata();
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public void enforceRebalance(final String reason) {
+        acquireAndEnsureOpen();
+        try {
+            if (coordinator == null) {
+                throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group.");
+            }
+            coordinator.requestRejoin(reason == null || reason.isEmpty() ? DEFAULT_REASON : reason);
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public void enforceRebalance() {
+        enforceRebalance(null);
+    }
+
+    @Override
+    public void close() {
+        close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+    }
+
+    @Override
+    public void close(Duration timeout) {
+        if (timeout.toMillis() < 0)
+            throw new IllegalArgumentException("The timeout cannot be negative.");
+        acquire();
+        try {
+            if (!closed) {
+                // need to close before setting the flag since the close function
+                // itself may trigger rebalance callback that needs the consumer to be open still
+                close(timeout, false);
+            }
+        } finally {
+            closed = true;
+            release();
+        }
+    }
+
+    @Override
+    public void wakeup() {
+        this.client.wakeup();
+    }
+
+    private Timer createTimerForRequest(final Duration timeout) {
+        // this.time could be null if an exception occurs in constructor prior to setting the this.time field
+        final Time localTime = (time == null) ? Time.SYSTEM : time;
+        return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+    }
+
+    private void close(Duration timeout, boolean swallowException) {
+        log.trace("Closing the Kafka consumer");
+        AtomicReference<Throwable> firstException = new AtomicReference<>();
+
+        final Timer closeTimer = createTimerForRequest(timeout);
+        // Close objects with a timeout. The timeout is required because the coordinator & the fetcher send requests to
+        // the server in the process of closing which may not respect the overall timeout defined for closing the
+        // consumer.
+        if (coordinator != null) {
+            // This is a blocking call bound by the time remaining in closeTimer
+            swallow(log, Level.ERROR, "Failed to close coordinator with a timeout(ms)=" + closeTimer.timeoutMs(), () -> coordinator.close(closeTimer), firstException);
+        }
+
+        if (fetcher != null) {
+            // the timeout for the session close is at-most the requestTimeoutMs
+            long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - closeTimer.elapsedMs());
+            if (remainingDurationInTimeout > 0) {
+                remainingDurationInTimeout = Math.min(requestTimeoutMs, remainingDurationInTimeout);
+            }
+
+            closeTimer.reset(remainingDurationInTimeout);
+
+            // This is a blocking call bound by the time remaining in closeTimer
+            swallow(log, Level.ERROR, "Failed to close fetcher with a timeout(ms)=" + closeTimer.timeoutMs(), () -> fetcher.close(closeTimer), firstException);
+        }
+
+        closeQuietly(interceptors, "consumer interceptors", firstException);
+        closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
+        closeQuietly(metrics, "consumer metrics", firstException);
+        closeQuietly(client, "consumer network client", firstException);
+        closeQuietly(deserializers, "consumer deserializers", firstException);
+        AppInfoParser.unregisterAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics);
+        log.debug("Kafka consumer has been closed");
+        Throwable exception = firstException.get();
+        if (exception != null && !swallowException) {
+            if (exception instanceof InterruptException) {
+                throw (InterruptException) exception;
+            }
+            throw new KafkaException("Failed to close kafka consumer", exception);
+        }
+    }
+
+    /**
+     * Set the fetch position to the committed position (if there is one)
+     * or reset it using the offset reset policy the user has configured.
+     *
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
+     *             defined
+     * @return true iff the operation completed without timing out
+     */
+    private boolean updateFetchPositions(final Timer timer) {
+        // If any partitions have been truncated due to a leader change, we need to validate the offsets
+        offsetFetcher.validatePositionsIfNeeded();
+
+        cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions();
+        if (cachedSubscriptionHasAllFetchPositions) return true;
+
+        // If there are any partitions which do not have a valid position and are not
+        // awaiting reset, then we need to fetch committed offsets. We will only do a
+        // coordinator lookup if there are partitions which have missing positions, so
+        // a consumer with manually assigned partitions can avoid a coordinator dependence
+        // by always ensuring that assigned partitions have an initial position.
+        if (coordinator != null && !coordinator.initWithCommittedOffsetsIfNeeded(timer)) return false;
+
+        // If there are partitions still needing a position and a reset policy is defined,
+        // request reset using the default policy. If no reset strategy is defined and there
+        // are partitions with a missing position, then we will raise an exception.
+        subscriptions.resetInitializingPositions();
+
+        // Finally send an asynchronous request to look up and update the positions of any
+        // partitions which are awaiting reset.
+        offsetFetcher.resetPositionsIfNeeded();
+
+        return true;
+    }
+
+    /**
+     * Acquire the light lock and ensure that the consumer hasn't been closed.
+     * @throws IllegalStateException If the consumer has been closed
+     */
+    private void acquireAndEnsureOpen() {
+        acquire();
+        if (this.closed) {
+            release();
+            throw new IllegalStateException("This consumer has already been closed.");
+        }
+    }
+
+    /**
+     * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
+     * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
+     * supported).
+     * @throws ConcurrentModificationException if another thread already has the lock
+     */
+    private void acquire() {
+        final Thread thread = Thread.currentThread();
+        final long threadId = thread.getId();
+        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
+            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access. " +
+                    "currentThread(name: " + thread.getName() + ", id: " + threadId + ")" +
+                    " otherThread(id: " + currentThread.get() + ")"
+            );
+        refcount.incrementAndGet();
+    }
+
+    /**
+     * Release the light lock protecting the consumer from multi-threaded access.
+     */
+    private void release() {
+        if (refcount.decrementAndGet() == 0)
+            currentThread.set(NO_CURRENT_THREAD);
+    }
+
+    private void throwIfNoAssignorsConfigured() {
+        if (assignors.isEmpty())
+            throw new IllegalStateException("Must configure at least one partition assigner class name to " +
+                    ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
+    }
+
+    private void maybeThrowInvalidGroupIdException() {
+        if (!groupId.isPresent())
+            throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
+                    "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
+    }
+
+    private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
+        if (offsetAndMetadata != null)
+            offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
+    }
+
+    // Functions below are for testing only
+    @Override
+    public String clientId() {
+        return clientId;
+    }
+
+    @Override
+    public Metrics metricsRegistry() {
+        return metrics;
+    }
+
+    @Override
+    public KafkaConsumerMetrics kafkaConsumerMetrics() {
+        return kafkaConsumerMetrics;
+    }
+
+    @Override
+    public boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
+        return updateAssignmentMetadataIfNeeded(timer, true);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index 58181b68535..df3ba98dd0c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -39,7 +39,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
 public class ConsumerConfigTest {
 
@@ -66,18 +65,24 @@ public class ConsumerConfigTest {
 
     @Test
     public void testOverrideEnableAutoCommit() {
-        ConsumerConfig config = new ConsumerConfig(properties);
-        boolean overrideEnableAutoCommit = config.maybeOverrideEnableAutoCommit();
-        assertFalse(overrideEnableAutoCommit);
-
-        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        config = new ConsumerConfig(properties);
-        try {
-            config.maybeOverrideEnableAutoCommit();
-            fail("Should have thrown an exception");
-        } catch (InvalidConfigurationException e) {
-            // expected
-        }
+        // Verify that our default properties (no 'enable.auto.commit' or 'group.id') are valid.
+        assertEquals(false, new ConsumerConfig(properties).getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+
+        // Verify that explicitly disabling 'enable.auto.commit' still works.
+        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE.toString());
+        assertEquals(false, new ConsumerConfig(properties).getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+
+        // Verify that enabling 'enable.auto.commit' but without 'group.id' fails.
+        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE.toString());
+        assertThrows(InvalidConfigurationException.class, () -> new ConsumerConfig(properties));
+
+        // Verify that then adding 'group.id' to the mix allows it to pass OK.
+        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
+        assertEquals(true, new ConsumerConfig(properties).getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+
+        // Now remove the 'enable.auto.commit' flag and verify that it is set to true (the default).
+        properties.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+        assertEquals(true, new ConsumerConfig(properties).getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index b4c3dba56fb..b07ce529c36 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -16,27 +16,15 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
-import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
-import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
-import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
-import org.apache.kafka.clients.consumer.internals.Deserializers;
-import org.apache.kafka.clients.consumer.internals.FetchConfig;
-import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
-import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
-import org.apache.kafka.clients.consumer.internals.OffsetFetcher;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
-import org.apache.kafka.clients.consumer.internals.TopicMetadataFetcher;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
@@ -106,7 +94,8 @@ import org.apache.kafka.test.MockConsumerInterceptor;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -124,6 +113,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
@@ -146,8 +136,9 @@ import java.util.stream.Stream;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
-import static org.apache.kafka.clients.consumer.KafkaConsumer.DEFAULT_REASON;
+import static org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.DEFAULT_REASON;
 import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
+import static org.apache.kafka.common.utils.Utils.propsToMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -181,13 +172,11 @@ public class KafkaConsumerTest {
 
     private final int sessionTimeoutMs = 10000;
     private final int defaultApiTimeoutMs = 60000;
-    private final int requestTimeoutMs = defaultApiTimeoutMs / 2;
     private final int heartbeatIntervalMs = 1000;
 
     // Set auto commit interval lower than heartbeat so we don't need to deal with
     // a concurrent heartbeat request
     private final int autoCommitIntervalMs = 500;
-    private final int throttleMs = 10;
 
     private final String groupId = "mock-group";
     private final String memberId = "memberId";
@@ -222,42 +211,51 @@ public class KafkaConsumerTest {
         }
     }
 
-    @Test
-    public void testMetricsReporterAutoGeneratedClientId() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testMetricsReporterAutoGeneratedClientId(GroupProtocol groupProtocol) {
         Properties props = new Properties();
+        props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
-        consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
+        consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
 
-        MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metrics.reporters().get(0);
+        MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metricsRegistry().reporters().get(0);
 
-        assertEquals(consumer.getClientId(), mockMetricsReporter.clientId);
-        assertEquals(2, consumer.metrics.reporters().size());
+        assertEquals(consumer.clientId(), mockMetricsReporter.clientId);
+        assertEquals(2, consumer.metricsRegistry().reporters().size());
     }
 
-    @Test
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
     @SuppressWarnings("deprecation")
-    public void testDisableJmxReporter() {
+    public void testDisableJmxReporter(GroupProtocol groupProtocol) {
         Properties props = new Properties();
+        props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(ConsumerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
-        consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
-        assertTrue(consumer.metrics.reporters().isEmpty());
+        consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
+        assertTrue(consumer.metricsRegistry().reporters().isEmpty());
     }
 
-    @Test
-    public void testExplicitlyEnableJmxReporter() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testExplicitlyEnableJmxReporter(GroupProtocol groupProtocol) {
         Properties props = new Properties();
+        props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
-        consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
-        assertEquals(1, consumer.metrics.reporters().size());
+        consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
+        assertEquals(1, consumer.metricsRegistry().reporters().size());
     }
 
-    @Test
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
     @SuppressWarnings("unchecked")
-    public void testPollReturnsRecords() {
-        consumer = setUpConsumerWithRecordsToPoll(tp0, 5);
+    public void testPollReturnsRecords(GroupProtocol groupProtocol) {
+        consumer = setUpConsumerWithRecordsToPoll(groupProtocol, tp0, 5);
 
         ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ZERO);
 
@@ -266,14 +264,17 @@ public class KafkaConsumerTest {
         assertEquals(records.records(tp0).size(), 5);
     }
 
-    @Test
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
     @SuppressWarnings("unchecked")
-    public void testSecondPollWithDeserializationErrorThrowsRecordDeserializationException() {
+    public void testSecondPollWithDeserializationErrorThrowsRecordDeserializationException(GroupProtocol groupProtocol) {
         int invalidRecordNumber = 4;
         int invalidRecordOffset = 3;
         StringDeserializer deserializer = mockErrorDeserializer(invalidRecordNumber);
 
-        consumer = setUpConsumerWithRecordsToPoll(tp0, 5, deserializer);
+        consumer = setUpConsumerWithRecordsToPoll(groupProtocol, tp0, 5, deserializer);
         ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ZERO);
 
         assertEquals(invalidRecordNumber - 1, records.count());
@@ -317,18 +318,23 @@ public class KafkaConsumerTest {
         };
     }
 
-    private KafkaConsumer<?, ?> setUpConsumerWithRecordsToPoll(TopicPartition tp, int recordCount) {
-        return setUpConsumerWithRecordsToPoll(tp, recordCount, new StringDeserializer());
+    private KafkaConsumer<?, ?> setUpConsumerWithRecordsToPoll(GroupProtocol groupProtocol,
+                                                               TopicPartition tp,
+                                                               int recordCount) {
+        return setUpConsumerWithRecordsToPoll(groupProtocol, tp, recordCount, new StringDeserializer());
     }
 
-    private KafkaConsumer<?, ?> setUpConsumerWithRecordsToPoll(TopicPartition tp, int recordCount, Deserializer<String> deserializer) {
+    private KafkaConsumer<?, ?> setUpConsumerWithRecordsToPoll(GroupProtocol groupProtocol,
+                                                               TopicPartition tp,
+                                                               int recordCount,
+                                                               Deserializer<String> deserializer) {
         Cluster cluster = TestUtils.singletonCluster(tp.topic(), 1);
         Node node = cluster.nodes().get(0);
 
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
-        consumer = newConsumer(time, client, subscription, metadata, assignor,
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor,
                 true, groupId, groupInstanceId, Optional.of(deserializer), false);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         prepareRebalance(client, node, assignor, singletonList(tp), null);
@@ -337,9 +343,11 @@ public class KafkaConsumerTest {
         return consumer;
     }
 
-    @Test
-    public void testConstructorClose() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testConstructorClose(GroupProtocol groupProtocol) {
         Properties props = new Properties();
+        props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "invalid-23-8409-adsfsdj");
         props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
@@ -347,7 +355,7 @@ public class KafkaConsumerTest {
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
         final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
         try {
-            new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+            newConsumer(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
             fail("should have caught an exception and returned");
         } catch (KafkaException e) {
             assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
@@ -356,44 +364,53 @@ public class KafkaConsumerTest {
         }
     }
 
-    @Test
-    public void testOsDefaultSocketBufferSizes() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testOsDefaultSocketBufferSizes(GroupProtocol groupProtocol) {
         Map<String, Object> config = new HashMap<>();
+        config.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         config.put(ConsumerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
         config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
-        consumer = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        consumer = newConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
     }
 
-    @Test
-    public void testInvalidSocketSendBufferSize() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testInvalidSocketSendBufferSize(GroupProtocol groupProtocol) {
         Map<String, Object> config = new HashMap<>();
+        config.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         config.put(ConsumerConfig.SEND_BUFFER_CONFIG, -2);
         assertThrows(KafkaException.class,
-            () -> new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()));
+            () -> newConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()));
     }
 
-    @Test
-    public void testInvalidSocketReceiveBufferSize() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testInvalidSocketReceiveBufferSize(GroupProtocol groupProtocol) {
         Map<String, Object> config = new HashMap<>();
+        config.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, -2);
         assertThrows(KafkaException.class,
-            () -> new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()));
+            () -> newConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()));
     }
 
-    @Test
-    public void shouldIgnoreGroupInstanceIdForEmptyGroupId() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void shouldIgnoreGroupInstanceIdForEmptyGroupId(GroupProtocol groupProtocol) {
         Map<String, Object> config = new HashMap<>();
+        config.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         config.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "instance_id");
-        consumer = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        consumer = newConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
     }
 
-    @Test
-    public void testSubscription() {
-        consumer = newConsumer(groupId);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testSubscription(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, groupId);
 
         consumer.subscribe(singletonList(topic));
         assertEquals(singleton(topic), consumer.subscription());
@@ -412,93 +429,107 @@ public class KafkaConsumerTest {
         assertTrue(consumer.assignment().isEmpty());
     }
 
-    @Test
-    public void testSubscriptionOnNullTopicCollection() {
-        consumer = newConsumer(groupId);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testSubscriptionOnNullTopicCollection(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, groupId);
         assertThrows(IllegalArgumentException.class, () -> consumer.subscribe((List<String>) null));
     }
 
-    @Test
-    public void testSubscriptionOnNullTopic() {
-        consumer = newConsumer(groupId);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testSubscriptionOnNullTopic(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, groupId);
         assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(null)));
     }
 
-    @Test
-    public void testSubscriptionOnEmptyTopic() {
-        consumer = newConsumer(groupId);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testSubscriptionOnEmptyTopic(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, groupId);
         String emptyTopic = "  ";
         assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(emptyTopic)));
     }
 
-    @Test
-    public void testSubscriptionOnNullPattern() {
-        consumer = newConsumer(groupId);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testSubscriptionOnNullPattern(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, groupId);
         assertThrows(IllegalArgumentException.class,
             () -> consumer.subscribe((Pattern) null));
     }
 
-    @Test
-    public void testSubscriptionOnEmptyPattern() {
-        consumer = newConsumer(groupId);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testSubscriptionOnEmptyPattern(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, groupId);
         assertThrows(IllegalArgumentException.class,
             () -> consumer.subscribe(Pattern.compile("")));
     }
 
-    @Test
-    public void testSubscriptionWithEmptyPartitionAssignment() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testSubscriptionWithEmptyPartitionAssignment(GroupProtocol groupProtocol) {
         Properties props = new Properties();
+        props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "");
         props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        consumer = newConsumer(props);
+        consumer = newConsumer(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
         assertThrows(IllegalStateException.class,
             () -> consumer.subscribe(singletonList(topic)));
     }
 
-    @Test
-    public void testSeekNegative() {
-        consumer = newConsumer((String) null);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testSeekNegative(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, null);
         consumer.assign(singleton(new TopicPartition("nonExistTopic", 0)));
         assertThrows(IllegalArgumentException.class,
             () -> consumer.seek(new TopicPartition("nonExistTopic", 0), -1));
     }
 
-    @Test
-    public void testAssignOnNullTopicPartition() {
-        consumer = newConsumer((String) null);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testAssignOnNullTopicPartition(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, null);
         assertThrows(IllegalArgumentException.class, () -> consumer.assign(null));
     }
 
-    @Test
-    public void testAssignOnEmptyTopicPartition() {
-        consumer = newConsumer(groupId);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testAssignOnEmptyTopicPartition(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, groupId);
         consumer.assign(Collections.emptyList());
         assertTrue(consumer.subscription().isEmpty());
         assertTrue(consumer.assignment().isEmpty());
     }
 
-    @Test
-    public void testAssignOnNullTopicInPartition() {
-        consumer = newConsumer((String) null);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testAssignOnNullTopicInPartition(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, null);
         assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(null, 0))));
     }
 
-    @Test
-    public void testAssignOnEmptyTopicInPartition() {
-        consumer = newConsumer((String) null);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testAssignOnEmptyTopicInPartition(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, null);
         assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition("  ", 0))));
     }
 
-    @Test
-    public void testInterceptorConstructorClose() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testInterceptorConstructorClose(GroupProtocol groupProtocol) {
         try {
             Properties props = new Properties();
             // test with client ID assigned by KafkaConsumer
+            props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
             props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
             props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName());
 
-            consumer = new KafkaConsumer<>(
+            consumer = newConsumer(
                     props, new StringDeserializer(), new StringDeserializer());
             assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
             assertEquals(0, MockConsumerInterceptor.CLOSE_COUNT.get());
@@ -515,12 +546,14 @@ public class KafkaConsumerTest {
         }
     }
 
-    @Test
-    public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances(GroupProtocol groupProtocol) {
         final int targetInterceptor = 3;
 
         try {
             Properties props = new Properties();
+            props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
             props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
             props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,  MockConsumerInterceptor.class.getName() + ", "
                     + MockConsumerInterceptor.class.getName() + ", "
@@ -528,10 +561,8 @@ public class KafkaConsumerTest {
 
             MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(targetInterceptor);
 
-            assertThrows(KafkaException.class, () -> {
-                new KafkaConsumer<>(
-                        props, new StringDeserializer(), new StringDeserializer());
-            });
+            assertThrows(KafkaException.class, () -> newConsumer(
+                    props, new StringDeserializer(), new StringDeserializer()));
 
             assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
             assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get());
@@ -541,9 +572,10 @@ public class KafkaConsumerTest {
         }
     }
 
-    @Test
-    public void testPause() {
-        consumer = newConsumer(groupId);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testPause(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, groupId);
 
         consumer.assign(singletonList(tp0));
         assertEquals(singleton(tp0), consumer.assignment());
@@ -559,27 +591,32 @@ public class KafkaConsumerTest {
         assertTrue(consumer.paused().isEmpty());
     }
 
-    @Test
-    public void testConsumerJmxPrefix() throws  Exception {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testConsumerJmxPrefix(GroupProtocol groupProtocol) throws  Exception {
         Map<String, Object> config = new HashMap<>();
+        config.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         config.put(ConsumerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
         config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
         config.put("client.id", "client-1");
-        consumer = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        consumer = newConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
         MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-        MetricName testMetricName = consumer.metrics.metricName("test-metric",
+        MetricName testMetricName = consumer.metricsRegistry().metricName("test-metric",
                 "grp1", "test metric");
-        consumer.metrics.addMetric(testMetricName, new Avg());
+        consumer.metricsRegistry().addMetric(testMetricName, new Avg());
         assertNotNull(server.getObjectInstance(new ObjectName("kafka.consumer:type=grp1,client-id=client-1")));
     }
 
-    private KafkaConsumer<byte[], byte[]> newConsumer(String groupId) {
-        return newConsumer(groupId, Optional.empty());
+    private KafkaConsumer<byte[], byte[]> newConsumer(GroupProtocol groupProtocol, String groupId) {
+        return newConsumer(groupProtocol, groupId, Optional.empty());
     }
 
-    private KafkaConsumer<byte[], byte[]> newConsumer(String groupId, Optional<Boolean> enableAutoCommit) {
+    private KafkaConsumer<byte[], byte[]> newConsumer(GroupProtocol groupProtocol,
+                                                      String groupId,
+                                                      Optional<Boolean> enableAutoCommit) {
         Properties props = new Properties();
+        props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my.consumer");
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
@@ -587,22 +624,38 @@ public class KafkaConsumerTest {
             props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         enableAutoCommit.ifPresent(
             autoCommit -> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString()));
-        return newConsumer(props);
+        return newConsumer(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+    }
+
+    private <K, V> KafkaConsumer<K, V> newConsumer(Properties props) {
+        return newConsumer(props, null, null);
+    }
+
+    private <K, V> KafkaConsumer<K, V> newConsumer(Map<String, Object> configs,
+                                                   Deserializer<K> keyDeserializer,
+                                                   Deserializer<V> valueDeserializer) {
+        return new KafkaConsumer<>(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
+                keyDeserializer, valueDeserializer);
     }
 
-    private KafkaConsumer<byte[], byte[]> newConsumer(Properties props) {
-        return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+    private <K, V> KafkaConsumer<K, V> newConsumer(Properties props,
+                                                   Deserializer<K> keyDeserializer,
+                                                   Deserializer<V> valueDeserializer) {
+        return newConsumer(propsToMap(props), keyDeserializer, valueDeserializer);
     }
 
-    @Test
-    public void verifyHeartbeatSent() throws Exception {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void verifyHeartbeatSent(GroupProtocol groupProtocol) throws Exception {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -624,15 +677,18 @@ public class KafkaConsumerTest {
         assertTrue(heartbeatReceived.get());
     }
 
-    @Test
-    public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void verifyHeartbeatSentWhenFetchedDataReady(GroupProtocol groupProtocol) throws Exception {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
@@ -654,15 +710,16 @@ public class KafkaConsumerTest {
         assertTrue(heartbeatReceived.get());
     }
 
-    @Test
-    public void verifyPollTimesOutDuringMetadataUpdate() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void verifyPollTimesOutDuringMetadataUpdate(GroupProtocol groupProtocol) {
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         // Since we would enable the heartbeat thread after received join-response which could
         // send the sync-group on behalf of the consumer if it is enqueued, we may still complete
@@ -677,16 +734,19 @@ public class KafkaConsumerTest {
         assertEquals(0, requests.stream().filter(request -> request.apiKey().equals(ApiKeys.FETCH)).count());
     }
 
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
     @SuppressWarnings("deprecation")
-    @Test
-    public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() {
+    public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate(GroupProtocol groupProtocol) {
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
@@ -699,15 +759,16 @@ public class KafkaConsumerTest {
         assertEquals(FetchRequest.Builder.class, aClass);
     }
 
-    @Test
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
     @SuppressWarnings("unchecked")
-    public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
+    public void verifyNoCoordinatorLookupForManualAssignmentWithSeek(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, null, groupInstanceId, false);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, null, groupInstanceId, false);
         consumer.assign(singleton(tp0));
         consumer.seekToBeginning(singleton(tp0));
 
@@ -721,8 +782,11 @@ public class KafkaConsumerTest {
         assertEquals(55L, consumer.position(tp0));
     }
 
-    @Test
-    public void verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -730,7 +794,7 @@ public class KafkaConsumerTest {
         Node node = metadata.fetch().nodes().get(0);
 
         // create a consumer with groupID with manual assignment
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.assign(singleton(tp0));
 
         // 1st coordinator error should cause coordinator unknown
@@ -757,8 +821,11 @@ public class KafkaConsumerTest {
         assertEquals(55, consumer.committed(Collections.singleton(tp0), Duration.ZERO).get(tp0).offset());
     }
 
-    @Test
-    public void testFetchProgressWithMissingPartitionPosition() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testFetchProgressWithMissingPartitionPosition(GroupProtocol groupProtocol) {
         // Verifies that we can make progress on one partition while we are awaiting
         // a reset on another partition.
 
@@ -767,7 +834,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumerNoAutoCommit(time, client, subscription, metadata);
+        consumer = newConsumerNoAutoCommit(groupProtocol, time, client, subscription, metadata);
         consumer.assign(Arrays.asList(tp0, tp1));
         consumer.seekToEnd(singleton(tp0));
         consumer.seekToBeginning(singleton(tp1));
@@ -814,8 +881,11 @@ public class KafkaConsumerTest {
         mockClient.updateMetadata(initialMetadata);
     }
 
-    @Test
-    public void testMissingOffsetNoResetPolicy() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) {
         SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
@@ -823,7 +893,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor,
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor,
                 true, groupId, groupInstanceId, false);
         consumer.assign(singletonList(tp0));
 
@@ -835,8 +905,11 @@ public class KafkaConsumerTest {
         assertThrows(NoOffsetForPartitionException.class, () -> consumer.poll(Duration.ZERO));
     }
 
-    @Test
-    public void testResetToCommittedOffset() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testResetToCommittedOffset(GroupProtocol groupProtocol) {
         SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
@@ -844,7 +917,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor,
                 true, groupId, groupInstanceId, false);
         consumer.assign(singletonList(tp0));
 
@@ -857,8 +930,11 @@ public class KafkaConsumerTest {
         assertEquals(539L, consumer.position(tp0));
     }
 
-    @Test
-    public void testResetUsingAutoResetPolicy() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testResetUsingAutoResetPolicy(GroupProtocol groupProtocol) {
         SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
@@ -866,7 +942,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor,
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor,
                 true, groupId, groupInstanceId, false);
         consumer.assign(singletonList(tp0));
 
@@ -881,15 +957,16 @@ public class KafkaConsumerTest {
         assertEquals(50L, consumer.position(tp0));
     }
 
-    @Test
-    public void testOffsetIsValidAfterSeek() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testOffsetIsValidAfterSeek(GroupProtocol groupProtocol) {
         SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor,
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor,
                 true, groupId, Optional.empty(), false);
         consumer.assign(singletonList(tp0));
         consumer.seek(tp0, 20L);
@@ -897,8 +974,11 @@ public class KafkaConsumerTest {
         assertEquals(subscription.validPosition(tp0).offset, 20L);
     }
 
-    @Test
-    public void testCommitsFetchedDuringAssign() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testCommitsFetchedDuringAssign(GroupProtocol groupProtocol) {
         long offset1 = 10000;
         long offset2 = 20000;
 
@@ -908,7 +988,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.assign(singletonList(tp0));
 
         // lookup coordinator
@@ -933,22 +1013,31 @@ public class KafkaConsumerTest {
         assertEquals(offset2, consumer.committed(Collections.singleton(tp1)).get(tp1).offset());
     }
 
-    @Test
-    public void testFetchStableOffsetThrowInCommitted() {
-        assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer().committed(Collections.singleton(tp0)));
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testFetchStableOffsetThrowInCommitted(GroupProtocol groupProtocol) {
+        assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer(groupProtocol).committed(Collections.singleton(tp0)));
     }
 
-    @Test
-    public void testFetchStableOffsetThrowInPoll() {
-        assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer().poll(Duration.ZERO));
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) {
+        assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer(groupProtocol).poll(Duration.ZERO));
     }
 
-    @Test
-    public void testFetchStableOffsetThrowInPosition() {
-        assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer().position(tp0));
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testFetchStableOffsetThrowInPosition(GroupProtocol groupProtocol) {
+        assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer(groupProtocol).position(tp0));
     }
 
-    private KafkaConsumer<?, ?> setupThrowableConsumer() {
+    private KafkaConsumer<?, ?> setupThrowableConsumer(GroupProtocol groupProtocol) {
         long offset1 = 10000;
 
         ConsumerMetadata metadata = createMetadata(subscription);
@@ -960,7 +1049,7 @@ public class KafkaConsumerTest {
         Node node = metadata.fetch().nodes().get(0);
 
         consumer = newConsumer(
-            time, client, subscription, metadata, assignor, true, groupId, groupInstanceId, true);
+                groupProtocol, time, client, subscription, metadata, assignor, true, groupId, groupInstanceId, true);
         consumer.assign(singletonList(tp0));
 
         client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
@@ -971,8 +1060,11 @@ public class KafkaConsumerTest {
         return consumer;
     }
 
-    @Test
-    public void testNoCommittedOffsets() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testNoCommittedOffsets(GroupProtocol groupProtocol) {
         long offset1 = 10000;
 
         ConsumerMetadata metadata = createMetadata(subscription);
@@ -981,7 +1073,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.assign(Arrays.asList(tp0, tp1));
 
         // lookup coordinator
@@ -996,15 +1088,18 @@ public class KafkaConsumerTest {
         assertNull(committed.get(tp1));
     }
 
-    @Test
-    public void testAutoCommitSentBeforePositionUpdate() {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testAutoCommitSentBeforePositionUpdate(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
@@ -1027,8 +1122,11 @@ public class KafkaConsumerTest {
         assertTrue(commitReceived.get());
     }
 
-    @Test
-    public void testRegexSubscription() {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testRegexSubscription(GroupProtocol groupProtocol) {
         String unmatchedTopic = "unmatched";
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
@@ -1040,7 +1138,7 @@ public class KafkaConsumerTest {
         initMetadata(client, partitionCounts);
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
 
         consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
@@ -1053,8 +1151,11 @@ public class KafkaConsumerTest {
         assertEquals(singleton(tp0), consumer.assignment());
     }
 
-    @Test
-    public void testChangingRegexSubscription() {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testChangingRegexSubscription(GroupProtocol groupProtocol) {
         String otherTopic = "other";
         TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0);
 
@@ -1068,7 +1169,7 @@ public class KafkaConsumerTest {
         initMetadata(client, partitionCounts);
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
 
         Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
         consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
@@ -1087,15 +1188,18 @@ public class KafkaConsumerTest {
         assertEquals(singleton(otherTopic), consumer.subscription());
     }
 
-    @Test
-    public void testWakeupWithFetchDataAvailable() throws Exception {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testWakeupWithFetchDataAvailable(GroupProtocol groupProtocol) throws Exception {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
@@ -1125,15 +1229,18 @@ public class KafkaConsumerTest {
         exec.awaitTermination(5L, TimeUnit.SECONDS);
     }
 
-    @Test
-    public void testPollThrowsInterruptExceptionIfInterrupted() {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testPollThrowsInterruptExceptionIfInterrupted(GroupProtocol groupProtocol) {
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
@@ -1150,15 +1257,16 @@ public class KafkaConsumerTest {
         }
     }
 
-    @Test
-    public void fetchResponseWithUnexpectedPartitionIsIgnored() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void fetchResponseWithUnexpectedPartitionIsIgnored(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer));
 
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1182,9 +1290,12 @@ public class KafkaConsumerTest {
      * Upon unsubscribing from subscribed topics the consumer subscription and assignment
      * are both updated right away but its consumed offsets are not auto committed.
      */
-    @Test
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
     @SuppressWarnings("unchecked")
-    public void testSubscriptionChangesWithAutoCommitEnabled() {
+    public void testSubscriptionChangesWithAutoCommitEnabled(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1197,7 +1308,7 @@ public class KafkaConsumerTest {
 
         ConsumerPartitionAssignor assignor = new RangeAssignor();
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         // initial subscription
         consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer));
@@ -1295,8 +1406,11 @@ public class KafkaConsumerTest {
      * Upon unsubscribing from subscribed topics, the assigned partitions immediately
      * change but if auto-commit is disabled the consumer offsets are not committed.
      */
-    @Test
-    public void testSubscriptionChangesWithAutoCommitDisabled() {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testSubscriptionChangesWithAutoCommitDisabled(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1308,7 +1422,7 @@ public class KafkaConsumerTest {
 
         ConsumerPartitionAssignor assignor = new RangeAssignor();
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
 
         initializeSubscriptionWithSingleTopic(consumer, getConsumerRebalanceListener(consumer));
 
@@ -1349,8 +1463,11 @@ public class KafkaConsumerTest {
         client.requests().clear();
     }
 
-    @Test
-    public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration() {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1358,7 +1475,7 @@ public class KafkaConsumerTest {
         Node node = metadata.fetch().nodes().get(0);
 
         CooperativeStickyAssignor assignor = new CooperativeStickyAssignor();
-        consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
 
         initializeSubscriptionWithSingleTopic(consumer, getExceptionConsumerRebalanceListener());
 
@@ -1372,8 +1489,11 @@ public class KafkaConsumerTest {
         assertEquals(partitionRevoked + singleTopicPartition, unsubscribeException.getCause().getMessage());
     }
 
-    @Test
-    public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration() throws Exception {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration(GroupProtocol groupProtocol) throws Exception {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1381,7 +1501,7 @@ public class KafkaConsumerTest {
         Node node = metadata.fetch().nodes().get(0);
 
         CooperativeStickyAssignor assignor = new CooperativeStickyAssignor();
-        consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
 
         initializeSubscriptionWithSingleTopic(consumer, getExceptionConsumerRebalanceListener());
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1407,9 +1527,12 @@ public class KafkaConsumerTest {
         assertEquals(Collections.emptySet(), consumer.assignment());
     }
 
-    @Test
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
     @SuppressWarnings("unchecked")
-    public void testManualAssignmentChangeWithAutoCommitEnabled() {
+    public void testManualAssignmentChangeWithAutoCommitEnabled(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1421,7 +1544,7 @@ public class KafkaConsumerTest {
 
         ConsumerPartitionAssignor assignor = new RangeAssignor();
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         // lookup coordinator
         client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
@@ -1462,8 +1585,11 @@ public class KafkaConsumerTest {
         client.requests().clear();
     }
 
-    @Test
-    public void testManualAssignmentChangeWithAutoCommitDisabled() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testManualAssignmentChangeWithAutoCommitDisabled(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1475,7 +1601,7 @@ public class KafkaConsumerTest {
 
         ConsumerPartitionAssignor assignor = new RangeAssignor();
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
 
         // lookup coordinator
         client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
@@ -1517,8 +1643,11 @@ public class KafkaConsumerTest {
         client.requests().clear();
     }
 
-    @Test
-    public void testOffsetOfPausedPartitions() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testOffsetOfPausedPartitions(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1527,7 +1656,7 @@ public class KafkaConsumerTest {
 
         ConsumerPartitionAssignor assignor = new RangeAssignor();
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         // lookup coordinator
         client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
@@ -1567,38 +1696,47 @@ public class KafkaConsumerTest {
         consumer.unsubscribe();
     }
 
-    @Test
-    public void testPollWithNoSubscription() {
-        consumer = newConsumer((String) null);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testPollWithNoSubscription(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, null);
         assertThrows(IllegalStateException.class, () -> consumer.poll(Duration.ZERO));
     }
 
-    @Test
-    public void testPollWithEmptySubscription() {
-        consumer = newConsumer(groupId);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testPollWithEmptySubscription(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, groupId);
         consumer.subscribe(Collections.emptyList());
         assertThrows(IllegalStateException.class, () -> consumer.poll(Duration.ZERO));
     }
 
-    @Test
-    public void testPollWithEmptyUserAssignment() {
-        consumer = newConsumer(groupId);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testPollWithEmptyUserAssignment(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, groupId);
         consumer.assign(Collections.emptySet());
         assertThrows(IllegalStateException.class, () -> consumer.poll(Duration.ZERO));
     }
 
-    @Test
-    public void testGracefulClose() throws Exception {
+    // TODO: this test references RPCs to be sent that are not part of the CONSUMER group protocol.
+    //       We are deferring any attempts at generalizing this test for both group protocols to the future.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testGracefulClose(GroupProtocol groupProtocol) throws Exception {
         Map<TopicPartition, Errors> response = new HashMap<>();
         response.put(tp0, Errors.NONE);
         OffsetCommitResponse commitResponse = offsetCommitResponse(response);
         LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()));
         FetchResponse closeResponse = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>());
-        consumerCloseTest(5000, Arrays.asList(commitResponse, leaveGroupResponse, closeResponse), 0, false);
+        consumerCloseTest(groupProtocol, 5000, Arrays.asList(commitResponse, leaveGroupResponse, closeResponse), 0, false);
     }
 
-    @Test
-    public void testCloseTimeoutDueToNoResponseForCloseFetchRequest() throws Exception {
+    // TODO: this test references RPCs to be sent that are not part of the CONSUMER group protocol.
+    //       We are deferring any attempts at generalizing this test for both group protocols to the future.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testCloseTimeoutDueToNoResponseForCloseFetchRequest(GroupProtocol groupProtocol) throws Exception {
         Map<TopicPartition, Errors> response = new HashMap<>();
         response.put(tp0, Errors.NONE);
         OffsetCommitResponse commitResponse = offsetCommitResponse(response);
@@ -1610,39 +1748,54 @@ public class KafkaConsumerTest {
         // than configured timeout.
         final int closeTimeoutMs = 5000;
         final int waitForCloseCompletionMs = closeTimeoutMs + 1000;
-        consumerCloseTest(closeTimeoutMs, serverResponsesWithoutCloseResponse, waitForCloseCompletionMs, false);
+        consumerCloseTest(groupProtocol, closeTimeoutMs, serverResponsesWithoutCloseResponse, waitForCloseCompletionMs, false);
     }
 
-    @Test
-    public void testCloseTimeout() throws Exception {
-        consumerCloseTest(5000, Collections.emptyList(), 5000, false);
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testCloseTimeout(GroupProtocol groupProtocol) throws Exception {
+        consumerCloseTest(groupProtocol, 5000, Collections.emptyList(), 5000, false);
     }
 
-    @Test
-    public void testLeaveGroupTimeout() throws Exception {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testLeaveGroupTimeout(GroupProtocol groupProtocol) throws Exception {
         Map<TopicPartition, Errors> response = new HashMap<>();
         response.put(tp0, Errors.NONE);
         OffsetCommitResponse commitResponse = offsetCommitResponse(response);
-        consumerCloseTest(5000, singletonList(commitResponse), 5000, false);
+        consumerCloseTest(groupProtocol, 5000, singletonList(commitResponse), 5000, false);
     }
 
-    @Test
-    public void testCloseNoWait() throws Exception {
-        consumerCloseTest(0, Collections.emptyList(), 0, false);
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testCloseNoWait(GroupProtocol groupProtocol) throws Exception {
+        consumerCloseTest(groupProtocol, 0, Collections.emptyList(), 0, false);
     }
 
-    @Test
-    public void testCloseInterrupt() throws Exception {
-        consumerCloseTest(Long.MAX_VALUE, Collections.emptyList(), 0, true);
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testCloseInterrupt(GroupProtocol groupProtocol) throws Exception {
+        consumerCloseTest(groupProtocol, Long.MAX_VALUE, Collections.emptyList(), 0, true);
     }
 
-    @Test
-    public void testCloseShouldBeIdempotent() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testCloseShouldBeIdempotent(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = spy(new MockClient(time, metadata));
         initMetadata(client, singletonMap(topic, 1));
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
 
         consumer.close(Duration.ZERO);
         consumer.close(Duration.ZERO);
@@ -1651,47 +1804,49 @@ public class KafkaConsumerTest {
         verify(client).close();
     }
 
-    @Test
-    public void testOperationsBySubscribingConsumerWithDefaultGroupId() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testOperationsBySubscribingConsumerWithDefaultGroupId(GroupProtocol groupProtocol) {
         try {
-            newConsumer(null, Optional.of(Boolean.TRUE));
+            newConsumer(groupProtocol, null, Optional.of(Boolean.TRUE));
             fail("Expected an InvalidConfigurationException");
-        } catch (KafkaException e) {
-            assertEquals(InvalidConfigurationException.class, e.getCause().getClass());
+        } catch (InvalidConfigurationException e) {
+            // OK, expected
         }
 
         try {
-            newConsumer((String) null).subscribe(Collections.singleton(topic));
+            newConsumer(groupProtocol, null).subscribe(Collections.singleton(topic));
             fail("Expected an InvalidGroupIdException");
         } catch (InvalidGroupIdException e) {
             // OK, expected
         }
 
         try {
-            newConsumer((String) null).committed(Collections.singleton(tp0)).get(tp0);
+            newConsumer(groupProtocol, null).committed(Collections.singleton(tp0)).get(tp0);
             fail("Expected an InvalidGroupIdException");
         } catch (InvalidGroupIdException e) {
             // OK, expected
         }
 
         try {
-            newConsumer((String) null).commitAsync();
+            newConsumer(groupProtocol, null).commitAsync();
             fail("Expected an InvalidGroupIdException");
         } catch (InvalidGroupIdException e) {
             // OK, expected
         }
 
         try {
-            newConsumer((String) null).commitSync();
+            newConsumer(groupProtocol, null).commitSync();
             fail("Expected an InvalidGroupIdException");
         } catch (InvalidGroupIdException e) {
             // OK, expected
         }
     }
 
-    @Test
-    public void testOperationsByAssigningConsumerWithDefaultGroupId() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testOperationsByAssigningConsumerWithDefaultGroupId(GroupProtocol groupProtocol) {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, null);
         consumer.assign(singleton(tp0));
 
         try {
@@ -1716,30 +1871,35 @@ public class KafkaConsumerTest {
         }
     }
 
-    @Test
-    public void testMetricConfigRecordingLevelInfo() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testMetricConfigRecordingLevelInfo(GroupProtocol groupProtocol) {
         Properties props = new Properties();
+        props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
-        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        assertEquals(Sensor.RecordingLevel.INFO, consumer.metrics.config().recordLevel());
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        assertEquals(Sensor.RecordingLevel.INFO, consumer.metricsRegistry().config().recordLevel());
         consumer.close(Duration.ZERO);
 
         props.put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
-        KafkaConsumer<byte[], byte[]> consumer2 = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        assertEquals(Sensor.RecordingLevel.DEBUG, consumer2.metrics.config().recordLevel());
+        KafkaConsumer<byte[], byte[]> consumer2 = newConsumer(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        assertEquals(Sensor.RecordingLevel.DEBUG, consumer2.metricsRegistry().config().recordLevel());
         consumer2.close(Duration.ZERO);
     }
 
-    @Test
+    // TODO: this test references RPCs to be sent that are not part of the CONSUMER group protocol.
+    //       We are deferring any attempts at generalizing this test for both group protocols to the future.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
     @SuppressWarnings("unchecked")
-    public void testShouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception {
+    public void testShouldAttemptToRejoinGroupAfterSyncGroupFailed(GroupProtocol groupProtocol) throws Exception {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
@@ -1797,7 +1957,8 @@ public class KafkaConsumerTest {
         assertFalse(records.isEmpty());
     }
 
-    private void consumerCloseTest(final long closeTimeoutMs,
+    private void consumerCloseTest(GroupProtocol groupProtocol,
+                                   final long closeTimeoutMs,
                                    List<? extends AbstractResponse> responses,
                                    long waitMs,
                                    boolean interrupt) throws Exception {
@@ -1807,7 +1968,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, Optional.empty());
+        final KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, Optional.empty());
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
@@ -1893,8 +2054,11 @@ public class KafkaConsumerTest {
         }
     }
 
-    @Test
-    public void testPartitionsForNonExistingTopic() {
+    // TODO: this test requires topic metadata logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testPartitionsForNonExistingTopic(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -1907,59 +2071,83 @@ public class KafkaConsumerTest {
             Collections.emptyList());
         client.prepareResponse(updateResponse);
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         assertEquals(Collections.emptyList(), consumer.partitionsFor("non-exist-topic"));
     }
 
-    @Test
-    public void testPartitionsForAuthenticationFailure() {
-        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
+    // TODO: this test requires topic metadata logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testPartitionsForAuthenticationFailure(GroupProtocol groupProtocol) {
+        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
         assertThrows(AuthenticationException.class, () -> consumer.partitionsFor("some other topic"));
     }
 
-    @Test
-    public void testBeginningOffsetsAuthenticationFailure() {
-        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testBeginningOffsetsAuthenticationFailure(GroupProtocol groupProtocol) {
+        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
         assertThrows(AuthenticationException.class, () -> consumer.beginningOffsets(Collections.singleton(tp0)));
     }
 
-    @Test
-    public void testEndOffsetsAuthenticationFailure() {
-        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testEndOffsetsAuthenticationFailure(GroupProtocol groupProtocol) {
+        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
         assertThrows(AuthenticationException.class, () -> consumer.endOffsets(Collections.singleton(tp0)));
     }
 
-    @Test
-    public void testPollAuthenticationFailure() {
-        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testPollAuthenticationFailure(GroupProtocol groupProtocol) {
+        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
         consumer.subscribe(singleton(topic));
         assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO));
     }
 
-    @Test
-    public void testOffsetsForTimesAuthenticationFailure() {
-        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testOffsetsForTimesAuthenticationFailure(GroupProtocol groupProtocol) {
+        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
         assertThrows(AuthenticationException.class, () -> consumer.offsetsForTimes(singletonMap(tp0, 0L)));
     }
 
-    @Test
-    public void testCommitSyncAuthenticationFailure() {
-        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testCommitSyncAuthenticationFailure(GroupProtocol groupProtocol) {
+        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(tp0, new OffsetAndMetadata(10L));
         assertThrows(AuthenticationException.class, () -> consumer.commitSync(offsets));
     }
 
-    @Test
-    public void testCommittedAuthenticationFailure() {
-        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testCommittedAuthenticationFailure(GroupProtocol groupProtocol) {
+        final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
         assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0));
     }
 
-    @Test
-    public void testMeasureCommitSyncDurationOnFailure() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testMeasureCommitSyncDurationOnFailure(GroupProtocol groupProtocol) {
         final KafkaConsumer<String, String> consumer
-            = consumerWithPendingError(new MockTime(Duration.ofSeconds(1).toMillis()));
+            = consumerWithPendingError(groupProtocol, new MockTime(Duration.ofSeconds(1).toMillis()));
 
         try {
             consumer.commitSync(Collections.singletonMap(tp0, new OffsetAndMetadata(10L)));
@@ -1967,12 +2155,15 @@ public class KafkaConsumerTest {
         }
 
         final Metric metric = consumer.metrics()
-            .get(consumer.metrics.metricName("commit-sync-time-ns-total", "consumer-metrics"));
+            .get(consumer.metricsRegistry().metricName("commit-sync-time-ns-total", "consumer-metrics"));
         assertTrue((Double) metric.metricValue() >= Duration.ofMillis(999).toNanos());
     }
 
-    @Test
-    public void testMeasureCommitSyncDuration() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testMeasureCommitSyncDuration(GroupProtocol groupProtocol) {
         Time time = new MockTime(Duration.ofSeconds(1).toMillis());
         SubscriptionState subscription = new SubscriptionState(new LogContext(),
             OffsetResetStrategy.EARLIEST);
@@ -1980,7 +2171,7 @@ public class KafkaConsumerTest {
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata,
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata,
             assignor, true, groupInstanceId);
         consumer.assign(singletonList(tp0));
 
@@ -1995,14 +2186,17 @@ public class KafkaConsumerTest {
         consumer.commitSync(Collections.singletonMap(tp0, new OffsetAndMetadata(10L)));
 
         final Metric metric = consumer.metrics()
-            .get(consumer.metrics.metricName("commit-sync-time-ns-total", "consumer-metrics"));
+            .get(consumer.metricsRegistry().metricName("commit-sync-time-ns-total", "consumer-metrics"));
         assertTrue((Double) metric.metricValue() >= Duration.ofMillis(999).toNanos());
     }
 
-    @Test
-    public void testMeasureCommittedDurationOnFailure() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testMeasureCommittedDurationOnFailure(GroupProtocol groupProtocol) {
         final KafkaConsumer<String, String> consumer
-            = consumerWithPendingError(new MockTime(Duration.ofSeconds(1).toMillis()));
+            = consumerWithPendingError(groupProtocol, new MockTime(Duration.ofSeconds(1).toMillis()));
 
         try {
             consumer.committed(Collections.singleton(tp0));
@@ -2010,12 +2204,15 @@ public class KafkaConsumerTest {
         }
 
         final Metric metric = consumer.metrics()
-            .get(consumer.metrics.metricName("committed-time-ns-total", "consumer-metrics"));
+            .get(consumer.metricsRegistry().metricName("committed-time-ns-total", "consumer-metrics"));
         assertTrue((Double) metric.metricValue() >= Duration.ofMillis(999).toNanos());
     }
 
-    @Test
-    public void testMeasureCommittedDuration() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testMeasureCommittedDuration(GroupProtocol groupProtocol) {
         long offset1 = 10000;
         Time time = new MockTime(Duration.ofSeconds(1).toMillis());
         SubscriptionState subscription = new SubscriptionState(new LogContext(),
@@ -2024,7 +2221,7 @@ public class KafkaConsumerTest {
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata,
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata,
             assignor, true, groupInstanceId);
         consumer.assign(singletonList(tp0));
 
@@ -2040,19 +2237,22 @@ public class KafkaConsumerTest {
         consumer.committed(Collections.singleton(tp0)).get(tp0).offset();
 
         final Metric metric = consumer.metrics()
-            .get(consumer.metrics.metricName("committed-time-ns-total", "consumer-metrics"));
+            .get(consumer.metricsRegistry().metricName("committed-time-ns-total", "consumer-metrics"));
         assertTrue((Double) metric.metricValue() >= Duration.ofMillis(999).toNanos());
     }
 
-    @Test
-    public void testRebalanceException() {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testRebalanceException(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         consumer.subscribe(singleton(topic), getExceptionConsumerRebalanceListener());
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
@@ -2086,13 +2286,16 @@ public class KafkaConsumerTest {
         assertTrue(subscription.assignedPartitions().isEmpty());
     }
 
-    @Test
-    public void testReturnRecordsDuringRebalance() throws InterruptedException {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testReturnRecordsDuringRebalance(GroupProtocol groupProtocol) throws InterruptedException {
         Time time = new MockTime(1L);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor();
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
 
@@ -2211,15 +2414,18 @@ public class KafkaConsumerTest {
         consumer.close(Duration.ZERO);
     }
 
-    @Test
-    public void testGetGroupMetadata() {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testGetGroupMetadata(GroupProtocol groupProtocol) {
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         final Node node = metadata.fetch().nodes().get(0);
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        final KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         final ConsumerGroupMetadata groupMetadataOnStart = consumer.groupMetadata();
         assertEquals(groupId, groupMetadataOnStart.groupId());
@@ -2241,12 +2447,15 @@ public class KafkaConsumerTest {
         assertEquals(groupInstanceId, groupMetadataAfterPoll.groupInstanceId());
     }
 
-    @Test
-    public void testInvalidGroupMetadata() throws InterruptedException {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testInvalidGroupMetadata(GroupProtocol groupProtocol) throws InterruptedException {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata,
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata,
                 new RoundRobinAssignor(), true, groupInstanceId);
         consumer.subscribe(singletonList(topic));
         // concurrent access is illegal
@@ -2268,15 +2477,18 @@ public class KafkaConsumerTest {
         assertThrows(IllegalStateException.class, consumer::groupMetadata);
     }
 
-    @Test
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
     @SuppressWarnings("unchecked")
-    public void testCurrentLag() {
+    public void testCurrentLag(GroupProtocol groupProtocol) {
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, singletonMap(topic, 1));
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         // throws for unassigned partition
         assertThrows(IllegalStateException.class, () -> consumer.currentLag(tp0));
@@ -2321,14 +2533,17 @@ public class KafkaConsumerTest {
         assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
     }
 
-    @Test
-    public void testListOffsetShouldUpdateSubscriptions() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testListOffsetShouldUpdateSubscriptions(GroupProtocol groupProtocol) {
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, singletonMap(topic, 1));
 
-        consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         consumer.assign(singleton(tp0));
 
@@ -2344,7 +2559,8 @@ public class KafkaConsumerTest {
         assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
     }
 
-    private KafkaConsumer<String, String> consumerWithPendingAuthenticationError(final Time time) {
+    private KafkaConsumer<String, String> consumerWithPendingAuthenticationError(GroupProtocol groupProtocol,
+                                                                                 final Time time) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -2354,15 +2570,15 @@ public class KafkaConsumerTest {
         ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         client.createPendingAuthenticationError(node, 0);
-        return newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+        return newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
     }
 
-    private KafkaConsumer<String, String> consumerWithPendingAuthenticationError() {
-        return consumerWithPendingAuthenticationError(new MockTime());
+    private KafkaConsumer<String, String> consumerWithPendingAuthenticationError(GroupProtocol groupProtocol) {
+        return consumerWithPendingAuthenticationError(groupProtocol, new MockTime());
     }
 
-    private KafkaConsumer<String, String> consumerWithPendingError(final Time time) {
-        return consumerWithPendingAuthenticationError(time);
+    private KafkaConsumer<String, String> consumerWithPendingError(GroupProtocol groupProtocol, final Time time) {
+        return consumerWithPendingAuthenticationError(groupProtocol, time);
     }
 
     private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<?, ?> consumer) {
@@ -2512,6 +2728,7 @@ public class KafkaConsumerTest {
             partitionData.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue(),
                     Optional.empty(), "", error));
         }
+        int throttleMs = 10;
         return new OffsetFetchResponse(
             throttleMs,
             Collections.singletonMap(groupId, Errors.NONE),
@@ -2561,11 +2778,12 @@ public class KafkaConsumerTest {
             if (fetchCount == 0) {
                 records = MemoryRecords.EMPTY;
             } else {
-                MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
-                        TimestampType.CREATE_TIME, fetchOffset);
-                for (int i = 0; i < fetchCount; i++)
-                    builder.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
-                records = builder.build();
+                try (MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                        TimestampType.CREATE_TIME, fetchOffset)) {
+                    for (int i = 0; i < fetchCount; i++)
+                        builder.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
+                    records = builder.build();
+                }
             }
             tpResponses.put(new TopicIdPartition(topicIds.get(partition.topic()), partition),
                 new FetchResponseData.PartitionData()
@@ -2582,24 +2800,49 @@ public class KafkaConsumerTest {
         return fetchResponse(Collections.singletonMap(partition, fetchInfo));
     }
 
-    private KafkaConsumer<String, String> newConsumer(Time time,
+    private KafkaConsumer<String, String> newConsumer(GroupProtocol groupProtocol,
+                                                      Time time,
                                                       KafkaClient client,
                                                       SubscriptionState subscription,
                                                       ConsumerMetadata metadata,
                                                       ConsumerPartitionAssignor assignor,
                                                       boolean autoCommitEnabled,
                                                       Optional<String> groupInstanceId) {
-        return newConsumer(time, client, subscription, metadata, assignor, autoCommitEnabled, groupId, groupInstanceId, false);
+        return newConsumer(
+            groupProtocol,
+            time,
+            client,
+            subscription,
+            metadata,
+            assignor,
+            autoCommitEnabled,
+            groupId,
+            groupInstanceId,
+            false
+        );
     }
 
-    private KafkaConsumer<String, String> newConsumerNoAutoCommit(Time time,
+    private KafkaConsumer<String, String> newConsumerNoAutoCommit(GroupProtocol groupProtocol,
+                                                                  Time time,
                                                                   KafkaClient client,
                                                                   SubscriptionState subscription,
                                                                   ConsumerMetadata metadata) {
-        return newConsumer(time, client, subscription, metadata, new RangeAssignor(), false, groupId, groupInstanceId, false);
+        return newConsumer(
+            groupProtocol,
+            time,
+            client,
+            subscription,
+            metadata,
+            new RangeAssignor(),
+            false,
+            groupId,
+            groupInstanceId,
+            false
+        );
     }
 
-    private KafkaConsumer<String, String> newConsumer(Time time,
+    private KafkaConsumer<String, String> newConsumer(GroupProtocol groupProtocol,
+                                                      Time time,
                                                       KafkaClient client,
                                                       SubscriptionState subscription,
                                                       ConsumerMetadata metadata,
@@ -2608,22 +2851,64 @@ public class KafkaConsumerTest {
                                                       String groupId,
                                                       Optional<String> groupInstanceId,
                                                       boolean throwOnStableOffsetNotSupported) {
-        return newConsumer(time, client, subscription, metadata, assignor, autoCommitEnabled, groupId, groupInstanceId,
-        Optional.of(new StringDeserializer()), throwOnStableOffsetNotSupported);
+        return newConsumer(
+            groupProtocol,
+            time,
+            client,
+            subscription,
+            metadata,
+            assignor,
+            autoCommitEnabled,
+            groupId,
+            groupInstanceId,
+            Optional.of(new StringDeserializer()),
+            throwOnStableOffsetNotSupported
+        );
     }
 
-    private KafkaConsumer<String, String> newConsumer(Time time,
+    private KafkaConsumer<String, String> newConsumer(GroupProtocol groupProtocol,
+                                                      Time time,
                                                       KafkaClient client,
-                                                      SubscriptionState subscription,
+                                                      SubscriptionState subscriptions,
                                                       ConsumerMetadata metadata,
                                                       ConsumerPartitionAssignor assignor,
                                                       boolean autoCommitEnabled,
                                                       String groupId,
                                                       Optional<String> groupInstanceId,
-                                                      Optional<Deserializer<String>> valueDeserializer,
+                                                      Optional<Deserializer<String>> valueDeserializerOpt,
                                                       boolean throwOnStableOffsetNotSupported) {
+        Deserializer<String> keyDeserializer = new StringDeserializer();
+        Deserializer<String> valueDeserializer = valueDeserializerOpt.orElse(new StringDeserializer());
+        LogContext logContext = new LogContext();
+        List<ConsumerPartitionAssignor> assignors = singletonList(assignor);
+        ConsumerConfig config = newConsumerConfig(
+            groupProtocol,
+            autoCommitEnabled,
+            groupId,
+            groupInstanceId,
+            valueDeserializer,
+            throwOnStableOffsetNotSupported
+        );
+        return new KafkaConsumer<>(
+            logContext,
+            time,
+            config,
+            keyDeserializer,
+            valueDeserializer,
+            client,
+            subscriptions,
+            metadata,
+            assignors
+        );
+    }
+
+    private ConsumerConfig newConsumerConfig(GroupProtocol groupProtocol,
+                                             boolean autoCommitEnabled,
+                                             String groupId,
+                                             Optional<String> groupInstanceId,
+                                             Deserializer<String> valueDeserializer,
+                                             boolean throwOnStableOffsetNotSupported) {
         String clientId = "mock-consumer";
-        String metricGroupPrefix = "consumer";
         long retryBackoffMs = 100;
         long retryBackoffMaxMs = 1000;
         int minBytes = 1;
@@ -2633,101 +2918,35 @@ public class KafkaConsumerTest {
         int maxPollRecords = Integer.MAX_VALUE;
         boolean checkCrcs = true;
         int rebalanceTimeoutMs = 60000;
+        int requestTimeoutMs = defaultApiTimeoutMs / 2;
 
-        Deserializer<String> keyDeserializer = new StringDeserializer();
-        Deserializer<String> deserializer = valueDeserializer.orElse(new StringDeserializer());
-
-        List<ConsumerPartitionAssignor> assignors = singletonList(assignor);
-        ConsumerInterceptors<String, String> interceptors = new ConsumerInterceptors<>(Collections.emptyList());
-
-        Metrics metrics = new Metrics(time);
-        ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix);
-
-        LogContext loggerFactory = new LogContext();
-        ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time,
-                retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs);
-
-        ConsumerCoordinator consumerCoordinator = null;
-        if (groupId != null) {
-            GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
-                rebalanceTimeoutMs,
-                heartbeatIntervalMs,
-                groupId,
-                groupInstanceId,
-                retryBackoffMs,
-                retryBackoffMaxMs,
-                true);
-            consumerCoordinator = new ConsumerCoordinator(rebalanceConfig,
-                loggerFactory,
-                consumerClient,
-                assignors,
-                metadata,
-                subscription,
-                metrics,
-                metricGroupPrefix,
-                time,
-                autoCommitEnabled,
-                autoCommitIntervalMs,
-                interceptors,
-                throwOnStableOffsetNotSupported,
-                null);
-        }
-        IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;
-        FetchMetricsManager metricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
-        FetchConfig fetchConfig = new FetchConfig(
-                minBytes,
-                maxBytes,
-                maxWaitMs,
-                fetchSize,
-                maxPollRecords,
-                checkCrcs,
-                CommonClientConfigs.DEFAULT_CLIENT_RACK,
-                isolationLevel);
-        Fetcher<String, String> fetcher = new Fetcher<>(
-                loggerFactory,
-                consumerClient,
-                metadata,
-                subscription,
-                fetchConfig,
-                new Deserializers<>(keyDeserializer, deserializer),
-                metricsManager,
-                time,
-                new ApiVersions());
-        OffsetFetcher offsetFetcher = new OffsetFetcher(loggerFactory,
-                consumerClient,
-                metadata,
-                subscription,
-                time,
-                retryBackoffMs,
-                requestTimeoutMs,
-                isolationLevel,
-                new ApiVersions());
-        TopicMetadataFetcher topicMetadataFetcher = new TopicMetadataFetcher(loggerFactory,
-                consumerClient,
-                retryBackoffMs,
-                retryBackoffMaxMs);
-
-        return new KafkaConsumer<>(
-                loggerFactory,
-                clientId,
-                consumerCoordinator,
-                keyDeserializer,
-                deserializer,
-                fetcher,
-                offsetFetcher,
-                topicMetadataFetcher,
-                interceptors,
-                time,
-                consumerClient,
-                metrics,
-                subscription,
-                metadata,
-                retryBackoffMs,
-                retryBackoffMaxMs,
-                requestTimeoutMs,
-                defaultApiTimeoutMs,
-                assignors,
-                groupId);
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
+        configs.put(ConsumerConfig.CHECK_CRCS_CONFIG, checkCrcs);
+        configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+        configs.put(ConsumerConfig.CLIENT_RACK_CONFIG, CommonClientConfigs.DEFAULT_CLIENT_RACK);
+        configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, defaultApiTimeoutMs);
+        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitEnabled);
+        configs.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, maxBytes);
+        configs.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, maxWaitMs);
+        configs.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, minBytes);
+        configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
+        configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMs);
+        configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, fetchSize);
+        configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, rebalanceTimeoutMs);
+        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
+        configs.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
+        configs.put(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, retryBackoffMaxMs);
+        configs.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs);
+        configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
+        configs.put(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, throwOnStableOffsetNotSupported);
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
+        groupInstanceId.ifPresent(gi -> configs.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, gi));
+
+        return new ConsumerConfig(configs);
     }
 
     private static class FetchInfo {
@@ -2748,8 +2967,11 @@ public class KafkaConsumerTest {
         }
     }
 
-    @Test
-    public void testSubscriptionOnInvalidTopic() {
+    // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+    //       Once it is implemented, this should use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testSubscriptionOnInvalidTopic(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -2767,22 +2989,23 @@ public class KafkaConsumerTest {
                 topicMetadata);
         client.prepareMetadataUpdate(updateResponse);
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(invalidTopicName), getConsumerRebalanceListener(consumer));
 
         assertThrows(InvalidTopicException.class, () -> consumer.poll(Duration.ZERO));
     }
 
-    @Test
-    public void testPollTimeMetrics() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testPollTimeMetrics(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singletonList(topic));
         // MetricName objects to check
-        Metrics metrics = consumer.metrics;
+        Metrics metrics = consumer.metricsRegistry();
         MetricName lastPollSecondsAgoName = metrics.metricName("last-poll-seconds-ago", "consumer-metrics");
         MetricName timeBetweenPollAvgName = metrics.metricName("time-between-poll-avg", "consumer-metrics");
         MetricName timeBetweenPollMaxName = metrics.metricName("time-between-poll-max", "consumer-metrics");
@@ -2818,32 +3041,33 @@ public class KafkaConsumerTest {
         assertEquals(10 * 1000d, consumer.metrics().get(timeBetweenPollMaxName).metricValue());
     }
 
-    @Test
-    public void testPollIdleRatio() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+public void testPollIdleRatio(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
 
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         // MetricName object to check
-        Metrics metrics = consumer.metrics;
+        Metrics metrics = consumer.metricsRegistry();
         MetricName pollIdleRatio = metrics.metricName("poll-idle-ratio-avg", "consumer-metrics");
         // Test default value
         assertEquals(Double.NaN, consumer.metrics().get(pollIdleRatio).metricValue());
 
         // 1st poll
         // Spend 50ms in poll so value = 1.0
-        consumer.kafkaConsumerMetrics.recordPollStart(time.milliseconds());
+        consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds());
         time.sleep(50);
-        consumer.kafkaConsumerMetrics.recordPollEnd(time.milliseconds());
+        consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds());
 
         assertEquals(1.0d, consumer.metrics().get(pollIdleRatio).metricValue());
 
         // 2nd poll
         // Spend 50m outside poll and 0ms in poll so value = 0.0
         time.sleep(50);
-        consumer.kafkaConsumerMetrics.recordPollStart(time.milliseconds());
-        consumer.kafkaConsumerMetrics.recordPollEnd(time.milliseconds());
+        consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds());
+        consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds());
 
         // Avg of first two data points
         assertEquals((1.0d + 0.0d) / 2, consumer.metrics().get(pollIdleRatio).metricValue());
@@ -2851,9 +3075,9 @@ public class KafkaConsumerTest {
         // 3rd poll
         // Spend 25ms outside poll and 25ms in poll so value = 0.5
         time.sleep(25);
-        consumer.kafkaConsumerMetrics.recordPollStart(time.milliseconds());
+        consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds());
         time.sleep(25);
-        consumer.kafkaConsumerMetrics.recordPollEnd(time.milliseconds());
+        consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds());
 
         // Avg of three data points
         assertEquals((1.0d + 0.0d + 0.5d) / 3, consumer.metrics().get(pollIdleRatio).metricValue());
@@ -2861,16 +3085,17 @@ public class KafkaConsumerTest {
 
     private static boolean consumerMetricPresent(KafkaConsumer<String, String> consumer, String name) {
         MetricName metricName = new MetricName(name, "consumer-metrics", "", Collections.emptyMap());
-        return consumer.metrics.metrics().containsKey(metricName);
+        return consumer.metricsRegistry().metrics().containsKey(metricName);
     }
 
-    @Test
-    public void testClosingConsumerUnregistersConsumerMetrics() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+public void testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupProtocol) {
         Time time = new MockTime(1L);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         initMetadata(client, Collections.singletonMap(topic, 1));
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata,
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata,
             new RoundRobinAssignor(), true, groupInstanceId);
         consumer.subscribe(singletonList(topic));
         assertTrue(consumerMetricPresent(consumer, "last-poll-seconds-ago"));
@@ -2882,19 +3107,23 @@ public class KafkaConsumerTest {
         assertFalse(consumerMetricPresent(consumer, "time-between-poll-max"));
     }
 
-    @Test
-    public void testEnforceRebalanceWithManualAssignment() {
-        consumer = newConsumer((String) null);
+    // NOTE: this test uses the enforceRebalance API which is not implemented in the CONSUMER group protocol.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testEnforceRebalanceWithManualAssignment(GroupProtocol groupProtocol) {
+        consumer = newConsumer(groupProtocol, null);
         consumer.assign(singleton(new TopicPartition("topic", 0)));
         assertThrows(IllegalStateException.class, consumer::enforceRebalance);
     }
 
-    @Test
-    public void testEnforceRebalanceTriggersRebalanceOnNextPoll() {
+    // NOTE: this test uses the enforceRebalance API which is not implemented in the CONSUMER group protocol.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testEnforceRebalanceTriggersRebalanceOnNextPoll(GroupProtocol groupProtocol) {
         Time time = new MockTime(1L);
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
         MockRebalanceListener countingRebalanceListener = new MockRebalanceListener();
         initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
 
@@ -2918,8 +3147,10 @@ public class KafkaConsumerTest {
         assertEquals(countingRebalanceListener.revokedCount, 1);
     }
 
-    @Test
-    public void testEnforceRebalanceReason() {
+    // NOTE: this test uses the enforceRebalance API which is not implemented in the CONSUMER group protocol.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testEnforceRebalanceReason(GroupProtocol groupProtocol) {
         Time time = new MockTime(1L);
 
         ConsumerMetadata metadata = createMetadata(subscription);
@@ -2928,6 +3159,7 @@ public class KafkaConsumerTest {
         Node node = metadata.fetch().nodes().get(0);
 
         consumer = newConsumer(
+            groupProtocol,
             time,
             client,
             subscription,
@@ -2978,24 +3210,30 @@ public class KafkaConsumerTest {
         );
     }
 
-    @Test
-    public void configurableObjectsShouldSeeGeneratedClientId() {
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void configurableObjectsShouldSeeGeneratedClientId(GroupProtocol groupProtocol) {
         Properties props = new Properties();
+        props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DeserializerForClientId.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DeserializerForClientId.class.getName());
         props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorForClientId.class.getName());
 
-        consumer = new KafkaConsumer<>(props);
-        assertNotNull(consumer.getClientId());
-        assertNotEquals(0, consumer.getClientId().length());
+        consumer = newConsumer(props);
+        assertNotNull(consumer.clientId());
+        assertNotEquals(0, consumer.clientId().length());
         assertEquals(3, CLIENT_IDS.size());
-        CLIENT_IDS.forEach(id -> assertEquals(id, consumer.getClientId()));
+        CLIENT_IDS.forEach(id -> assertEquals(id, consumer.clientId()));
     }
 
-    @Test
-    public void testUnusedConfigs() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testUnusedConfigs(GroupProtocol groupProtocol) {
         Map<String, Object> props = new HashMap<>();
+        props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS");
         ConsumerConfig config = new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(props, new StringDeserializer(), new StringDeserializer()));
@@ -3006,45 +3244,56 @@ public class KafkaConsumerTest {
         assertTrue(config.unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG));
     }
 
-    @Test
-    public void testAssignorNameConflict() {
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testAssignorNameConflict(GroupProtocol groupProtocol) {
         Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
         configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
             Arrays.asList(RangeAssignor.class.getName(), ConsumerPartitionAssignorTest.TestConsumerPartitionAssignor.class.getName()));
 
         assertThrows(KafkaException.class,
-            () -> new KafkaConsumer<>(configs, new StringDeserializer(), new StringDeserializer()));
+            () -> newConsumer(configs, new StringDeserializer(), new StringDeserializer()));
     }
 
-    @Test
-    public void testOffsetsForTimesTimeout() {
-        final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testOffsetsForTimesTimeout(GroupProtocol groupProtocol) {
+        final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException(groupProtocol);
         assertEquals(
             "Failed to get offsets by times in 60000ms",
             assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage()
         );
     }
 
-    @Test
-    public void testBeginningOffsetsTimeout() {
-        final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testBeginningOffsetsTimeout(GroupProtocol groupProtocol) {
+        final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException(groupProtocol);
         assertEquals(
             "Failed to get offsets by times in 60000ms",
             assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.beginningOffsets(singletonList(tp0))).getMessage()
         );
     }
 
-    @Test
-    public void testEndOffsetsTimeout() {
-        final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
+    // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+    //       The bug will be investigated and fixed so this test can use both group protocols.
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+    public void testEndOffsetsTimeout(GroupProtocol groupProtocol) {
+        final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException(groupProtocol);
         assertEquals(
             "Failed to get offsets by times in 60000ms",
             assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.endOffsets(singletonList(tp0))).getMessage()
         );
     }
 
-    private KafkaConsumer<String, String> consumerForCheckingTimeoutException() {
+    private KafkaConsumer<String, String> consumerForCheckingTimeoutException(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
 
@@ -3052,7 +3301,7 @@ public class KafkaConsumerTest {
 
         ConsumerPartitionAssignor assignor = new RangeAssignor();
 
-        final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+        final KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
 
         for (int i = 0; i < 10; i++) {
             client.prepareResponse(
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
similarity index 98%
rename from clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
rename to clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index d67f509d905..7f506ec8f9f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -69,10 +69,10 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-public class PrototypeAsyncConsumerTest {
+public class AsyncKafkaConsumerTest {
 
-    private PrototypeAsyncConsumer<?, ?> consumer;
-    private ConsumerTestBuilder.PrototypeAsyncConsumerTestBuilder testBuilder;
+    private AsyncKafkaConsumer<?, ?> consumer;
+    private ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder testBuilder;
     private ApplicationEventHandler applicationEventHandler;
 
     @BeforeEach
@@ -82,7 +82,7 @@ public class PrototypeAsyncConsumerTest {
     }
 
     private void setup(Optional<ConsumerTestBuilder.GroupInformation> groupInfo) {
-        testBuilder = new ConsumerTestBuilder.PrototypeAsyncConsumerTestBuilder(groupInfo);
+        testBuilder = new ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder(groupInfo);
         applicationEventHandler = testBuilder.applicationEventHandler;
         consumer = testBuilder.consumer;
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 4751cdea09d..7e8dcbead7b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -57,6 +58,7 @@ import java.util.stream.Stream;
 
 import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID;
@@ -497,6 +499,10 @@ public class CommitRequestManagerTest {
     private CommitRequestManager create(final boolean autoCommitEnabled, final long autoCommitInterval) {
         props.setProperty(AUTO_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(autoCommitInterval));
         props.setProperty(ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(autoCommitEnabled));
+
+        if (autoCommitEnabled)
+            props.setProperty(GROUP_ID_CONFIG, TestUtils.randomString(10));
+
         return spy(new CommitRequestManager(
             this.time,
             this.logContext,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
index b36f1958f7d..03ab1893289 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
@@ -304,11 +304,11 @@ public class ConsumerTestBuilder implements Closeable {
         }
     }
 
-    public static class PrototypeAsyncConsumerTestBuilder extends ApplicationEventHandlerTestBuilder {
+    public static class AsyncKafkaConsumerTestBuilder extends ApplicationEventHandlerTestBuilder {
 
-        final PrototypeAsyncConsumer<String, String> consumer;
+        final AsyncKafkaConsumer<String, String> consumer;
 
-        public PrototypeAsyncConsumerTestBuilder(Optional<GroupInformation> groupInfo) {
+        public AsyncKafkaConsumerTestBuilder(Optional<GroupInformation> groupInfo) {
             super(groupInfo);
             String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
             List<ConsumerPartitionAssignor> assignors = ConsumerPartitionAssignor.getAssignorInstances(
@@ -323,7 +323,7 @@ public class ConsumerTestBuilder implements Closeable {
                     deserializers,
                     metricsManager,
                     time);
-            this.consumer = spy(new PrototypeAsyncConsumer<>(
+            this.consumer = spy(new AsyncKafkaConsumer<>(
                     logContext,
                     clientId,
                     deserializers,
diff --git a/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala
index 6d09575a463..717f1179753 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala
@@ -17,10 +17,12 @@
 package kafka.api
 
 import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.consumer.{ConsumerConfig, GroupProtocol}
 import org.junit.jupiter.api.Assertions.{assertNotNull, assertNull, assertTrue}
 import org.junit.jupiter.api.Test
 
 import java.time.Duration
+import java.util.Properties
 import scala.jdk.CollectionConverters._
 
 class BaseAsyncConsumerTest extends AbstractConsumerTest {
@@ -28,7 +30,9 @@ class BaseAsyncConsumerTest extends AbstractConsumerTest {
 
   @Test
   def testCommitAsync(): Unit = {
-    val consumer = createAsyncConsumer()
+    val props = new Properties();
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name());
+    val consumer = createConsumer(configOverrides = props)
     val producer = createProducer()
     val numRecords = 10000
     val startingTimestamp = System.currentTimeMillis()
@@ -49,7 +53,9 @@ class BaseAsyncConsumerTest extends AbstractConsumerTest {
 
   @Test
   def testCommitSync(): Unit = {
-    val consumer = createAsyncConsumer()
+    val props = new Properties();
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name());
+    val consumer = createConsumer(configOverrides = props)
     val producer = createProducer()
     val numRecords = 10000
     val startingTimestamp = System.currentTimeMillis()
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 1a861b2b2f1..4eaa83f0eb6 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -27,7 +27,6 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
 import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
-import org.apache.kafka.clients.consumer.internals.PrototypeAsyncConsumer
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@@ -171,19 +170,6 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
     producer
   }
 
-  def createAsyncConsumer[K, V](keyDeserializer: Deserializer[K] = new ByteArrayDeserializer,
-                                valueDeserializer: Deserializer[V] = new ByteArrayDeserializer,
-                                configOverrides: Properties = new Properties,
-                                configsToRemove: List[String] = List()): PrototypeAsyncConsumer[K, V] = {
-    val props = new Properties
-    props ++= consumerConfig
-    props ++= configOverrides
-    configsToRemove.foreach(props.remove(_))
-    val consumer = new PrototypeAsyncConsumer[K, V](props, keyDeserializer, valueDeserializer)
-    consumers += consumer
-    consumer
-  }
-
   def createConsumer[K, V](keyDeserializer: Deserializer[K] = new ByteArrayDeserializer,
                            valueDeserializer: Deserializer[V] = new ByteArrayDeserializer,
                            configOverrides: Properties = new Properties,
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 880806c87b2..9d2971a3ceb 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -586,4 +586,14 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
         <Bug pattern="WA_AWAIT_NOT_IN_LOOP"/>
     </Match>
 
+    <Match>
+        <!-- Suppress the warnings about the caller not using the return value from Consumer. KafkaConsumer now
+             delegates all API calls to an internal instance of Consumer, which confuses SpotBugs. -->
+        <Or>
+            <Class name="org.apache.kafka.tools.ClientCompatibilityTest"/>
+            <Class name="org.apache.kafka.tools.StreamsResetter"/>
+        </Or>
+        <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
+    </Match>
+
 </FindBugsFilter>
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
index a706d74d66e..7f00f9ddd48 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -131,9 +133,14 @@ public class TaskAssignorIntegrationTest {
 
             final Field mainConsumer = StreamThread.class.getDeclaredField("mainConsumer");
             mainConsumer.setAccessible(true);
-            final KafkaConsumer<?, ?> consumer = (KafkaConsumer<?, ?>) mainConsumer.get(streamThread);
+            final KafkaConsumer<?, ?> parentConsumer = (KafkaConsumer<?, ?>) mainConsumer.get(streamThread);
 
-            final Field assignors = KafkaConsumer.class.getDeclaredField("assignors");
+            final Field delegate = KafkaConsumer.class.getDeclaredField("delegate");
+            delegate.setAccessible(true);
+            final Consumer<?, ?> consumer = (Consumer<?, ?>)  delegate.get(parentConsumer);
+            assertThat(consumer, instanceOf(LegacyKafkaConsumer.class));
+
+            final Field assignors = LegacyKafkaConsumer.class.getDeclaredField("assignors");
             assignors.setAccessible(true);
             final List<ConsumerPartitionAssignor> consumerPartitionAssignors = (List<ConsumerPartitionAssignor>) assignors.get(consumer);
             final StreamsPartitionAssignor streamsPartitionAssignor = (StreamsPartitionAssignor) consumerPartitionAssignors.get(0);