You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/04/07 04:10:22 UTC

[kafka] branch 2.1 updated: KAFKA-8157: fix the incorrect usage of segment.index.bytes (2.2) (#6547)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 483a081  KAFKA-8157: fix the incorrect usage of segment.index.bytes (2.2) (#6547)
483a081 is described below

commit 483a081870dfba0a907e995a2b137e7a9c09f14b
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Sat Apr 6 21:08:41 2019 -0700

    KAFKA-8157: fix the incorrect usage of segment.index.bytes (2.2) (#6547)
    
    Should be cherry-picked to older branches as well.
    
    Reviewers: Bill Bejeck <bb...@gmail.com>
---
 .../java/org/apache/kafka/streams/StreamsConfig.java  | 19 +++++++------------
 .../org/apache/kafka/streams/StreamsConfigTest.java   | 12 ++++++++++--
 2 files changed, 17 insertions(+), 14 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 4552374..1d44995 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1003,22 +1003,17 @@ public class StreamsConfig extends AbstractConfig {
 
         // verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
         final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
+        final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
 
-        if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG))) {
-            final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)).toString());
-            final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
-            final int batchSize;
-            if (producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
-                batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
-            } else {
-                final ProducerConfig producerDefaultConfig = new ProducerConfig(new Properties());
-                batchSize = producerDefaultConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG);
-            }
+        if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)) &&
+            producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
+            final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)).toString());
+            final int batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
 
             if (segmentSize < batchSize) {
                 throw new IllegalArgumentException(String.format("Specified topic segment size %d is is smaller than the configured producer batch size %d, this will cause produced batch not able to be appended to the topic",
-                        segmentSize,
-                        batchSize));
+                    segmentSize,
+                    batchSize));
             }
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 4bddb29..38563be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -49,6 +49,7 @@ import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION;
 import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
+import static org.apache.kafka.streams.StreamsConfig.topicPrefix;
 import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
@@ -111,7 +112,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host");
         props.put(StreamsConfig.RETRIES_CONFIG, 10);
         props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 5);
-        props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
+        props.put(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
 
         final String groupId = "example-application";
@@ -125,7 +126,7 @@ public class StreamsConfigTest {
         assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
         assertNull(returnedProps.get(StreamsConfig.RETRIES_CONFIG));
         assertEquals(5, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
-        assertEquals(100, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
+        assertEquals(100, returnedProps.get(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
     }
 
     @Test
@@ -627,6 +628,13 @@ public class StreamsConfigTest {
         new StreamsConfig(props);
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void testThrowIllegalArgumentExceptionWhenTopicSegmentSizeSmallerThanProducerBatchSize() {
+        props.put(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
+        props.put(producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), 101);
+        new StreamsConfig(props).getMainConsumerConfigs("groupId", "clientId");
+    }
+
     static class MisconfiguredSerde implements Serde {
         @Override
         public void configure(final Map configs, final boolean isKey) {