You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/27 17:30:45 UTC

kafka git commit: KAFKA-4881: add internal leave.group.on.close config to consumer

Repository: kafka
Updated Branches:
  refs/heads/trunk d27e09e60 -> 1abed91bd


KAFKA-4881: add internal leave.group.on.close config to consumer

Author: Damian Guy <da...@gmail.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #2650 from dguy/consumer-leave-group-config


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1abed91b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1abed91b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1abed91b

Branch: refs/heads/trunk
Commit: 1abed91bd2915226cf6320395e1a5877ea0705d7
Parents: d27e09e
Author: Damian Guy <da...@gmail.com>
Authored: Mon Mar 27 10:30:38 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Mar 27 10:30:38 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  | 18 ++++-
 .../kafka/clients/consumer/KafkaConsumer.java   | 31 ++++-----
 .../consumer/internals/AbstractCoordinator.java |  9 ++-
 .../consumer/internals/ConsumerCoordinator.java | 20 +++---
 .../apache/kafka/common/config/ConfigDef.java   | 33 ++++++++--
 .../clients/consumer/KafkaConsumerTest.java     |  3 +-
 .../internals/AbstractCoordinatorTest.java      |  2 +-
 .../internals/ConsumerCoordinatorTest.java      | 69 +++++++++++---------
 .../kafka/common/config/ConfigDefTest.java      | 22 +++++++
 .../runtime/distributed/WorkerCoordinator.java  | 17 ++---
 10 files changed, 152 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 26a7d5d..abb8255 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -222,7 +222,18 @@ public class ConsumerConfig extends AbstractConfig {
     private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether records from internal topics (such as offsets) should be exposed to the consumer. "
                                                             + "If set to <code>true</code> the only way to receive records from an internal topic is subscribing to it.";
     public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
-    
+
+    /**
+     * <code>internal.leave.group.on.close</code>
+     * Whether or not the consumer should leave the group on close. If set to <code>false</code> then a rebalance
+     * won't occur until <code>session.timeout.ms</code> expires.
+     *
+     * <p>
+     * Note: this is an internal configuration and could be changed in the future in a backward incompatible way
+     *
+     */
+    static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close";
+
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
                                         Type.LIST,
@@ -390,7 +401,10 @@ public class ConsumerConfig extends AbstractConfig {
                                         DEFAULT_EXCLUDE_INTERNAL_TOPICS,
                                         Importance.MEDIUM,
                                         EXCLUDE_INTERNAL_TOPICS_DOC)
-
+                                .defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
+                                                Type.BOOLEAN,
+                                                true,
+                                                Importance.LOW)
                                 // security support
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3894b5f..01d9463 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -680,21 +680,22 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                     PartitionAssignor.class);
             this.coordinator = new ConsumerCoordinator(this.client,
-                    config.getString(ConsumerConfig.GROUP_ID_CONFIG),
-                    config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
-                    config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
-                    config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
-                    assignors,
-                    this.metadata,
-                    this.subscriptions,
-                    metrics,
-                    metricGrpPrefix,
-                    this.time,
-                    retryBackoffMs,
-                    config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
-                    config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
-                    this.interceptors,
-                    config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
+                                                       config.getString(ConsumerConfig.GROUP_ID_CONFIG),
+                                                       config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
+                                                       config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
+                                                       config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
+                                                       assignors,
+                                                       this.metadata,
+                                                       this.subscriptions,
+                                                       metrics,
+                                                       metricGrpPrefix,
+                                                       this.time,
+                                                       retryBackoffMs,
+                                                       config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
+                                                       config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
+                                                       this.interceptors,
+                                                       config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
+                                                       config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
             this.fetcher = new Fetcher<>(this.client,
                     config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
                     config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),

http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index db665b6..1ebce76 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -102,6 +102,7 @@ public abstract class AbstractCoordinator implements Closeable {
 
     protected final int rebalanceTimeoutMs;
     private final int sessionTimeoutMs;
+    private final boolean leaveGroupOnClose;
     private final GroupCoordinatorMetrics sensors;
     private final Heartbeat heartbeat;
     protected final String groupId;
@@ -130,12 +131,14 @@ public abstract class AbstractCoordinator implements Closeable {
                                Metrics metrics,
                                String metricGrpPrefix,
                                Time time,
-                               long retryBackoffMs) {
+                               long retryBackoffMs,
+                               boolean leaveGroupOnClose) {
         this.client = client;
         this.time = time;
         this.groupId = groupId;
         this.rebalanceTimeoutMs = rebalanceTimeoutMs;
         this.sessionTimeoutMs = sessionTimeoutMs;
+        this.leaveGroupOnClose = leaveGroupOnClose;
         this.heartbeat = new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs);
         this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
         this.retryBackoffMs = retryBackoffMs;
@@ -677,7 +680,9 @@ public abstract class AbstractCoordinator implements Closeable {
             // Synchronize after closing the heartbeat thread since heartbeat thread
             // needs this lock to complete and terminate after close flag is set.
             synchronized (this) {
-                maybeLeaveGroup();
+                if (leaveGroupOnClose) {
+                    maybeLeaveGroup();
+                }
 
                 // At this point, there may be pending commits (async commits or sync commits that were
                 // interrupted using wakeup) and the leave group request which have been queued, but not

http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index ba19146..fa2e31a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -108,16 +108,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                boolean autoCommitEnabled,
                                int autoCommitIntervalMs,
                                ConsumerInterceptors<?, ?> interceptors,
-                               boolean excludeInternalTopics) {
+                               boolean excludeInternalTopics,
+                               final boolean leaveGroupOnClose) {
         super(client,
-                groupId,
-                rebalanceTimeoutMs,
-                sessionTimeoutMs,
-                heartbeatIntervalMs,
-                metrics,
-                metricGrpPrefix,
-                time,
-                retryBackoffMs);
+              groupId,
+              rebalanceTimeoutMs,
+              sessionTimeoutMs,
+              heartbeatIntervalMs,
+              metrics,
+              metricGrpPrefix,
+              time,
+              retryBackoffMs,
+              leaveGroupOnClose);
         this.metadata = metadata;
         this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
         this.subscriptions = subscriptions;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 3396f63..c4eac78 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -132,7 +132,7 @@ public class ConfigDef {
      */
     public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
                             String group, int orderInGroup, Width width, String displayName, List<String> dependents, Recommender recommender) {
-        return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender));
+        return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender, false));
     }
 
     /**
@@ -382,6 +382,19 @@ public class ConfigDef {
     }
 
     /**
+     * Define a new internal configuration. Internal configuration won't show up in the docs and aren't
+     * intended for general use.
+     * @param name              The name of the config parameter
+     * @param type              The type of the config
+     * @param defaultValue      The default value to use if this config isn't present
+     * @param importance
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef defineInternal(final String name, final Type type, final Object defaultValue, final Importance importance) {
+        return define(new ConfigKey(name, type, defaultValue, null, importance, "", "", -1, Width.NONE, name, Collections.<String>emptyList(), null, true));
+    }
+
+    /**
      * Get the configuration keys
      * @return a map containing all configuration keys
      */
