You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/17 17:58:57 UTC

[kafka] branch trunk updated: KAFKA-7853: Refactor coordinator config (#6854)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1b9e107  KAFKA-7853: Refactor coordinator config (#6854)
1b9e107 is described below

commit 1b9e1073885951697f34950a1ea706c93826e871
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Mon Jun 17 10:58:43 2019 -0700

    KAFKA-7853: Refactor coordinator config (#6854)
    
    An attempt to refactor current coordinator logic.
    
    Reviewers: Stanislav Kozlovski <st...@outlook.com>, Konstantine Karantasis <ko...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../apache/kafka/clients/CommonClientConfigs.java  |  38 +++++++-
 .../apache/kafka/clients/GroupRebalanceConfig.java | 100 +++++++++++++++++++
 .../kafka/clients/consumer/ConsumerConfig.java     |  37 ++-----
 .../kafka/clients/consumer/KafkaConsumer.java      |  29 ++----
 .../consumer/internals/AbstractCoordinator.java    | 106 ++++++++-------------
 .../consumer/internals/ConsumerCoordinator.java    |  51 ++++------
 .../clients/consumer/internals/Heartbeat.java      |  37 +++----
 .../kafka/clients/CommonClientConfigsTest.java     |   2 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  42 ++++----
 .../internals/AbstractCoordinatorTest.java         |  34 +++++--
 .../internals/ConsumerCoordinatorTest.java         |  96 ++++++++++++-------
 .../clients/consumer/internals/HeartbeatTest.java  |  21 +++-
 .../runtime/distributed/DistributedConfig.java     |  13 +--
 .../runtime/distributed/WorkerCoordinator.java     |  22 ++---
 .../runtime/distributed/WorkerGroupMember.java     |   9 +-
 .../WorkerCoordinatorIncrementalTest.java          |  37 +++----
 .../runtime/distributed/WorkerCoordinatorTest.java |  38 ++++----
 17 files changed, 409 insertions(+), 303 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 49465dc..d8428a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Some configurations shared by both producer and consumer
+ * Configurations shared by Kafka client applications: producer, consumer, connect, etc.
  */
 public class CommonClientConfigs {
     private static final Logger log = LoggerFactory.getLogger(CommonClientConfigs.class);
@@ -101,6 +101,42 @@ public class CommonClientConfigs {
                                                          + "elapses the client will resend the request if necessary or fail the request if "
                                                          + "retries are exhausted.";
 
+    public static final String GROUP_ID_CONFIG = "group.id";
+    public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
+
+    public static final String GROUP_INSTANCE_ID_CONFIG = "group.instance.id";
+    public static final String GROUP_INSTANCE_ID_DOC = "A unique identifier of the consumer instance provided by end user. " +
+            "Only non-empty strings are permitted. If set, the consumer is treated as a static member, " +
+            "which means that only one instance with this ID is allowed in the consumer group at any time. " +
+            "This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability " +
+            "(e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.";
+
+    public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
+    public static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll() when using " +
+            "consumer group management. This places an upper bound on the amount of time that the consumer can be idle " +
+            "before fetching more records. If poll() is not called before expiration of this timeout, then the consumer " +
+            "is considered failed and the group will rebalance in order to reassign the partitions to another member. ";
+
+    public static final String REBALANCE_TIMEOUT_MS_CONFIG = "rebalance.timeout.ms";
+    public static final String REBALANCE_TIMEOUT_MS_DOC = "The maximum allowed time for each worker to join the group " +
+            "once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to " +
+            "flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed " +
+            "from the group, which will cause offset commit failures.";
+
+    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
+    public static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect client failures when using " +
+            "Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness " +
+            "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, " +
+            "then the broker will remove this client from the group and initiate a rebalance. Note that the value " +
+            "must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> " +
+            "and <code>group.max.session.timeout.ms</code>.";
+
+    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
+    public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer " +
+            "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the " +
+            "consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. " +
+            "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher " +
+            "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
 
     /**
      * Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
diff --git a/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java b/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java
new file mode 100644
index 0000000..006800a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * Class to extract group rebalance related configs.
+ */
+public class GroupRebalanceConfig {
+
+    public enum ProtocolType {
+        CONSUMER,
+        CONNECT;
+
+        @Override
+        public String toString() {
+            return super.toString().toLowerCase(Locale.ROOT);
+        }
+    }
+
+    public final int sessionTimeoutMs;
+    public final int rebalanceTimeoutMs;
+    public final int heartbeatIntervalMs;
+    public final String groupId;
+    public final Optional<String> groupInstanceId;
+    public final long retryBackoffMs;
+    public final boolean leaveGroupOnClose;
+
+    public GroupRebalanceConfig(AbstractConfig config, ProtocolType protocolType) {
+        this.sessionTimeoutMs = config.getInt(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG);
+
+        // Consumer and Connect use different config names for defining rebalance timeout
+        if (protocolType == ProtocolType.CONSUMER) {
+            this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+        } else {
+            this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG);
+        }
+
+        this.heartbeatIntervalMs = config.getInt(CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG);
+        this.groupId = config.getString(CommonClientConfigs.GROUP_ID_CONFIG);
+
+        // Static membership is only introduced in consumer API.
+        if (protocolType == ProtocolType.CONSUMER) {
+            String groupInstanceId = config.getString(CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG);
+            if (groupInstanceId != null) {
+                JoinGroupRequest.validateGroupInstanceId(groupInstanceId);
+                this.groupInstanceId = Optional.of(groupInstanceId);
+            } else {
+                this.groupInstanceId = Optional.empty();
+            }
+        } else {
+            this.groupInstanceId = Optional.empty();
+        }
+
+        this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
+
+        // Internal leave group config is only defined in Consumer.
+        if (protocolType == ProtocolType.CONSUMER) {
+            this.leaveGroupOnClose = config.getBoolean("internal.leave.group.on.close");
+        } else {
+            this.leaveGroupOnClose = true;
+        }
+    }
+
+    // For testing purpose.
+    public GroupRebalanceConfig(final int sessionTimeoutMs,
+                                final int rebalanceTimeoutMs,
+                                final int heartbeatIntervalMs,
+                                String groupId,
+                                Optional<String> groupInstanceId,
+                                long retryBackoffMs,
+                                boolean leaveGroupOnClose) {
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.heartbeatIntervalMs = heartbeatIntervalMs;
+        this.groupId = groupId;
+        this.groupInstanceId = groupInstanceId;
+        this.retryBackoffMs = retryBackoffMs;
+        this.leaveGroupOnClose = leaveGroupOnClose;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 010fff8..7eb34d4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -50,50 +50,33 @@ public class ConsumerConfig extends AbstractConfig {
     /**
      * <code>group.id</code>
      */
-    public static final String GROUP_ID_CONFIG = "group.id";
-    private static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
+    public static final String GROUP_ID_CONFIG = CommonClientConfigs.GROUP_ID_CONFIG;
+    private static final String GROUP_ID_DOC = CommonClientConfigs.GROUP_ID_DOC;
 
     /**
      * <code>group.instance.id</code>
      */
-    public static final String GROUP_INSTANCE_ID_CONFIG = "group.instance.id";
-    private static final String GROUP_INSTANCE_ID_DOC = "A unique identifier of the consumer instance provided by end user. " +
-            "Only non-empty strings are permitted. If set, the consumer is treated as a static member, " +
-            "which means that only one instance with this ID is allowed in the consumer group at any time. " +
-            "This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability " +
-            "(e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.";
+    public static final String GROUP_INSTANCE_ID_CONFIG = CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG;
+    private static final String GROUP_INSTANCE_ID_DOC = CommonClientConfigs.GROUP_INSTANCE_ID_DOC;
 
     /** <code>max.poll.records</code> */
     public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
     private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll().";
 
     /** <code>max.poll.interval.ms</code> */
-    public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
-    private static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll() when using " +
-            "consumer group management. This places an upper bound on the amount of time that the consumer can be idle " +
-            "before fetching more records. If poll() is not called before expiration of this timeout, then the consumer " +
-            "is considered failed and the group will rebalance in order to reassign the partitions to another member. ";
-
+    public static final String MAX_POLL_INTERVAL_MS_CONFIG = CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
+    private static final String MAX_POLL_INTERVAL_MS_DOC = CommonClientConfigs.MAX_POLL_INTERVAL_MS_DOC;
     /**
      * <code>session.timeout.ms</code>
      */
-    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
-    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect consumer failures when using " +
-            "Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness " +
-            "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, " +
-            "then the broker will remove this consumer from the group and initiate a rebalance. Note that the value " +
-            "must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> " +
-            "and <code>group.max.session.timeout.ms</code>.";
+    public static final String SESSION_TIMEOUT_MS_CONFIG = CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;
+    private static final String SESSION_TIMEOUT_MS_DOC = CommonClientConfigs.SESSION_TIMEOUT_MS_DOC;
 
     /**
      * <code>heartbeat.interval.ms</code>
      */
-    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
-    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer " +
-            "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the " +
-            "consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. " +
-            "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher " +
-            "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
+    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
+    private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
     /**
      * <code>bootstrap.servers</code>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 5be065d..79afafa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
@@ -27,7 +28,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
-import org.apache.kafka.clients.consumer.internals.Heartbeat;
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
@@ -50,7 +50,6 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.requests.IsolationLevel;
-import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
@@ -568,7 +567,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final Logger log;
     private final String clientId;
     private String groupId;
-    private Optional<String> groupInstanceId;
     private final ConsumerCoordinator coordinator;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
@@ -674,15 +672,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.clientId = clientId;
             this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
 
+            GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
+                                                                                 GroupRebalanceConfig.ProtocolType.CONSUMER);
             LogContext logContext;
             // If group.instance.id is set, we will append it to the log context.
-            String groupInstanceId = config.getString(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
-            if (groupInstanceId != null) {
-                JoinGroupRequest.validateGroupInstanceId(groupInstanceId);
-                this.groupInstanceId = Optional.of(groupInstanceId);
-                logContext = new LogContext("[Consumer instanceId=" + groupInstanceId + ", clientId=" + clientId + ", groupId=" + groupId + "] ");
+            if (groupRebalanceConfig.groupInstanceId.isPresent()) {
+                logContext = new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() +
+                        ", clientId=" + clientId + ", groupId=" + groupId + "] ");
             } else {
-                this.groupInstanceId = Optional.empty();
                 logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
             }
 
@@ -773,28 +770,20 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                     PartitionAssignor.class);
 
-            int maxPollIntervalMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
-            int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
             // no coordinator will be constructed for the default (null) group id
             this.coordinator = groupId == null ? null :
-                new ConsumerCoordinator(logContext,
+                new ConsumerCoordinator(groupRebalanceConfig,
+                        logContext,
                         this.client,
-                        groupId,
-                        this.groupInstanceId,
-                        maxPollIntervalMs,
-                        sessionTimeoutMs,
-                        new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs),
                         assignors,
                         this.metadata,
                         this.subscriptions,
                         metrics,
                         metricGrpPrefix,
                         this.time,
-                        retryBackoffMs,
                         enableAutoCommit,
                         config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
-                        this.interceptors,
-                        config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
+                        this.interceptors);
             this.fetcher = new Fetcher<>(
                     logContext,
                     this.client,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 30277b3..efe6cb7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -71,7 +72,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -114,15 +114,11 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     private final Logger log;
-    private final int sessionTimeoutMs;
     private final GroupCoordinatorMetrics sensors;
     private final Heartbeat heartbeat;
-    protected final int rebalanceTimeoutMs;
-    protected final String groupId;
-    protected final Optional<String> groupInstanceId;
+    private final GroupRebalanceConfig rebalanceConfig;
     protected final ConsumerNetworkClient client;
     protected final Time time;
-    protected final long retryBackoffMs;
 
     private HeartbeatThread heartbeatThread = null;
     private boolean rejoinNeeded = true;
@@ -133,52 +129,24 @@ public abstract class AbstractCoordinator implements Closeable {
     private Generation generation = Generation.NO_GENERATION;
 
     private RequestFuture<Void> findCoordinatorFuture = null;
-    private final boolean leaveGroupOnClose;
 
     /**
      * Initialize the coordination manager.
      */
-    public AbstractCoordinator(LogContext logContext,
+    public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
+                               LogContext logContext,
                                ConsumerNetworkClient client,
-                               String groupId,
-                               Optional<String> groupInstanceId,
-                               int rebalanceTimeoutMs,
-                               int sessionTimeoutMs,
-                               Heartbeat heartbeat,
                                Metrics metrics,
                                String metricGrpPrefix,
-                               Time time,
-                               long retryBackoffMs,
-                               boolean leaveGroupOnClose) {
+                               Time time) {
+        Objects.requireNonNull(rebalanceConfig.groupId,
+                               "Expected a non-null group id for coordinator construction");
+        this.rebalanceConfig = rebalanceConfig;
         this.log = logContext.logger(AbstractCoordinator.class);
         this.client = client;
         this.time = time;
-        this.groupId = Objects.requireNonNull(groupId,
-                "Expected a non-null group id for coordinator construction");
-        this.groupInstanceId = groupInstanceId;
-        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
-        this.sessionTimeoutMs = sessionTimeoutMs;
-        this.heartbeat = heartbeat;
+        this.heartbeat = new Heartbeat(rebalanceConfig, time);
         this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
-        this.retryBackoffMs = retryBackoffMs;
-        this.leaveGroupOnClose = leaveGroupOnClose;
-    }
-
-    public AbstractCoordinator(LogContext logContext,
-                               ConsumerNetworkClient client,
-                               String groupId,
-                               Optional<String> groupInstanceId,
-                               int rebalanceTimeoutMs,
-                               int sessionTimeoutMs,
-                               int heartbeatIntervalMs,
-                               Metrics metrics,
-                               String metricGrpPrefix,
-                               Time time,
-                               long retryBackoffMs,
-                               boolean leaveGroupOnClose) {
-        this(logContext, client, groupId, groupInstanceId, rebalanceTimeoutMs, sessionTimeoutMs,
-                new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs),
-                metrics, metricGrpPrefix, time, retryBackoffMs, leaveGroupOnClose);
     }
 
     /**
@@ -263,7 +231,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 // we found the coordinator, but the connection has failed, so mark
                 // it dead and backoff before retrying discovery
                 markCoordinatorUnknown();
-                timer.sleep(retryBackoffMs);
+                timer.sleep(rebalanceConfig.retryBackoffMs);
             }
         } while (coordinatorUnknown() && timer.notExpired());
 
@@ -438,7 +406,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 else if (!future.isRetriable())
                     throw exception;
 
-                timer.sleep(retryBackoffMs);
+                timer.sleep(rebalanceConfig.retryBackoffMs);
             }
         }
         return true;
@@ -505,13 +473,13 @@ public abstract class AbstractCoordinator implements Closeable {
         log.info("(Re-)joining group");
         JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
                 new JoinGroupRequestData()
-                        .setGroupId(groupId)
-                        .setSessionTimeoutMs(this.sessionTimeoutMs)
+                        .setGroupId(rebalanceConfig.groupId)
+                        .setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
                         .setMemberId(this.generation.memberId)
-                        .setGroupInstanceId(this.groupInstanceId.orElse(null))
+                        .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                         .setProtocolType(protocolType())
                         .setProtocols(metadata())
-                        .setRebalanceTimeoutMs(this.rebalanceTimeoutMs)
+                        .setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
         );
 
         log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
@@ -519,7 +487,7 @@ public abstract class AbstractCoordinator implements Closeable {
         // Note that we override the request timeout using the rebalance timeout since that is the
         // maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays.
 
-        int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000);
+        int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000);
         return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
                 .compose(new JoinGroupResponseHandler());
     }
@@ -573,9 +541,9 @@ public abstract class AbstractCoordinator implements Closeable {
                 // log the error and re-throw the exception
                 log.error("Attempt to join group failed due to fatal error: {}", error.message());
                 if (error == Errors.GROUP_MAX_SIZE_REACHED) {
-                    future.raise(new GroupMaxSizeReachedException(groupId));
+                    future.raise(new GroupMaxSizeReachedException(rebalanceConfig.groupId));
                 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                    future.raise(new GroupAuthorizationException(groupId));
+                    future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
                 } else {
                     future.raise(error);
                 }
@@ -606,9 +574,9 @@ public abstract class AbstractCoordinator implements Closeable {
         SyncGroupRequest.Builder requestBuilder =
                 new SyncGroupRequest.Builder(
                         new SyncGroupRequestData()
-                                .setGroupId(groupId)
+                                .setGroupId(rebalanceConfig.groupId)
                                 .setMemberId(generation.memberId)
-                                .setGroupInstanceId(this.groupInstanceId.orElse(null))
+                                .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                                 .setGenerationId(generation.generationId)
                                 .setAssignments(Collections.emptyList())
                 );
@@ -633,9 +601,9 @@ public abstract class AbstractCoordinator implements Closeable {
             SyncGroupRequest.Builder requestBuilder =
                     new SyncGroupRequest.Builder(
                             new SyncGroupRequestData()
-                                    .setGroupId(groupId)
+                                    .setGroupId(rebalanceConfig.groupId)
                                     .setMemberId(generation.memberId)
-                                    .setGroupInstanceId(this.groupInstanceId.orElse(null))
+                                    .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                                     .setGenerationId(generation.generationId)
                                     .setAssignments(groupAssignmentList)
                     );
@@ -665,7 +633,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 requestRejoin();
 
                 if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                    future.raise(new GroupAuthorizationException(groupId));
+                    future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.debug("SyncGroup failed because the group began another rebalance");
                     future.raise(error);
@@ -701,7 +669,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 new FindCoordinatorRequest.Builder(
                         new FindCoordinatorRequestData()
                             .setKeyType(CoordinatorType.GROUP.id())
-                            .setKey(this.groupId));
+                            .setKey(this.rebalanceConfig.groupId));
         return client.send(node, requestBuilder)
                 .compose(new FindCoordinatorResponseHandler());
     }
@@ -731,7 +699,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 }
                 future.complete(null);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                future.raise(new GroupAuthorizationException(groupId));
+                future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
             } else {
                 log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
                 future.raise(error);
@@ -854,7 +822,7 @@ public abstract class AbstractCoordinator implements Closeable {
             // Synchronize after closing the heartbeat thread since heartbeat thread
             // needs this lock to complete and terminate after close flag is set.
             synchronized (this) {
-                if (leaveGroupOnClose) {
+                if (rebalanceConfig.leaveGroupOnClose) {
                     maybeLeaveGroup();
                 }
 
@@ -883,7 +851,7 @@ public abstract class AbstractCoordinator implements Closeable {
             // attempt any resending if the request fails or times out.
             log.info("Member {} sending LeaveGroup request to coordinator {}", generation.memberId, coordinator);
             LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(new LeaveGroupRequestData()
-                    .setGroupId(groupId).setMemberId(generation.memberId));
+                    .setGroupId(rebalanceConfig.groupId).setMemberId(generation.memberId));
             client.send(coordinator, request)
                     .compose(new LeaveGroupResponseHandler());
             client.pollNoWakeup();
@@ -893,7 +861,7 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     protected boolean isDynamicMember() {
-        return !groupInstanceId.isPresent();
+        return !rebalanceConfig.groupInstanceId.isPresent();
     }
 
     private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
@@ -915,9 +883,9 @@ public abstract class AbstractCoordinator implements Closeable {
         log.debug("Sending Heartbeat request to coordinator {}", coordinator);
         HeartbeatRequest.Builder requestBuilder =
                 new HeartbeatRequest.Builder(new HeartbeatRequestData()
-                        .setGroupId(groupId)
+                        .setGroupId(rebalanceConfig.groupId)
                         .setMemberId(this.generation.memberId)
-                        .setGroupInstanceId(this.groupInstanceId.orElse(null))
+                        .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                         .setGenerationId(this.generation.generationId));
         return client.send(coordinator, requestBuilder)
                 .compose(new HeartbeatResponseHandler());
@@ -953,7 +921,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 resetGeneration();
                 future.raise(error);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                future.raise(new GroupAuthorizationException(groupId));
+                future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
             } else {
                 future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
             }
@@ -1051,7 +1019,7 @@ public abstract class AbstractCoordinator implements Closeable {
         private AtomicReference<RuntimeException> failed = new AtomicReference<>(null);
 
         private HeartbeatThread() {
-            super(HEARTBEAT_THREAD_PREFIX + (groupId.isEmpty() ? "" : " | " + groupId), true);
+            super(HEARTBEAT_THREAD_PREFIX + (rebalanceConfig.groupId.isEmpty() ? "" : " | " + rebalanceConfig.groupId), true);
         }
 
         public void enable() {
@@ -1113,7 +1081,7 @@ public abstract class AbstractCoordinator implements Closeable {
                             if (findCoordinatorFuture != null || lookupCoordinator().failed())
                                 // the immediate future check ensures that we backoff properly in the case that no
                                 // brokers are available to connect to.
-                                AbstractCoordinator.this.wait(retryBackoffMs);
+                                AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
                         } else if (heartbeat.sessionTimeoutExpired(now)) {
                             // the session timeout has expired without seeing a successful heartbeat, so we should
                             // probably make sure the coordinator is still healthy.
@@ -1131,7 +1099,7 @@ public abstract class AbstractCoordinator implements Closeable {
                         } else if (!heartbeat.shouldHeartbeat(now)) {
                             // poll again after waiting for the retry backoff in case the heartbeat failed or the
                             // coordinator disconnected
-                            AbstractCoordinator.this.wait(retryBackoffMs);
+                            AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
                         } else {
                             heartbeat.sentHeartbeat(now);
 
@@ -1153,7 +1121,7 @@ public abstract class AbstractCoordinator implements Closeable {
                                             // however, then the session timeout may expire before we can rejoin.
                                             heartbeat.receiveHeartbeat();
                                         } else if (e instanceof FencedInstanceIdException) {
-                                            log.error("Caught fenced group.instance.id {} error in heartbeat thread", groupInstanceId);
+                                            log.error("Caught fenced group.instance.id {} error in heartbeat thread", rebalanceConfig.groupInstanceId);
                                             heartbeatThread.failed.set(e);
                                             heartbeatThread.disable();
                                         } else {
@@ -1243,4 +1211,8 @@ public abstract class AbstractCoordinator implements Closeable {
 
     }
 
+    // For testing only
+    public Heartbeat heartbeat() {
+        return heartbeat;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index fbf5db4..a590a1e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -63,7 +64,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -74,6 +74,7 @@ import java.util.stream.Collectors;
  * This class manages the coordination process with the consumer coordinator.
  */
 public final class ConsumerCoordinator extends AbstractCoordinator {
+    private final GroupRebalanceConfig rebalanceConfig;
     private final Logger log;
     private final List<PartitionAssignor> assignors;
     private final ConsumerMetadata metadata;
@@ -120,36 +121,25 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     /**
      * Initialize the coordination manager.
      */
-    public ConsumerCoordinator(LogContext logContext,
+    public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
+                               LogContext logContext,
                                ConsumerNetworkClient client,
-                               String groupId,
-                               Optional<String> groupInstanceId,
-                               int rebalanceTimeoutMs,
-                               int sessionTimeoutMs,
-                               Heartbeat heartbeat,
                                List<PartitionAssignor> assignors,
                                ConsumerMetadata metadata,
                                SubscriptionState subscriptions,
                                Metrics metrics,
                                String metricGrpPrefix,
                                Time time,
-                               long retryBackoffMs,
                                boolean autoCommitEnabled,
                                int autoCommitIntervalMs,
-                               ConsumerInterceptors<?, ?> interceptors,
-                               boolean leaveGroupOnClose) {
-        super(logContext,
+                               ConsumerInterceptors<?, ?> interceptors) {
+        super(rebalanceConfig,
+              logContext,
               client,
-              groupId,
-              groupInstanceId,
-              rebalanceTimeoutMs,
-              sessionTimeoutMs,
-              heartbeat,
               metrics,
               metricGrpPrefix,
-              time,
-              retryBackoffMs,
-              leaveGroupOnClose);
+              time);
+        this.rebalanceConfig = rebalanceConfig;
         this.log = logContext.logger(ConsumerCoordinator.class);
         this.metadata = metadata;
         this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion());
@@ -459,7 +449,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     @Override
     protected void onJoinPrepare(int generation, String memberId) {
         // commit offsets prior to rebalance if auto-commit enabled
-        maybeAutoCommitOffsetsSync(time.timer(rebalanceTimeoutMs));
+        maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
 
         // execute the user's callback before rebalance
         ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
@@ -558,7 +548,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 } else if (!future.isRetriable()) {
                     throw future.exception();
                 } else {
-                    timer.sleep(retryBackoffMs);
+                    timer.sleep(rebalanceConfig.retryBackoffMs);
                 }
             } else {
                 return null;
@@ -585,8 +575,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     // visible for testing
     void invokeCompletedOffsetCommitCallbacks() {
         if (asyncCommitFenced.get()) {
-            throw new FencedInstanceIdException("Get fenced exception for group.instance.id: " +
-                    groupInstanceId.orElse("unset_instance_id") + ", current member.id is " + memberId());
+            throw new FencedInstanceIdException("Get fenced exception for group.instance.id "
+                + rebalanceConfig.groupInstanceId.orElse("unset_instance_id")
+                + ", current member.id is " + memberId());
         }
         while (true) {
             OffsetCommitCompletion completion = completedOffsetCommits.poll();
@@ -698,7 +689,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             if (future.failed() && !future.isRetriable())
                 throw future.exception();
 
-            timer.sleep(retryBackoffMs);
+            timer.sleep(rebalanceConfig.retryBackoffMs);
         } while (timer.notExpired());
 
         return false;
@@ -723,7 +714,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 if (exception instanceof RetriableException) {
                     log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
                         exception);
-                    nextAutoCommitTimer.updateAndReset(retryBackoffMs);
+                    nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs);
                 } else {
                     log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
                 }
@@ -813,10 +804,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
                 new OffsetCommitRequestData()
-                        .setGroupId(this.groupId)
+                        .setGroupId(this.rebalanceConfig.groupId)
                         .setGenerationId(generation.generationId)
                         .setMemberId(generation.memberId)
-                        .setGroupInstanceId(groupInstanceId.orElse(null))
+                        .setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null))
                         .setTopics(new ArrayList<>(requestTopicDataMap.values()))
         );
 
@@ -857,7 +848,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         }
 
                         if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                            future.raise(new GroupAuthorizationException(groupId));
+                            future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
                             return;
                         } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                             unauthorizedTopics.add(tp.topic());
