You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/07/19 16:35:59 UTC
[kafka] branch 3.3 updated: Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 45009ef382 Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421)
45009ef382 is described below
commit 45009ef382145fb6e33c5ebec03600b37a1474c0
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Tue Jul 19 11:17:46 2022 -0500
Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)" (#12421)
This reverts commit 4835c64f
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../org/apache/kafka/streams/KafkaStreams.java | 22 +-------
.../integration/EmitOnChangeIntegrationTest.java | 2 +-
...amsUncaughtExceptionHandlerIntegrationTest.java | 61 +++-------------------
.../processor/internals/StreamThreadTest.java | 4 +-
4 files changed, 11 insertions(+), 78 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index a923c8e983..3a61f05de1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -155,9 +155,6 @@ public class KafkaStreams implements AutoCloseable {
private static final String JMX_PREFIX = "kafka.streams";
- private static final Set<Class<? extends Throwable>> EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS =
- new HashSet<>(Arrays.asList(IllegalStateException.class, IllegalArgumentException.class));
-
// processId is expected to be unique across JVMs and to be used
// in userData of the subscription request to allow assignor be aware
// of the co-location of stream thread's consumers. It is for internal
@@ -515,25 +512,10 @@ public class KafkaStreams implements AutoCloseable {
}
}
- private boolean wrappedExceptionIsIn(final Throwable throwable, final Set<Class<? extends Throwable>> exceptionsOfInterest) {
- return throwable.getCause() != null && exceptionsOfInterest.contains(throwable.getCause().getClass());
- }
-
- private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable,
- final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
- final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action;
- if (wrappedExceptionIsIn(throwable, EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
- action = SHUTDOWN_CLIENT;
- } else {
- action = streamsUncaughtExceptionHandler.handle(throwable);
- }
- return action;
- }
-
private void handleStreamsUncaughtException(final Throwable throwable,
final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
final boolean skipThreadReplacement) {
- final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
+ final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(throwable);
if (oldHandler) {
log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." +
"The old handler will be ignored as long as a new handler is set.");
@@ -549,7 +531,7 @@ public class KafkaStreams implements AutoCloseable {
break;
case SHUTDOWN_CLIENT:
log.error("Encountered the following exception during processing " +
- "and Kafka Streams opted to " + action + "." +
+ "and the registered exception handler opted to " + action + "." +
" The streams client is going to shut down now. ", throwable);
closeToError();
break;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
index 27958730cf..f41c95a6bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -111,7 +111,7 @@ public class EmitOnChangeIntegrationTest {
.toStream()
.map((key, value) -> {
if (shouldThrow.compareAndSet(true, false)) {
- throw new RuntimeException("Kaboom");
+ throw new IllegalStateException("Kaboom");
} else {
return new KeyValue<>(key, value);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index c81ddcfa74..be98e8d9fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -93,9 +93,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
}
public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
- private static final AtomicBoolean THROW_ERROR = new AtomicBoolean(true);
- private static final AtomicBoolean THROW_ILLEGAL_STATE_EXCEPTION = new AtomicBoolean(false);
- private static final AtomicBoolean THROW_ILLEGAL_ARGUMENT_EXCEPTION = new AtomicBoolean(false);
@Rule
public final TestName testName = new TestName();
@@ -108,6 +105,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
private final String outputTopic2 = "output2" + testId;
private final StreamsBuilder builder = new StreamsBuilder();
private final List<String> processorValueCollector = new ArrayList<>();
+ private static AtomicBoolean throwError = new AtomicBoolean(true);
private final Properties properties = basicProps();
@@ -173,47 +171,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
}
}
-
- @Test
- public void shouldShutdownClientWhenIllegalStateException() throws InterruptedException {
- THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(false, true);
- try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
- kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
-
- kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD); // if the user defined uncaught exception handler would be hit we would be replacing the thread
-
- StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-
- produceMessages(0L, inputTopic, "A");
- waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
-
- assertThat(processorValueCollector.size(), equalTo(1));
- } finally {
- THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(true, false);
- }
-
- }
-
- @Test
- public void shouldShutdownClientWhenIllegalArgumentException() throws InterruptedException {
- THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(false, true);
- try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
- kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
-
- kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD); // if the user defined uncaught exception handler would be hit we would be replacing the thread
-
- StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-
- produceMessages(0L, inputTopic, "A");
- waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
-
- assertThat(processorValueCollector.size(), equalTo(1));
- } finally {
- THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(true, false);
- }
-
- }
-
@Test
public void shouldReplaceThreads() throws InterruptedException {
testReplaceThreads(2);
@@ -365,16 +322,10 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
@Override
public void process(final String key, final String value) {
valueList.add(value + " " + context.taskId());
- if (THROW_ERROR.get()) {
- if (THROW_ILLEGAL_STATE_EXCEPTION.get()) {
- throw new IllegalStateException("Something unexpected happened in " + Thread.currentThread().getName());
- } else if (THROW_ILLEGAL_ARGUMENT_EXCEPTION.get()) {
- throw new IllegalArgumentException("Something unexpected happened in " + Thread.currentThread().getName());
- } else {
- throw new StreamsException(Thread.currentThread().getName());
- }
+ if (throwError.get()) {
+ throw new StreamsException(Thread.currentThread().getName());
}
- THROW_ERROR.set(true);
+ throwError.set(true);
}
}
@@ -408,7 +359,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
final AtomicInteger count = new AtomicInteger();
kafkaStreams.setUncaughtExceptionHandler(exception -> {
if (count.incrementAndGet() == numThreads) {
- THROW_ERROR.set(false);
+ throwError.set(false);
}
return REPLACE_THREAD;
});
@@ -416,7 +367,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
produceMessages(0L, inputTopic, "A");
TestUtils.waitForCondition(() -> count.get() == numThreads, "finished replacing threads");
- TestUtils.waitForCondition(() -> THROW_ERROR.get(), "finished replacing threads");
+ TestUtils.waitForCondition(() -> throwError.get(), "finished replacing threads");
kafkaStreams.close();
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5d898c345b..a43b0793a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -2362,9 +2362,9 @@ public class StreamThreadTest {
expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
consumer.subscribe((Collection<String>) anyObject(), anyObject());
- EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expectLastCall().anyTimes();
consumer.unsubscribe();
- EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expectLastCall().anyTimes();
EasyMock.replay(consumerGroupMetadata);
final Task task1 = mock(Task.class);
final Task task2 = mock(Task.class);