You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/23 16:15:03 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request #10387: HOTFIX: get EOS corner case

wcarlson5 opened a new pull request #10387:
URL: https://github.com/apache/kafka/pull/10387


   When in EOS the run loop terminates on that thread before the shutdown can be called. This is a problem for EOS single thread applications using the application shutdown feature. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600698092



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -631,6 +625,14 @@ public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
     }
 
+    public void maybeSendShutdown() {
+        if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
+            log.warn("Detected that shutdown was requested. " +
+                    "All clients in this app will now begin to shutdown");
+            mainConsumer.enforceRebalance();

Review comment:
       No we don't we can take those out




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman removed a comment on pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman removed a comment on pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#issuecomment-805303422


   Also, since we have a fix for this, can we modify the `catch` blocks in the StreamThread loop to return false regardless of the processing mode? Also now that you explained how the default exception handler interacts with the StreamThread loop under ALOS, it seems like KAFKA-12537 probably affects both EOS and ALOS


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600049583



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");
+                    addStreamThread();
+                }
                 if (throwable instanceof Error) {
                     log.error("This option requires running threads to shut down the application." +
                             "but the uncaught exception was an Error, which means this runtime is no " +
                             "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                 }
-
-                if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
-                    log.error("Exception in global thread caused the application to attempt to shutdown." +

Review comment:
       We do check fo the replace thread in the replace thread option. We don't have the option to replace the global threads. I will make a ticket for that. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r602525060



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {

Review comment:
       You can use `StreamsBuilder#addGlobalStore` to inject an error, this method has a required parameter for a Processor which you're supposed to use to insert the data into the store (tbh no idea why you have to implement this yourself, it's caused a LOT of problems since it's unclear what you can/can't do with this -- but that's a whole other can of worms). 
   
   Adding a thread is probably a fine workaround for now, it clearly answered the question, but imo we should just go ahead and use the `StreamsBuilder#addGlobalStore` + injected error to decouple the test from the way we currently happen to handle this. Then we won't have to rewrite it if we ever change something.
   
   Regarding the IllegalSTateException: wouldn't the thread hit this `IllegalStateException` before it got the chance to send the shutdown signal in a rebalance? IIRC the `subscribe` that's throwing is at the very beginning of the StreamThread loop. Also, I take it your test is passing but I wonder why we don't get stuck in an endless cycle? The new thread hits an unexpected exception and invokes the handler, SHUTDOWN_APPLICATION notices that this is the last StreamThread so it starts a new one, and so on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600044638



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");
+                    addStreamThread();
+                }
                 if (throwable instanceof Error) {
                     log.error("This option requires running threads to shut down the application." +
                             "but the uncaught exception was an Error, which means this runtime is no " +
                             "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                 }
-
-                if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
-                    log.error("Exception in global thread caused the application to attempt to shutdown." +

Review comment:
       WDYT about just checking for the case of a global-only topology before the `switch` statement, and just automatically invoking `closeToError()` with a warning that the other options are not supported in this case? We should also file an improvement ticket for the "restart the global thread" feature, if we don't already have one 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r602472401



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {

Review comment:
       I wasn't sure about how to inject an error into an only global topology. But I figured what we really needed to test was if we can add a thread to a global-only topology. So I did that WDYT?
   
   It will hit an illegal state exception as "Consumer is not subscribed to any topics or assigned any partitions" but that is after it will have resized the cache and after the shutdown rebalance has been triggered




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600060976



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");
+                    addStreamThread();
+                }
                 if (throwable instanceof Error) {
                     log.error("This option requires running threads to shut down the application." +
                             "but the uncaught exception was an Error, which means this runtime is no " +
                             "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                 }
-
-                if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
-                    log.error("Exception in global thread caused the application to attempt to shutdown." +

Review comment:
       I'm not sure there's a good way to check if it's a global-only topology at the moment, so I'm with not putting in a check for that case. Was just a suggestion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#issuecomment-805301703


   @wcarlson5 heads up, looks like you need to rebase this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r602584888



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {

Review comment:
       Well we know that the `SHUTDOWN_APPLICATION` will fail because the other client will not have a thread to receive the signal, unless you mean applications with different topologies?
   
   It doesn't appear to endlessly loop the illegal state exception because both threads are still alive at that point but  it does not work as intended. I think that maybe this won't solve the global thread only case. We should only add a stream thread when there is one thread but not if there is zero. At least this ill take care of the eos problem and we can handle the global case as before




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#issuecomment-805354345


   @ableegoldman fixed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #10387: KAFKA-12537: fix application shutdown corner case with only one thread

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #10387:
URL: https://github.com/apache/kafka/pull/10387


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600055319



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");
+                    addStreamThread();
+                }
                 if (throwable instanceof Error) {
                     log.error("This option requires running threads to shut down the application." +
                             "but the uncaught exception was an Error, which means this runtime is no " +
                             "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                 }
-
-                if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
-                    log.error("Exception in global thread caused the application to attempt to shutdown." +

Review comment:
       Well, who knows what the exception is -- could be some local disk error or corruption, or they're using a remote state store and god-knows-what happened. I agree that it's probably rare for a global-only app to hit an exception and fail to shut down the application, and not end up shutting down anyways due to hitting the same exception elsewhere. But anything's possible




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#issuecomment-805303422


   Also, since we have a fix for this, can we modify the `catch` blocks in the StreamThread loop to return false regardless of the processing mode? Also now that you explained how the default exception handler interacts with the StreamThread loop under ALOS, it seems like KAFKA-12537 probably affects both EOS and ALOS


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r602525060



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {

Review comment:
       You can use `StreamsBuilder#addGlobalStore` to inject an error, this method has a required parameter for a Processor which you're supposed to use to insert the data into the store (tbh no idea why you have to implement this yourself, it's caused a LOT of problems since it's unclear what you can/can't do with this -- but that's a whole other can of worms). 
   
   Adding a thread clearly answered the question about what would happen, but imo we should just go ahead and use the `StreamsBuilder#addGlobalStore` + two KafkaStreams instances + injected error on one app to actually test the SHUTDOWN_APPLICATION feature.
   
   Regarding the IllegalSTateException: wouldn't the thread hit this `IllegalStateException` before it got the chance to send the shutdown signal in a rebalance? IIRC the `subscribe` that's throwing is at the very beginning of the StreamThread loop. And unfortunately I think this may actually result in an endless cycle --  the new thread hits an unexpected exception and invokes the handler, SHUTDOWN_APPLICATION notices that this is the last StreamThread so it starts a new one, and so on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10387: HOTFIX: get EOS corner case

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r599886841



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -493,22 +493,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (countStreamThread(StreamThread::isRunning) == 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");
+                    addStreamThread();
+                }
                 if (throwable instanceof Error) {
                     log.error("This option requires running threads to shut down the application." +
                             "but the uncaught exception was an Error, which means this runtime is no " +
                             "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                 }
-
-                if (Thread.currentThread().equals(globalStreamThread) && countStreamThread(StreamThread::isRunning) == 0) {

Review comment:
       Since we are adding a thread we won't have this issue anymore

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -493,22 +493,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (countStreamThread(StreamThread::isRunning) == 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");

Review comment:
       If we add a thread before shutting down the application the thread with the exception can shutdown




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#issuecomment-808533844


   @ableegoldman I removed the global only case improvement and made https://issues.apache.org/jira/browse/KAFKA-12565.
   
   Now this just has a fix for if there is one thread (eos or not) and consolidations of eos and alos logic divergence 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600049583



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");
+                    addStreamThread();
+                }
                 if (throwable instanceof Error) {
                     log.error("This option requires running threads to shut down the application." +
                             "but the uncaught exception was an Error, which means this runtime is no " +
                             "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                 }
-
-                if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
-                    log.error("Exception in global thread caused the application to attempt to shutdown." +

Review comment:
       We do check for the replace thread in the replace thread option. We don't have the option to replace the global threads. I will make a ticket for that. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600049583



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");
+                    addStreamThread();
+                }
                 if (throwable instanceof Error) {
                     log.error("This option requires running threads to shut down the application." +
                             "but the uncaught exception was an Error, which means this runtime is no " +
                             "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                 }
-
-                if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
-                    log.error("Exception in global thread caused the application to attempt to shutdown." +

Review comment:
       We do check for the replace thread in the replace thread option. so we kinda already take care of it. If they do choose the shutdown the client we don't need to log a warning so its probably unnecessary. We don't have the option to replace the global threads yet. I will make a ticket for that. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600051169



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");
+                    addStreamThread();
+                }
                 if (throwable instanceof Error) {
                     log.error("This option requires running threads to shut down the application." +
                             "but the uncaught exception was an Error, which means this runtime is no " +
                             "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                 }
-
-                if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
-                    log.error("Exception in global thread caused the application to attempt to shutdown." +

Review comment:
       If all the Clients are global thread only shouldn't they hit the same error?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r602606489



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {

Review comment:
       Ah, I guess it's from trying to poll without any subscribed topics. Since that's not the case this alleviates my concern about the endless cycle too (In hindsight, yeah, no way `subscribe(<empty topics>)` would throw since that's supposed to be semantically identical to calling `unsubscribe()`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600050203



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -631,6 +625,14 @@ public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
     }
 
+    public void maybeSendShutdown() {
+        if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
+            log.warn("Detected that shutdown was requested. " +
+                    "All clients in this app will now begin to shutdown");
+            mainConsumer.enforceRebalance();

Review comment:
       This is also how we trigger the rebalance for the other threads so we can't just remove it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600059442



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -631,6 +625,14 @@ public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
     }
 
+    public void maybeSendShutdown() {
+        if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
+            log.warn("Detected that shutdown was requested. " +
+                    "All clients in this app will now begin to shutdown");
+            mainConsumer.enforceRebalance();

Review comment:
       But do we even need to invoke `maybeSendShutdown` at all from the two catch blocks? The thread that we start up should handle this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman edited a comment on pull request #10387: KAFKA-12537: fix application shutdown corner case with only one thread

Posted by GitBox <gi...@apache.org>.
ableegoldman edited a comment on pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#issuecomment-808632135


   Merged to trunk, just running the tests before pushing to 2.8 @vvcephei 
   
   edit: done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10387: KAFKA-12537: fix application shutdown corner case with only one thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#issuecomment-808619295


   Looks like just one unrelated flaky test failure: `kafka.common.record.MemoryRecordsBuilderTest.[18] magic=1, bufferOffset=15, compressionType=GZIP`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600875808



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {

Review comment:
       This PR LGTM but can you add a quick test for the case of a global-only topology with injected error + SHUTDOWN_APPLICATION handler? Just want to make sure nothing weird happens when you try to start up a StreamThread with no topology




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600052508



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");
+                    addStreamThread();
+                }
                 if (throwable instanceof Error) {
                     log.error("This option requires running threads to shut down the application." +
                             "but the uncaught exception was an Error, which means this runtime is no " +
                             "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                 }
-
-                if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
-                    log.error("Exception in global thread caused the application to attempt to shutdown." +

Review comment:
       Whoops, should've scrolled up a bit to the `replaceThread()` method. Thanks for filing the ticket




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600042316



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -631,6 +625,14 @@ public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
     }
 
+    public void maybeSendShutdown() {
+        if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
+            log.warn("Detected that shutdown was requested. " +
+                    "All clients in this app will now begin to shutdown");
+            mainConsumer.enforceRebalance();

Review comment:
       Since this thread is going to immediately shut down anyways, I think we can skip the `mainConsumer.enforceRebalance()`

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");
+                    addStreamThread();
+                }
                 if (throwable instanceof Error) {
                     log.error("This option requires running threads to shut down the application." +
                             "but the uncaught exception was an Error, which means this runtime is no " +
                             "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                 }
-
-                if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
-                    log.error("Exception in global thread caused the application to attempt to shutdown." +

Review comment:
       Ah, nice that this solves the global thread issue as well! I guess technically this will still fail to communicate the shutdown if the application only ever runs the global thread and literally never started up any StreamThreads, but I think that's fine. Apparently running a global-only Streams app is a thing, as some users have reported in the past, but I would imagine this use case would almost certainly prefer the `REPLACE_THREAD` option.
   
   Ooh, wait. Do we need to add this check in the `REPLACE_THREAD` handling so we don't start up a StreamThread if it was the global thread that was killed?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -493,22 +493,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (countStreamThread(StreamThread::isRunning) == 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");

Review comment:
       ```suggestion
                       log.warn("Attempt to shut down the application requires adding a thread to communicate the shutdown. No processing will be done on this thread");
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r600041390



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {
+                    log.warn("Adding thread to communicate the shutdown. No processing will be done on this thread");
+                    addStreamThread();
+                }
                 if (throwable instanceof Error) {
                     log.error("This option requires running threads to shut down the application." +
                             "but the uncaught exception was an Error, which means this runtime is no " +
                             "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                 }
-
-                if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
-                    log.error("Exception in global thread caused the application to attempt to shutdown." +

Review comment:
       Ah, nice that this solves the global thread issue as well! I guess technically this will still fail to communicate the shutdown if the application only ever runs the global thread and literally never started up any StreamThreads, but I think that's fine. Apparently running a global-only Streams app is a thing, as some users have reported in the past, but I would imagine this use case would almost certainly prefer the `REPLACE_THREAD` option.
   
   ~Ooh, wait. Do we need to add this check in the `REPLACE_THREAD` handling so we don't start up a StreamThread if it was the global thread that was killed?~ edit: we already do exactly this, but it's in `replaceThread()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r602525060



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {

Review comment:
       You can use `StreamsBuilder#addGlobalStore` to inject an error, this method has a required parameter for a Processor which you're supposed to use to insert the data into the store (tbh no idea why you have to implement this yourself, it's caused a LOT of problems since it's unclear what you can/can't do with this -- but that's a whole other can of worms). 
   
   Adding a thread is probably a fine workaround for now, it clearly answered the question, but imo we should just go ahead and use the `StreamsBuilder#addGlobalStore` + injected error to decouple the test from the way we currently happen to handle this. Then we won't have to rewrite it if we ever change something.
   
   Regarding the IllegalSTateException: wouldn't the thread hit this `IllegalStateException` before it got the chance to send the shutdown signal in a rebalance? IIRC the `subscribe` that's throwing is at the very beginning of the StreamThread loop. And unfortunately I think this may actually result in an endless cycle --  the new thread hits an unexpected exception and invokes the handler, SHUTDOWN_APPLICATION notices that this is the last StreamThread so it starts a new one, and so on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10387: KAFKA-12537: fix application shutdown corner case with only one thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#issuecomment-808632135


   Merged to trunk, just running the tests before pushing to 2.8 @vvcephei 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10387: KAFKA-12537: get EOS corner case

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r602591231



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {

Review comment:
       Also the `IllegalStateException` is coming from the runLoop not the `subcribe`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org