You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/27 00:00:45 UTC

kafka git commit: KAFKA-3278: concatenate thread name to clientId when producer and consumers config is created

Repository: kafka
Updated Branches:
  refs/heads/trunk 4e0ae79d5 -> d4e60b9f5


KAFKA-3278: concatenate thread name to clientId when producer and consumers config is created

guozhangwang made the changes as requested, I reverted my original commit and that seems to have closed the other pull request - sorry if that mucks up the process a bit

Author: tomdearman <to...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #978 from tomdearman/KAFKA-3278


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d4e60b9f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d4e60b9f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d4e60b9f

Branch: refs/heads/trunk
Commit: d4e60b9f59e04e82125b404f173c4fccc949d906
Parents: 4e0ae79
Author: Tom Dearman <to...@gmail.com>
Authored: Fri Feb 26 15:00:38 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Feb 26 15:00:38 2016 -0800

----------------------------------------------------------------------
 .../streams/processor/internals/StreamThread.java    | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d4e60b9f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7d460e1..7392d9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -219,22 +219,25 @@ public class StreamThread extends Thread {
     }
 
     private Producer<byte[], byte[]> createProducer() {
-        log.info("Creating producer client for stream thread [" + this.getName() + "]");
-        return new KafkaProducer<>(config.getProducerConfigs(this.clientId),
+        String threadName = this.getName();
+        log.info("Creating producer client for stream thread [" + threadName + "]");
+        return new KafkaProducer<>(config.getProducerConfigs(this.clientId + "-" + threadName),
                 new ByteArraySerializer(),
                 new ByteArraySerializer());
     }
 
     private Consumer<byte[], byte[]> createConsumer() {
-        log.info("Creating consumer client for stream thread [" + this.getName() + "]");
-        return new KafkaConsumer<>(config.getConsumerConfigs(this, this.jobId, this.clientId),
+        String threadName = this.getName();
+        log.info("Creating consumer client for stream thread [" + threadName + "]");
+        return new KafkaConsumer<>(config.getConsumerConfigs(this, this.jobId, this.clientId + "-" + threadName),
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer());
     }
 
     private Consumer<byte[], byte[]> createRestoreConsumer() {
-        log.info("Creating restore consumer client for stream thread [" + this.getName() + "]");
-        return new KafkaConsumer<>(config.getRestoreConsumerConfigs(this.clientId),
+        String threadName = this.getName();
+        log.info("Creating restore consumer client for stream thread [" + threadName + "]");
+        return new KafkaConsumer<>(config.getRestoreConsumerConfigs(this.clientId + "-" + threadName),
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer());
     }