You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/24 00:01:18 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

ableegoldman commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600042316



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -631,6 +625,14 @@ public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
     }
 
+    public void maybeSendShutdown() {
+        if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
+            log.warn("Detected that shutdown was requested. " +
+                    "All clients in this app will now begin to shutdown");
+            mainConsumer.enforceRebalance();

Review comment:
       Since this thread is going to immediately shut down anyways, I think we can skip the `mainConsumer.enforceRebalance()`

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");
+                    addStreamThread();
+                }
                 if (throwable instanceof Error) {
                     log.error("This option requires running threads to shut down the application." +
                             "but the uncaught exception was an Error, which means this runtime is no " +
                             "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                 }
-
-                if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
-                    log.error("Exception in global thread caused the application to attempt to shutdown." +

Review comment:
       Ah, nice that this solves the global thread issue as well! I guess technically this will still fail to communicate the shutdown if the application only ever runs the global thread and literally never started up any StreamThreads, but I think that's fine. Apparently running a global-only Streams app is a thing, as some users have reported in the past, but I would imagine this use case would almost certainly prefer the `REPLACE_THREAD` option.
   
   Ooh, wait. Do we need to add this check in the `REPLACE_THREAD` handling so we don't start up a StreamThread if it was the global thread that was killed?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -493,22 +493,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (countStreamThread(StreamThread::isRunning) == 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");

Review comment:
       ```suggestion
                       log.warn("Attempt to shut down the application requires adding a thread to communicate the shutdown. No processing will be done on this thread");
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org