You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/18 02:39:24 UTC
kafka git commit: MINOR: remove the group id from a restore consumer
Repository: kafka
Updated Branches:
refs/heads/trunk 1a36af80b -> 015862348
MINOR: remove the group id from a restore consumer
guozhangwang
A restore consumer does not belong to a consumer group.
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #543 from ymatsuda/no_group_for_restore_consumer
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01586234
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01586234
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01586234
Branch: refs/heads/trunk
Commit: 015862348002cc6d1148dda0555fe88c27998982
Parents: 1a36af8
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Nov 17 17:39:21 2015 -0800
Committer: Confluent <co...@Confluents-MacBook-Pro.local>
Committed: Tue Nov 17 17:39:21 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/kafka/streams/StreamingConfig.java | 13 +++++++++++--
.../streams/processor/internals/StreamThread.java | 2 +-
2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/01586234/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
index f563070..eb4b83f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -222,14 +222,23 @@ public class StreamingConfig extends AbstractConfig {
}
public Map<String, Object> getConsumerConfigs(StreamThread streamThread) {
- Map<String, Object> props = getConsumerConfigs();
+ Map<String, Object> props = getRestoreConsumerConfigs();
props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
return props;
}
- public Map<String, Object> getConsumerConfigs() {
+ public Map<String, Object> getRestoreConsumerConfigs() {
+ Map<String, Object> props = getBaseConsumerConfigs();
+
+ // no group id for a restore consumer
+ props.remove(ConsumerConfig.GROUP_ID_CONFIG);
+
+ return props;
+ }
+
+ private Map<String, Object> getBaseConsumerConfigs() {
Map<String, Object> props = this.originals();
// set consumer default property values
http://git-wip-us.apache.org/repos/asf/kafka/blob/01586234/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 796e53f..31dca39 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -182,7 +182,7 @@ public class StreamThread extends Thread {
private Consumer<byte[], byte[]> createRestoreConsumer() {
log.info("Creating restore consumer client for stream thread [" + this.getName() + "]");
- return new KafkaConsumer<>(config.getConsumerConfigs(),
+ return new KafkaConsumer<>(config.getRestoreConsumerConfigs(),
new ByteArrayDeserializer(),
new ByteArrayDeserializer());
}