@@ -929,7 +920,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         log.debug("Fetching committed offsets for partitions: {}", partitions);
         // construct the request
-        OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(this.groupId,
+        OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(this.rebalanceConfig.groupId,
                 new ArrayList<>(partitions));
 
         // send the request with a callback
@@ -952,7 +943,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     markCoordinatorUnknown();
                     future.raise(error);
                 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                    future.raise(new GroupAuthorizationException(groupId));
+                    future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
                 } else {
                     future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
                 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index 8a67f31..3bf8c92 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 
@@ -23,10 +24,8 @@ import org.apache.kafka.common.utils.Timer;
  * A helper class for managing the heartbeat to the coordinator
  */
 public final class Heartbeat {
-    private final int sessionTimeoutMs;
-    private final int heartbeatIntervalMs;
     private final int maxPollIntervalMs;
-    private final long retryBackoffMs;
+    private final GroupRebalanceConfig rebalanceConfig;
     private final Time time;
     private final Timer heartbeatTimer;
     private final Timer sessionTimer;
@@ -34,21 +33,15 @@ public final class Heartbeat {
 
     private volatile long lastHeartbeatSend;
 
-    public Heartbeat(Time time,
-                     int sessionTimeoutMs,
-                     int heartbeatIntervalMs,
-                     int maxPollIntervalMs,
-                     long retryBackoffMs) {
-        if (heartbeatIntervalMs >= sessionTimeoutMs)
+    public Heartbeat(GroupRebalanceConfig config,
+                     Time time) {
+        if (config.heartbeatIntervalMs >= config.sessionTimeoutMs)
             throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
-
+        this.rebalanceConfig = config;
         this.time = time;
-        this.sessionTimeoutMs = sessionTimeoutMs;
-        this.heartbeatIntervalMs = heartbeatIntervalMs;
-        this.maxPollIntervalMs = maxPollIntervalMs;
-        this.retryBackoffMs = retryBackoffMs;
-        this.heartbeatTimer = time.timer(heartbeatIntervalMs);
-        this.sessionTimer = time.timer(sessionTimeoutMs);
+        this.heartbeatTimer = time.timer(config.heartbeatIntervalMs);
+        this.sessionTimer = time.timer(config.sessionTimeoutMs);
+        this.maxPollIntervalMs = config.rebalanceTimeoutMs;
         this.pollTimer = time.timer(maxPollIntervalMs);
     }
 
@@ -66,17 +59,17 @@ public final class Heartbeat {
     public void sentHeartbeat(long now) {
         this.lastHeartbeatSend = now;
         update(now);
-        heartbeatTimer.reset(heartbeatIntervalMs);
+        heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
     }
 
     public void failHeartbeat() {
         update(time.milliseconds());
-        heartbeatTimer.reset(retryBackoffMs);
+        heartbeatTimer.reset(rebalanceConfig.retryBackoffMs);
     }
 
     public void receiveHeartbeat() {
         update(time.milliseconds());
-        sessionTimer.reset(sessionTimeoutMs);
+        sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
     }
 
     public boolean shouldHeartbeat(long now) {
@@ -100,14 +93,14 @@ public final class Heartbeat {
 
     public void resetTimeouts() {
         update(time.milliseconds());
-        sessionTimer.reset(sessionTimeoutMs);
+        sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
         pollTimer.reset(maxPollIntervalMs);
-        heartbeatTimer.reset(heartbeatIntervalMs);
+        heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
     }
 
     public void resetSessionTimeout() {
         update(time.milliseconds());
-        sessionTimer.reset(sessionTimeoutMs);
+        sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
     }
 
     public boolean pollTimeoutExpired(long now) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java b/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
index 63a9312..d65e6d1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
@@ -58,7 +58,7 @@ public class CommonClientConfigsTest {
     }
 
     @Test
-    public void testExponentialBackoffDefaults() throws Exception {
+    public void testExponentialBackoffDefaults() {
         TestConfig defaultConf = new TestConfig(Collections.emptyMap());
         assertEquals(Long.valueOf(50L),
                 defaultConf.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 405ec68..c1adf19 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
@@ -27,7 +28,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
-import org.apache.kafka.clients.consumer.internals.Heartbeat;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
@@ -1895,27 +1895,25 @@ public class KafkaConsumerTest {
         ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time,
                 retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs);
 
-        Heartbeat heartbeat = new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs);
-        ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(
-                loggerFactory,
-                consumerClient,
-                groupId,
-                groupInstanceId,
-                rebalanceTimeoutMs,
-                sessionTimeoutMs,
-                heartbeat,
-                assignors,
-                metadata,
-                subscription,
-                metrics,
-                metricGroupPrefix,
-                time,
-                retryBackoffMs,
-                autoCommitEnabled,
-                autoCommitIntervalMs,
-                interceptors,
-                true);
-
+        GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+                                                                        rebalanceTimeoutMs,
+                                                                        heartbeatIntervalMs,
+                                                                        groupId,
+                                                                        groupInstanceId,
+                                                                        retryBackoffMs,
+                                                                        true);
+        ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(rebalanceConfig,
+                                                                          loggerFactory,
+                                                                          consumerClient,
+                                                                          assignors,
+                                                                          metadata,
+                                                                          subscription,
+                                                                          metrics,
+                                                                          metricGroupPrefix,
+                                                                          time,
+                                                                          autoCommitEnabled,
+                                                                          autoCommitIntervalMs,
+                                                                          interceptors);
         Fetcher<String, String> fetcher = new Fetcher<>(
                 loggerFactory,
                 consumerClient,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 0fc5f62..659ef5f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
@@ -104,14 +105,30 @@ public class AbstractCoordinatorTest {
                 logContext, new ClusterResourceListeners());
 
         this.mockClient = new MockClient(mockTime, metadata);
-        this.consumerClient = new ConsumerNetworkClient(logContext, mockClient, metadata, mockTime,
-                retryBackoffMs, REQUEST_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS);
+        this.consumerClient = new ConsumerNetworkClient(logContext,
+                                                        mockClient,
+                                                        metadata,
+                                                        mockTime,
+                                                        retryBackoffMs,
+                                                        REQUEST_TIMEOUT_MS,
+                                                        HEARTBEAT_INTERVAL_MS);
         Metrics metrics = new Metrics();
 
         mockClient.updateMetadata(TestUtils.metadataUpdateWith(1, emptyMap()));
         this.node = metadata.fetch().nodes().get(0);
         this.coordinatorNode = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
-        this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime, rebalanceTimeoutMs, retryBackoffMs, groupInstanceId);
+
+        GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(SESSION_TIMEOUT_MS,
+                                                                        rebalanceTimeoutMs,
+                                                                        HEARTBEAT_INTERVAL_MS,
+                                                                        GROUP_ID,
+                                                                        groupInstanceId,
+                                                                        retryBackoffMs,
+                                                                        !groupInstanceId.isPresent());
+        this.coordinator = new DummyCoordinator(rebalanceConfig,
+                                                consumerClient,
+                                                metrics,
+                                                mockTime);
     }
 
     @Test
@@ -850,14 +867,11 @@ public class AbstractCoordinatorTest {
         private int onJoinCompleteInvokes = 0;
         private boolean wakeupOnJoinComplete = false;
 
-        public DummyCoordinator(ConsumerNetworkClient client,
+        public DummyCoordinator(GroupRebalanceConfig rebalanceConfig,
+                                ConsumerNetworkClient client,
                                 Metrics metrics,
-                                Time time,
-                                int rebalanceTimeoutMs,
-                                int retryBackoffMs,
-                                Optional<String> groupInstanceId) {
-            super(new LogContext(), client, GROUP_ID, groupInstanceId, rebalanceTimeoutMs,
-                    SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, !groupInstanceId.isPresent());
+                                Time time) {
+            super(rebalanceConfig, new LogContext(), client, metrics, METRIC_GROUP_PREFIX, time);
         }
 
         @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 4b377ea..c54540e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -118,8 +119,7 @@ public class ConsumerCoordinatorTest {
     private final int autoCommitIntervalMs = 2000;
     private final int requestTimeoutMs = 30000;
     private final MockTime time = new MockTime();
-    private final Heartbeat heartbeat = new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs,
-            rebalanceTimeoutMs, retryBackoffMs);
+    private GroupRebalanceConfig rebalanceConfig;
 
     private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
     private List<PartitionAssignor> assignors = Collections.singletonList(partitionAssignor);
@@ -153,8 +153,21 @@ public class ConsumerCoordinatorTest {
         this.rebalanceListener = new MockRebalanceListener();
         this.mockOffsetCommitCallback = new MockCommitCallback();
         this.partitionAssignor.clear();
+        this.rebalanceConfig = buildRebalanceConfig(Optional.empty());
+        this.coordinator = buildCoordinator(rebalanceConfig,
+                                            metrics,
+                                            assignors,
+                                            false);
+    }
 
-        this.coordinator = buildCoordinator(metrics, assignors, false, Optional.empty());
+    private GroupRebalanceConfig buildRebalanceConfig(Optional<String> groupInstanceId) {
+        return new GroupRebalanceConfig(sessionTimeoutMs,
+                                        rebalanceTimeoutMs,
+                                        heartbeatIntervalMs,
+                                        groupId,
+                                        groupInstanceId,
+                                        retryBackoffMs,
+                                        !groupInstanceId.isPresent());
     }
 
     @After
@@ -741,10 +754,10 @@ public class ConsumerCoordinatorTest {
 
         assertTrue(coordinator.coordinatorUnknown());
         assertFalse(coordinator.poll(time.timer(0)));
-        assertEquals(time.milliseconds(), heartbeat.lastPollTime());
+        assertEquals(time.milliseconds(), coordinator.heartbeat().lastPollTime());
 
         time.sleep(rebalanceTimeoutMs - 1);
-        assertFalse(heartbeat.pollTimeoutExpired(time.milliseconds()));
+        assertFalse(coordinator.heartbeat().pollTimeoutExpired(time.milliseconds()));
     }
 
     @Test
@@ -1044,9 +1057,6 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testWakeupFromAssignmentCallback() {
-        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                false, Optional.empty());
-
         final String topic = "topic1";
         TopicPartition partition = new TopicPartition(topic, 0);
         final String consumerId = "follower";
@@ -1162,7 +1172,10 @@ public class ConsumerCoordinatorTest {
         metadata = new ConsumerMetadata(0, Long.MAX_VALUE, includeInternalTopics,
                 false, subscriptions, new LogContext(), new ClusterResourceListeners());
         client = new MockClient(time, metadata);
-        coordinator = buildCoordinator(new Metrics(), assignors, false, Optional.empty());
+        coordinator = buildCoordinator(rebalanceConfig,
+                                       new Metrics(),
+                                       assignors,
+                                       false);
 
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
 
@@ -1290,8 +1303,10 @@ public class ConsumerCoordinatorTest {
     public void testAutoCommitDynamicAssignment() {
         final String consumerId = "consumer";
 
-        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                true, groupInstanceId);
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                                                           new Metrics(),
+                                                           assignors,
+                                                           true);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
         joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
@@ -1306,8 +1321,10 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testAutoCommitRetryBackoff() {
         final String consumerId = "consumer";
-        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                true, groupInstanceId);
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                                                           new Metrics(),
+                                                           assignors,
+                                                           true);
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
         joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
 
@@ -1340,7 +1357,10 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testAutoCommitAwaitsInterval() {
         final String consumerId = "consumer";
-        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, true, groupInstanceId);
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                                                           new Metrics(),
+                                                           assignors,
+                                                           true);
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
         joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
 
@@ -1378,8 +1398,10 @@ public class ConsumerCoordinatorTest {
     public void testAutoCommitDynamicAssignmentRebalance() {
         final String consumerId = "consumer";
 
-        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                true, groupInstanceId);
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                                                           new Metrics(),
+                                                           assignors,
+                                                           true);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
@@ -1404,8 +1426,10 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitManualAssignment() {
-        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                true, groupInstanceId);
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                                                           new Metrics(),
+                                                           assignors,
+                                                           true);
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.seek(t1p, 100);
@@ -1421,8 +1445,10 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitManualAssignmentCoordinatorUnknown() {
-        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                true, groupInstanceId);
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                                                           new Metrics(),
+                                                           assignors,
+                                                           true);
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.seek(t1p, 100);
@@ -2066,8 +2092,10 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitAfterCoordinatorBackToService() {
-        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                true, groupInstanceId);
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                                                           new Metrics(),
+                                                           assignors,
+                                                           true);
 
         subscriptions.assignFromUser(Collections.singleton(t1p));
         subscriptions.seek(t1p, 100L);
@@ -2125,8 +2153,11 @@ public class ConsumerCoordinatorTest {
                                                                final boolean autoCommit,
                                                                final Optional<String> groupInstanceId) {
         final String consumerId = "consumer";
-        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                autoCommit, groupInstanceId);
+        rebalanceConfig = buildRebalanceConfig(groupInstanceId);
+        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+                                                           new Metrics(),
+                                                           assignors,
+                                                           autoCommit);
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
         if (useGroupManagement) {
@@ -2216,30 +2247,23 @@ public class ConsumerCoordinatorTest {
         assertEquals("leaveGroupRequested should be " + shouldLeaveGroup, shouldLeaveGroup, leaveGroupRequested.get());
     }
 
-    private ConsumerCoordinator buildCoordinator(final Metrics metrics,
+    private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanceConfig,
+                                                 final Metrics metrics,
                                                  final List<PartitionAssignor> assignors,
-                                                 final boolean autoCommitEnabled,
-                                                 final Optional<String> groupInstanceId) {
+                                                 final boolean autoCommitEnabled) {
         return new ConsumerCoordinator(
+                rebalanceConfig,
                 new LogContext(),
                 consumerClient,
-                groupId,
-                groupInstanceId,
-                rebalanceTimeoutMs,
-                sessionTimeoutMs,
-                heartbeat,
                 assignors,
                 metadata,
                 subscriptions,
                 metrics,
                 "consumer" + groupId,
                 time,
-                retryBackoffMs,
                 autoCommitEnabled,
                 autoCommitIntervalMs,
-                null,
-                !groupInstanceId.isPresent()
-        );
+                null);
     }
 
     private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
index c382de6..b014bec 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
@@ -16,23 +16,38 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.common.utils.MockTime;
 
+import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Optional;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class HeartbeatTest {
-
     private int sessionTimeoutMs = 300;
     private int heartbeatIntervalMs = 100;
     private int maxPollIntervalMs = 900;
     private long retryBackoffMs = 10L;
     private MockTime time = new MockTime();
-    private Heartbeat heartbeat = new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs,
-            maxPollIntervalMs, retryBackoffMs);
+
+    private Heartbeat heartbeat;
+
+    @Before
+    public void setUp() {
+        GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+                                                                        maxPollIntervalMs,
+                                                                        heartbeatIntervalMs,
+                                                                        "group_id",
+                                                                        Optional.empty(),
+                                                                        retryBackoffMs,
+                                                                        true);
+        heartbeat = new Heartbeat(rebalanceConfig, time);
+    }
 
     @Test
     public void testShouldHeartbeat() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index c7ff6d8..bc3bc19 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -39,13 +39,13 @@ public class DistributedConfig extends WorkerConfig {
     /**
      * <code>group.id</code>
      */
-    public static final String GROUP_ID_CONFIG = "group.id";
+    public static final String GROUP_ID_CONFIG = CommonClientConfigs.GROUP_ID_CONFIG;
     private static final String GROUP_ID_DOC = "A unique string that identifies the Connect cluster group this worker belongs to.";
 
     /**
      * <code>session.timeout.ms</code>
      */
-    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
+    public static final String SESSION_TIMEOUT_MS_CONFIG = CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;
     private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect worker failures. " +
             "The worker sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are " +
             "received by the broker before the expiration of this session timeout, then the broker will remove the " +
@@ -56,7 +56,7 @@ public class DistributedConfig extends WorkerConfig {
     /**
      * <code>heartbeat.interval.ms</code>
      */
-    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
+    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
     private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the group " +
             "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the " +
             "worker's session stays active and to facilitate rebalancing when new members join or leave the group. " +
@@ -66,11 +66,8 @@ public class DistributedConfig extends WorkerConfig {
     /**
      * <code>rebalance.timeout.ms</code>
      */
-    public static final String REBALANCE_TIMEOUT_MS_CONFIG = "rebalance.timeout.ms";
-    private static final String REBALANCE_TIMEOUT_MS_DOC = "The maximum allowed time for each worker to join the group " +
-            "once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to " +
-            "flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed " +
-            "from the group, which will cause offset commit failures.";
+    public static final String REBALANCE_TIMEOUT_MS_CONFIG = CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG;
+    private static final String REBALANCE_TIMEOUT_MS_DOC = CommonClientConfigs.REBALANCE_TIMEOUT_MS_DOC;
 
     /**
      * <code>worker.sync.timeout.ms</code>
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 230a272..0b855a0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -39,7 +40,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 
 import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
 import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
@@ -70,33 +70,23 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
     /**
      * Initialize the coordination manager.
      */
-    public WorkerCoordinator(LogContext logContext,
+    public WorkerCoordinator(GroupRebalanceConfig config,
+                             LogContext logContext,
                              ConsumerNetworkClient client,
-                             String groupId,
-                             int rebalanceTimeoutMs,
-                             int sessionTimeoutMs,
-                             int heartbeatIntervalMs,
                              Metrics metrics,
                              String metricGrpPrefix,
                              Time time,
-                             long retryBackoffMs,
                              String restUrl,
                              ConfigBackingStore configStorage,
                              WorkerRebalanceListener listener,
                              ConnectProtocolCompatibility protocolCompatibility,
                              int maxDelay) {
-        super(logContext,
+        super(config,
+              logContext,
               client,
-              groupId,
-              Optional.empty(),
-              rebalanceTimeoutMs,
-              sessionTimeoutMs,
-              heartbeatIntervalMs,
               metrics,
               metricGrpPrefix,
-              time,
-              retryBackoffMs,
-              true);
+              time);
         this.log = logContext.logger(WorkerCoordinator.class);
         this.restUrl = restUrl;
         this.configStorage = configStorage;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 99ea3a4..94cf97d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.JmxReporter;
@@ -79,8 +80,6 @@ public class WorkerGroupMember {
             this.clientId = clientId;
             this.log = logContext.logger(WorkerGroupMember.class);
 
-            String groupId = config.getString(DistributedConfig.GROUP_ID_CONFIG);
-
             Map<String, String> metricsTags = new LinkedHashMap<>();
             metricsTags.put("client-id", clientId);
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
@@ -124,16 +123,12 @@ public class WorkerGroupMember {
                     config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
                     Integer.MAX_VALUE);
             this.coordinator = new WorkerCoordinator(
+                    new GroupRebalanceConfig(config, GroupRebalanceConfig.ProtocolType.CONNECT),
                     logContext,
                     this.client,
-                    groupId,
-                    config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG),
-                    config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG),
-                    config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
                     metrics,
                     metricGrpPrefix,
                     this.time,
-                    retryBackoffMs,
                     restUrl,
                     configStorage,
                     listener,
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
index f06976a..b1d3ec6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.distributed;
 
+import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
@@ -45,6 +46,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
 import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
@@ -93,6 +95,7 @@ public class WorkerCoordinatorIncrementalTest {
     private MockRebalanceListener rebalanceListener;
     @Mock
     private KafkaConfigBackingStore configStorage;
+    private GroupRebalanceConfig rebalanceConfig;
     private WorkerCoordinator coordinator;
     private int rebalanceDelay = DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT;
 
@@ -150,22 +153,24 @@ public class WorkerCoordinatorIncrementalTest {
 
         this.configStorageCalls = 0;
 
-        this.coordinator = new WorkerCoordinator(
-                loggerFactory,
-                consumerClient,
-                groupId,
-                rebalanceTimeoutMs,
-                sessionTimeoutMs,
-                heartbeatIntervalMs,
-                metrics,
-                "worker" + groupId,
-                time,
-                retryBackoffMs,
-                expectedUrl(leaderId),
-                configStorage,
-                rebalanceListener,
-                compatibility,
-                rebalanceDelay);
+        this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+                                                        rebalanceTimeoutMs,
+                                                        heartbeatIntervalMs,
+                                                        groupId,
+                                                        Optional.empty(),
+                                                        retryBackoffMs,
+                                                        true);
+        this.coordinator = new WorkerCoordinator(rebalanceConfig,
+                                                 loggerFactory,
+                                                 consumerClient,
+                                                 metrics,
+                                                 "worker" + groupId,
+                                                 time,
+                                                 expectedUrl(leaderId),
+                                                 configStorage,
+                                                 rebalanceListener,
+                                                 compatibility,
+                                                 rebalanceDelay);
 
         configState1 = clusterConfigState(offset, 2, 4);
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 182d6bd..eac13d1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.distributed;
 
+import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
@@ -56,6 +57,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -90,6 +92,7 @@ public class WorkerCoordinatorTest {
     private ConsumerNetworkClient consumerClient;
     private MockRebalanceListener rebalanceListener;
     @Mock private KafkaConfigBackingStore configStorage;
+    private GroupRebalanceConfig rebalanceConfig;
     private WorkerCoordinator coordinator;
 
     private ClusterConfigState configState1;
@@ -125,23 +128,24 @@ public class WorkerCoordinatorTest {
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();
         this.configStorage = PowerMock.createMock(KafkaConfigBackingStore.class);
-
-        this.coordinator = new WorkerCoordinator(
-                logContext,
-                consumerClient,
-                groupId,
-                rebalanceTimeoutMs,
-                sessionTimeoutMs,
-                heartbeatIntervalMs,
-                metrics,
-                "consumer" + groupId,
-                time,
-                retryBackoffMs,
-                LEADER_URL,
-                configStorage,
-                rebalanceListener,
-                compatibility,
-                0);
+        this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+                                                        rebalanceTimeoutMs,
+                                                        heartbeatIntervalMs,
+                                                        groupId,
+                                                        Optional.empty(),
+                                                        retryBackoffMs,
+                                                        true);
+        this.coordinator = new WorkerCoordinator(rebalanceConfig,
+                                                 logContext,
+                                                 consumerClient,
+                                                 metrics,
+                                                 "consumer" + groupId,
+                                                 time,
+                                                 LEADER_URL,
+                                                 configStorage,
+                                                 rebalanceListener,
+                                                 compatibility,
+                                                 0);
 
         configState1 = new ClusterConfigState(
                 1L,