@@ -900,11 +913,13 @@ public class ConfigDef {
         public final String displayName;
         public final List<String> dependents;
         public final Recommender recommender;
+        public final boolean internalConfig;
 
         public ConfigKey(String name, Type type, Object defaultValue, Validator validator,
                          Importance importance, String documentation, String group,
                          int orderInGroup, Width width, String displayName,
-                         List<String> dependents, Recommender recommender) {
+                         List<String> dependents, Recommender recommender,
+                         boolean internalConfig) {
             this.name = name;
             this.type = type;
             this.defaultValue = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
@@ -919,6 +934,7 @@ public class ConfigDef {
             this.width = width;
             this.displayName = displayName;
             this.recommender = recommender;
+            this.internalConfig = internalConfig;
         }
 
         public boolean hasDefault() {
@@ -971,6 +987,9 @@ public class ConfigDef {
         }
         b.append("</tr>\n");
         for (ConfigKey key : configs) {
+            if (key.internalConfig) {
+                continue;
+            }
             b.append("<tr>\n");
             // print column values
             for (String headerName : headers()) {
@@ -991,6 +1010,9 @@ public class ConfigDef {
     public String toRst() {
         StringBuilder b = new StringBuilder();
         for (ConfigKey key : sortedConfigs()) {
+            if (key.internalConfig) {
+                continue;
+            }
             getConfigKeyRst(key, b);
             b.append("\n");
         }
@@ -1006,6 +1028,9 @@ public class ConfigDef {
 
         String lastKeyGroupName = "";
         for (ConfigKey key : sortedConfigs()) {
+            if (key.internalConfig) {
+                continue;
+            }
             if (key.group != null) {
                 if (!lastKeyGroupName.equalsIgnoreCase(key.group)) {
                     b.append(key.group).append("\n");
@@ -1114,8 +1139,8 @@ public class ConfigDef {
                     key.width,
                     key.displayName,
                     embeddedDependents(keyPrefix, key.dependents),
-                    embeddedRecommender(keyPrefix, key.recommender)
-            ));
+                    embeddedRecommender(keyPrefix, key.recommender),
+                    key.internalConfig));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 7fb3552..ec60209 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1516,7 +1516,8 @@ public class KafkaConsumerTest {
                 autoCommitEnabled,
                 autoCommitIntervalMs,
                 interceptors,
-                excludeInternalTopics);
+                excludeInternalTopics,
+                true);
 
         Fetcher<String, String> fetcher = new Fetcher<>(
                 consumerClient,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 45ee29a..62801d0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -492,7 +492,7 @@ public class AbstractCoordinatorTest {
                                 Metrics metrics,
                                 Time time) {
             super(client, GROUP_ID, REBALANCE_TIMEOUT_MS, SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics,
-                    METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS);
+                  METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS, false);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8bf8a63..f0b289f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -129,7 +129,7 @@ public class ConsumerCoordinatorTest {
         this.partitionAssignor.clear();
 
         client.setNode(node);
-        this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled);
+        this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled, true);
     }
 
     @After
@@ -861,7 +861,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testIncludeInternalTopicsConfigOption() {
-        coordinator = buildCoordinator(new Metrics(), assignors, false, false);
+        coordinator = buildCoordinator(new Metrics(), assignors, false, false, true);
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
 
         metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds());
@@ -955,7 +955,7 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "consumer";
 
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
+                                                           ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
@@ -980,7 +980,7 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "consumer";
 
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
+                                                           ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
@@ -1007,7 +1007,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testAutoCommitManualAssignment() {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
+                                                           ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.seek(t1p, 100);
@@ -1025,7 +1025,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testAutoCommitManualAssignmentCoordinatorUnknown() {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
+                                                           ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.seek(t1p, 100);
@@ -1376,7 +1376,7 @@ public class ConsumerCoordinatorTest {
 
         try (Metrics metrics = new Metrics(time)) {
             ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range),
-                    ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false);
+                                                               ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
             List<ProtocolMetadata> metadata = coordinator.metadata();
             assertEquals(2, metadata.size());
             assertEquals(roundRobin.name(), metadata.get(0).name());
@@ -1385,7 +1385,7 @@ public class ConsumerCoordinatorTest {
 
         try (Metrics metrics = new Metrics(time)) {
             ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin),
-                    ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false);
+                                                               ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
             List<ProtocolMetadata> metadata = coordinator.metadata();
             assertEquals(2, metadata.size());
             assertEquals(range.name(), metadata.get(0).name());
@@ -1395,19 +1395,25 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCloseDynamicAssignment() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
         gracefulCloseTest(coordinator, true);
     }
 
     @Test
     public void testCloseManualAssignment() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true);
+        gracefulCloseTest(coordinator, false);
+    }
+
+    @Test
+    public void shouldNotLeaveGroupWhenLeaveGroupFlagIsFalse() throws Exception {
+        final ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, false);
         gracefulCloseTest(coordinator, false);
     }
 
     @Test
     public void testCloseCoordinatorNotKnownManualAssignment() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true);
         makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
@@ -1415,14 +1421,14 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
         makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
         closeVerifyTimeout(coordinator, 1000, 60000, 0, 0);
     }
 
     @Test
     public void testCloseCoordinatorNotKnownWithCommits() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
         makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
@@ -1430,14 +1436,14 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
         makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
         closeVerifyTimeout(coordinator, 1000, 60000, 0, 0);
     }
 
     @Test
     public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
         makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
@@ -1445,7 +1451,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
         makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
@@ -1453,20 +1459,20 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCloseNoResponseForCommit() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
     }
 
     @Test
     public void testCloseNoResponseForLeaveGroup() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
         closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
     }
 
     @Test
     public void testCloseNoWait() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, 0, 60000, 0, 0);
     }
