You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/04/03 00:12:41 UTC

[kafka] branch trunk updated: KAFKA-7190: KIP-443; Remove streams overrides on repartition topics (#6511)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 213466b  KAFKA-7190: KIP-443; Remove streams overrides on repartition topics (#6511)
213466b is described below

commit 213466b3d4fd21b332c0b6882fea36cf1affef1c
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Apr 2 17:12:32 2019 -0700

    KAFKA-7190: KIP-443; Remove streams overrides on repartition topics (#6511)
    
    * remove streams overrides on segment.ms and segment.index.bytes
    
    * kip comments
---
 .../main/java/org/apache/kafka/streams/StreamsConfig.java | 15 +++++----------
 .../processor/internals/RepartitionTopicConfig.java       |  6 ++----
 .../streams/integration/InternalTopicIntegrationTest.java |  4 ++--
 .../processor/internals/InternalTopologyBuilderTest.java  |  2 +-
 4 files changed, 10 insertions(+), 17 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 2ba7312..f607d1d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1003,17 +1003,12 @@ public class StreamsConfig extends AbstractConfig {
 
         // verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
         final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
+        final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
 
-        if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG))) {
-            final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)).toString());
-            final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
-            final int batchSize;
-            if (producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
-                batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
-            } else {
-                final ProducerConfig producerDefaultConfig = new ProducerConfig(new Properties());
-                batchSize = producerDefaultConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG);
-            }
+        if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)) &&
+            producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
+            final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)).toString());
+            final int batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
 
             if (segmentSize < batchSize) {
                 throw new IllegalArgumentException(String.format("Specified topic segment size %d is is smaller than the configured producer batch size %d, this will cause produced batch not able to be appended to the topic",
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
index 466520e..7161a3f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
@@ -33,10 +33,8 @@ public class RepartitionTopicConfig extends InternalTopicConfig {
     static {
         final Map<String, String> tempTopicDefaultOverrides = new HashMap<>();
         tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
-        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, "52428800");               // 50 MB
-        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800");                     // 50 MB
-        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_MS_CONFIG, "600000");                          // 10 min
-        tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(Long.MAX_VALUE));  // Infinity
+        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800");         // 50 MB
+        tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(-1));  // Infinity
         REPARTITION_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 8bcaf5d..345b581 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -171,7 +171,7 @@ public class InternalTopicIntegrationTest {
 
         final Properties repartitionProps = getTopicProperties(appID + "-Counts-repartition");
         assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
-        assertEquals(5, repartitionProps.size());
+        assertEquals(3, repartitionProps.size());
     }
 
     @Test
@@ -216,6 +216,6 @@ public class InternalTopicIntegrationTest {
 
         final Properties repartitionProps = getTopicProperties(appID + "-CountWindows-repartition");
         assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
-        assertEquals(5, repartitionProps.size());
+        assertEquals(3, repartitionProps.size());
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 1a46af8..fbeae18 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -643,7 +643,7 @@ public class InternalTopologyBuilderTest {
         final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
         final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
-        assertEquals(5, properties.size());
+        assertEquals(3, properties.size());
         assertEquals(String.valueOf(Long.MAX_VALUE), properties.get(TopicConfig.RETENTION_MS_CONFIG));
         assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
         assertEquals("appId-foo", topicConfig.name());