You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/02 21:14:55 UTC

kafka git commit: KAFKA-3929: Add prefix for underlying clients configs in StreamConfig

Repository: kafka
Updated Branches:
  refs/heads/trunk 3bb38d37b -> bb629f224


KAFKA-3929: Add prefix for underlying clients configs in StreamConfig

Add prefixes for consumer and producer configs to StreamsConfig, but be backward compatible.

Author: Damian Guy <da...@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #1649 from dguy/kafka-3929


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bb629f22
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bb629f22
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bb629f22

Branch: refs/heads/trunk
Commit: bb629f2243c4462db2a863793c190d734f11f3c6
Parents: 3bb38d3
Author: Damian Guy <da...@gmail.com>
Authored: Tue Aug 2 14:14:52 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Aug 2 14:14:52 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java | 83 ++++++++++++++++++--
 .../apache/kafka/streams/StreamsConfigTest.java | 64 +++++++++++++++
 2 files changed, 139 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bb629f22/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index a68de4f..b624e0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -47,6 +47,12 @@ public class StreamsConfig extends AbstractConfig {
 
     private static final ConfigDef CONFIG;
 
+    // Prefix used to isolate consumer configs from producer configs.
+    public static final String CONSUMER_PREFIX = "consumer.";
+
+    // Prefix used to isolate producer configs from consumer configs.
+    public static final String PRODUCER_PREFIX = "producer.";
+
     /** <code>state.dir</code> */
     public static final String STATE_DIR_CONFIG = "state.dir";
     private static final String STATE_DIR_DOC = "Directory location for state store.";
@@ -122,6 +128,7 @@ public class StreamsConfig extends AbstractConfig {
     public static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface";
 
 
+
     static {
         CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG,      // required with no default value
                                         Type.STRING,
@@ -251,23 +258,55 @@ public class StreamsConfig extends AbstractConfig {
         public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__";
     }
 
+    /**
+     * Prefix a property with {@link StreamsConfig#CONSUMER_PREFIX}. This is used to isolate consumer configs
+     * from producer configs
+     * @param consumerProp
+     * @return CONSUMER_PREFIX + consumerProp
+     */
+    public static String consumerPrefix(final String consumerProp) {
+        return CONSUMER_PREFIX + consumerProp;
+    }
+
+    /**
+     * Prefix a property with {@link StreamsConfig#PRODUCER_PREFIX}. This is used to isolate producer configs
+     * from consumer configs
+     * @param producerProp
+     * @return PRODUCER_PREFIX + consumerProp
+     */
+    public static String producerPrefix(final String producerProp) {
+        return PRODUCER_PREFIX + producerProp;
+    }
+
     public StreamsConfig(Map<?, ?> props) {
         super(CONFIG, props);
     }
 
+    /**
+     * Get the configs specific to the Consumer. Properties using the prefix {@link StreamsConfig#CONSUMER_PREFIX}
+     * will be used in favor over their non-prefixed versions except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG}
+     * where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster
+     * @param streamThread   the {@link StreamThread} creating a consumer
+     * @param groupId        consumer groupId
+     * @param clientId       clientId
+     * @return  Map of the Consumer configuration.
+     * @throws ConfigException
+     */
     public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) throws ConfigException {
-        Map<String, Object> originals = this.originals();
+        final Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX);
 
         // disable auto commit and throw exception if there is user overridden values,
         // this is necessary for streams commit semantics
-        if (originals.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+        if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
             throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
                     + ", as the streams client will always turn off auto committing.");
         }
 
         // generate consumer configs from original properties and overridden maps
-        Map<String, Object> props = clientProps(ConsumerConfig.configNames(), originals, CONSUMER_DEFAULT_OVERRIDES);
+        Map<String, Object> props = clientProps(ConsumerConfig.configNames(), consumerProps, CONSUMER_DEFAULT_OVERRIDES);
 
+        // bootstrap.servers should be from StreamsConfig
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
         // add client id with stream client id prefix, and group id
         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
@@ -283,18 +322,30 @@ public class StreamsConfig extends AbstractConfig {
         return props;
     }
 
+
+    /**
+     * Get the consumer config for the restore-consumer. Properties using the prefix {@link StreamsConfig#CONSUMER_PREFIX}
+     * will be used in favor over their non-prefixed versions except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG}
+     * where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster
+     * @param clientId  clientId
+     * @return  Map of the Consumer configuration
+     * @throws ConfigException
+     */
     public Map<String, Object> getRestoreConsumerConfigs(String clientId) throws ConfigException {
-        Map<String, Object> originals = this.originals();
+        Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX);
 
         // disable auto commit and throw exception if there is user overridden values,
         // this is necessary for streams commit semantics
-        if (originals.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+        if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
             throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
                     + ", as the streams client will always turn off auto committing.");
         }
 
         // generate consumer configs from original properties and overridden maps