@@ -1474,7 +1480,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testHeartbeatThreadClose() throws Exception {
         groupId = "testCloseTimeoutWithHeartbeatThread";
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
+        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
         coordinator.ensureActiveGroup();
         time.sleep(heartbeatIntervalMs + 100);
         Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat
@@ -1485,10 +1491,12 @@ public class ConsumerCoordinatorTest {
             assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId));
     }
 
-    private ConsumerCoordinator prepareCoordinatorForCloseTest(boolean useGroupManagement, boolean autoCommit) {
+    private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement,
+                                                               final boolean autoCommit,
+                                                               final boolean leaveGroup) {
         final String consumerId = "consumer";
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommit);
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommit, leaveGroup);
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
         if (useGroupManagement) {
@@ -1547,7 +1555,7 @@ public class ConsumerCoordinatorTest {
         }
     }
 
-    private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean dynamicAssignment) throws Exception {
+    private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean shouldLeaveGroup) throws Exception {
         final AtomicBoolean commitRequested = new AtomicBoolean();
         final AtomicBoolean leaveGroupRequested = new AtomicBoolean();
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -1569,14 +1577,14 @@ public class ConsumerCoordinatorTest {
 
         coordinator.close();
         assertTrue("Commit not requested", commitRequested.get());
-        if (dynamicAssignment)
-            assertTrue("Leave group not requested", leaveGroupRequested.get());
+        assertEquals("leaveGroupRequested should be " + shouldLeaveGroup, shouldLeaveGroup, leaveGroupRequested.get());
     }
 
-    private ConsumerCoordinator buildCoordinator(Metrics metrics,
-                                                 List<PartitionAssignor> assignors,
-                                                 boolean excludeInternalTopics,
-                                                 boolean autoCommitEnabled) {
+    private ConsumerCoordinator buildCoordinator(final Metrics metrics,
+                                                 final List<PartitionAssignor> assignors,
+                                                 final boolean excludeInternalTopics,
+                                                 final boolean autoCommitEnabled,
+                                                 final boolean leaveGroup) {
         return new ConsumerCoordinator(
                 consumerClient,
                 groupId,
@@ -1593,7 +1601,8 @@ public class ConsumerCoordinatorTest {
                 autoCommitEnabled,
                 autoCommitIntervalMs,
                 null,
-                excludeInternalTopics);
+                excludeInternalTopics,
+                leaveGroup);
     }
 
     private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 10271ca..11e5803 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -35,6 +35,7 @@ import java.util.Properties;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 public class ConfigDefTest {
@@ -323,6 +324,27 @@ public class ConfigDefTest {
         }
     }
 
+    @Test
+    public void testCanAddInternalConfig() throws Exception {
+        final String configName = "internal.config";
+        final ConfigDef configDef = new ConfigDef().defineInternal(configName, Type.STRING, "", Importance.LOW);
+        final HashMap<String, String> properties = new HashMap<>();
+        properties.put(configName, "value");
+        final List<ConfigValue> results = configDef.validate(properties);
+        final ConfigValue configValue = results.get(0);
+        assertEquals("value", configValue.value());
+        assertEquals(configName, configValue.name());
+    }
+
+    @Test
+    public void testInternalConfigDoesntShowUpInDocs() throws Exception {
+        final String name = "my.config";
+        final ConfigDef configDef = new ConfigDef().defineInternal(name, Type.STRING, "", Importance.LOW);
+        assertFalse(configDef.toHtmlTable().contains("my.config"));
+        assertFalse(configDef.toEnrichedRst().contains("my.config"));
+        assertFalse(configDef.toRst().contains("my.config"));
+    }
+
     private static class IntegerRecommender implements ConfigDef.Recommender {
 
         private boolean hasParent;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 0dae54b..0252ae9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -74,14 +74,15 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
                              ConfigBackingStore configStorage,
                              WorkerRebalanceListener listener) {
         super(client,
-                groupId,
-                rebalanceTimeoutMs,
-                sessionTimeoutMs,
-                heartbeatIntervalMs,
-                metrics,
-                metricGrpPrefix,
-                time,
-                retryBackoffMs);
+              groupId,
+              rebalanceTimeoutMs,
+              sessionTimeoutMs,
+              heartbeatIntervalMs,
+              metrics,
+              metricGrpPrefix,
+              time,
+              retryBackoffMs,
+              true);
         this.restUrl = restUrl;
         this.configStorage = configStorage;
         this.assignmentSnapshot = null;