You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/07/19 20:35:48 UTC
[kafka] branch trunk updated: KAFKA-12699: Override the default handler for stream threads if the stream's handler is used (#12324)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b62d8b975c KAFKA-12699: Override the default handler for stream threads if the stream's handler is used (#12324)
b62d8b975c is described below
commit b62d8b975cf97b5c1328b9b03a05fa09b07cf13a
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Tue Jul 19 15:35:26 2022 -0500
KAFKA-12699: Override the default handler for stream threads if the stream's handler is used (#12324)
Override the default handler for stream threads if the stream's handler is used. We do no want the java default handler triggering when a thread is replaced.
Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java | 7 +++++++
.../StreamsUncaughtExceptionHandlerIntegrationTest.java | 6 ++++++
2 files changed, 13 insertions(+)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 3a61f05de1..05d99dd172 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -471,6 +471,13 @@ public class KafkaStreams implements AutoCloseable {
exception -> handleStreamsUncaughtException(exception, userStreamsUncaughtExceptionHandler, false)
);
}
+ processStreamThread(thread -> thread.setUncaughtExceptionHandler((t, e) -> { }
+ ));
+
+ if (globalStreamThread != null) {
+ globalStreamThread.setUncaughtExceptionHandler((t, e) -> { }
+ );
+ }
} else {
throw new IllegalStateException("Can only set UncaughtExceptionHandler before calling start(). " +
"Current state is: " + state);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index be98e8d9fc..4af333a65a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -176,6 +176,12 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
testReplaceThreads(2);
}
+ @Test
+ public void shouldReplaceThreadsWithoutJavaHandler() throws InterruptedException {
+ Thread.setDefaultUncaughtExceptionHandler((t, e) -> fail("exception thrown"));
+ testReplaceThreads(2);
+ }
+
@Test
public void shouldReplaceSingleThread() throws InterruptedException {
testReplaceThreads(1);