You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/30 18:18:57 UTC

[kafka] branch trunk updated: KAFKA-6166: Streams configuration requires consumer. and producer. in order to be read (#4434)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cb93d76  KAFKA-6166: Streams configuration requires consumer. and producer. in order to be read (#4434)
cb93d76 is described below

commit cb93d764613d801a1185989f09ce2d6b76009020
Author: Filipe Agapito <fi...@gmail.com>
AuthorDate: Tue Jan 30 18:18:51 2018 +0000

    KAFKA-6166: Streams configuration requires consumer. and producer. in order to be read (#4434)
    
    * Implement method to get custom properties
    * Add custom properties to getConsumerConfigs and getProducerConfigs
    * Add tests
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../org/apache/kafka/streams/StreamsConfig.java    | 28 ++++++++++++++++-
 .../apache/kafka/streams/StreamsConfigTest.java    | 36 +++++++++++++++++++---
 2 files changed, 59 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0feb48d..1393223 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -693,6 +693,7 @@ public class StreamsConfig extends AbstractConfig {
         checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
 
         final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
+        consumerProps.putAll(getClientCustomProps());
         consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
@@ -832,6 +833,7 @@ public class StreamsConfig extends AbstractConfig {
 
         // generate producer configs from original properties and overridden maps
         final Map<String, Object> props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
+        props.putAll(getClientCustomProps());
         props.putAll(clientProvidedProps);
 
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG));
@@ -847,7 +849,11 @@ public class StreamsConfig extends AbstractConfig {
      * @return Map of the admin client configuration.
      */
     public Map<String, Object> getAdminConfigs(final String clientId) {
-        final Map<String, Object> props = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames());
+        final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames());
+
+        final Map<String, Object> props = new HashMap<>();
+        props.putAll(getClientCustomProps());
+        props.putAll(clientProvidedProps);
 
         // add client id with stream client id prefix
         props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-admin");
@@ -863,6 +869,26 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     /**
+     * Get a map of custom configs by removing from the originals all the Streams, Consumer, Producer, and AdminClient configs.
+     * Prefixed properties are also removed because they are already added by {@link #getClientPropsWithPrefix(String, Set)}.
+     * This allows to set a custom property for a specific client alone if specified using a prefix, or for all
+     * when no prefix is used.
+     *
+     * @return a map with the custom properties
+     */
+    private Map<String, Object> getClientCustomProps() {
+        final Map<String, Object> props = originals();
+        props.keySet().removeAll(CONFIG.names());
+        props.keySet().removeAll(ConsumerConfig.configNames());
+        props.keySet().removeAll(ProducerConfig.configNames());
+        props.keySet().removeAll(AdminClientConfig.configNames());
+        props.keySet().removeAll(originalsWithPrefix(CONSUMER_PREFIX, false).keySet());
+        props.keySet().removeAll(originalsWithPrefix(PRODUCER_PREFIX, false).keySet());
+        props.keySet().removeAll(originalsWithPrefix(ADMIN_CLIENT_PREFIX, false).keySet());
+        return props;
+    }
+
+    /**
      * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #KEY_SERDE_CLASS_CONFIG key Serde
      * class}. This method is deprecated. Use {@link #defaultKeySerde()} method instead.
      *
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 1d6b5a5..cc072d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -45,6 +45,7 @@ import java.util.Properties;
 import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
 import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED;
 import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
 import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig;
@@ -66,7 +67,6 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.put("DUMMY", "dummy");
         props.put("key.deserializer.encoding", "UTF8");
         props.put("value.deserializer.encoding", "UTF-16");
         streamsConfig = new StreamsConfig(props);
@@ -90,7 +90,6 @@ public class StreamsConfigTest {
         final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId);
         assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer");
         assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100");
-        assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test
@@ -101,7 +100,6 @@ public class StreamsConfigTest {
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer");
         assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), groupId);
         assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
-        assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test
@@ -148,7 +146,6 @@ public class StreamsConfigTest {
         final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-restore-consumer");
         assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
-        assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test
@@ -265,6 +262,37 @@ public class StreamsConfigTest {
     }
 
     @Test
+    public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        props.put("custom.property.host", "host");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
+        assertEquals("host", consumerConfigs.get("custom.property.host"));
+        assertEquals("host", restoreConsumerConfigs.get("custom.property.host"));
+        assertEquals("host", producerConfigs.get("custom.property.host"));
+        assertEquals("host", adminConfigs.get("custom.property.host"));
+    }
+
+    @Test
+    public void shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs() {
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        props.put("custom.property.host", "host0");
+        props.put(consumerPrefix("custom.property.host"), "host1");
+        props.put(producerPrefix("custom.property.host"), "host2");
+        props.put(adminClientPrefix("custom.property.host"), "host3");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
+        assertEquals("host1", consumerConfigs.get("custom.property.host"));
+        assertEquals("host1", restoreConsumerConfigs.get("custom.property.host"));
+        assertEquals("host2", producerConfigs.get("custom.property.host"));
+        assertEquals("host3", adminConfigs.get("custom.property.host"));
+    }
+
+    @Test
     public void shouldSupportNonPrefixedAdminConfigs() {
         props.put(AdminClientConfig.RETRIES_CONFIG, 10);
         final StreamsConfig streamsConfig = new StreamsConfig(props);

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.