You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/07/19 20:35:48 UTC

[kafka] branch trunk updated: KAFKA-12699: Override the default handler for stream threads if the stream's handler is used (#12324)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b62d8b975c KAFKA-12699: Override the default handler for stream threads if the stream's handler is used (#12324)
b62d8b975c is described below

commit b62d8b975cf97b5c1328b9b03a05fa09b07cf13a
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Tue Jul 19 15:35:26 2022 -0500

    KAFKA-12699: Override the default handler for stream threads if the stream's handler is used (#12324)
    
    Override the default handler for stream threads if the stream's handler is used. We do no want the java default handler triggering when a thread is replaced.
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java   | 7 +++++++
 .../StreamsUncaughtExceptionHandlerIntegrationTest.java            | 6 ++++++
 2 files changed, 13 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 3a61f05de1..05d99dd172 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -471,6 +471,13 @@ public class KafkaStreams implements AutoCloseable {
                         exception -> handleStreamsUncaughtException(exception, userStreamsUncaughtExceptionHandler, false)
                     );
                 }
+                processStreamThread(thread -> thread.setUncaughtExceptionHandler((t, e) -> { }
+                ));
+
+                if (globalStreamThread != null) {
+                    globalStreamThread.setUncaughtExceptionHandler((t, e) -> { }
+                    );
+                }
             } else {
                 throw new IllegalStateException("Can only set UncaughtExceptionHandler before calling start(). " +
                     "Current state is: " + state);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index be98e8d9fc..4af333a65a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -176,6 +176,12 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
         testReplaceThreads(2);
     }
 
+    @Test
+    public void shouldReplaceThreadsWithoutJavaHandler() throws InterruptedException {
+        Thread.setDefaultUncaughtExceptionHandler((t, e) -> fail("exception thrown"));
+        testReplaceThreads(2);
+    }
+
     @Test
     public void shouldReplaceSingleThread() throws InterruptedException {
         testReplaceThreads(1);