You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/04/03 00:12:41 UTC
[kafka] branch trunk updated: KAFKA-7190: KIP-443;
Remove streams overrides on repartition topics (#6511)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 213466b KAFKA-7190: KIP-443; Remove streams overrides on repartition topics (#6511)
213466b is described below
commit 213466b3d4fd21b332c0b6882fea36cf1affef1c
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Apr 2 17:12:32 2019 -0700
KAFKA-7190: KIP-443; Remove streams overrides on repartition topics (#6511)
* remove streams overrides on segment.ms and segment.index.bytes
* kip comments
---
.../main/java/org/apache/kafka/streams/StreamsConfig.java | 15 +++++----------
.../processor/internals/RepartitionTopicConfig.java | 6 ++----
.../streams/integration/InternalTopicIntegrationTest.java | 4 ++--
.../processor/internals/InternalTopologyBuilderTest.java | 2 +-
4 files changed, 10 insertions(+), 17 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 2ba7312..f607d1d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1003,17 +1003,12 @@ public class StreamsConfig extends AbstractConfig {
// verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
+ final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
- if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG))) {
- final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)).toString());
- final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
- final int batchSize;
- if (producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
- batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
- } else {
- final ProducerConfig producerDefaultConfig = new ProducerConfig(new Properties());
- batchSize = producerDefaultConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG);
- }
+ if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)) &&
+ producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
+ final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)).toString());
+ final int batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
if (segmentSize < batchSize) {
throw new IllegalArgumentException(String.format("Specified topic segment size %d is is smaller than the configured producer batch size %d, this will cause produced batch not able to be appended to the topic",
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
index 466520e..7161a3f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
@@ -33,10 +33,8 @@ public class RepartitionTopicConfig extends InternalTopicConfig {
static {
final Map<String, String> tempTopicDefaultOverrides = new HashMap<>();
tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
- tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, "52428800"); // 50 MB
- tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"); // 50 MB
- tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_MS_CONFIG, "600000"); // 10 min
- tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(Long.MAX_VALUE)); // Infinity
+ tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"); // 50 MB
+ tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(-1)); // Infinity
REPARTITION_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 8bcaf5d..345b581 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -171,7 +171,7 @@ public class InternalTopicIntegrationTest {
final Properties repartitionProps = getTopicProperties(appID + "-Counts-repartition");
assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
- assertEquals(5, repartitionProps.size());
+ assertEquals(3, repartitionProps.size());
}
@Test
@@ -216,6 +216,6 @@ public class InternalTopicIntegrationTest {
final Properties repartitionProps = getTopicProperties(appID + "-CountWindows-repartition");
assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
- assertEquals(5, repartitionProps.size());
+ assertEquals(3, repartitionProps.size());
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 1a46af8..fbeae18 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -643,7 +643,7 @@ public class InternalTopologyBuilderTest {
final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
- assertEquals(5, properties.size());
+ assertEquals(3, properties.size());
assertEquals(String.valueOf(Long.MAX_VALUE), properties.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("appId-foo", topicConfig.name());