You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/03/27 03:16:11 UTC
[kafka] branch 2.8 updated: KAFKA-12537: fix application shutdown
corner case with only one thread (#10387)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 0949d41 KAFKA-12537: fix application shutdown corner case with only one thread (#10387)
0949d41 is described below
commit 0949d4144f502a7c58fc8375a5b1f8d788913ea5
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Fri Mar 26 19:55:27 2021 -0700
KAFKA-12537: fix application shutdown corner case with only one thread (#10387)
When in EOS the run loop terminates on that thread before the shutdown can be called. This is a problem for EOS single thread applications using the application shutdown feature.
I changed it so in all cases with a single thread, the dying thread will spin up a new thread to communicate the shutdown and terminate the dying thread. Also @ableegoldman refactored the catch blocks in runloop.
Co-authored-by: A. Sophie Blee-Goldman <ab...@gmail.com>
Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
.../org/apache/kafka/streams/KafkaStreams.java | 13 ++--
.../streams/processor/internals/StreamThread.java | 37 ++++++-----
.../processor/internals/StreamThreadTest.java | 74 ++++++++++++++++++++--
3 files changed, 96 insertions(+), 28 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 441faf6..1d4e6b5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -492,22 +492,25 @@ public class KafkaStreams implements AutoCloseable {
closeToError();
break;
case SHUTDOWN_APPLICATION:
+ if (getNumLiveStreamThreads() == 1) {
+ log.warn("Attempt to shut down the application requires adding a thread to communicate the shutdown. No processing will be done on this thread");
+ addStreamThread();
+ }
if (throwable instanceof Error) {
log.error("This option requires running threads to shut down the application." +
"but the uncaught exception was an Error, which means this runtime is no " +
"longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
}
-
if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
log.error("Exception in global thread caused the application to attempt to shutdown." +
" This action will succeed only if there is at least one StreamThread running on this client." +
" Currently there are no running threads so will now close the client.");
closeToError();
- } else {
- processStreamThread(thread -> thread.sendShutdownRequest(AssignorError.SHUTDOWN_REQUESTED));
- log.error("Encountered the following exception during processing " +
- "and sent shutdown request for the entire application.", throwable);
+ break;
}
+ processStreamThread(thread -> thread.sendShutdownRequest(AssignorError.SHUTDOWN_REQUESTED));
+ log.error("Encountered the following exception during processing " +
+ "and sent shutdown request for the entire application.", throwable);
break;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7a9dba2..3286b3b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -554,6 +554,9 @@ public class StreamThread extends Thread {
boolean cleanRun = false;
try {
cleanRun = runLoop();
+ } catch (final Throwable e) {
+ failedStreamThreadSensor.record();
+ this.streamsUncaughtExceptionHandler.accept(e);
} finally {
completeShutdown(cleanRun);
}
@@ -572,11 +575,7 @@ public class StreamThread extends Thread {
// until the rebalance is completed before we close and commit the tasks
while (isRunning() || taskManager.isRebalanceInProgress()) {
try {
- if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
- log.warn("Detected that shutdown was requested. " +
- "All clients in this app will now begin to shutdown");
- mainConsumer.enforceRebalance();
- }
+ maybeSendShutdown();
final Long size = cacheResizeSize.getAndSet(-1L);
if (size != -1L) {
cacheResizer.accept(size);
@@ -589,7 +588,7 @@ public class StreamThread extends Thread {
}
} catch (final TaskCorruptedException e) {
log.warn("Detected the states of tasks " + e.corruptedTasks() + " are corrupted. " +
- "Will close the task as dirty and re-create and bootstrap from scratch.", e);
+ "Will close the task as dirty and re-create and bootstrap from scratch.", e);
try {
taskManager.handleCorruption(e.corruptedTasks());
} catch (final TaskMigratedException taskMigrated) {
@@ -600,24 +599,16 @@ public class StreamThread extends Thread {
} catch (final UnsupportedVersionException e) {
final String errorMessage = e.getMessage();
if (errorMessage != null &&
- errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) {
+ errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) {
log.error("Shutting down because the Kafka cluster seems to be on a too old version. " +
- "Setting {}=\"{}\" requires broker version 2.5 or higher.",
- StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
- EXACTLY_ONCE_BETA);
- }
- failedStreamThreadSensor.record();
- this.streamsUncaughtExceptionHandler.accept(e);
- if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_BETA) {
- return false;
+ "Setting {}=\"{}\" requires broker version 2.5 or higher.",
+ StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
+ EXACTLY_ONCE_BETA);
}
- } catch (final Throwable e) {
failedStreamThreadSensor.record();
this.streamsUncaughtExceptionHandler.accept(e);
- if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_BETA) {
- return false;
- }
+ return false;
}
}
return true;
@@ -632,6 +623,14 @@ public class StreamThread extends Thread {
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
}
+ public void maybeSendShutdown() {
+ if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
+ log.warn("Detected that shutdown was requested. " +
+ "All clients in this app will now begin to shutdown");
+ mainConsumer.enforceRebalance();
+ }
+ }
+
public boolean waitOnThreadState(final StreamThread.State targetState, final long timeoutMs) {
final long begin = time.milliseconds();
synchronized (stateLock) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index f017f59..f7915e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
@@ -2375,10 +2376,76 @@ public class StreamThreadTest {
}
}.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
- thread.setState(StreamThread.State.STARTING);
- thread.runLoop();
+ thread.run();
+
+ verify(taskManager);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler() {
+ final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+ final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+ final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
+ expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
+ expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
+ consumer.subscribe((Collection<String>) anyObject(), anyObject());
+ EasyMock.expectLastCall().anyTimes();
+ consumer.unsubscribe();
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.replay(consumerGroupMetadata);
+ final Task task1 = mock(Task.class);
+ final Task task2 = mock(Task.class);
+ final TaskId taskId1 = new TaskId(0, 0);
+ final TaskId taskId2 = new TaskId(0, 2);
+
+ final Set<TaskId> corruptedTasks = singleton(taskId1);
+
+ expect(task1.state()).andStubReturn(Task.State.RUNNING);
+ expect(task1.id()).andStubReturn(taskId1);
+ expect(task2.state()).andStubReturn(Task.State.RUNNING);
+ expect(task2.id()).andStubReturn(taskId2);
+
+ taskManager.handleCorruption(corruptedTasks);
+ expectLastCall().andThrow(new TimeoutException());
+
+ EasyMock.replay(task1, task2, taskManager, consumer);
+
+ final StreamsMetricsImpl streamsMetrics =
+ new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+ final StreamThread thread = new StreamThread(
+ mockTime,
+ config,
+ null,
+ consumer,
+ consumer,
+ null,
+ null,
+ taskManager,
+ streamsMetrics,
+ internalTopologyBuilder,
+ CLIENT_ID,
+ new LogContext(""),
+ new AtomicInteger(),
+ new AtomicLong(Long.MAX_VALUE),
+ null,
+ HANDLER,
+ null
+ ) {
+ @Override
+ void runOnce() {
+ setState(State.PENDING_SHUTDOWN);
+ throw new TaskCorruptedException(corruptedTasks);
+ }
+ }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+
+ final AtomicBoolean exceptionHandlerInvoked = new AtomicBoolean(false);
+
+ thread.setStreamsUncaughtExceptionHandler(e -> exceptionHandlerInvoked.set(true));
+ thread.run();
verify(taskManager);
+ assertThat(exceptionHandlerInvoked.get(), is(true));
}
@Test
@@ -2778,9 +2845,8 @@ public class StreamThreadTest {
};
EasyMock.replay(taskManager);
thread.updateThreadMetadata("metadata");
- thread.setState(StreamThread.State.STARTING);
- thread.runLoop();
+ thread.run();
final Metric failedThreads = StreamsTestUtils.getMetricByName(metrics.metrics(), "failed-stream-threads", "stream-metrics");
assertThat(failedThreads.metricValue(), is(shouldFail ? 1.0 : 0.0));