-        Map<String, Object> props = clientProps(ConsumerConfig.configNames(), originals, CONSUMER_DEFAULT_OVERRIDES);
+        Map<String, Object> props = clientProps(ConsumerConfig.configNames(), consumerProps, CONSUMER_DEFAULT_OVERRIDES);
+
+        // bootstrap.servers should be from StreamsConfig
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
 
         // no need to set group id for a restore consumer
         props.remove(ConsumerConfig.GROUP_ID_CONFIG);
@@ -305,16 +356,32 @@ public class StreamsConfig extends AbstractConfig {
         return props;
     }
 
+
+    /**
+     * Get the configs for the Producer. Properties using the prefix {@link StreamsConfig#PRODUCER_PREFIX}
+     * will be used in favor over their non-prefixed versions except in the case of {@link ProducerConfig#BOOTSTRAP_SERVERS_CONFIG}
+     * where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster
+     * @param clientId  clientId
+     * @return  Map of the Consumer configuration
+     * @throws ConfigException
+     */
     public Map<String, Object> getProducerConfigs(String clientId) {
         // generate producer configs from original properties and overridden maps
-        Map<String, Object> props = clientProps(ProducerConfig.configNames(), this.originals(), PRODUCER_DEFAULT_OVERRIDES);
-
+        Map<String, Object> props = clientProps(ProducerConfig.configNames(), getClientPropsWithPrefix(PRODUCER_PREFIX), PRODUCER_DEFAULT_OVERRIDES);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
         // add client id with stream client id prefix
         props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
 
         return props;
     }
 
+    private Map<String, Object> getClientPropsWithPrefix(final String prefix) {
+        // To be backward compatible we first get all the originals.
+        final Map<String, Object> props = this.originals();
+        props.putAll(this.originalsWithPrefix(prefix));
+        return props;
+    }
+
     public Serde keySerde() {
         Serde<?> serde = getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serde.class);
         serde.configure(originals(), true);

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb629f22/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 3d4a9cc..30306f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -31,6 +31,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
+import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -108,4 +110,66 @@ public class StreamsConfigTest {
         assertEquals(expectedBootstrapServers, actualBootstrapServers);
     }
 
+    @Test
+    public void shouldSupportPrefixedConsumerConfigs() throws Exception {
+        props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
+        props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+        assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+        assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+    }
+
+    @Test
+    public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception {
+        props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
+        props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
+        assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+        assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+    }
+
+
+    @Test
+    public void shouldSupportPrefixedProducerConfigs() throws Exception {
+        props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
+        props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> configs = streamsConfig.getProducerConfigs("client");
+        assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
+        assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
+    }
+
+    @Test
+    public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception {
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+        assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+        assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+    }
+
+    @Test
+    public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception {
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("groupId");
+        assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+        assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+    }
+
+    @Test
+    public void shouldSupportNonPrefixedProducerConfigs() throws Exception {
+        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
+        props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> configs = streamsConfig.getProducerConfigs("client");
+        assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
+        assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
+    }
+
+
 }