You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/01 17:08:04 UTC

kafka git commit: KAFKA-4361: Streams does not respect user configs for "default" params

Repository: kafka
Updated Branches:
  refs/heads/trunk 979a7baa7 -> ba322e542


KAFKA-4361: Streams does not respect user configs for "default" params

Enable user provided consumer and producer configs to override the streams default configs.

Author: Damian Guy <da...@gmail.com>

Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang

Closes #2084 from dguy/kafka-4361


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ba322e54
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ba322e54
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ba322e54

Branch: refs/heads/trunk
Commit: ba322e54203ffec5779944753f264d70b87a24fd
Parents: 979a7ba
Author: Damian Guy <da...@gmail.com>
Authored: Tue Nov 1 10:07:58 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 1 10:07:58 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java | 21 ++++++----
 .../apache/kafka/streams/StreamsConfigTest.java | 43 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ba322e54/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 66c15b9..5ba4383 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -321,16 +321,19 @@ public class StreamsConfig extends AbstractConfig {
      * @throws ConfigException
      */
     public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) throws ConfigException {
-        final Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
+
+        final Map<String, Object> consumerProps = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
+
+        final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
 
         // disable auto commit and throw exception if there is user overridden values,
         // this is necessary for streams commit semantics
-        if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+        if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
             throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
                     + ", as the streams client will always turn off auto committing.");
         }
 
-        consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);
+        consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
         consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
@@ -362,16 +365,18 @@ public class StreamsConfig extends AbstractConfig {
      * @throws ConfigException
      */
     public Map<String, Object> getRestoreConsumerConfigs(String clientId) throws ConfigException {
-        Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
+        Map<String, Object> consumerProps = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
+
+        final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
 
         // disable auto commit and throw exception if there is user overridden values,
         // this is necessary for streams commit semantics
-        if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+        if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
             throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
                     + ", as the streams client will always turn off auto committing.");
         }
 
-        consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);
+        consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
         consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
@@ -396,8 +401,8 @@ public class StreamsConfig extends AbstractConfig {
      */
     public Map<String, Object> getProducerConfigs(String clientId) {
         // generate producer configs from original properties and overridden maps
-        final Map<String, Object> props = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
-        props.putAll(PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> props = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+        props.putAll(getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()));
 
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
         // add client id with stream client id prefix

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba322e54/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 3caa767..f03bed9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -214,6 +215,48 @@ public class StreamsConfigTest {
         streamsConfig.valueSerde();
     }
 
+    @Test
+    public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+        assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+        assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+    }
+
+    @Test
+    public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception {
+        props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("client");
+        assertEquals("10000", producerConfigs.get(ProducerConfig.LINGER_MS_CONFIG));
+    }
+
+    @Test
+    public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throws Exception {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("client");
+        assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+        assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+    }
+
+    @Test(expected = ConfigException.class)
+    public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws Exception {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        streamsConfig.getConsumerConfigs(null, "a", "b");
+    }
+
+    @Test(expected = ConfigException.class)
+    public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() throws Exception {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        streamsConfig.getRestoreConsumerConfigs("client");
+    }
+
     static class MisconfiguredSerde implements Serde {
         @Override
         public void configure(final Map configs, final boolean isKey) {