You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/22 15:03:23 UTC
[kafka] branch trunk updated: KAFKA-8399: bring back
internal.leave.group.on.close config for KStream (#6779)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cafdc1e KAFKA-8399: bring back internal.leave.group.on.close config for KStream (#6779)
cafdc1e is described below
commit cafdc1e7df7725c92a374ba9528f7974fe592196
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Wed May 22 08:03:00 2019 -0700
KAFKA-8399: bring back internal.leave.group.on.close config for KStream (#6779)
As title states. We plan to merge this to both trunk and 2.3 if it could fix the stream system tests globally.
Reference implementation: #6673
Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>
---
.../org/apache/kafka/clients/consumer/ConsumerConfig.java | 15 +++++++++++++++
.../org/apache/kafka/clients/consumer/KafkaConsumer.java | 3 ++-
.../clients/consumer/internals/AbstractCoordinator.java | 14 ++++++++++----
.../clients/consumer/internals/ConsumerCoordinator.java | 6 ++++--
.../apache/kafka/clients/consumer/KafkaConsumerTest.java | 3 ++-
.../consumer/internals/AbstractCoordinatorTest.java | 2 +-
.../consumer/internals/ConsumerCoordinatorTest.java | 3 ++-
.../connect/runtime/distributed/WorkerCoordinator.java | 3 ++-
.../main/java/org/apache/kafka/streams/StreamsConfig.java | 1 +
.../java/org/apache/kafka/streams/StreamsConfigTest.java | 8 ++++++++
10 files changed, 47 insertions(+), 11 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index ff2e5cd..010fff8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -254,6 +254,17 @@ public class ConsumerConfig extends AbstractConfig {
"be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.";
public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
+ /**
+ * <code>internal.leave.group.on.close</code>
+ * Whether or not the consumer should leave the group on close. If set to <code>false</code> then a rebalance
+ * won't occur until <code>session.timeout.ms</code> expires.
+ *
+ * <p>
+ * Note: this is an internal configuration and could be changed in the future in a backward incompatible way
+ *
+ */
+ static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close";
+
/** <code>isolation.level</code> */
public static final String ISOLATION_LEVEL_CONFIG = "isolation.level";
public static final String ISOLATION_LEVEL_DOC = "<p>Controls how to read messages written transactionally. If set to <code>read_committed</code>, consumer.poll() will only return" +
@@ -476,6 +487,10 @@ public class ConsumerConfig extends AbstractConfig {
DEFAULT_EXCLUDE_INTERNAL_TOPICS,
Importance.MEDIUM,
EXCLUDE_INTERNAL_TOPICS_DOC)
+ .defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
+ Type.BOOLEAN,
+ true,
+ Importance.LOW)
.define(ISOLATION_LEVEL_CONFIG,
Type.STRING,
DEFAULT_ISOLATION_LEVEL,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ad7ae82..c33a52e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -792,7 +792,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
retryBackoffMs,
enableAutoCommit,
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
- this.interceptors);
+ this.interceptors,
+ config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
this.fetcher = new Fetcher<>(
logContext,
this.client,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 3af6d05..54678f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -133,6 +133,7 @@ public abstract class AbstractCoordinator implements Closeable {
private Generation generation = Generation.NO_GENERATION;
private RequestFuture<Void> findCoordinatorFuture = null;
+ private final boolean leaveGroupOnClose;
/**
* Initialize the coordination manager.
@@ -147,7 +148,8 @@ public abstract class AbstractCoordinator implements Closeable {
Metrics metrics,
String metricGrpPrefix,
Time time,
- long retryBackoffMs) {
+ long retryBackoffMs,
+ boolean leaveGroupOnClose) {
this.log = logContext.logger(AbstractCoordinator.class);
this.client = client;
this.time = time;
@@ -159,6 +161,7 @@ public abstract class AbstractCoordinator implements Closeable {
this.heartbeat = heartbeat;
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
this.retryBackoffMs = retryBackoffMs;
+ this.leaveGroupOnClose = leaveGroupOnClose;
}
public AbstractCoordinator(LogContext logContext,
@@ -171,10 +174,11 @@ public abstract class AbstractCoordinator implements Closeable {
Metrics metrics,
String metricGrpPrefix,
Time time,
- long retryBackoffMs) {
+ long retryBackoffMs,
+ boolean leaveGroupOnClose) {
this(logContext, client, groupId, groupInstanceId, rebalanceTimeoutMs, sessionTimeoutMs,
new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs),
- metrics, metricGrpPrefix, time, retryBackoffMs);
+ metrics, metricGrpPrefix, time, retryBackoffMs, leaveGroupOnClose);
}
/**
@@ -845,7 +849,9 @@ public abstract class AbstractCoordinator implements Closeable {
// Synchronize after closing the heartbeat thread since heartbeat thread
// needs this lock to complete and terminate after close flag is set.
synchronized (this) {
- maybeLeaveGroup();
+ if (leaveGroupOnClose) {
+ maybeLeaveGroup();
+ }
// At this point, there may be pending commits (async commits or sync commits that were
// interrupted using wakeup) and the leave group request which have been queued, but not
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3aef0c5..b03af74 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -137,7 +137,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
long retryBackoffMs,
boolean autoCommitEnabled,
int autoCommitIntervalMs,
- ConsumerInterceptors<?, ?> interceptors) {
+ ConsumerInterceptors<?, ?> interceptors,
+ boolean leaveGroupOnClose) {
super(logContext,
client,
groupId,
@@ -148,7 +149,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
metrics,
metricGrpPrefix,
time,
- retryBackoffMs);
+ retryBackoffMs,
+ leaveGroupOnClose);
this.log = logContext.logger(ConsumerCoordinator.class);
this.metadata = metadata;
this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9012ea2..42cccd4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1902,7 +1902,8 @@ public class KafkaConsumerTest {
retryBackoffMs,
autoCommitEnabled,
autoCommitIntervalMs,
- interceptors);
+ interceptors,
+ true);
Fetcher<String, String> fetcher = new Fetcher<>(
loggerFactory,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 31328b3..0fc5f62 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -857,7 +857,7 @@ public class AbstractCoordinatorTest {
int retryBackoffMs,
Optional<String> groupInstanceId) {
super(new LogContext(), client, GROUP_ID, groupInstanceId, rebalanceTimeoutMs,
- SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs);
+ SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, !groupInstanceId.isPresent());
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index f0214d2..86032c4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2219,7 +2219,8 @@ public class ConsumerCoordinatorTest {
retryBackoffMs,
autoCommitEnabled,
autoCommitIntervalMs,
- null
+ null,
+ !groupInstanceId.isPresent()
);
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 706742a..fd7c7a4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -95,7 +95,8 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
metrics,
metricGrpPrefix,
time,
- retryBackoffMs);
+ retryBackoffMs,
+ true);
this.log = logContext.logger(WorkerCoordinator.class);
this.restUrl = restUrl;
this.configStorage = configStorage;
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 5024c28..6d93b99 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -720,6 +720,7 @@ public class StreamsConfig extends AbstractConfig {
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false);
CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index c202c93..5f053bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
+import org.hamcrest.CoreMatchers;
import org.junit.Before;
import org.junit.Test;
@@ -425,6 +426,13 @@ public class StreamsConfigTest {
}
@Test
+ public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
+ assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.equalTo(false));
+ }
+
+ @Test
public void shouldAcceptAtLeastOnce() {
// don't use `StreamsConfig.AT_LEAST_ONCE` to actually do a useful test
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "at_least_once");