You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/18 02:39:44 UTC

kafka git commit: MINOR: remove streams config params from producer/consumer configs

Repository: kafka
Updated Branches:
  refs/heads/trunk 8c90b1a98 -> eee95228f


MINOR: remove streams config params from producer/consumer configs

Removing streams' specific config params from producer/consumer configs to reduce warning messages.

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #906 from ymatsuda/clean_config


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eee95228
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eee95228
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eee95228

Branch: refs/heads/trunk
Commit: eee95228fabe1643baa016a2d49fb0a9fe2c66bd
Parents: 8c90b1a
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Thu Feb 18 09:39:30 2016 +0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Feb 18 09:39:30 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java    | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eee95228/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 041d0e9..cf0684a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -258,10 +258,9 @@ public class StreamsConfig extends AbstractConfig {
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
         // remove properties that are not required for consumers
+        removeStreamsSpecificConfigs(props);
         props.remove(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG);
         props.remove(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-        props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
-        props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
 
         return props;
     }
@@ -273,15 +272,27 @@ public class StreamsConfig extends AbstractConfig {
         props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
 
         // remove properties that are not required for producers
+        removeStreamsSpecificConfigs(props);
         props.remove(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG);
         props.remove(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-        props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+        props.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
 
         props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
 
         return props;
     }
 
+    private void removeStreamsSpecificConfigs(Map<String, Object> props) {
+        props.remove(StreamsConfig.JOB_ID_CONFIG);
+        props.remove(StreamsConfig.STATE_DIR_CONFIG);
+        props.remove(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
+        props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+        props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+        props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+        props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+        props.remove(InternalConfig.STREAM_THREAD_INSTANCE);
+    }
+
     public Serializer keySerializer() {
         return getConfiguredInstance(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
     }