You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/07/19 16:54:52 UTC

[kafka] branch 3.2 updated: Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 56a136d820 Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421)
56a136d820 is described below

commit 56a136d8203f2d2cf90752ebd37b59850ea60b2a
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Tue Jul 19 11:17:46 2022 -0500

    Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421)
    
    This reverts commit 4835c64f
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 22 +-------
 ...amsUncaughtExceptionHandlerIntegrationTest.java | 61 +++-------------------
 .../processor/internals/StreamThreadTest.java      |  4 +-
 3 files changed, 10 insertions(+), 77 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 4333bbb3cd..82e3a5bc78 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -154,9 +154,6 @@ public class KafkaStreams implements AutoCloseable {
 
     private static final String JMX_PREFIX = "kafka.streams";
 
-    private static final Set<Class<? extends Throwable>> EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS =
-        new HashSet<>(Arrays.asList(IllegalStateException.class, IllegalArgumentException.class));
-
     // processId is expected to be unique across JVMs and to be used
     // in userData of the subscription request to allow assignor be aware
     // of the co-location of stream thread's consumers. It is for internal
@@ -514,25 +511,10 @@ public class KafkaStreams implements AutoCloseable {
         }
     }
 
-    private boolean wrappedExceptionIsIn(final Throwable throwable, final Set<Class<? extends Throwable>> exceptionsOfInterest) {
-        return throwable.getCause() != null && exceptionsOfInterest.contains(throwable.getCause().getClass());
-    }
-
-    private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable,
-                                                                                                final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
-        final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action;
-        if (wrappedExceptionIsIn(throwable, EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
-            action = SHUTDOWN_CLIENT;
-        } else {
-            action = streamsUncaughtExceptionHandler.handle(throwable);
-        }
-        return action;
-    }
-
     private void handleStreamsUncaughtException(final Throwable throwable,
                                                 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
                                                 final boolean skipThreadReplacement) {
-        final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
+        final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(throwable);
         if (oldHandler) {
             log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." +
                     "The old handler will be ignored as long as a new handler is set.");
@@ -548,7 +530,7 @@ public class KafkaStreams implements AutoCloseable {
                 break;
             case SHUTDOWN_CLIENT:
                 log.error("Encountered the following exception during processing " +
-                        "and Kafka Streams opted to " + action + "." +
+                        "and the registered exception handler opted to " + action + "." +
                         " The streams client is going to shut down now. ", throwable);
                 closeToError();
                 break;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 0f42d3546f..761d6fbb87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -90,9 +90,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
     }
 
     public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
-    private static final AtomicBoolean THROW_ERROR = new AtomicBoolean(true);
-    private static final AtomicBoolean THROW_ILLEGAL_STATE_EXCEPTION = new AtomicBoolean(false);
-    private static final AtomicBoolean THROW_ILLEGAL_ARGUMENT_EXCEPTION = new AtomicBoolean(false);
 
     @Rule
     public final TestName testName = new TestName();
@@ -105,6 +102,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
     private final String outputTopic2 = "output2" + testId;
     private final StreamsBuilder builder = new StreamsBuilder();
     private final List<String> processorValueCollector = new ArrayList<>();
+    private static AtomicBoolean throwError = new AtomicBoolean(true);
 
     private final Properties properties = basicProps();
 
@@ -170,47 +168,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
         }
     }
 
-
-    @Test
-    public void shouldShutdownClientWhenIllegalStateException() throws InterruptedException {
-        THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(false, true);
-        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
-            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
-
-            kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD); // if the user defined uncaught exception handler would be hit we would be replacing the thread
-
-            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-
-            produceMessages(0L, inputTopic, "A");
-            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
-
-            assertThat(processorValueCollector.size(), equalTo(1));
-        } finally {
-            THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(true, false);
-        }
-
-    }
-
-    @Test
-    public void shouldShutdownClientWhenIllegalArgumentException() throws InterruptedException {
-        THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(false, true);
-        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
-            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
-
-            kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD); // if the user defined uncaught exception handler would be hit we would be replacing the thread
-
-            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-
-            produceMessages(0L, inputTopic, "A");
-            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
-
-            assertThat(processorValueCollector.size(), equalTo(1));
-        } finally {
-            THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(true, false);
-        }
-
-    }
-
     @Test
     public void shouldReplaceThreads() throws InterruptedException {
         testReplaceThreads(2);
@@ -362,16 +319,10 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
         @Override
         public void process(final String key, final String value) {
             valueList.add(value + " " + context.taskId());
-            if (THROW_ERROR.get()) {
-                if (THROW_ILLEGAL_STATE_EXCEPTION.get()) {
-                    throw new IllegalStateException("Something unexpected happened in " + Thread.currentThread().getName());
-                } else if (THROW_ILLEGAL_ARGUMENT_EXCEPTION.get()) {
-                    throw new IllegalArgumentException("Something unexpected happened in " + Thread.currentThread().getName());
-                } else {
-                    throw new StreamsException(Thread.currentThread().getName());
-                }
+            if (throwError.get()) {
+                throw new StreamsException(Thread.currentThread().getName());
             }
-            THROW_ERROR.set(true);
+            throwError.set(true);
         }
     }
 
@@ -405,7 +356,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
             final AtomicInteger count = new AtomicInteger();
             kafkaStreams.setUncaughtExceptionHandler(exception -> {
                 if (count.incrementAndGet() == numThreads) {
-                    THROW_ERROR.set(false);
+                    throwError.set(false);
                 }
                 return REPLACE_THREAD;
             });
@@ -413,7 +364,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
 
             produceMessages(0L, inputTopic, "A");
             TestUtils.waitForCondition(() -> count.get() == numThreads, "finished replacing threads");
-            TestUtils.waitForCondition(() -> THROW_ERROR.get(), "finished replacing threads");
+            TestUtils.waitForCondition(() -> throwError.get(), "finished replacing threads");
             kafkaStreams.close();
             waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index cfbf1e5531..39de43ee46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -2260,9 +2260,9 @@ public class StreamThreadTest {
         expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
         expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
         consumer.subscribe((Collection<String>) anyObject(), anyObject());
-        EasyMock.expectLastCall().atLeastOnce();
+        EasyMock.expectLastCall().anyTimes();
         consumer.unsubscribe();
-        EasyMock.expectLastCall().atLeastOnce();
+        EasyMock.expectLastCall().anyTimes();
         EasyMock.replay(consumerGroupMetadata);
         final Task task1 = mock(Task.class);
         final Task task2 = mock(Task.class);