You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/18 02:39:24 UTC

kafka git commit: MINOR: remove the group id from a restore consumer

Repository: kafka
Updated Branches:
  refs/heads/trunk 1a36af80b -> 015862348


MINOR: remove the group id from a restore consumer

guozhangwang
A restore consumer does not belong to a consumer group.

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #543 from ymatsuda/no_group_for_restore_consumer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01586234
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01586234
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01586234

Branch: refs/heads/trunk
Commit: 015862348002cc6d1148dda0555fe88c27998982
Parents: 1a36af8
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Nov 17 17:39:21 2015 -0800
Committer: Confluent <co...@Confluents-MacBook-Pro.local>
Committed: Tue Nov 17 17:39:21 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/kafka/streams/StreamingConfig.java | 13 +++++++++++--
 .../streams/processor/internals/StreamThread.java      |  2 +-
 2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/01586234/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
index f563070..eb4b83f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -222,14 +222,23 @@ public class StreamingConfig extends AbstractConfig {
     }
 
     public Map<String, Object> getConsumerConfigs(StreamThread streamThread) {
-        Map<String, Object> props = getConsumerConfigs();
+        Map<String, Object> props = getRestoreConsumerConfigs();
         props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
         props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
         props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
         return props;
     }
 
-    public Map<String, Object> getConsumerConfigs() {
+    public Map<String, Object> getRestoreConsumerConfigs() {
+        Map<String, Object> props = getBaseConsumerConfigs();
+
+        // no group id for a restore consumer
+        props.remove(ConsumerConfig.GROUP_ID_CONFIG);
+
+        return props;
+    }
+
+    private Map<String, Object> getBaseConsumerConfigs() {
         Map<String, Object> props = this.originals();
 
         // set consumer default property values

http://git-wip-us.apache.org/repos/asf/kafka/blob/01586234/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 796e53f..31dca39 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -182,7 +182,7 @@ public class StreamThread extends Thread {
 
     private Consumer<byte[], byte[]> createRestoreConsumer() {
         log.info("Creating restore consumer client for stream thread [" + this.getName() + "]");
-        return new KafkaConsumer<>(config.getConsumerConfigs(),
+        return new KafkaConsumer<>(config.getRestoreConsumerConfigs(),
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer());
     }