You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/05/08 00:43:28 UTC

[kafka] branch trunk updated: KAFKA-8284: enable static membership on KStream (#6673)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b0e82a6  KAFKA-8284: enable static membership on KStream (#6673)
b0e82a6 is described below

commit b0e82a68b34512d23a5a60a9b5da0db86eb880b2
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Tue May 7 17:43:13 2019 -0700

    KAFKA-8284: enable static membership on KStream (#6673)
    
    Part of KIP-345 effort. The strategy is to extract user passed in group.instance.id config and pass it in with given thread-id (because consumer is currently per-thread level).
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../kafka/clients/consumer/ConsumerConfig.java     |  15 ---
 .../kafka/clients/consumer/KafkaConsumer.java      |   3 +-
 .../consumer/internals/AbstractCoordinator.java    |  14 +--
 .../consumer/internals/ConsumerCoordinator.java    |   6 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   3 +-
 .../internals/AbstractCoordinatorTest.java         |   2 +-
 .../internals/ConsumerCoordinatorTest.java         |   3 +-
 .../runtime/distributed/WorkerCoordinator.java     |   3 +-
 .../apache/kafka/streams/KafkaClientSupplier.java  |   2 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |  18 +++-
 .../streams/processor/internals/StreamThread.java  |   5 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java |   1 -
 .../apache/kafka/streams/StreamsConfigTest.java    | 120 ++++++++++-----------
 .../integration/AbstractJoinIntegrationTest.java   |   1 -
 .../integration/AbstractResetIntegrationTest.java  |   1 -
 .../streams/integration/EosIntegrationTest.java    |   1 -
 .../FineGrainedAutoResetIntegrationTest.java       |   1 -
 .../GlobalKTableEOSIntegrationTest.java            |   1 -
 .../integration/GlobalKTableIntegrationTest.java   |   1 -
 .../integration/GlobalThreadShutDownOrderTest.java |   1 -
 .../integration/InternalTopicIntegrationTest.java  |   1 -
 .../KStreamAggregationDedupIntegrationTest.java    |   1 -
 .../KStreamAggregationIntegrationTest.java         |   1 -
 .../KTableSourceTopicRestartIntegrationTest.java   |   1 -
 .../PurgeRepartitionTopicIntegrationTest.java      |   1 -
 .../integration/QueryableStateIntegrationTest.java |   2 -
 .../integration/RegexSourceIntegrationTest.java    |   1 -
 .../RepartitionOptimizingIntegrationTest.java      |   1 -
 ...artitionWithMergeOptimizingIntegrationTest.java |   1 -
 .../integration/RestoreIntegrationTest.java        |   1 -
 .../integration/utils/IntegrationTestUtils.java    |   1 -
 31 files changed, 84 insertions(+), 130 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index bd1c984..e285ade 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -249,17 +249,6 @@ public class ConsumerConfig extends AbstractConfig {
             "be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.";
     public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
 
-    /**
-     * <code>internal.leave.group.on.close</code>
-     * Whether or not the consumer should leave the group on close. If set to <code>false</code> then a rebalance
-     * won't occur until <code>session.timeout.ms</code> expires.
-     *
-     * <p>
-     * Note: this is an internal configuration and could be changed in the future in a backward incompatible way
-     *
-     */
-    static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close";
-
     /** <code>isolation.level</code> */
     public static final String ISOLATION_LEVEL_CONFIG = "isolation.level";
     public static final String ISOLATION_LEVEL_DOC = "<p>Controls how to read messages written transactionally. If set to <code>read_committed</code>, consumer.poll() will only return" +
@@ -469,10 +458,6 @@ public class ConsumerConfig extends AbstractConfig {
                                         DEFAULT_EXCLUDE_INTERNAL_TOPICS,
                                         Importance.MEDIUM,
                                         EXCLUDE_INTERNAL_TOPICS_DOC)
-                                .defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
-                                        Type.BOOLEAN,
-                                        true,
-                                        Importance.LOW)
                                 .define(ISOLATION_LEVEL_CONFIG,
                                         Type.STRING,
                                         DEFAULT_ISOLATION_LEVEL,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 9030308..5c8c1dc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -788,8 +788,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                         retryBackoffMs,
                         enableAutoCommit,
                         config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
-                        this.interceptors,
-                        config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
+                        this.interceptors);
             this.fetcher = new Fetcher<>(
                     logContext,
                     this.client,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 2cf3910..69d4928 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -110,7 +110,6 @@ public abstract class AbstractCoordinator implements Closeable {
 
     private final Logger log;
     private final int sessionTimeoutMs;
-    private final boolean leaveGroupOnClose;
     private final GroupCoordinatorMetrics sensors;
     private final Heartbeat heartbeat;
     protected final int rebalanceTimeoutMs;
@@ -144,8 +143,7 @@ public abstract class AbstractCoordinator implements Closeable {
                                Metrics metrics,
                                String metricGrpPrefix,
                                Time time,
-                               long retryBackoffMs,
-                               boolean leaveGroupOnClose) {
+                               long retryBackoffMs) {
         this.log = logContext.logger(AbstractCoordinator.class);
         this.client = client;
         this.time = time;
@@ -154,7 +152,6 @@ public abstract class AbstractCoordinator implements Closeable {
         this.groupInstanceId = groupInstanceId;
         this.rebalanceTimeoutMs = rebalanceTimeoutMs;
         this.sessionTimeoutMs = sessionTimeoutMs;
-        this.leaveGroupOnClose = leaveGroupOnClose;
         this.heartbeat = heartbeat;
         this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
         this.retryBackoffMs = retryBackoffMs;
@@ -170,11 +167,10 @@ public abstract class AbstractCoordinator implements Closeable {
                                Metrics metrics,
                                String metricGrpPrefix,
                                Time time,
-                               long retryBackoffMs,
-                               boolean leaveGroupOnClose) {
+                               long retryBackoffMs) {
         this(logContext, client, groupId, groupInstanceId, rebalanceTimeoutMs, sessionTimeoutMs,
                 new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs),
-                metrics, metricGrpPrefix, time, retryBackoffMs, leaveGroupOnClose);
+                metrics, metricGrpPrefix, time, retryBackoffMs);
     }
 
     /**
@@ -814,9 +810,7 @@ public abstract class AbstractCoordinator implements Closeable {
             // Synchronize after closing the heartbeat thread since heartbeat thread
             // needs this lock to complete and terminate after close flag is set.
             synchronized (this) {
-                if (leaveGroupOnClose) {
-                    maybeLeaveGroup();
-                }
+                maybeLeaveGroup();
 
                 // At this point, there may be pending commits (async commits or sync commits that were
                 // interrupted using wakeup) and the leave group request which have been queued, but not
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index deed257..829d9dc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -134,8 +134,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                long retryBackoffMs,
                                boolean autoCommitEnabled,
                                int autoCommitIntervalMs,
-                               ConsumerInterceptors<?, ?> interceptors,
-                               final boolean leaveGroupOnClose) {
+                               ConsumerInterceptors<?, ?> interceptors) {
         super(logContext,
               client,
               groupId,
@@ -146,8 +145,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
               metrics,
               metricGrpPrefix,
               time,
-              retryBackoffMs,
-              leaveGroupOnClose);
+              retryBackoffMs);
         this.log = logContext.logger(ConsumerCoordinator.class);
         this.metadata = metadata;
         this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ccd9e94..524ee25 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1894,8 +1894,7 @@ public class KafkaConsumerTest {
                 retryBackoffMs,
                 autoCommitEnabled,
                 autoCommitIntervalMs,
-                interceptors,
-                true);
+                interceptors);
 
         Fetcher<String, String> fetcher = new Fetcher<>(
                 loggerFactory,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 5aaf476..93c074b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -809,7 +809,7 @@ public class AbstractCoordinatorTest {
                                 int retryBackoffMs,
                                 Optional<String> groupInstanceId) {
             super(new LogContext(), client, GROUP_ID, groupInstanceId, rebalanceTimeoutMs,
-                    SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, !groupInstanceId.isPresent());
+                    SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs);
         }
 
         @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index a60316f..a83df5e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2178,8 +2178,7 @@ public class ConsumerCoordinatorTest {
                 retryBackoffMs,
                 autoCommitEnabled,
                 autoCommitIntervalMs,
-                null,
-                true
+                null
         );
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index a5882e8..968855a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -86,8 +86,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
               metrics,
               metricGrpPrefix,
               time,
-              retryBackoffMs,
-                true);
+              retryBackoffMs);
         this.log = logContext.logger(WorkerCoordinator.class);
         this.restUrl = restUrl;
         this.configStorage = configStorage;
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index 888edf3..4ed2770 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -50,7 +50,7 @@ public interface KafkaClientSupplier {
     /**
      * Create a {@link Consumer} which is used to read records of source topics.
      *
-     * @param config {@link StreamsConfig#getMainConsumerConfigs(String, String) consumer config} which is
+     * @param config {@link StreamsConfig#getMainConsumerConfigs(String, String, int) consumer config} which is
      *               supplied by the {@link java.util.Properties} given to the {@link KafkaStreams} instance
      * @return an instance of Kafka consumer
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index f3e6386..5024c28 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -139,6 +139,8 @@ public class StreamsConfig extends AbstractConfig {
     private final static long DEFAULT_COMMIT_INTERVAL_MS = 30000L;
     private final static long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
 
+    public final static int DUMMY_THREAD_INDEX = 1;
+
     /**
      * Prefix used to provide default topic configs to be applied when creating internal topics.
      * These should be valid properties from {@link org.apache.kafka.common.config.TopicConfig TopicConfig}.
@@ -718,7 +720,6 @@ public class StreamsConfig extends AbstractConfig {
         tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
         tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false);
         CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
     }
 
@@ -943,12 +944,12 @@ public class StreamsConfig extends AbstractConfig {
      * @param groupId      consumer groupId
      * @param clientId     clientId
      * @return Map of the consumer configuration.
-     * @deprecated use {@link StreamsConfig#getMainConsumerConfigs(String, String)}
+     * @deprecated use {@link StreamsConfig#getMainConsumerConfigs(String, String, int)}
      */
     @SuppressWarnings("WeakerAccess")
     @Deprecated
     public Map<String, Object> getConsumerConfigs(final String groupId, final String clientId) {
-        return getMainConsumerConfigs(groupId, clientId);
+        return getMainConsumerConfigs(groupId, clientId, DUMMY_THREAD_INDEX);
     }
 
     /**
@@ -963,10 +964,11 @@ public class StreamsConfig extends AbstractConfig {
      *
      * @param groupId      consumer groupId
      * @param clientId     clientId
+     * @param threadIdx    stream thread index
      * @return Map of the consumer configuration.
      */
     @SuppressWarnings("WeakerAccess")
-    public Map<String, Object> getMainConsumerConfigs(final String groupId, final String clientId) {
+    public Map<String, Object> getMainConsumerConfigs(final String groupId, final String clientId, final int threadIdx) {
         final Map<String, Object> consumerProps = getCommonConsumerConfigs();
 
         // Get main consumer override configs
@@ -977,9 +979,15 @@ public class StreamsConfig extends AbstractConfig {
 
         // this is a hack to work around StreamsConfig constructor inside StreamsPartitionAssignor to avoid casting
         consumerProps.put(APPLICATION_ID_CONFIG, groupId);
-        // add client id with stream client id prefix, and group id
+
+        // add group id, client id with stream client id prefix, and group instance id
         consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
+        final String groupInstanceId = (String) consumerProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
+        // Suffix each thread consumer with thread.id to enforce uniqueness of group.instance.id.
+        if (groupInstanceId != null) {
+            consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId + "-" + threadIdx);
+        }
 
         // add configs required for stream partition assignor
         consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 46612e5d..419e181 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -661,7 +661,7 @@ public class StreamThread extends Thread {
 
         log.info("Creating consumer client");
         final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
-        final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, getConsumerClientId(threadClientId));
+        final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, getConsumerClientId(threadClientId), threadIdx);
         consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
         final AtomicInteger assignmentErrorCode = new AtomicInteger();
         consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode);
@@ -721,7 +721,8 @@ public class StreamThread extends Thread {
         this.assignmentErrorCode = assignmentErrorCode;
 
         this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
-        this.maxPollTimeMs = new InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", "dummyClientId"))
+        final int dummyThreadIdx = 1;
+        this.maxPollTimeMs = new InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", "dummyClientId", dummyThreadIdx))
                 .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
         this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 3e55f29..2cd31ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -106,7 +106,6 @@ public class KafkaStreamsTest {
         props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
         props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
-        props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         globalStreams = new KafkaStreams(builder.build(), props);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 2c9a97b..c202c93 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
-import org.hamcrest.CoreMatchers;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -62,6 +61,10 @@ public class StreamsConfigTest {
     private final Properties props = new Properties();
     private StreamsConfig streamsConfig;
 
+    private final String groupId = "example-application";
+    private final String clientId = "client";
+    private final int threadIdx = 1;
+
     @Before
     public void setUp() {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test");
@@ -87,7 +90,6 @@ public class StreamsConfigTest {
 
     @Test
     public void testGetProducerConfigs() {
-        final String clientId = "client";
         final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId);
         assertThat(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), equalTo(clientId));
         assertThat(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), equalTo("100"));
@@ -95,12 +97,19 @@ public class StreamsConfigTest {
 
     @Test
     public void testGetConsumerConfigs() {
-        final String groupId = "example-application";
-        final String clientId = "client";
-        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
+        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
         assertThat(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), equalTo(clientId));
         assertThat(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), equalTo(groupId));
         assertThat(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), equalTo("1000"));
+        assertNull(returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
+    }
+
+    @Test
+    public void testGetGroupInstanceIdConfigs() {
+        props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "group-instance-id");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
+        assertThat(returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), equalTo("group-instance-id-1"));
     }
 
     @Test
@@ -113,10 +122,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 5);
         props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-
-        final String groupId = "example-application";
-        final String clientId = "client";
-        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
+        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
 
         assertEquals(42, returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
         assertEquals(1, returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
@@ -135,10 +141,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.RETRIES_CONFIG, 10);
         props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 100L);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-
-        final String groupId = "example-application";
-        final String clientId = "client";
-        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
+        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
 
         assertEquals(20, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
         assertEquals(200L, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRY_BACKOFF_MS_CONFIG)));
@@ -148,15 +151,12 @@ public class StreamsConfigTest {
     public void testGetMainConsumerConfigsWithMainConsumerOverridenPrefix() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
         props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
-        final String groupId = "example-application";
-        final String clientId = "client";
-        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
+        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
         assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
     }
 
     @Test
     public void testGetRestoreConsumerConfigs() {
-        final String clientId = "client";
         final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId);
         assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
@@ -199,7 +199,7 @@ public class StreamsConfigTest {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
         assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
@@ -208,7 +208,7 @@ public class StreamsConfigTest {
     public void shouldSupportPrefixedRestoreConsumerConfigs() {
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId);
         assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
 
@@ -216,7 +216,7 @@ public class StreamsConfigTest {
     public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
-        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
         assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
     }
 
@@ -224,7 +224,7 @@ public class StreamsConfigTest {
     public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
-        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId);
         assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
     }
 
@@ -232,7 +232,7 @@ public class StreamsConfigTest {
     public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(producerPrefix("interceptor.statsd.host"), "host");
-        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
         assertEquals("host", producerConfigs.get("interceptor.statsd.host"));
     }
 
@@ -241,7 +241,7 @@ public class StreamsConfigTest {
         props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
         props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> configs = streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> configs = streamsConfig.getProducerConfigs(clientId);
         assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
         assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
@@ -251,7 +251,7 @@ public class StreamsConfigTest {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
         assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
@@ -260,7 +260,7 @@ public class StreamsConfigTest {
     public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("groupId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs(groupId);
         assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
 
@@ -269,7 +269,7 @@ public class StreamsConfigTest {
         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> configs = streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> configs = streamsConfig.getProducerConfigs(clientId);
         assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
         assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
@@ -278,10 +278,10 @@ public class StreamsConfigTest {
     public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put("custom.property.host", "host");
-        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
-        final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
-        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
-        final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
+        final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId);
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
+        final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs(clientId);
         assertEquals("host", consumerConfigs.get("custom.property.host"));
         assertEquals("host", restoreConsumerConfigs.get("custom.property.host"));
         assertEquals("host", producerConfigs.get("custom.property.host"));
@@ -295,10 +295,10 @@ public class StreamsConfigTest {
         props.put(consumerPrefix("custom.property.host"), "host1");
         props.put(producerPrefix("custom.property.host"), "host2");
         props.put(adminClientPrefix("custom.property.host"), "host3");
-        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
-        final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
-        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
-        final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
+        final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId);
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
+        final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs(clientId);
         assertEquals("host1", consumerConfigs.get("custom.property.host"));
         assertEquals("host1", restoreConsumerConfigs.get("custom.property.host"));
         assertEquals("host2", producerConfigs.get("custom.property.host"));
@@ -309,7 +309,7 @@ public class StreamsConfigTest {
     public void shouldSupportNonPrefixedAdminConfigs() {
         props.put(AdminClientConfig.RETRIES_CONFIG, 10);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> configs = streamsConfig.getAdminConfigs("clientId");
+        final Map<String, Object> configs = streamsConfig.getAdminConfigs(clientId);
         assertEquals(10, configs.get(AdminClientConfig.RETRIES_CONFIG));
     }
 
@@ -332,7 +332,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
         assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
     }
@@ -341,7 +341,7 @@ public class StreamsConfigTest {
     public void shouldOverrideStreamsDefaultProducerConfigs() {
         props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
         assertEquals("10000", producerConfigs.get(ProducerConfig.LINGER_MS_CONFIG));
     }
 
@@ -349,7 +349,7 @@ public class StreamsConfigTest {
     public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId);
         assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
     }
 
@@ -357,7 +357,7 @@ public class StreamsConfigTest {
     public void shouldResetToDefaultIfConsumerAutoCommitIsOverridden() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("a", "b");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("a", "b", threadIdx);
         assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     }
 
@@ -365,7 +365,7 @@ public class StreamsConfigTest {
     public void shouldResetToDefaultIfRestoreConsumerAutoCommitIsOverridden() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("client");
+        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId);
         assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     }
 
@@ -373,13 +373,12 @@ public class StreamsConfigTest {
     public void testGetRestoreConsumerConfigsWithRestoreConsumerOverridenPrefix() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
         props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
-        final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("clientId");
+        final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
         assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
     }
 
     @Test
     public void testGetGlobalConsumerConfigs() {
-        final String clientId = "client";
         final Map<String, Object> returnedProps = streamsConfig.getGlobalConsumerConfigs(clientId);
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-global-consumer");
         assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
@@ -389,7 +388,7 @@ public class StreamsConfigTest {
     public void shouldSupportPrefixedGlobalConsumerConfigs() {
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs(clientId);
         assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
 
@@ -397,7 +396,7 @@ public class StreamsConfigTest {
     public void shouldSupportPrefixedPropertiesThatAreNotPartOfGlobalConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
-        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs(clientId);
         assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
     }
 
@@ -405,7 +404,7 @@ public class StreamsConfigTest {
     public void shouldBeSupportNonPrefixedGlobalConsumerConfigs() {
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("groupId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs(groupId);
         assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
 
@@ -413,7 +412,7 @@ public class StreamsConfigTest {
     public void shouldResetToDefaultIfGlobalConsumerAutoCommitIsOverridden() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("client");
+        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs(clientId);
         assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     }
 
@@ -421,18 +420,11 @@ public class StreamsConfigTest {
     public void testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
         props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
-        final Map<String, Object> returnedProps = streamsConfig.getGlobalConsumerConfigs("clientId");
+        final Map<String, Object> returnedProps = streamsConfig.getGlobalConsumerConfigs(clientId);
         assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
     }
 
     @Test
-    public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
-        final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
-        assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.equalTo(false));
-    }
-
-    @Test
     public void shouldAcceptAtLeastOnce() {
         // don't use `StreamsConfig.AT_LEAST_ONCE` to actually do a useful test
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "at_least_once");
@@ -457,7 +449,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
         assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
     }
 
@@ -465,7 +457,7 @@ public class StreamsConfigTest {
     public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientrId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
         assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
     }
 
@@ -475,7 +467,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "anyValue");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
         assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
     }
 
@@ -483,7 +475,7 @@ public class StreamsConfigTest {
     public void shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() {
         props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
         assertThat(producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), equalTo(false));
     }
 
@@ -492,8 +484,8 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
 
-        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
-        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
 
         assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
         assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
@@ -508,7 +500,7 @@ public class StreamsConfigTest {
         props.put(ProducerConfig.RETRIES_CONFIG, numberOfRetries);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
 
-        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
 
         assertThat(producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(numberOfRetries));
     }
@@ -589,7 +581,7 @@ public class StreamsConfigTest {
         props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         try {
-            streamsConfig.getProducerConfigs("clientId");
+            streamsConfig.getProducerConfigs(clientId);
             fail("Should throw ConfigException when ESO is enabled and maxInFlight requests exceeds 5");
         } catch (final ConfigException e) {
             assertEquals("Invalid value 7 for configuration max.in.flight.requests.per.connection: Can't exceed 5 when exactly-once processing is enabled", e.getMessage());
@@ -601,7 +593,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
 
-        new StreamsConfig(props).getProducerConfigs("clientId");
+        new StreamsConfig(props).getProducerConfigs(clientId);
     }
 
     @Test
@@ -610,7 +602,7 @@ public class StreamsConfigTest {
         props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "not-a-number");
 
         try {
-            new StreamsConfig(props).getProducerConfigs("clientId");
+            new StreamsConfig(props).getProducerConfigs(clientId);
             fail("Should throw ConfigException when EOS is enabled and maxInFlight cannot be paresed into an integer");
         } catch (final ConfigException e) {
             assertEquals("Invalid value not-a-number for configuration max.in.flight.requests.per.connection: String value could not be parsed as 32-bit integer", e.getMessage());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index badaa36..7fa108d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -139,7 +139,6 @@ public abstract class AbstractJoinIntegrationTest {
         RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
         STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index aeb581e..c9ae1bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -156,7 +156,6 @@ public abstract class AbstractResetIntegrationTest {
         streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
-        streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streamsConfig.putAll(commonClientConfig);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index f43b396..4f5455b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -160,7 +160,6 @@ public class EosIntegrationTest {
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
-        properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         for (int i = 0; i < numberOfRestarts; ++i) {
             final Properties config = StreamsTestUtils.getStreamsConfig(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index 6a724ec..87d6a16 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -143,7 +143,6 @@ public class FineGrainedAutoResetIntegrationTest {
         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         streamsConfiguration = StreamsTestUtils.getStreamsConfig(
                 "testAutoOffsetId",
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 787cb29..1aa99f6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -95,7 +95,6 @@ public class GlobalKTableEOSIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
         globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 5007fa9..6617512 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -89,7 +89,6 @@ public class GlobalKTableIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
                                           Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index 7c16927..7c2b009 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -88,7 +88,6 @@ public class GlobalThreadShutDownOrderTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
 
         final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 345b581..0ee0278 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -93,7 +93,6 @@ public class InternalTopicIntegrationTest {
         streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsProp.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
     }
 
     @After
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 7493b06..e1c9b5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -92,7 +92,6 @@ public class KStreamAggregationDedupIntegrationTest {
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
-        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         final KeyValueMapper<Integer, String, String> mapper = MockMapper.selectValueMapper();
         stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 8e05d9e..a13408e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -129,7 +129,6 @@ public class KStreamAggregationIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 32d77c0..3ec239f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -78,7 +78,6 @@ public class KTableSourceTopicRestartIntegrationTest {
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5);
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 4c7859b..0cfa97a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -160,7 +160,6 @@ public class PurgeRepartitionTopicIntegrationTest {
         streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), PURGE_INTERVAL_MS);
         streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), PURGE_SEGMENT_BYTES);
         streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), PURGE_SEGMENT_BYTES / 2);    // we cannot allow batch size larger than segment size
-        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream(INPUT_TOPIC)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 6889525..14d9e5d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -188,8 +188,6 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-        // override this to make the rebalances happen quickly
-        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         stringComparator = Comparator.comparing((KeyValue<String, String> o) -> o.key).thenComparing(o -> o.value);
         stringLongComparator = Comparator.comparing((KeyValue<String, Long> o) -> o.key).thenComparingLong(o -> o.value);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 1cb9e0c..f74487b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -122,7 +122,6 @@ public class RegexSourceIntegrationTest {
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test",
                                                                  CLUSTER.bootstrapServers(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
index 6fa1bff..2d996b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
@@ -91,7 +91,6 @@ public class RepartitionOptimizingIntegrationTest {
         final Properties props = new Properties();
         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10);
         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
-        props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         streamsConfiguration = StreamsTestUtils.getStreamsConfig(
             "maybe-optimized-test-app",
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
index bf65264..473a626 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
@@ -79,7 +79,6 @@ public class RepartitionWithMergeOptimizingIntegrationTest {
         final Properties props = new Properties();
         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10);
         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
-        props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         streamsConfiguration = StreamsTestUtils.getStreamsConfig(
             "maybe-optimized-with-merge-test-app",
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index f21dbfc..2d88ff3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -104,7 +104,6 @@ public class RestoreIntegrationTest {
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         return streamsConfiguration;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 74cac06..e786a44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -74,7 +74,6 @@ import static org.hamcrest.Matchers.equalTo;
 public class IntegrationTestUtils {
 
     public static final long DEFAULT_TIMEOUT = 60 * 1000L;
-    public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close";
 
     /*
      * Records state transition for StreamThread