You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/12 12:05:04 UTC

kafka git commit: KAFKA-4965; set internal.leave.group.on.close to false in StreamsConfig

Repository: kafka
Updated Branches:
  refs/heads/trunk 3c93fa321 -> 0be835dde


KAFKA-4965; set internal.leave.group.on.close to false in StreamsConfig

Set the internal consumer config internal.leave.group.on.close in
`StreamsConfig`. This is to reduce the number of rebalances we get
during bounces.

Author: Damian Guy <da...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #2750 from dguy/kafka-4965


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0be835dd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0be835dd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0be835dd

Branch: refs/heads/trunk
Commit: 0be835dde50611277504a4ddd93cc62f39e6defe
Parents: 3c93fa3
Author: Damian Guy <da...@gmail.com>
Authored: Wed Apr 12 13:04:36 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Apr 12 13:04:44 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/kafka/streams/StreamsConfig.java     |  2 +-
 .../java/org/apache/kafka/streams/StreamsConfigTest.java |  9 +++++++++
 .../streams/integration/GlobalKTableIntegrationTest.java |  1 +
 .../integration/InternalTopicIntegrationTest.java        |  1 +
 .../kafka/streams/integration/JoinIntegrationTest.java   |  1 +
 .../KStreamAggregationDedupIntegrationTest.java          |  1 +
 .../integration/KStreamAggregationIntegrationTest.java   |  1 +
 .../integration/KStreamKTableJoinIntegrationTest.java    |  1 +
 .../streams/integration/KStreamRepartitionJoinTest.java  |  1 +
 .../KStreamsFineGrainedAutoResetIntegrationTest.java     |  1 +
 .../integration/KTableKTableJoinIntegrationTest.java     |  1 +
 .../integration/QueryableStateIntegrationTest.java       |  2 ++
 .../streams/integration/RegexSourceIntegrationTest.java  | 11 +++++++----
 .../kafka/streams/integration/ResetIntegrationTest.java  |  1 +
 .../streams/integration/utils/IntegrationTestUtils.java  |  1 +
 15 files changed, 30 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 078877f..f968dbc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -402,6 +402,7 @@ public class StreamsConfig extends AbstractConfig {
         tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
         tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false);
         // MAX_POLL_INTERVAL_MS_CONFIG needs to be large for streams to handle cases when
         // streams is recovering data from state stores. We may set it to Integer.MAX_VALUE since
         // the streams code itself catches most exceptions and acts accordingly without needing
@@ -409,7 +410,6 @@ public class StreamsConfig extends AbstractConfig {
         // are losing the ability to detect them by setting this value to large. Hopefully
         // deadlocks happen very rarely or never.
         tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
-
         CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 612d7a2..b229af9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.hamcrest.CoreMatchers;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -36,6 +37,7 @@ import java.util.Properties;
 
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -256,6 +258,13 @@ public class StreamsConfigTest {
         streamsConfig.getRestoreConsumerConfigs("client");
     }
 
+    @Test
+    public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() throws Exception {
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "group", "client");
+        assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false));
+    }
+
     static class MisconfiguredSerde implements Serde {
         @Override
         public void configure(final Map configs, final boolean isKey) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 676b464..94b576d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -98,6 +98,7 @@ public class GlobalKTableIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), globalOne, globalStore);
         stream = builder.stream(Serdes.String(), Serdes.Long(), inputStream);
         table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table");

http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 4b558f4..0ff1d32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -90,6 +90,7 @@ public class InternalTopicIntegrationTest {
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
     }
 
     private Properties getTopicConfigProperties(final String changelog) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 5263456..8d95fad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -129,6 +129,7 @@ public class JoinIntegrationTest {
         STREAMS_CONFIG.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
         STREAMS_CONFIG.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
             30000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 415f593..372b89c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -97,6 +97,7 @@ public class KStreamAggregationDedupIntegrationTest {
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer, String>SelectValueMapper();
         stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index f42ad56..303ec8a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -106,6 +106,7 @@ public class KStreamAggregationIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
 
         final KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.SelectValueMapper();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index d566041..1806321 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -92,6 +92,7 @@ public class KStreamKTableJoinIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
             TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 7da1ffd..d3ab176 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -98,6 +98,7 @@ public class KStreamRepartitionJoinTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
         streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index 3594225..cff5f43 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -134,6 +134,7 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
 
         Properties props = new Properties();
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         streamsConfiguration = StreamsTestUtils.getStreamsConfig(
                 "testAutoOffsetId",

http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index ec40c17..efdd9a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -76,6 +76,7 @@ public class KTableKTableJoinIntegrationTest {
         streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         final Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 012462a..1e16f43 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -136,6 +136,8 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        // override this to make the rebalances happen quickly
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
 
         stringComparator = new Comparator<KeyValue<String, String>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index b671c4e..5647b1e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -114,10 +114,13 @@ public class RegexSourceIntegrationTest {
 
     @Before
     public void setUp() {
-
-        streamsConfiguration = StreamsTestUtils.getStreamsConfig(CLUSTER.bootstrapServers(),
-            STRING_SERDE_CLASSNAME,
-            STRING_SERDE_CLASSNAME);
+        final Properties properties = new Properties();
+        properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+        streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test",
+                                                                 CLUSTER.bootstrapServers(),
+                                                                 STRING_SERDE_CLASSNAME,
+                                                                 STRING_SERDE_CLASSNAME,
+                                                                 properties);
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index cf35902..afc299f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -273,6 +273,7 @@ public class ResetIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 1ca6bd4..c31647c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -53,6 +53,7 @@ import java.util.concurrent.Future;
 public class IntegrationTestUtils {
 
     public static final long DEFAULT_TIMEOUT = 30 * 1000L;
+    public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close";
 
     /**
      * Returns up to `maxMessages` message-values from the topic.