You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/05/03 00:16:39 UTC

[kafka] branch trunk updated: KAFKA-8285: enable localized thread IDs in Kafka Streams (#6632)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a4f7675  KAFKA-8285: enable localized thread IDs in Kafka Streams (#6632)
a4f7675 is described below

commit a4f7675db1a928e73c7a69eb906dd1e9ecd4a22a
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu May 2 17:16:17 2019 -0700

    KAFKA-8285: enable localized thread IDs in Kafka Streams (#6632)
    
    Details in the JIRA: https://issues.apache.org/jira/browse/KAFKA-8285
    
    Basically we want to avoid sharing of atomic updates for thread id with multiple stream instances on one JVM.
    
    Reviewers: Raoul de Haard, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java   | 3 ++-
 .../org/apache/kafka/streams/processor/internals/StreamThread.java | 7 +++----
 .../apache/kafka/streams/processor/internals/StreamThreadTest.java | 5 ++++-
 3 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 315a6bb..c69d927 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -726,7 +726,8 @@ public class KafkaStreams implements AutoCloseable {
                                              streamsMetadataState,
                                              cacheSizePerThread,
                                              stateDirectory,
-                                             delegatingStateRestoreListener);
+                                             delegatingStateRestoreListener,
+                                             i + 1);
             threadState.put(threads[i].getId(), threads[i].state());
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index cb4629f..46612e5d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -69,8 +69,6 @@ import static java.util.Collections.singleton;
 
 public class StreamThread extends Thread {
 
-    private final static AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
-
     /**
      * Stream thread states are the possible states that a stream thread can be in.
      * A thread must only be in one state at a time
@@ -600,8 +598,9 @@ public class StreamThread extends Thread {
                                       final StreamsMetadataState streamsMetadataState,
                                       final long cacheSizeBytes,
                                       final StateDirectory stateDirectory,
-                                      final StateRestoreListener userStateRestoreListener) {
-        final String threadClientId = clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement();
+                                      final StateRestoreListener userStateRestoreListener,
+                                      final int threadIdx) {
+        final String threadClientId = clientId + "-StreamThread-" + threadIdx;
 
         final String logPrefix = String.format("stream-thread [%s] ", threadClientId);
         final LogContext logContext = new LogContext(logPrefix);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 11485e4..1de39d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -104,6 +104,7 @@ public class StreamThreadTest {
 
     private final String clientId = "clientId";
     private final String applicationId = "stream-thread-test";
+    private final int threadIdx = 1;
     private final MockTime mockTime = new MockTime();
     private final Metrics metrics = new Metrics();
     private final MockClientSupplier clientSupplier = new MockClientSupplier();
@@ -244,7 +245,8 @@ public class StreamThreadTest {
             streamsMetadataState,
             0,
             stateDirectory,
-            new MockStateRestoreListener());
+            new MockStateRestoreListener(),
+            threadIdx);
     }
 
     @Test
@@ -278,6 +280,7 @@ public class StreamThreadTest {
 
         final JmxReporter reporter = new JmxReporter("kafka.streams");
         metrics.addReporter(reporter);
+        assertEquals(clientId + "-StreamThread-1", thread.getName());
         assertTrue(reporter.containsMbean(String.format("kafka.streams:type=%s,client-id=%s",
                 defaultGroupName, thread.getName())));
     }