You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/18 02:39:44 UTC
kafka git commit: MINOR: remove streams config params from
producer/consumer configs
Repository: kafka
Updated Branches:
refs/heads/trunk 8c90b1a98 -> eee95228f
MINOR: remove streams config params from producer/consumer configs
Removing streams' specific config params from producer/consumer configs to reduce warning messages.
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #906 from ymatsuda/clean_config
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eee95228
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eee95228
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eee95228
Branch: refs/heads/trunk
Commit: eee95228fabe1643baa016a2d49fb0a9fe2c66bd
Parents: 8c90b1a
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Thu Feb 18 09:39:30 2016 +0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Feb 18 09:39:30 2016 +0800
----------------------------------------------------------------------
.../org/apache/kafka/streams/StreamsConfig.java | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/eee95228/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 041d0e9..cf0684a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -258,10 +258,9 @@ public class StreamsConfig extends AbstractConfig {
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// remove properties that are not required for consumers
+ removeStreamsSpecificConfigs(props);
props.remove(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG);
props.remove(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG);
- props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
- props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
return props;
}
@@ -273,15 +272,27 @@ public class StreamsConfig extends AbstractConfig {
props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
// remove properties that are not required for producers
+ removeStreamsSpecificConfigs(props);
props.remove(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG);
props.remove(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
- props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+ props.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
return props;
}
+ private void removeStreamsSpecificConfigs(Map<String, Object> props) {
+ props.remove(StreamsConfig.JOB_ID_CONFIG);
+ props.remove(StreamsConfig.STATE_DIR_CONFIG);
+ props.remove(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
+ props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+ props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+ props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+ props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+ props.remove(InternalConfig.STREAM_THREAD_INSTANCE);
+ }
+
public Serializer keySerializer() {
return getConfiguredInstance(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
}