You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/23 15:28:40 UTC
[5/5] kafka git commit: KAFKA-4361: Streams does not respect user
configs for "default" params
KAFKA-4361: Streams does not respect user configs for "default" params
Enable user provided consumer and producer configs to override the streams default configs.
Author: Damian Guy <da...@gmail.com>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes #2084 from dguy/kafka-4361
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f91d95ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f91d95ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f91d95ac
Branch: refs/heads/0.10.1
Commit: f91d95ac9aa6c5fe9b7ee091b7759fee53e465d4
Parents: 34f987f
Author: Damian Guy <da...@gmail.com>
Authored: Tue Nov 1 10:07:58 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 07:28:24 2016 -0800
----------------------------------------------------------------------
.../org/apache/kafka/streams/StreamsConfig.java | 21 ++++++----
.../apache/kafka/streams/StreamsConfigTest.java | 43 ++++++++++++++++++++
2 files changed, 56 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f91d95ac/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 66c15b9..5ba4383 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -321,16 +321,19 @@ public class StreamsConfig extends AbstractConfig {
* @throws ConfigException
*/
public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) throws ConfigException {
- final Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
+
+ final Map<String, Object> consumerProps = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
+
+ final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
// disable auto commit and throw exception if there is user overridden values,
// this is necessary for streams commit semantics
- if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+ if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+ ", as the streams client will always turn off auto committing.");
}
- consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);
+ consumerProps.putAll(clientProvidedProps);
// bootstrap.servers should be from StreamsConfig
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
@@ -362,16 +365,18 @@ public class StreamsConfig extends AbstractConfig {
* @throws ConfigException
*/
public Map<String, Object> getRestoreConsumerConfigs(String clientId) throws ConfigException {
- Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
+ Map<String, Object> consumerProps = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
+
+ final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
// disable auto commit and throw exception if there is user overridden values,
// this is necessary for streams commit semantics
- if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+ if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+ ", as the streams client will always turn off auto committing.");
}
- consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);
+ consumerProps.putAll(clientProvidedProps);
// bootstrap.servers should be from StreamsConfig
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
@@ -396,8 +401,8 @@ public class StreamsConfig extends AbstractConfig {
*/
public Map<String, Object> getProducerConfigs(String clientId) {
// generate producer configs from original properties and overridden maps
- final Map<String, Object> props = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
- props.putAll(PRODUCER_DEFAULT_OVERRIDES);
+ final Map<String, Object> props = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+ props.putAll(getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()));
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
// add client id with stream client id prefix
http://git-wip-us.apache.org/repos/asf/kafka/blob/f91d95ac/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 3caa767..f03bed9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -214,6 +215,48 @@ public class StreamsConfigTest {
streamsConfig.valueSerde();
}
+ @Test
+ public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception {
+ props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
+ props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+ assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+ }
+
+ @Test
+ public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception {
+ props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("client");
+ assertEquals("10000", producerConfigs.get(ProducerConfig.LINGER_MS_CONFIG));
+ }
+
+ @Test
+ public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throws Exception {
+ props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
+ props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("client");
+ assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+ }
+
+ @Test(expected = ConfigException.class)
+ public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws Exception {
+ props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ streamsConfig.getConsumerConfigs(null, "a", "b");
+ }
+
+ @Test(expected = ConfigException.class)
+ public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() throws Exception {
+ props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ streamsConfig.getRestoreConsumerConfigs("client");
+ }
+
static class MisconfiguredSerde implements Serde {
@Override
public void configure(final Map configs, final boolean isKey) {