You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/27 00:00:45 UTC
kafka git commit: KAFKA-3278: concatenate thread name to clientId
when producer and consumers config is created
Repository: kafka
Updated Branches:
refs/heads/trunk 4e0ae79d5 -> d4e60b9f5
KAFKA-3278: concatenate thread name to clientId when producer and consumers config is created
guozhangwang made the changes as requested, I reverted my original commit and that seems to have closed the other pull request - sorry if that mucks up the process a bit
Author: tomdearman <to...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #978 from tomdearman/KAFKA-3278
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d4e60b9f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d4e60b9f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d4e60b9f
Branch: refs/heads/trunk
Commit: d4e60b9f59e04e82125b404f173c4fccc949d906
Parents: 4e0ae79
Author: Tom Dearman <to...@gmail.com>
Authored: Fri Feb 26 15:00:38 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Feb 26 15:00:38 2016 -0800
----------------------------------------------------------------------
.../streams/processor/internals/StreamThread.java | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d4e60b9f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7d460e1..7392d9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -219,22 +219,25 @@ public class StreamThread extends Thread {
}
private Producer<byte[], byte[]> createProducer() {
- log.info("Creating producer client for stream thread [" + this.getName() + "]");
- return new KafkaProducer<>(config.getProducerConfigs(this.clientId),
+ String threadName = this.getName();
+ log.info("Creating producer client for stream thread [" + threadName + "]");
+ return new KafkaProducer<>(config.getProducerConfigs(this.clientId + "-" + threadName),
new ByteArraySerializer(),
new ByteArraySerializer());
}
private Consumer<byte[], byte[]> createConsumer() {
- log.info("Creating consumer client for stream thread [" + this.getName() + "]");
- return new KafkaConsumer<>(config.getConsumerConfigs(this, this.jobId, this.clientId),
+ String threadName = this.getName();
+ log.info("Creating consumer client for stream thread [" + threadName + "]");
+ return new KafkaConsumer<>(config.getConsumerConfigs(this, this.jobId, this.clientId + "-" + threadName),
new ByteArrayDeserializer(),
new ByteArrayDeserializer());
}
private Consumer<byte[], byte[]> createRestoreConsumer() {
- log.info("Creating restore consumer client for stream thread [" + this.getName() + "]");
- return new KafkaConsumer<>(config.getRestoreConsumerConfigs(this.clientId),
+ String threadName = this.getName();
+ log.info("Creating restore consumer client for stream thread [" + threadName + "]");
+ return new KafkaConsumer<>(config.getRestoreConsumerConfigs(this.clientId + "-" + threadName),
new ByteArrayDeserializer(),
new ByteArrayDeserializer());
}