You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/02 20:25:00 UTC

[jira] [Commented] (KAFKA-6657) Add StreamsConfig prefix for different consumers

    [ https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461585#comment-16461585 ] 

ASF GitHub Bot commented on KAFKA-6657:
---------------------------------------

guozhangwang closed pull request #4805: KAFKA-6657: Add StreamsConfig prefix for different consumers
URL: https://github.com/apache/kafka/pull/4805
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index 1f18a953679..6399c4e3077 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -530,7 +530,7 @@ <h3><a class="toc-backref" href="#id16">Kafka consumers and producer configurati
           <h4><a class="toc-backref" href="#id17">Naming</a><a class="headerlink" href="#naming" title="Permalink to this headline"></a></h4>
           <p>Some consumer and producer configuration parameters use the same parameter name. For example, <code class="docutils literal"><span class="pre">send.buffer.bytes</span></code> and
             <code class="docutils literal"><span class="pre">receive.buffer.bytes</span></code> are used to configure TCP buffers; <code class="docutils literal"><span class="pre">request.timeout.ms</span></code> and <code class="docutils literal"><span class="pre">retry.backoff.ms</span></code> control retries
-            for client request. You can avoid duplicate names by prefix parameter names with <code class="docutils literal"><span class="pre">consumer.</span></code> or <code class="docutils literal"><span class="pre">producer</span></code> (e.g., <code class="docutils literal"><span class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
+            for client request. You can avoid duplicate names by prefix parameter names with <code class="docutils literal"><span class="pre">consumer.</span></code> or <code class="docutils literal"><span class="pre">producer.</span></code> (e.g., <code class="docutils literal"><span class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
           <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
 <span class="c1">// same value for consumer and producer</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">,</span> <span class="s">&quot;value&quot;</span><span class="o">);</span>
@@ -541,6 +541,24 @@ <h4><a class="toc-backref" href="#id17">Naming</a><a class="headerlink" href="#n
 <span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">consumerPrefix</span><span class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span class="s">&quot;consumer-value&quot;</span><span class="o">);</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">producerPrefix</span><span class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span class="s">&quot;producer-value&quot;</span><span class="o">);</span>
 </pre></div>
+          <p>You could further separate consumer configuration by adding different prefixes:</p>
+          <ul class="simple">
+            <li><code class="docutils literal"><span class="pre">main.consumer.</span></code> for main consumer which is the default consumer of stream source.</li>
+            <li><code class="docutils literal"><span class="pre">restore.consumer.</span></code> for restore consumer which is in charge of state store recovery.</li>
+            <li><code class="docutils literal"><span class="pre">global.consumer.</span></code> for global consumer which is used in global KTable construction.</li>
+          </ul>
+          <p>For example, if you only want to set restore consumer config without touching other consumers' settings, you could simply use <code class="docutils literal"><span class="pre">restore.consumer.</span></code> to set the config.</p>
+          <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+<span class="c1">// same config value for all consumer types</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;consumer.PARAMETER_NAME&quot;</span><span class="o">,</span> <span class="s">&quot;general-consumer-value&quot;</span><span class="o">);</span>
+<span class="c1">// set a different restore consumer config. This would make restore consumer take restore-consumer-value,</span>
+<span>// while main consumer and global consumer stay with general-consumer-value</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;restore.consumer.PARAMETER_NAME&quot;</span><span class="o">,</span> <span class="s">&quot;restore-consumer-value&quot;</span><span class="o">);</span>
+<span class="c1">// alternatively, you can use</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">restoreConsumerPrefix</span><span class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span class="s">&quot;restore-consumer-value&quot;</span><span class="o">);</span>
+</pre></div>
+          </div>
+          <p> Same applied to <code class="docutils literal"><span class="pre">main.consumer.</span></code> and <code class="docutils literal"><span class="pre">main.consumer.</span></code>, if you only want to specify one consumer type config.</p>
           </div>
         </div>
         <div class="section" id="default-values">
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index 8a6ec05b4ac..888edf3c714 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.processor.StateStore;
 
 import java.util.Map;
@@ -32,7 +33,7 @@
     /**
      * Create an {@link AdminClient} which is used for internal topic management.
      *
-     * @param config Supplied by the {@link StreamsConfig} given to the {@link KafkaStreams}
+     * @param config Supplied by the {@link java.util.Properties} given to the {@link KafkaStreams}
      * @return an instance of {@link AdminClient}
      */
     AdminClient getAdminClient(final Map<String, Object> config);
@@ -41,7 +42,7 @@
      * Create a {@link Producer} which is used to write records to sink topics.
      *
      * @param config {@link StreamsConfig#getProducerConfigs(String) producer config} which is supplied by the
-     *               {@link StreamsConfig} given to the {@link KafkaStreams} instance
+     *               {@link java.util.Properties} given to the {@link KafkaStreams} instance
      * @return an instance of Kafka producer
      */
     Producer<byte[], byte[]> getProducer(final Map<String, Object> config);
@@ -49,8 +50,8 @@
     /**
      * Create a {@link Consumer} which is used to read records of source topics.
      *
-     * @param config {@link StreamsConfig#getConsumerConfigs(String, String) consumer config} which is
-     *               supplied by the {@link StreamsConfig} given to the {@link KafkaStreams} instance
+     * @param config {@link StreamsConfig#getMainConsumerConfigs(String, String) consumer config} which is
+     *               supplied by the {@link java.util.Properties} given to the {@link KafkaStreams} instance
      * @return an instance of Kafka consumer
      */
     Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config);
@@ -59,8 +60,17 @@
      * Create a {@link Consumer} which is used to read records to restore {@link StateStore}s.
      *
      * @param config {@link StreamsConfig#getRestoreConsumerConfigs(String) restore consumer config} which is supplied
-     *               by the {@link StreamsConfig} given to the {@link KafkaStreams}
+     *               by the {@link java.util.Properties} given to the {@link KafkaStreams}
      * @return an instance of Kafka consumer
      */
     Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config);
+
+    /**
+     * Create a {@link Consumer} which is used to consume records for {@link GlobalKTable}.
+     *
+     * @param config {@link StreamsConfig#getGlobalConsumerConfigs(String) global consumer config} which is supplied
+     *               by the {@link java.util.Properties} given to the {@link KafkaStreams}
+     * @return an instance of Kafka consumer
+     */
+    Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 186276c22d8..b488f173b74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -707,7 +707,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
             final String globalThreadId = clientId + "-GlobalStreamThread";
             globalStreamThread = new GlobalStreamThread(globalTaskTopology,
                                                         config,
-                                                        clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")),
+                                                        clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(clientId)),
                                                         stateDirectory,
                                                         cacheSizePerThread,
                                                         metrics,
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0a525169fa6..7d41b39d779 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -153,6 +153,33 @@
      */
     public static final String CONSUMER_PREFIX = "consumer.";
 
+    /**
+     * Prefix used to override {@link KafkaConsumer consumer} configs for the main consumer client from
+     * the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
+     * 1. main.consumer.[config-name]
+     * 2. consumer.[config-name]
+     * 3. [config-name]
+     */
+    public static final String MAIN_CONSUMER_PREFIX = "main.consumer.";
+
+    /**
+     * Prefix used to override {@link KafkaConsumer consumer} configs for the restore consumer client from
+     * the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
+     * 1. restore.consumer.[config-name]
+     * 2. consumer.[config-name]
+     * 3. [config-name]
+     */
+    public static final String RESTORE_CONSUMER_PREFIX = "restore.consumer.";
+
+    /**
+     * Prefix used to override {@link KafkaConsumer consumer} configs for the global consumer client from
+     * the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
+     * 1. global.consumer.[config-name]
+     * 2. consumer.[config-name]
+     * 3. [config-name]
+     */
+    public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";
+
     /**
      * Prefix used to isolate {@link KafkaProducer producer} configs from other client configs.
      * It is recommended to use {@link #producerPrefix(String)} to add this prefix to {@link ProducerConfig producer
@@ -652,6 +679,39 @@ public static String consumerPrefix(final String consumerProp) {
         return CONSUMER_PREFIX + consumerProp;
     }
 
+    /**
+     * Prefix a property with {@link #MAIN_CONSUMER_PREFIX}. This is used to isolate {@link ConsumerConfig main consumer configs}
+     * from other client configs.
+     *
+     * @param consumerProp the consumer property to be masked
+     * @return {@link #MAIN_CONSUMER_PREFIX} + {@code consumerProp}
+     */
+    public static String mainConsumerPrefix(final String consumerProp) {
+        return MAIN_CONSUMER_PREFIX + consumerProp;
+    }
+
+    /**
+     * Prefix a property with {@link #RESTORE_CONSUMER_PREFIX}. This is used to isolate {@link ConsumerConfig restore consumer configs}
+     * from other client configs.
+     *
+     * @param consumerProp the consumer property to be masked
+     * @return {@link #RESTORE_CONSUMER_PREFIX} + {@code consumerProp}
+     */
+    public static String restoreConsumerPrefix(final String consumerProp) {
+        return RESTORE_CONSUMER_PREFIX + consumerProp;
+    }
+
+    /**
+     * Prefix a property with {@link #GLOBAL_CONSUMER_PREFIX}. This is used to isolate {@link ConsumerConfig global consumer configs}
+     * from other client configs.
+     *
+     * @param consumerProp the consumer property to be masked
+     * @return {@link #GLOBAL_CONSUMER_PREFIX} + {@code consumerProp}
+     */
+    public static String globalConsumerPrefix(final String consumerProp) {
+        return GLOBAL_CONSUMER_PREFIX + consumerProp;
+    }
+
     /**
      * Prefix a property with {@link #PRODUCER_PREFIX}. This is used to isolate {@link ProducerConfig producer configs}
      * from other client configs.
@@ -782,10 +842,37 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
      * @param groupId      consumer groupId
      * @param clientId     clientId
      * @return Map of the consumer configuration.
+     * @Deprecated use {@link StreamsConfig#getMainConsumerConfigs(String, String)}
      */
+    @Deprecated
     public Map<String, Object> getConsumerConfigs(final String groupId,
                                                   final String clientId) {
-        final Map<String, Object> consumerProps = getCommonConsumerConfigs();
+        return getMainConsumerConfigs(groupId, clientId);
+    }
+
+    /**
+     * Get the configs to the {@link KafkaConsumer main consumer}.
+     * Properties using the prefix {@link #MAIN_CONSUMER_PREFIX} will be used in favor over
+     * the properties prefixed with {@link #CONSUMER_PREFIX} and the non-prefixed versions
+     * (read the override precedence ordering in {@link #MAIN_CONSUMER_PREFIX)
+     * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always use the non-prefixed
+     * version as we only support reading/writing from/to the same Kafka Cluster.
+     * If not specified by {@link #MAIN_CONSUMER_PREFIX}, main consumer will share the general consumer configs
+     * prefixed by {@link #CONSUMER_PREFIX}.
+     *
+     * @param groupId      consumer groupId
+     * @param clientId     clientId
+     * @return Map of the consumer configuration.
+     */
+    public Map<String, Object> getMainConsumerConfigs(final String groupId,
+                                                      final String clientId) {
+        Map<String, Object> consumerProps = getCommonConsumerConfigs();
+
+        // Get main consumer override configs
+        Map<String, Object> mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX);
+        for (Map.Entry<String, Object> entry: mainConsumerProps.entrySet()) {
+            consumerProps.put(entry.getKey(), entry.getValue());
+        }
 
         // add client id with stream client id prefix, and group id
         consumerProps.put(APPLICATION_ID_CONFIG, groupId);
@@ -831,23 +918,64 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
 
     /**
      * Get the configs for the {@link KafkaConsumer restore-consumer}.
-     * Properties using the prefix {@link #CONSUMER_PREFIX} will be used in favor over their non-prefixed versions
+     * Properties using the prefix {@link #RESTORE_CONSUMER_PREFIX} will be used in favor over
+     * the properties prefixed with {@link #CONSUMER_PREFIX} and the non-prefixed versions
+     * (read the override precedence ordering in {@link #RESTORE_CONSUMER_PREFIX)
      * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always use the non-prefixed
      * version as we only support reading/writing from/to the same Kafka Cluster.
+     * If not specified by {@link #RESTORE_CONSUMER_PREFIX}, restore consumer will share the general consumer configs
+     * prefixed by {@link #CONSUMER_PREFIX}.
      *
      * @param clientId clientId
-     * @return Map of the consumer configuration.
+     * @return Map of the restore consumer configuration.
      */
     public Map<String, Object> getRestoreConsumerConfigs(final String clientId) {
-        final Map<String, Object> consumerProps = getCommonConsumerConfigs();
+        Map<String, Object> baseConsumerProps = getCommonConsumerConfigs();
+
+        // Get restore consumer override configs
+        Map<String, Object> restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
+        for (Map.Entry<String, Object> entry: restoreConsumerProps.entrySet()) {
+            baseConsumerProps.put(entry.getKey(), entry.getValue());
+        }
 
         // no need to set group id for a restore consumer
-        consumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
+        baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
         // add client id with stream client id prefix
-        consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
-        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+        baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
+        baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
 
-        return consumerProps;
+        return baseConsumerProps;
+    }
+
+    /**
+     * Get the configs for the {@link KafkaConsumer global consumer}.
+     * Properties using the prefix {@link #GLOBAL_CONSUMER_PREFIX} will be used in favor over
+     * the properties prefixed with {@link #CONSUMER_PREFIX} and the non-prefixed versions
+     * (read the override precedence ordering in {@link #GLOBAL_CONSUMER_PREFIX)
+     * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always use the non-prefixed
+     * version as we only support reading/writing from/to the same Kafka Cluster.
+     * If not specified by {@link #GLOBAL_CONSUMER_PREFIX}, global consumer will share the general consumer configs
+     * prefixed by {@link #CONSUMER_PREFIX}.
+     *
+     * @param clientId clientId
+     * @return Map of the global consumer configuration.
+     */
+    public Map<String, Object> getGlobalConsumerConfigs(final String clientId) {
+        Map<String, Object> baseConsumerProps = getCommonConsumerConfigs();
+
+        // Get global consumer override configs
+        Map<String, Object> globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
+        for (Map.Entry<String, Object> entry: globalConsumerProps.entrySet()) {
+            baseConsumerProps.put(entry.getKey(), entry.getValue());
+        }
+
+        // no need to set group id for a global consumer
+        baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
+        // add client id with stream client id prefix
+        baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-global-consumer");
+        baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+
+        return baseConsumerProps;
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
index 6f01e2fd964..69331b462ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
@@ -35,17 +35,22 @@ public AdminClient getAdminClient(final Map<String, Object> config) {
     }
 
     @Override
-    public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
+    public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
         return new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
     }
 
     @Override
-    public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
+    public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
         return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
     }
 
     @Override
-    public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) {
+    public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) {
+        return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+    }
+
+    @Override
+    public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config) {
         return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ab96cce4c2f..f1ca8fe148b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -648,7 +648,7 @@ public static StreamThread create(final InternalTopologyBuilder builder,
 
         log.info("Creating consumer client");
         final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
-        final Map<String, Object> consumerConfigs = config.getConsumerConfigs(applicationId, threadClientId);
+        final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, threadClientId);
         consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
         String originalReset = null;
         if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 0309659042d..29dd4587e9c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -96,7 +96,7 @@ public void testGetProducerConfigs() {
     public void testGetConsumerConfigs() {
         final String groupId = "example-application";
         final String clientId = "client";
-        final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(groupId, clientId);
+        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer");
         assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), groupId);
         assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
@@ -115,7 +115,7 @@ public void consumerConfigMustContainStreamPartitionAssignorConfig() {
 
         final String groupId = "example-application";
         final String clientId = "client";
-        final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(groupId, clientId);
+        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
 
         assertEquals(42, returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
         assertEquals(1, returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
@@ -135,11 +135,21 @@ public void consumerConfigMustUseAdminClientConfigForRetries() {
 
         final String groupId = "example-application";
         final String clientId = "client";
-        final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(groupId, clientId);
+        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
 
         assertEquals(20, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
     }
 
+    @Test
+    public void testGetMainConsumerConfigsWithMainConsumerOverridenPrefix() {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
+        props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
+        final String groupId = "example-application";
+        final String clientId = "client";
+        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
+        assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+    }
+
     @Test
     public void testGetRestoreConsumerConfigs() {
         final String clientId = "client";
@@ -185,7 +195,7 @@ public void shouldSupportPrefixedConsumerConfigs() {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
@@ -202,7 +212,7 @@ public void shouldSupportPrefixedRestoreConsumerConfigs() {
     public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
     }
 
@@ -238,7 +248,7 @@ public void shouldBeSupportNonPrefixedConsumerConfigs() {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
@@ -265,7 +275,7 @@ public void shouldSupportNonPrefixedProducerConfigs() {
     public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put("custom.property.host", "host");
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
         final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
@@ -282,7 +292,7 @@ public void shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs() {
         props.put(consumerPrefix("custom.property.host"), "host1");
         props.put(producerPrefix("custom.property.host"), "host2");
         props.put(adminClientPrefix("custom.property.host"), "host3");
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
         final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
@@ -319,7 +329,7 @@ public void shouldOverrideStreamsDefaultConsumerConfigs() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
     }
@@ -344,7 +354,7 @@ public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
     public void shouldResetToDefaultIfConsumerAutoCommitIsOverridden() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("a", "b");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("a", "b");
         assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     }
 
@@ -356,10 +366,66 @@ public void shouldResetToDefaultIfRestoreConsumerAutoCommitIsOverridden() {
         assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     }
 
+    @Test
+    public void testGetRestoreConsumerConfigsWithRestoreConsumerOverridenPrefix() {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
+        props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
+        final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("clientId");
+        assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+    }
+
+    @Test
+    public void testGetGlobalConsumerConfigs() {
+        final String clientId = "client";
+        final Map<String, Object> returnedProps = streamsConfig.getGlobalConsumerConfigs(clientId);
+        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-global-consumer");
+        assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
+    }
+
+    @Test
+    public void shouldSupportPrefixedGlobalConsumerConfigs() {
+        props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("clientId");
+        assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+    }
+
+    @Test
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfGlobalConsumerConfig() {
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        props.put(consumerPrefix("interceptor.statsd.host"), "host");
+        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("clientId");
+        assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
+    }
+
+    @Test
+    public void shouldBeSupportNonPrefixedGlobalConsumerConfigs() {
+        props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("groupId");
+        assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+    }
+
+    @Test
+    public void shouldResetToDefaultIfGlobalConsumerAutoCommitIsOverridden() {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("client");
+        assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+    }
+
+    @Test
+    public void testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
+        props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
+        final Map<String, Object> returnedProps = streamsConfig.getGlobalConsumerConfigs("clientId");
+        assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+    }
+
     @Test
     public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false));
     }
 
@@ -388,7 +454,9 @@ public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
+        String isoLevel = (String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG);
+        String name = READ_COMMITTED.name();
         assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
     }
 
@@ -396,7 +464,7 @@ public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled
     public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientrId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientrId");
         assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
     }
 
@@ -440,7 +508,7 @@ public void shouldSetDifferentDefaultsIfEosEnabled() {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
 
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
 
         assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index 1ec28fab0a1..d3430f2c727 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -83,4 +83,8 @@ public AdminClient getAdminClient(final Map<String, Object> config) {
         return restoreConsumer;
     }
 
+    @Override
+    public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config) {
+        return restoreConsumer;
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add StreamsConfig prefix for different consumers
> ------------------------------------------------
>
>                 Key: KAFKA-6657
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6657
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Matthias J. Sax
>            Assignee: Boyang Chen
>            Priority: Major
>              Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main consumer (2) the restore consumer and (3) the global consumer (that is a restore consumer as well atm).
> For some use cases, it's required to set different configs for different consumers. Thus, we should add two new prefix for restore and global consumer. We might also consider to extend `KafkaClientSupplier` and add a `getGlobalConsumer()` method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)