You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/12 12:05:04 UTC
kafka git commit: KAFKA-4965;
set internal.leave.group.on.close to false in StreamsConfig
Repository: kafka
Updated Branches:
refs/heads/trunk 3c93fa321 -> 0be835dde
KAFKA-4965; set internal.leave.group.on.close to false in StreamsConfig
Set the internal consumer config internal.leave.group.on.close in
`StreamsConfig`. This is to reduce the number of rebalances we get
during bounces.
Author: Damian Guy <da...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #2750 from dguy/kafka-4965
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0be835dd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0be835dd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0be835dd
Branch: refs/heads/trunk
Commit: 0be835dde50611277504a4ddd93cc62f39e6defe
Parents: 3c93fa3
Author: Damian Guy <da...@gmail.com>
Authored: Wed Apr 12 13:04:36 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Apr 12 13:04:44 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/kafka/streams/StreamsConfig.java | 2 +-
.../java/org/apache/kafka/streams/StreamsConfigTest.java | 9 +++++++++
.../streams/integration/GlobalKTableIntegrationTest.java | 1 +
.../integration/InternalTopicIntegrationTest.java | 1 +
.../kafka/streams/integration/JoinIntegrationTest.java | 1 +
.../KStreamAggregationDedupIntegrationTest.java | 1 +
.../integration/KStreamAggregationIntegrationTest.java | 1 +
.../integration/KStreamKTableJoinIntegrationTest.java | 1 +
.../streams/integration/KStreamRepartitionJoinTest.java | 1 +
.../KStreamsFineGrainedAutoResetIntegrationTest.java | 1 +
.../integration/KTableKTableJoinIntegrationTest.java | 1 +
.../integration/QueryableStateIntegrationTest.java | 2 ++
.../streams/integration/RegexSourceIntegrationTest.java | 11 +++++++----
.../kafka/streams/integration/ResetIntegrationTest.java | 1 +
.../streams/integration/utils/IntegrationTestUtils.java | 1 +
15 files changed, 30 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 078877f..f968dbc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -402,6 +402,7 @@ public class StreamsConfig extends AbstractConfig {
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false);
// MAX_POLL_INTERVAL_MS_CONFIG needs to be large for streams to handle cases when
// streams is recovering data from state stores. We may set it to Integer.MAX_VALUE since
// the streams code itself catches most exceptions and acts accordingly without needing
@@ -409,7 +410,6 @@ public class StreamsConfig extends AbstractConfig {
// are losing the ability to detect them by setting this value to large. Hopefully
// deadlocks happen very rarely or never.
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
-
CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 612d7a2..b229af9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
+import org.hamcrest.CoreMatchers;
import org.junit.Before;
import org.junit.Test;
@@ -36,6 +37,7 @@ import java.util.Properties;
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -256,6 +258,13 @@ public class StreamsConfigTest {
streamsConfig.getRestoreConsumerConfigs("client");
}
+ @Test
+ public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() throws Exception {
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "group", "client");
+ assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false));
+ }
+
static class MisconfiguredSerde implements Serde {
@Override
public void configure(final Map configs, final boolean isKey) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 676b464..94b576d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -98,6 +98,7 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), globalOne, globalStore);
stream = builder.stream(Serdes.String(), Serdes.Long(), inputStream);
table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table");
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 4b558f4..0ff1d32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -90,6 +90,7 @@ public class InternalTopicIntegrationTest {
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
}
private Properties getTopicConfigProperties(final String changelog) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 5263456..8d95fad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -129,6 +129,7 @@ public class JoinIntegrationTest {
STREAMS_CONFIG.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
STREAMS_CONFIG.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
30000,
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 415f593..372b89c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -97,6 +97,7 @@ public class KStreamAggregationDedupIntegrationTest {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
+ streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer, String>SelectValueMapper();
stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index f42ad56..303ec8a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -106,6 +106,7 @@ public class KStreamAggregationIntegrationTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
final KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.SelectValueMapper();
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index d566041..1806321 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -92,6 +92,7 @@ public class KStreamKTableJoinIntegrationTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 7da1ffd..d3ab176 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -98,6 +98,7 @@ public class KStreamRepartitionJoinTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
+ streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index 3594225..cff5f43 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -134,6 +134,7 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
Properties props = new Properties();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
streamsConfiguration = StreamsTestUtils.getStreamsConfig(
"testAutoOffsetId",
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index ec40c17..efdd9a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -76,6 +76,7 @@ public class KTableKTableJoinIntegrationTest {
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 012462a..1e16f43 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -136,6 +136,8 @@ public class QueryableStateIntegrationTest {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ // override this to make the rebalances happen quickly
+ streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
stringComparator = new Comparator<KeyValue<String, String>>() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index b671c4e..5647b1e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -114,10 +114,13 @@ public class RegexSourceIntegrationTest {
@Before
public void setUp() {
-
- streamsConfiguration = StreamsTestUtils.getStreamsConfig(CLUSTER.bootstrapServers(),
- STRING_SERDE_CLASSNAME,
- STRING_SERDE_CLASSNAME);
+ final Properties properties = new Properties();
+ properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+ streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test",
+ CLUSTER.bootstrapServers(),
+ STRING_SERDE_CLASSNAME,
+ STRING_SERDE_CLASSNAME,
+ properties);
}
@After
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index cf35902..afc299f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -273,6 +273,7 @@ public class ResetIntegrationTest {
streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be835dd/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 1ca6bd4..c31647c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -53,6 +53,7 @@ import java.util.concurrent.Future;
public class IntegrationTestUtils {
public static final long DEFAULT_TIMEOUT = 30 * 1000L;
+ public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close";
/**
* Returns up to `maxMessages` message-values from the topic.