You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/03/27 03:16:11 UTC

[kafka] branch 2.8 updated: KAFKA-12537: fix application shutdown corner case with only one thread (#10387)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 0949d41  KAFKA-12537: fix application shutdown corner case with only one thread (#10387)
0949d41 is described below

commit 0949d4144f502a7c58fc8375a5b1f8d788913ea5
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Fri Mar 26 19:55:27 2021 -0700

    KAFKA-12537: fix application shutdown corner case with only one thread (#10387)
    
    When in EOS the run loop terminates on that thread before the shutdown can be called. This is a problem for EOS single thread applications using the application shutdown feature.
    
    I changed it so in all cases with a single thread, the dying thread will spin up a new thread to communicate the shutdown and terminate the dying thread. Also @ableegoldman refactored the catch blocks in runloop.
    
    Co-authored-by: A. Sophie Blee-Goldman <ab...@gmail.com>
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 13 ++--
 .../streams/processor/internals/StreamThread.java  | 37 ++++++-----
 .../processor/internals/StreamThreadTest.java      | 74 ++++++++++++++++++++--
 3 files changed, 96 insertions(+), 28 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 441faf6..1d4e6b5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -492,22 +492,25 @@ public class KafkaStreams implements AutoCloseable {
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() == 1) {
+                    log.warn("Attempt to shut down the application requires adding a thread to communicate the shutdown. No processing will be done on this thread");
+                    addStreamThread();
+                }
                 if (throwable instanceof Error) {
                     log.error("This option requires running threads to shut down the application." +
                             "but the uncaught exception was an Error, which means this runtime is no " +
                             "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                 }
-
                 if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
                     log.error("Exception in global thread caused the application to attempt to shutdown." +
                             " This action will succeed only if there is at least one StreamThread running on this client." +
                             " Currently there are no running threads so will now close the client.");
                     closeToError();
-                } else {
-                    processStreamThread(thread -> thread.sendShutdownRequest(AssignorError.SHUTDOWN_REQUESTED));
-                    log.error("Encountered the following exception during processing " +
-                            "and sent shutdown request for the entire application.", throwable);
+                    break;
                 }
+                processStreamThread(thread -> thread.sendShutdownRequest(AssignorError.SHUTDOWN_REQUESTED));
+                log.error("Encountered the following exception during processing " +
+                        "and sent shutdown request for the entire application.", throwable);
                 break;
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7a9dba2..3286b3b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -554,6 +554,9 @@ public class StreamThread extends Thread {
         boolean cleanRun = false;
         try {
             cleanRun = runLoop();
+        } catch (final Throwable e) {
+            failedStreamThreadSensor.record();
+            this.streamsUncaughtExceptionHandler.accept(e);
         } finally {
             completeShutdown(cleanRun);
         }
@@ -572,11 +575,7 @@ public class StreamThread extends Thread {
         // until the rebalance is completed before we close and commit the tasks
         while (isRunning() || taskManager.isRebalanceInProgress()) {
             try {
-                if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
-                    log.warn("Detected that shutdown was requested. " +
-                            "All clients in this app will now begin to shutdown");
-                    mainConsumer.enforceRebalance();
-                }
+                maybeSendShutdown();
                 final Long size = cacheResizeSize.getAndSet(-1L);
                 if (size != -1L) {
                     cacheResizer.accept(size);
@@ -589,7 +588,7 @@ public class StreamThread extends Thread {
                 }
             } catch (final TaskCorruptedException e) {
                 log.warn("Detected the states of tasks " + e.corruptedTasks() + " are corrupted. " +
-                        "Will close the task as dirty and re-create and bootstrap from scratch.", e);
+                         "Will close the task as dirty and re-create and bootstrap from scratch.", e);
                 try {
                     taskManager.handleCorruption(e.corruptedTasks());
                 } catch (final TaskMigratedException taskMigrated) {
@@ -600,24 +599,16 @@ public class StreamThread extends Thread {
             } catch (final UnsupportedVersionException e) {
                 final String errorMessage = e.getMessage();
                 if (errorMessage != null &&
-                        errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) {
+                    errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) {
 
                     log.error("Shutting down because the Kafka cluster seems to be on a too old version. " +
-                                    "Setting {}=\"{}\" requires broker version 2.5 or higher.",
-                            StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
-                            EXACTLY_ONCE_BETA);
-                }
-                failedStreamThreadSensor.record();
-                this.streamsUncaughtExceptionHandler.accept(e);
-                if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_BETA) {
-                    return false;
+                              "Setting {}=\"{}\" requires broker version 2.5 or higher.",
+                          StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
+                          EXACTLY_ONCE_BETA);
                 }
-            } catch (final Throwable e) {
                 failedStreamThreadSensor.record();
                 this.streamsUncaughtExceptionHandler.accept(e);
-                if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_BETA) {
-                    return false;
-                }
+                return false;
             }
         }
         return true;
@@ -632,6 +623,14 @@ public class StreamThread extends Thread {
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
     }
 
+    public void maybeSendShutdown() {
+        if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
+            log.warn("Detected that shutdown was requested. " +
+                    "All clients in this app will now begin to shutdown");
+            mainConsumer.enforceRebalance();
+        }
+    }
+
     public boolean waitOnThreadState(final StreamThread.State targetState, final long timeoutMs) {
         final long begin = time.milliseconds();
         synchronized (stateLock) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index f017f59..f7915e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
@@ -2375,10 +2376,76 @@ public class StreamThreadTest {
             }
         }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
 
-        thread.setState(StreamThread.State.STARTING);
-        thread.runLoop();
+        thread.run();
+
+        verify(taskManager);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler() {
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+        final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
+        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
+        expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
+        consumer.subscribe((Collection<String>) anyObject(), anyObject());
+        EasyMock.expectLastCall().anyTimes();
+        consumer.unsubscribe();
+        EasyMock.expectLastCall().anyTimes();
+        EasyMock.replay(consumerGroupMetadata);
+        final Task task1 = mock(Task.class);
+        final Task task2 = mock(Task.class);
+        final TaskId taskId1 = new TaskId(0, 0);
+        final TaskId taskId2 = new TaskId(0, 2);
+
+        final Set<TaskId> corruptedTasks = singleton(taskId1);
+
+        expect(task1.state()).andStubReturn(Task.State.RUNNING);
+        expect(task1.id()).andStubReturn(taskId1);
+        expect(task2.state()).andStubReturn(Task.State.RUNNING);
+        expect(task2.id()).andStubReturn(taskId2);
+
+        taskManager.handleCorruption(corruptedTasks);
+        expectLastCall().andThrow(new TimeoutException());
+
+        EasyMock.replay(task1, task2, taskManager, consumer);
+
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            config,
+            null,
+            consumer,
+            consumer,
+            null,
+            null,
+            taskManager,
+            streamsMetrics,
+            internalTopologyBuilder,
+            CLIENT_ID,
+            new LogContext(""),
+            new AtomicInteger(),
+            new AtomicLong(Long.MAX_VALUE),
+            null,
+            HANDLER,
+            null
+        ) {
+            @Override
+            void runOnce() {
+                setState(State.PENDING_SHUTDOWN);
+                throw new TaskCorruptedException(corruptedTasks);
+            }
+        }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+
+        final AtomicBoolean exceptionHandlerInvoked = new AtomicBoolean(false);
+
+        thread.setStreamsUncaughtExceptionHandler(e -> exceptionHandlerInvoked.set(true));
+        thread.run();
 
         verify(taskManager);
+        assertThat(exceptionHandlerInvoked.get(), is(true));
     }
 
     @Test
@@ -2778,9 +2845,8 @@ public class StreamThreadTest {
         };
         EasyMock.replay(taskManager);
         thread.updateThreadMetadata("metadata");
-        thread.setState(StreamThread.State.STARTING);
 
-        thread.runLoop();
+        thread.run();
 
         final Metric failedThreads = StreamsTestUtils.getMetricByName(metrics.metrics(), "failed-stream-threads", "stream-metrics");
         assertThat(failedThreads.metricValue(), is(shouldFail ? 1.0 : 0.0));