You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/05/03 00:16:39 UTC
[kafka] branch trunk updated: KAFKA-8285: enable localized thread
IDs in Kafka Streams (#6632)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a4f7675 KAFKA-8285: enable localized thread IDs in Kafka Streams (#6632)
a4f7675 is described below
commit a4f7675db1a928e73c7a69eb906dd1e9ecd4a22a
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu May 2 17:16:17 2019 -0700
KAFKA-8285: enable localized thread IDs in Kafka Streams (#6632)
Details in the JIRA: https://issues.apache.org/jira/browse/KAFKA-8285
Basically we want to avoid sharing of atomic updates for thread id with multiple stream instances on one JVM.
Reviewers: Raoul de Haard, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java | 3 ++-
.../org/apache/kafka/streams/processor/internals/StreamThread.java | 7 +++----
.../apache/kafka/streams/processor/internals/StreamThreadTest.java | 5 ++++-
3 files changed, 9 insertions(+), 6 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 315a6bb..c69d927 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -726,7 +726,8 @@ public class KafkaStreams implements AutoCloseable {
streamsMetadataState,
cacheSizePerThread,
stateDirectory,
- delegatingStateRestoreListener);
+ delegatingStateRestoreListener,
+ i + 1);
threadState.put(threads[i].getId(), threads[i].state());
storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index cb4629f..46612e5d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -69,8 +69,6 @@ import static java.util.Collections.singleton;
public class StreamThread extends Thread {
- private final static AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
-
/**
* Stream thread states are the possible states that a stream thread can be in.
* A thread must only be in one state at a time
@@ -600,8 +598,9 @@ public class StreamThread extends Thread {
final StreamsMetadataState streamsMetadataState,
final long cacheSizeBytes,
final StateDirectory stateDirectory,
- final StateRestoreListener userStateRestoreListener) {
- final String threadClientId = clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement();
+ final StateRestoreListener userStateRestoreListener,
+ final int threadIdx) {
+ final String threadClientId = clientId + "-StreamThread-" + threadIdx;
final String logPrefix = String.format("stream-thread [%s] ", threadClientId);
final LogContext logContext = new LogContext(logPrefix);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 11485e4..1de39d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -104,6 +104,7 @@ public class StreamThreadTest {
private final String clientId = "clientId";
private final String applicationId = "stream-thread-test";
+ private final int threadIdx = 1;
private final MockTime mockTime = new MockTime();
private final Metrics metrics = new Metrics();
private final MockClientSupplier clientSupplier = new MockClientSupplier();
@@ -244,7 +245,8 @@ public class StreamThreadTest {
streamsMetadataState,
0,
stateDirectory,
- new MockStateRestoreListener());
+ new MockStateRestoreListener(),
+ threadIdx);
}
@Test
@@ -278,6 +280,7 @@ public class StreamThreadTest {
final JmxReporter reporter = new JmxReporter("kafka.streams");
metrics.addReporter(reporter);
+ assertEquals(clientId + "-StreamThread-1", thread.getName());
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=%s,client-id=%s",
defaultGroupName, thread.getName())));
}