You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/04/07 04:09:06 UTC
[kafka] branch 2.2 updated: KAFKA-8157: fix the incorrect usage of
segment.index.bytes (2.2) (#6547)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new 082e057 KAFKA-8157: fix the incorrect usage of segment.index.bytes (2.2) (#6547)
082e057 is described below
commit 082e057e1073c4471c92d4b341f094089598145f
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Sat Apr 6 21:08:41 2019 -0700
KAFKA-8157: fix the incorrect usage of segment.index.bytes (2.2) (#6547)
Should be cherry-picked to older branches as well.
Reviewers: Bill Bejeck <bb...@gmail.com>
---
.../java/org/apache/kafka/streams/StreamsConfig.java | 19 +++++++------------
.../org/apache/kafka/streams/StreamsConfigTest.java | 12 ++++++++++--
2 files changed, 17 insertions(+), 14 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b756ba2..d5f30ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1003,22 +1003,17 @@ public class StreamsConfig extends AbstractConfig {
// verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
+ final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
- if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG))) {
- final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)).toString());
- final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
- final int batchSize;
- if (producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
- batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
- } else {
- final ProducerConfig producerDefaultConfig = new ProducerConfig(new Properties());
- batchSize = producerDefaultConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG);
- }
+ if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)) &&
+ producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
+ final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)).toString());
+ final int batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
if (segmentSize < batchSize) {
throw new IllegalArgumentException(String.format("Specified topic segment size %d is is smaller than the configured producer batch size %d, this will cause produced batch not able to be appended to the topic",
- segmentSize,
- batchSize));
+ segmentSize,
+ batchSize));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 19d5ae0..724cbb5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -49,6 +49,7 @@ import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION;
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
+import static org.apache.kafka.streams.StreamsConfig.topicPrefix;
import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;
@@ -111,7 +112,7 @@ public class StreamsConfigTest {
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 5);
- props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
+ props.put(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
final StreamsConfig streamsConfig = new StreamsConfig(props);
final String groupId = "example-application";
@@ -125,7 +126,7 @@ public class StreamsConfigTest {
assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
assertNull(returnedProps.get(StreamsConfig.RETRIES_CONFIG));
assertEquals(5, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
- assertEquals(100, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
+ assertEquals(100, returnedProps.get(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
}
@Test
@@ -639,6 +640,13 @@ public class StreamsConfigTest {
new StreamsConfig(props);
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testThrowIllegalArgumentExceptionWhenTopicSegmentSizeSmallerThanProducerBatchSize() {
+ props.put(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
+ props.put(producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), 101);
+ new StreamsConfig(props).getMainConsumerConfigs("groupId", "clientId");
+ }
+
static class MisconfiguredSerde implements Serde {
@Override
public void configure(final Map configs, final boolean isKey) {