You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/12 06:26:59 UTC
kafka git commit: KAFKA-4881: add internal.leave.group.config to
consumer
Repository: kafka
Updated Branches:
refs/heads/0.10.2 9eb0cdb54 -> 29214d336
KAFKA-4881: add internal.leave.group.config to consumer
Backport from https://github.com/apache/kafka/pull/2650
Author: Damian Guy <da...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #3025 from dguy/kafka-4881-bp
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/29214d33
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/29214d33
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/29214d33
Branch: refs/heads/0.10.2
Commit: 29214d33634ef613618ef7ad32ad08eae9f83a40
Parents: 9eb0cdb
Author: Damian Guy <da...@gmail.com>
Authored: Thu May 11 23:26:52 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu May 11 23:26:52 2017 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/ConsumerConfig.java | 18 ++++-
.../kafka/clients/consumer/KafkaConsumer.java | 31 ++++-----
.../consumer/internals/AbstractCoordinator.java | 9 ++-
.../consumer/internals/ConsumerCoordinator.java | 20 +++---
.../apache/kafka/common/config/ConfigDef.java | 35 ++++++++--
.../clients/consumer/KafkaConsumerTest.java | 3 +-
.../internals/AbstractCoordinatorTest.java | 2 +-
.../internals/ConsumerCoordinatorTest.java | 69 +++++++++++---------
.../kafka/common/config/ConfigDefTest.java | 22 +++++++
.../runtime/distributed/WorkerCoordinator.java | 17 ++---
10 files changed, 153 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 1b33517..4d66f21 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -218,7 +218,18 @@ public class ConsumerConfig extends AbstractConfig {
private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether records from internal topics (such as offsets) should be exposed to the consumer. "
+ "If set to <code>true</code> the only way to receive records from an internal topic is subscribing to it.";
public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
-
+
+ /**
+ * <code>internal.leave.group.on.close</code>
+ * Whether or not the consumer should leave the group on close. If set to <code>false</code> then a rebalance
+ * won't occur until <code>session.timeout.ms</code> expires.
+ *
+ * <p>
+ * Note: this is an internal configuration and could be changed in the future in a backward incompatible way
+ *
+ */
+ static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close";
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
@@ -386,7 +397,10 @@ public class ConsumerConfig extends AbstractConfig {
DEFAULT_EXCLUDE_INTERNAL_TOPICS,
Importance.MEDIUM,
EXCLUDE_INTERNAL_TOPICS_DOC)
-
+ .defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
+ Type.BOOLEAN,
+ true,
+ Importance.LOW)
// security support
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 64d64ba..e54c5a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -674,21 +674,22 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
PartitionAssignor.class);
this.coordinator = new ConsumerCoordinator(this.client,
- config.getString(ConsumerConfig.GROUP_ID_CONFIG),
- config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
- config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
- config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
- assignors,
- this.metadata,
- this.subscriptions,
- metrics,
- metricGrpPrefix,
- this.time,
- retryBackoffMs,
- config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
- config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
- this.interceptors,
- config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
+ config.getString(ConsumerConfig.GROUP_ID_CONFIG),
+ config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
+ config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
+ config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
+ assignors,
+ this.metadata,
+ this.subscriptions,
+ metrics,
+ metricGrpPrefix,
+ this.time,
+ retryBackoffMs,
+ config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
+ config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
+ this.interceptors,
+ config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
+ config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
this.fetcher = new Fetcher<>(this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index b72769e..d24bde4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -98,6 +98,7 @@ public abstract class AbstractCoordinator implements Closeable {
protected final int rebalanceTimeoutMs;
private final int sessionTimeoutMs;
+ private final boolean leaveGroupOnClose;
private final GroupCoordinatorMetrics sensors;
private final Heartbeat heartbeat;
protected final String groupId;
@@ -126,12 +127,14 @@ public abstract class AbstractCoordinator implements Closeable {
Metrics metrics,
String metricGrpPrefix,
Time time,
- long retryBackoffMs) {
+ long retryBackoffMs,
+ boolean leaveGroupOnClose) {
this.client = client;
this.time = time;
this.groupId = groupId;
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.sessionTimeoutMs = sessionTimeoutMs;
+ this.leaveGroupOnClose = leaveGroupOnClose;
this.heartbeat = new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs);
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
this.retryBackoffMs = retryBackoffMs;
@@ -673,7 +676,9 @@ public abstract class AbstractCoordinator implements Closeable {
// Synchronize after closing the heartbeat thread since heartbeat thread
// needs this lock to complete and terminate after close flag is set.
synchronized (this) {
- maybeLeaveGroup();
+ if (leaveGroupOnClose) {
+ maybeLeaveGroup();
+ }
// At this point, there may be pending commits (async commits or sync commits that were
// interrupted using wakeup) and the leave group request which have been queued, but not
http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 2e37636..7f62489 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -104,16 +104,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
boolean autoCommitEnabled,
int autoCommitIntervalMs,
ConsumerInterceptors<?, ?> interceptors,
- boolean excludeInternalTopics) {
+ boolean excludeInternalTopics,
+ final boolean leaveGroupOnClose) {
super(client,
- groupId,
- rebalanceTimeoutMs,
- sessionTimeoutMs,
- heartbeatIntervalMs,
- metrics,
- metricGrpPrefix,
- time,
- retryBackoffMs);
+ groupId,
+ rebalanceTimeoutMs,
+ sessionTimeoutMs,
+ heartbeatIntervalMs,
+ metrics,
+ metricGrpPrefix,
+ time,
+ retryBackoffMs,
+ leaveGroupOnClose);
this.metadata = metadata;
this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
this.subscriptions = subscriptions;
http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 5257f6e..312205f 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -124,7 +124,7 @@ public class ConfigDef {
*/
public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
String group, int orderInGroup, Width width, String displayName, List<String> dependents, Recommender recommender) {
- return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender));
+ return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender, false));
}
/**
@@ -374,6 +374,19 @@ public class ConfigDef {
}
/**
+ * Define a new internal configuration. Internal configuration won't show up in the docs and aren't
+ * intended for general use.
+ * @param name The name of the config parameter
+ * @param type The type of the config
+ * @param defaultValue The default value to use if this config isn't present
+ * @param importance
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef defineInternal(final String name, final Type type, final Object defaultValue, final Importance importance) {
+ return define(new ConfigKey(name, type, defaultValue, null, importance, "", "", -1, Width.NONE, name, Collections.<String>emptyList(), null, true));
+ }
+
+ /**
* Get the configuration keys
* @return a map containing all configuration keys
*/
@@ -890,11 +903,13 @@ public class ConfigDef {
public final String displayName;
public final List<String> dependents;
public final Recommender recommender;
+ public final boolean internalConfig;
public ConfigKey(String name, Type type, Object defaultValue, Validator validator,
Importance importance, String documentation, String group,
int orderInGroup, Width width, String displayName,
- List<String> dependents, Recommender recommender) {
+ List<String> dependents, Recommender recommender,
+ boolean internalConfig) {
this.name = name;
this.type = type;
this.defaultValue = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
@@ -909,6 +924,7 @@ public class ConfigDef {
this.width = width;
this.displayName = displayName;
this.recommender = recommender;
+ this.internalConfig = internalConfig;
}
public boolean hasDefault() {
@@ -961,6 +977,10 @@ public class ConfigDef {
}
b.append("</tr>\n");
for (ConfigKey def : configs) {
+ if (def.internalConfig) {
+ continue;
+ }
+
b.append("<tr>\n");
// print column values
for (String headerName : headers()) {
@@ -981,6 +1001,9 @@ public class ConfigDef {
public String toRst() {
StringBuilder b = new StringBuilder();
for (ConfigKey def : sortedConfigs()) {
+ if (def.internalConfig) {
+ continue;
+ }
getConfigKeyRst(def, b);
b.append("\n");
}
@@ -996,10 +1019,12 @@ public class ConfigDef {
String lastKeyGroupName = "";
for (ConfigKey def : sortedConfigs()) {
+ if (def.internalConfig) {
+ continue;
+ }
if (def.group != null) {
if (!lastKeyGroupName.equalsIgnoreCase(def.group)) {
b.append(def.group).append("\n");
-
char[] underLine = new char[def.group.length()];
Arrays.fill(underLine, '^');
b.append(new String(underLine)).append("\n\n");
@@ -1104,8 +1129,8 @@ public class ConfigDef {
key.width,
key.displayName,
embeddedDependents(keyPrefix, key.dependents),
- embeddedRecommender(keyPrefix, key.recommender)
- ));
+ embeddedRecommender(keyPrefix, key.recommender),
+ key.internalConfig));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index f2905a9..369cb30 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1508,7 +1508,8 @@ public class KafkaConsumerTest {
autoCommitEnabled,
autoCommitIntervalMs,
interceptors,
- excludeInternalTopics);
+ excludeInternalTopics,
+ true);
Fetcher<String, String> fetcher = new Fetcher<>(
consumerClient,
http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 8846b5e..e6165cf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -492,7 +492,7 @@ public class AbstractCoordinatorTest {
Metrics metrics,
Time time) {
super(client, GROUP_ID, REBALANCE_TIMEOUT_MS, SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics,
- METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS);
+ METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS, false);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 66fe76d..501db2d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -129,7 +129,7 @@ public class ConsumerCoordinatorTest {
this.partitionAssignor.clear();
client.setNode(node);
- this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled);
+ this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled, true);
}
@After
@@ -861,7 +861,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testIncludeInternalTopicsConfigOption() {
- coordinator = buildCoordinator(new Metrics(), assignors, false, false);
+ coordinator = buildCoordinator(new Metrics(), assignors, false, false, true);
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds());
@@ -955,7 +955,7 @@ public class ConsumerCoordinatorTest {
final String consumerId = "consumer";
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
+ ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
@@ -980,7 +980,7 @@ public class ConsumerCoordinatorTest {
final String consumerId = "consumer";
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
+ ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
@@ -1007,7 +1007,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testAutoCommitManualAssignment() {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
+ ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
subscriptions.assignFromUser(singleton(t1p));
subscriptions.seek(t1p, 100);
@@ -1025,7 +1025,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testAutoCommitManualAssignmentCoordinatorUnknown() {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
+ ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
subscriptions.assignFromUser(singleton(t1p));
subscriptions.seek(t1p, 100);
@@ -1376,7 +1376,7 @@ public class ConsumerCoordinatorTest {
try (Metrics metrics = new Metrics(time)) {
ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range),
- ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false);
+ ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
List<ProtocolMetadata> metadata = coordinator.metadata();
assertEquals(2, metadata.size());
assertEquals(roundRobin.name(), metadata.get(0).name());
@@ -1385,7 +1385,7 @@ public class ConsumerCoordinatorTest {
try (Metrics metrics = new Metrics(time)) {
ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin),
- ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false);
+ ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
List<ProtocolMetadata> metadata = coordinator.metadata();
assertEquals(2, metadata.size());
assertEquals(range.name(), metadata.get(0).name());
@@ -1395,19 +1395,25 @@ public class ConsumerCoordinatorTest {
@Test
public void testCloseDynamicAssignment() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
gracefulCloseTest(coordinator, true);
}
@Test
public void testCloseManualAssignment() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true);
+ gracefulCloseTest(coordinator, false);
+ }
+
+ @Test
+ public void shouldNotLeaveGroupWhenLeaveGroupFlagIsFalse() throws Exception {
+ final ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, false);
gracefulCloseTest(coordinator, false);
}
@Test
public void testCloseCoordinatorNotKnownManualAssignment() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true);
makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
time.sleep(autoCommitIntervalMs);
closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
@@ -1415,14 +1421,14 @@ public class ConsumerCoordinatorTest {
@Test
public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
closeVerifyTimeout(coordinator, 1000, 60000, 0, 0);
}
@Test
public void testCloseCoordinatorNotKnownWithCommits() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
time.sleep(autoCommitIntervalMs);
closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
@@ -1430,14 +1436,14 @@ public class ConsumerCoordinatorTest {
@Test
public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
closeVerifyTimeout(coordinator, 1000, 60000, 0, 0);
}
@Test
public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
time.sleep(autoCommitIntervalMs);
closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
@@ -1445,7 +1451,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
time.sleep(autoCommitIntervalMs);
closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
@@ -1453,20 +1459,20 @@ public class ConsumerCoordinatorTest {
@Test
public void testCloseNoResponseForCommit() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
time.sleep(autoCommitIntervalMs);
closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
}
@Test
public void testCloseNoResponseForLeaveGroup() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
}
@Test
public void testCloseNoWait() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
time.sleep(autoCommitIntervalMs);
closeVerifyTimeout(coordinator, 0, 60000, 0, 0);
}
@@ -1474,7 +1480,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testHeartbeatThreadClose() throws Exception {
groupId = "testCloseTimeoutWithHeartbeatThread";
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
coordinator.ensureActiveGroup();
time.sleep(heartbeatIntervalMs + 100);
Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat
@@ -1485,10 +1491,12 @@ public class ConsumerCoordinatorTest {
assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId));
}
- private ConsumerCoordinator prepareCoordinatorForCloseTest(boolean useGroupManagement, boolean autoCommit) {
+ private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement,
+ final boolean autoCommit,
+ final boolean leaveGroup) {
final String consumerId = "consumer";
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommit);
+ ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommit, leaveGroup);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
if (useGroupManagement) {
@@ -1547,7 +1555,7 @@ public class ConsumerCoordinatorTest {
}
}
- private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean dynamicAssignment) throws Exception {
+ private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean shouldLeaveGroup) throws Exception {
final AtomicBoolean commitRequested = new AtomicBoolean();
final AtomicBoolean leaveGroupRequested = new AtomicBoolean();
client.prepareResponse(new MockClient.RequestMatcher() {
@@ -1569,14 +1577,14 @@ public class ConsumerCoordinatorTest {
coordinator.close();
assertTrue("Commit not requested", commitRequested.get());
- if (dynamicAssignment)
- assertTrue("Leave group not requested", leaveGroupRequested.get());
+ assertEquals("leaveGroupRequested should be " + shouldLeaveGroup, shouldLeaveGroup, leaveGroupRequested.get());
}
- private ConsumerCoordinator buildCoordinator(Metrics metrics,
- List<PartitionAssignor> assignors,
- boolean excludeInternalTopics,
- boolean autoCommitEnabled) {
+ private ConsumerCoordinator buildCoordinator(final Metrics metrics,
+ final List<PartitionAssignor> assignors,
+ final boolean excludeInternalTopics,
+ final boolean autoCommitEnabled,
+ final boolean leaveGroup) {
return new ConsumerCoordinator(
consumerClient,
groupId,
@@ -1593,7 +1601,8 @@ public class ConsumerCoordinatorTest {
autoCommitEnabled,
autoCommitIntervalMs,
null,
- excludeInternalTopics);
+ excludeInternalTopics,
+ leaveGroup);
}
private GroupCoordinatorResponse groupCoordinatorResponse(Node node, short error) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 5a6339e..4305779 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -31,6 +31,7 @@ import java.util.Properties;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
public class ConfigDefTest {
@@ -319,6 +320,27 @@ public class ConfigDefTest {
}
}
+ @Test
+ public void testCanAddInternalConfig() throws Exception {
+ final String configName = "internal.config";
+ final ConfigDef configDef = new ConfigDef().defineInternal(configName, Type.STRING, "", Importance.LOW);
+ final HashMap<String, String> properties = new HashMap<>();
+ properties.put(configName, "value");
+ final List<ConfigValue> results = configDef.validate(properties);
+ final ConfigValue configValue = results.get(0);
+ assertEquals("value", configValue.value());
+ assertEquals(configName, configValue.name());
+ }
+
+ @Test
+ public void testInternalConfigDoesntShowUpInDocs() throws Exception {
+ final String name = "my.config";
+ final ConfigDef configDef = new ConfigDef().defineInternal(name, Type.STRING, "", Importance.LOW);
+ assertFalse(configDef.toHtmlTable().contains("my.config"));
+ assertFalse(configDef.toEnrichedRst().contains("my.config"));
+ assertFalse(configDef.toRst().contains("my.config"));
+ }
+
private static class IntegerRecommender implements ConfigDef.Recommender {
private boolean hasParent;
http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 58525c5..9146925 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -74,14 +74,15 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
ConfigBackingStore configStorage,
WorkerRebalanceListener listener) {
super(client,
- groupId,
- rebalanceTimeoutMs,
- sessionTimeoutMs,
- heartbeatIntervalMs,
- metrics,
- metricGrpPrefix,
- time,
- retryBackoffMs);
+ groupId,
+ rebalanceTimeoutMs,
+ sessionTimeoutMs,
+ heartbeatIntervalMs,
+ metrics,
+ metricGrpPrefix,
+ time,
+ retryBackoffMs,
+ true);
this.restUrl = restUrl;
this.configStorage = configStorage;
this.assignmentSnapshot = null;