You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2021/09/01 17:00:14 UTC

[kafka] branch trunk updated: KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4835c64  KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)
4835c64 is described below

commit 4835c64f89876db5af2bba8fa3ef17de5a0b44e3
Author: Josep Prat <jo...@aiven.io>
AuthorDate: Wed Sep 1 18:58:36 2021 +0200

    KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)
    
    Instead of letting all RuntimeExceptions go through and be processed by the uncaught exception handler, IllegalStateException and IllegalArgumentException are not passed through and fail fast. In this PR when setting the uncaught exception handler we check if the exception is in an "exclude list", if so, we terminate the client, otherwise we continue as usual.
    
    Added test checking this new case. Added integration test checking that user defined exception handler is not used when an IllegalStateException is thrown.
    
    Reviewers: Bruno Cadonna <ca...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 22 +++++++-
 .../integration/EmitOnChangeIntegrationTest.java   |  2 +-
 ...amsUncaughtExceptionHandlerIntegrationTest.java | 61 +++++++++++++++++++---
 .../processor/internals/StreamThreadTest.java      |  4 +-
 4 files changed, 78 insertions(+), 11 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index eee55fc..5067da6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -141,6 +141,9 @@ public class KafkaStreams implements AutoCloseable {
 
     private static final String JMX_PREFIX = "kafka.streams";
 
+    private static final Set<Class<? extends Throwable>> EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS =
+        new HashSet<>(Arrays.asList(IllegalStateException.class, IllegalArgumentException.class));
+
     // processId is expected to be unique across JVMs and to be used
     // in userData of the subscription request to allow assignor be aware
     // of the co-location of stream thread's consumers. It is for internal
@@ -495,9 +498,24 @@ public class KafkaStreams implements AutoCloseable {
         }
     }
 
+    private boolean wrappedExceptionIsIn(final Throwable throwable, final Set<Class<? extends Throwable>> exceptionsOfInterest) {
+        return throwable.getCause() != null && exceptionsOfInterest.contains(throwable.getCause().getClass());
+    }
+
+    private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable,
+                                                                                                final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+        final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action;
+        if (wrappedExceptionIsIn(throwable, EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
+            action = SHUTDOWN_CLIENT;
+        } else {
+            action = streamsUncaughtExceptionHandler.handle(throwable);
+        }
+        return action;
+    }
+
     private void handleStreamsUncaughtException(final Throwable throwable,
                                                 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
-        final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(throwable);
+        final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
         if (oldHandler) {
             log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." +
                     "The old handler will be ignored as long as a new handler is set.");
@@ -509,7 +527,7 @@ public class KafkaStreams implements AutoCloseable {
                 break;
             case SHUTDOWN_CLIENT:
                 log.error("Encountered the following exception during processing " +
-                        "and the registered exception handler opted to " + action + "." +
+                        "and Kafka Streams opted to " + action + "." +
                         " The streams client is going to shut down now. ", throwable);
                 closeToError();
                 break;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
index 8bc5e52..63e0f27 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -104,7 +104,7 @@ public class EmitOnChangeIntegrationTest {
             .toStream()
             .map((key, value) -> {
                 if (shouldThrow.compareAndSet(true, false)) {
-                    throw new IllegalStateException("Kaboom");
+                    throw new RuntimeException("Kaboom");
                 } else {
                     return new KeyValue<>(key, value);
                 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 97d894f..74d6ba9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -93,7 +93,9 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
     private static Properties properties;
     private static List<String> processorValueCollector;
     private static String appId = "";
-    private static AtomicBoolean throwError = new AtomicBoolean(true);
+    private static final AtomicBoolean THROW_ERROR = new AtomicBoolean(true);
+    private static final AtomicBoolean THROW_ILLEGAL_STATE_EXCEPTION = new AtomicBoolean(false);
+    private static final AtomicBoolean THROW_ILLEGAL_ARGUMENT_EXCEPTION = new AtomicBoolean(false);
 
     @Before
     public void setup() {
@@ -163,6 +165,47 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
         }
     }
 
+
+    @Test
+    public void shouldShutdownClientWhenIllegalStateException() throws InterruptedException {
+        THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(false, true);
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+
+            kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD); // if the user defined uncaught exception handler would be hit we would be replacing the thread
+
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+            produceMessages(0L, inputTopic, "A");
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
+
+            assertThat(processorValueCollector.size(), equalTo(1));
+        } finally {
+            THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(true, false);
+        }
+
+    }
+
+    @Test
+    public void shouldShutdownClientWhenIllegalArgumentException() throws InterruptedException {
+        THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(false, true);
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+
+            kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD); // if the user defined uncaught exception handler would be hit we would be replacing the thread
+
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+            produceMessages(0L, inputTopic, "A");
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
+
+            assertThat(processorValueCollector.size(), equalTo(1));
+        } finally {
+            THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(true, false);
+        }
+
+    }
+
     @Test
     public void shouldReplaceThreads() throws InterruptedException {
         testReplaceThreads(2);
@@ -235,10 +278,16 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
         @Override
         public void process(final String key, final String value) {
             valueList.add(value + " " + context.taskId());
-            if (throwError.get()) {
-                throw new StreamsException(Thread.currentThread().getName());
+            if (THROW_ERROR.get()) {
+                if (THROW_ILLEGAL_STATE_EXCEPTION.get()) {
+                    throw new IllegalStateException("Something unexpected happened in " + Thread.currentThread().getName());
+                } else if (THROW_ILLEGAL_ARGUMENT_EXCEPTION.get()) {
+                    throw new IllegalArgumentException("Something unexpected happened in " + Thread.currentThread().getName());
+                } else {
+                    throw new StreamsException(Thread.currentThread().getName());
+                }
             }
-            throwError.set(true);
+            THROW_ERROR.set(true);
         }
     }
 
@@ -272,7 +321,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
             final AtomicInteger count = new AtomicInteger();
             kafkaStreams.setUncaughtExceptionHandler(exception -> {
                 if (count.incrementAndGet() == numThreads) {
-                    throwError.set(false);
+                    THROW_ERROR.set(false);
                 }
                 return REPLACE_THREAD;
             });
@@ -280,7 +329,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
 
             produceMessages(0L, inputTopic, "A");
             TestUtils.waitForCondition(() -> count.get() == numThreads, "finished replacing threads");
-            TestUtils.waitForCondition(() -> throwError.get(), "finished replacing threads");
+            TestUtils.waitForCondition(() -> THROW_ERROR.get(), "finished replacing threads");
             kafkaStreams.close();
             waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 02b20e2..e57c565 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -2336,9 +2336,9 @@ public class StreamThreadTest {
         expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
         expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
         consumer.subscribe((Collection<String>) anyObject(), anyObject());
-        EasyMock.expectLastCall().anyTimes();
+        EasyMock.expectLastCall().atLeastOnce();
         consumer.unsubscribe();
-        EasyMock.expectLastCall().anyTimes();
+        EasyMock.expectLastCall().atLeastOnce();
         EasyMock.replay(consumerGroupMetadata);
         final Task task1 = mock(Task.class);
         final Task task2 = mock(Task.class);