You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/05/22 15:08:46 UTC

[kafka] branch 2.3 updated: KAFKA-8399: bring back internal.leave.group.on.close config for KStream (#6779)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 8dee2fe  KAFKA-8399: bring back internal.leave.group.on.close config for KStream (#6779)
8dee2fe is described below

commit 8dee2fe1f59bb96840e67beb0493c8254521a67f
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Wed May 22 08:03:00 2019 -0700

    KAFKA-8399: bring back internal.leave.group.on.close config for KStream (#6779)
    
    As title states. We plan to merge this to both trunk and 2.3 if it could fix the stream system tests globally.
    Reference implementation: #6673
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>
---
 .../org/apache/kafka/clients/consumer/ConsumerConfig.java | 15 +++++++++++++++
 .../org/apache/kafka/clients/consumer/KafkaConsumer.java  |  3 ++-
 .../clients/consumer/internals/AbstractCoordinator.java   | 14 ++++++++++----
 .../clients/consumer/internals/ConsumerCoordinator.java   |  6 ++++--
 .../apache/kafka/clients/consumer/KafkaConsumerTest.java  |  3 ++-
 .../consumer/internals/AbstractCoordinatorTest.java       |  2 +-
 .../consumer/internals/ConsumerCoordinatorTest.java       |  3 ++-
 .../connect/runtime/distributed/WorkerCoordinator.java    |  3 ++-
 .../main/java/org/apache/kafka/streams/StreamsConfig.java |  1 +
 .../java/org/apache/kafka/streams/StreamsConfigTest.java  |  8 ++++++++
 10 files changed, 47 insertions(+), 11 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index ff2e5cd..010fff8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -254,6 +254,17 @@ public class ConsumerConfig extends AbstractConfig {
             "be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.";
     public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
 
+    /**
+     * <code>internal.leave.group.on.close</code>
+     * Whether or not the consumer should leave the group on close. If set to <code>false</code> then a rebalance
+     * won't occur until <code>session.timeout.ms</code> expires.
+     *
+     * <p>
+     * Note: this is an internal configuration and could be changed in the future in a backward incompatible way
+     *
+     */
+    static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close";
+
     /** <code>isolation.level</code> */
     public static final String ISOLATION_LEVEL_CONFIG = "isolation.level";
     public static final String ISOLATION_LEVEL_DOC = "<p>Controls how to read messages written transactionally. If set to <code>read_committed</code>, consumer.poll() will only return" +
@@ -476,6 +487,10 @@ public class ConsumerConfig extends AbstractConfig {
                                         DEFAULT_EXCLUDE_INTERNAL_TOPICS,
                                         Importance.MEDIUM,
                                         EXCLUDE_INTERNAL_TOPICS_DOC)
+                                .defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
+                                        Type.BOOLEAN,
+                                        true,
+                                        Importance.LOW)
                                 .define(ISOLATION_LEVEL_CONFIG,
                                         Type.STRING,
                                         DEFAULT_ISOLATION_LEVEL,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ad7ae82..c33a52e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -792,7 +792,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                         retryBackoffMs,
                         enableAutoCommit,
                         config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
-                        this.interceptors);
+                        this.interceptors,
+                        config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
             this.fetcher = new Fetcher<>(
                     logContext,
                     this.client,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 3af6d05..54678f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -133,6 +133,7 @@ public abstract class AbstractCoordinator implements Closeable {
     private Generation generation = Generation.NO_GENERATION;
 
     private RequestFuture<Void> findCoordinatorFuture = null;
+    private final boolean leaveGroupOnClose;
 
     /**
      * Initialize the coordination manager.
@@ -147,7 +148,8 @@ public abstract class AbstractCoordinator implements Closeable {
                                Metrics metrics,
                                String metricGrpPrefix,
                                Time time,
-                               long retryBackoffMs) {
+                               long retryBackoffMs,
+                               boolean leaveGroupOnClose) {
         this.log = logContext.logger(AbstractCoordinator.class);
         this.client = client;
         this.time = time;
@@ -159,6 +161,7 @@ public abstract class AbstractCoordinator implements Closeable {
         this.heartbeat = heartbeat;
         this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
         this.retryBackoffMs = retryBackoffMs;
+        this.leaveGroupOnClose = leaveGroupOnClose;
     }
 
     public AbstractCoordinator(LogContext logContext,
@@ -171,10 +174,11 @@ public abstract class AbstractCoordinator implements Closeable {
                                Metrics metrics,
                                String metricGrpPrefix,
                                Time time,
-                               long retryBackoffMs) {
+                               long retryBackoffMs,
+                               boolean leaveGroupOnClose) {
         this(logContext, client, groupId, groupInstanceId, rebalanceTimeoutMs, sessionTimeoutMs,
                 new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs),
-                metrics, metricGrpPrefix, time, retryBackoffMs);
+                metrics, metricGrpPrefix, time, retryBackoffMs, leaveGroupOnClose);
     }
 
     /**
@@ -845,7 +849,9 @@ public abstract class AbstractCoordinator implements Closeable {
             // Synchronize after closing the heartbeat thread since heartbeat thread
             // needs this lock to complete and terminate after close flag is set.
             synchronized (this) {
-                maybeLeaveGroup();
+                if (leaveGroupOnClose) {
+                    maybeLeaveGroup();
+                }
 
                 // At this point, there may be pending commits (async commits or sync commits that were
                 // interrupted using wakeup) and the leave group request which have been queued, but not
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3aef0c5..b03af74 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -137,7 +137,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                long retryBackoffMs,
                                boolean autoCommitEnabled,
                                int autoCommitIntervalMs,
-                               ConsumerInterceptors<?, ?> interceptors) {
+                               ConsumerInterceptors<?, ?> interceptors,
+                               boolean leaveGroupOnClose) {
         super(logContext,
               client,
               groupId,
@@ -148,7 +149,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
               metrics,
               metricGrpPrefix,
               time,
-              retryBackoffMs);
+              retryBackoffMs,
+              leaveGroupOnClose);
         this.log = logContext.logger(ConsumerCoordinator.class);
         this.metadata = metadata;
         this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9012ea2..42cccd4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1902,7 +1902,8 @@ public class KafkaConsumerTest {
                 retryBackoffMs,
                 autoCommitEnabled,
                 autoCommitIntervalMs,
-                interceptors);
+                interceptors,
+                true);
 
         Fetcher<String, String> fetcher = new Fetcher<>(
                 loggerFactory,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 31328b3..0fc5f62 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -857,7 +857,7 @@ public class AbstractCoordinatorTest {
                                 int retryBackoffMs,
                                 Optional<String> groupInstanceId) {
             super(new LogContext(), client, GROUP_ID, groupInstanceId, rebalanceTimeoutMs,
-                    SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs);
+                    SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, !groupInstanceId.isPresent());
         }
 
         @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index f0214d2..86032c4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2219,7 +2219,8 @@ public class ConsumerCoordinatorTest {
                 retryBackoffMs,
                 autoCommitEnabled,
                 autoCommitIntervalMs,
-                null
+                null,
+                !groupInstanceId.isPresent()
         );
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 706742a..fd7c7a4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -95,7 +95,8 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
               metrics,
               metricGrpPrefix,
               time,
-              retryBackoffMs);
+              retryBackoffMs,
+              true);
         this.log = logContext.logger(WorkerCoordinator.class);
         this.restUrl = restUrl;
         this.configStorage = configStorage;
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 5024c28..6d93b99 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -720,6 +720,7 @@ public class StreamsConfig extends AbstractConfig {
         tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
         tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false);
         CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index c202c93..5f053bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
+import org.hamcrest.CoreMatchers;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -425,6 +426,13 @@ public class StreamsConfigTest {
     }
 
     @Test
+    public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
+        assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.equalTo(false));
+    }
+
+    @Test
     public void shouldAcceptAtLeastOnce() {
         // don't use `StreamsConfig.AT_LEAST_ONCE` to actually do a useful test
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "at_least_once");