You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/04 22:17:37 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request #9697: KAFKA-10810: Replace stream threads

wcarlson5 opened a new pull request #9697:
URL: https://github.com/apache/kafka/pull/9697


   StreamThreads can now be replaced in the streams uncaught exception handler
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r541028355



##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##########
@@ -27,6 +27,7 @@
      * Enumeration that describes the response from the exception handler.
      */
     enum StreamThreadExceptionResponse {
+        REPLACE_THREAD(0, "REPLACE_STREAM_THREAD"),

Review comment:
       Oh, good catch.
   
   Just a quick question: did we misname this option in the KIP? A StreamThread is a specific kind of thread. What I mean is that a GlobalStreamThread is _not_ a StreamThread. Perhaps `REPLACE_THREAD` and `SHUTDOWN_THREAD` would have been better, more general names, for these. If you agree, I think we can just amend the KIP and fix it in this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r541114609



##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##########
@@ -27,6 +27,7 @@
      * Enumeration that describes the response from the exception handler.
      */
     enum StreamThreadExceptionResponse {
+        REPLACE_THREAD(0, "REPLACE_STREAM_THREAD"),

Review comment:
       What if we add an option to replace the global thread?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r539636011



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                     "The old handler will be ignored as long as a new handler is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                StreamThread deadThread = (StreamThread) threads.stream().filter(n -> n.getName().equals(Thread.currentThread().getName())).toArray()[0];
+                threads.remove(deadThread);

Review comment:
       It won't matter. Ensures is too strong of a word. IF there has been other threads removed before this it may have a different name. However this ensures that the replacement thread will never have a thread index larger than the number of threads




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zydzjy commented on pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
zydzjy commented on pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#issuecomment-756615836


   what can i do with replace new thread?Can i skip the current offset in exception handler,cause the exception occured by the data with current offset?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r539615678



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -202,6 +213,28 @@ private void testShutdownApplication(final int numThreads) throws InterruptedExc
             assertThat(processorValueCollector.size(), equalTo(1));
         }
     }
+
+    private void testReplaceThreads(final int numThreads) throws InterruptedException {
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+
+            final AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                if (count.get() > 2) {
+                    return SHUTDOWN_CLIENT;
+                }
+                return REPLACE_THREAD;
+            });
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+            produceMessages(0L, inputTopic, "A");
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+
+            assertThat(processorValueCollector.size(), equalTo(3));

Review comment:
       It's not obvious to me how this verifies that the thread actually got replaced. Maybe an explanatory comment is in order?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r540308752



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,25 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                     "The old handler will be ignored as long as a new handler is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                if (globalStreamThread != null && Thread.currentThread().getName().equals(globalStreamThread.getName())) {
+                    log.warn("The global thread cannot be replaced. Reverting to shutting down the client.");
+                    log.error("Encountered the following exception during processing " +
+                            "and the registered exception handler opted to " + action + "." +
+                            " The streams client is going to shut down now. ", throwable);
+                    close(Duration.ZERO);
+                }
+                final StreamThread deadThread = (StreamThread) Thread.currentThread();
+                threads.remove(deadThread);
+                addStreamThread();
+                deadThread.shutdown();

Review comment:
       I don't think it matters, it just set the thread state earlier, but we can delete it

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,25 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                     "The old handler will be ignored as long as a new handler is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                if (globalStreamThread != null && Thread.currentThread().getName().equals(globalStreamThread.getName())) {
+                    log.warn("The global thread cannot be replaced. Reverting to shutting down the client.");
+                    log.error("Encountered the following exception during processing " +
+                            "and the registered exception handler opted to " + action + "." +
+                            " The streams client is going to shut down now. ", throwable);
+                    close(Duration.ZERO);
+                }
+                final StreamThread deadThread = (StreamThread) Thread.currentThread();
+                threads.remove(deadThread);
+                addStreamThread();
+                deadThread.shutdown();
+                if (throwable instanceof RuntimeException) {
+                    throw (RuntimeException) throwable;
+                } else if (throwable instanceof Error) {
+                    throw (Error) throwable;
+                } else {
+                    throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable);
+                }

Review comment:
       we can do that

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -202,6 +213,29 @@ private void testShutdownApplication(final int numThreads) throws InterruptedExc
             assertThat(processorValueCollector.size(), equalTo(1));
         }
     }
+
+    private void testReplaceThreads(final int numThreads) throws InterruptedException {
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+
+            final AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                if (count.get() > 2) {
+                    return SHUTDOWN_CLIENT;
+                }
+                return REPLACE_THREAD;
+            });
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+            produceMessages(0L, inputTopic, "A");
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+
+            assertThat(processorValueCollector.size(), equalTo(3));
+            //because we only have 2 threads at the start and each record kills a thread we must have replaced threads

Review comment:
       I edited the test to be based on the number of thread instead of hard coding. And gave a reason

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -202,6 +213,29 @@ private void testShutdownApplication(final int numThreads) throws InterruptedExc
             assertThat(processorValueCollector.size(), equalTo(1));
         }
     }
+
+    private void testReplaceThreads(final int numThreads) throws InterruptedException {
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+
+            final AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                if (count.get() > 2) {
+                    return SHUTDOWN_CLIENT;
+                }
+                return REPLACE_THREAD;
+            });

Review comment:
       We can change the test so that we verify the replaced threads can process records. 
   
   I am not sure that is necessary for the shutdown as testing if streams can process some records once started should be tested elsewhere, but in any case I think that the PR is not the place for this discussion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r536416836



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -145,6 +148,26 @@ public void shouldShutdownClient() throws InterruptedException {
         }
     }
 
+    @Test
+    public void shouldReplaceThread() throws InterruptedException {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+            AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                return REPLACE_THREAD;
+            });
+
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+            produceMessages(0L, inputTopic, "A");
+            TestUtils.waitForCondition(() -> count.get() > 2, DEFAULT_TIMEOUT, "At least 3 threads have died");

Review comment:
       All of the original threads have died and been replaced and it is still running




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r540661585



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -202,6 +213,29 @@ private void testShutdownApplication(final int numThreads) throws InterruptedExc
             assertThat(processorValueCollector.size(), equalTo(1));
         }
     }
+
+    private void testReplaceThreads(final int numThreads) throws InterruptedException {
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+
+            final AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                if (count.get() > 2) {
+                    return SHUTDOWN_CLIENT;
+                }
+                return REPLACE_THREAD;
+            });
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+            produceMessages(0L, inputTopic, "A");
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+
+            assertThat(processorValueCollector.size(), equalTo(3));
+            //because we only have 2 threads at the start and each record kills a thread we must have replaced threads
+        }
+    }
 }

Review comment:
       added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r537899383



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,27 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                     "The old handler will be ignored as long as a new handler is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                log.warn("The global thread can not be replaced. Reverting to shutting down the client.");

Review comment:
       nit: "can not" -> "cannot"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#issuecomment-758067947


   @zydzjy I don't think you can do that yet, at least not with anything I have added here. The idea is to recover from transient expectations without having to restart the client. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zydzjy commented on pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
zydzjy commented on pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#issuecomment-756615836


   what can i do with replace new thread?Can i skip the current offset in exception handler,cause the exception occured by the data with current offset?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r541119671



##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##########
@@ -27,6 +27,7 @@
      * Enumeration that describes the response from the exception handler.
      */
     enum StreamThreadExceptionResponse {
+        REPLACE_THREAD(0, "REPLACE_STREAM_THREAD"),

Review comment:
       Ah, now I got it! Sorry! Makes sense! In that case we can reuse `REPLACE_THREAD` also for the global stream thread. Forgot about that!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r537899383



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,27 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                     "The old handler will be ignored as long as a new handler is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                log.warn("The global thread can not be replaced. Reverting to shutting down the client.");

Review comment:
       nit: "can not" -> "cannot", same below




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r536416836



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -145,6 +148,26 @@ public void shouldShutdownClient() throws InterruptedException {
         }
     }
 
+    @Test
+    public void shouldReplaceThread() throws InterruptedException {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+            AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                return REPLACE_THREAD;
+            });
+
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+            produceMessages(0L, inputTopic, "A");
+            TestUtils.waitForCondition(() -> count.get() > 2, DEFAULT_TIMEOUT, "At least 3 threads have died");

Review comment:
       All of the original threads have died and been replaced and it is still running

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                     "The old handler will be ignored as long as a new handler is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                StreamThread deadThread = (StreamThread) threads.stream().filter(n -> n.getName().equals(Thread.currentThread().getName())).toArray()[0];
+                threads.remove(deadThread);

Review comment:
       removing from the thread list does 2 things
   
   1. keeps DEAD threads out of the list as kip-663 dictates
   2. ensures the next thread has the same name

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                     "The old handler will be ignored as long as a new handler is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                StreamThread deadThread = (StreamThread) threads.stream().filter(n -> n.getName().equals(Thread.currentThread().getName())).toArray()[0];
+                threads.remove(deadThread);
+                addStreamThread();
+                deadThread.shutdown();
+                if (throwable instanceof RuntimeException) {

Review comment:
       We need to throw the error or the task gets lost and we drop records




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r541070527



##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##########
@@ -27,6 +27,7 @@
      * Enumeration that describes the response from the exception handler.
      */
     enum StreamThreadExceptionResponse {
+        REPLACE_THREAD(0, "REPLACE_STREAM_THREAD"),

Review comment:
       It should be the string is `REPLACE_THREAD`. Ill just fix that, its just left over from a period of time where we thought of calling it `REPLACE_STREAM_THREAD`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r539678346



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,25 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                     "The old handler will be ignored as long as a new handler is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                if (globalStreamThread != null && Thread.currentThread().getName().equals(globalStreamThread.getName())) {
+                    log.warn("The global thread cannot be replaced. Reverting to shutting down the client.");
+                    log.error("Encountered the following exception during processing " +
+                            "and the registered exception handler opted to " + action + "." +
+                            " The streams client is going to shut down now. ", throwable);
+                    close(Duration.ZERO);
+                }
+                final StreamThread deadThread = (StreamThread) threads.stream().filter(n -> n.getName().equals(Thread.currentThread().getName())).toArray()[0];

Review comment:
       Yep, it is now :)

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -202,6 +213,28 @@ private void testShutdownApplication(final int numThreads) throws InterruptedExc
             assertThat(processorValueCollector.size(), equalTo(1));
         }
     }
+
+    private void testReplaceThreads(final int numThreads) throws InterruptedException {
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+
+            final AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                if (count.get() > 2) {
+                    return SHUTDOWN_CLIENT;
+                }
+                return REPLACE_THREAD;
+            });
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+            produceMessages(0L, inputTopic, "A");
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+
+            assertThat(processorValueCollector.size(), equalTo(3));

Review comment:
       good idea




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9697:
URL: https://github.com/apache/kafka/pull/9697


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r540865043



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -436,6 +436,26 @@ private void defaultStreamsUncaughtExceptionHandler(final Throwable throwable) {
         }
     }
 
+    private void replaceThreadHelper(final Throwable throwable) {

Review comment:
       Please rename to `replaceStreamThread()`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##########
@@ -27,6 +27,7 @@
      * Enumeration that describes the response from the exception handler.
      */
     enum StreamThreadExceptionResponse {
+        REPLACE_THREAD(0, "REPLACE_STREAM_THREAD"),

Review comment:
       I would rename it to `REPLACE_STREAM_THREAD`.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -155,16 +171,46 @@ public void shouldShutdownSingleThreadApplication() throws InterruptedException
         testShutdownApplication(1);
     }
 
+    @Test
+    public void testGlobalThreadException() throws InterruptedException {
+        builder  = new StreamsBuilder();
+        builder.addGlobalStore(
+            new KeyValueStoreBuilder<>(
+                Stores.persistentKeyValueStore("globalStore"),
+                Serdes.String(),
+                Serdes.String(),
+                CLUSTER.time
+            ),
+            inputTopic,
+            Consumed.with(Serdes.String(), Serdes.String()),
+            () -> new ShutdownProcessor(processorValueCollector)
+        );
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0);
+
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+            kafkaStreams.setUncaughtExceptionHandler(exception -> SHUTDOWN_CLIENT);

Review comment:
       The point of this test should be that a global stream thread is not replaced but the client is shutdown instead. Hence, the uncaught exception handler should return `REPLACE_THREAD`, not `SHUTDOWN_CLIENT`.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -155,16 +171,46 @@ public void shouldShutdownSingleThreadApplication() throws InterruptedException
         testShutdownApplication(1);
     }
 
+    @Test
+    public void testGlobalThreadException() throws InterruptedException {

Review comment:
       Please rename to something like `shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r539583538



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,25 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                     "The old handler will be ignored as long as a new handler is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                if (globalStreamThread != null && Thread.currentThread().getName().equals(globalStreamThread.getName())) {
+                    log.warn("The global thread cannot be replaced. Reverting to shutting down the client.");
+                    log.error("Encountered the following exception during processing " +
+                            "and the registered exception handler opted to " + action + "." +
+                            " The streams client is going to shut down now. ", throwable);
+                    close(Duration.ZERO);
+                }
+                final StreamThread deadThread = (StreamThread) threads.stream().filter(n -> n.getName().equals(Thread.currentThread().getName())).toArray()[0];

Review comment:
       Is this equivalent to `Thread.currentThead()`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r541099547



##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##########
@@ -27,6 +27,7 @@
      * Enumeration that describes the response from the exception handler.
      */
     enum StreamThreadExceptionResponse {
+        REPLACE_THREAD(0, "REPLACE_STREAM_THREAD"),

Review comment:
       I actually would be in favor of calling the enum value `REPLACE_STREAM_THREAD`. A stream thread is a stream thread and a global stream thread is a global stream thread. I am aware that the KIP calls the enum value differently, but we also have a config that is called 'NUM_STREAM_THREADS_CONFIG' and we have also 'addStreamThread()' and `removeStreamThread()`. So I guess, the name to the outside of this is stream thread and not thread. We have also other threads in Kafka Streams like the state directory cleaner thread and the RocksDB metrics recording thread.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r541071515



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -155,16 +171,46 @@ public void shouldShutdownSingleThreadApplication() throws InterruptedException
         testShutdownApplication(1);
     }
 
+    @Test
+    public void testGlobalThreadException() throws InterruptedException {
+        builder  = new StreamsBuilder();
+        builder.addGlobalStore(
+            new KeyValueStoreBuilder<>(
+                Stores.persistentKeyValueStore("globalStore"),
+                Serdes.String(),
+                Serdes.String(),
+                CLUSTER.time
+            ),
+            inputTopic,
+            Consumed.with(Serdes.String(), Serdes.String()),
+            () -> new ShutdownProcessor(processorValueCollector)
+        );
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0);
+
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+            kafkaStreams.setUncaughtExceptionHandler(exception -> SHUTDOWN_CLIENT);

Review comment:
       I guess we can test that here at the same time




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r540249465



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,25 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                     "The old handler will be ignored as long as a new handler is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                if (globalStreamThread != null && Thread.currentThread().getName().equals(globalStreamThread.getName())) {
+                    log.warn("The global thread cannot be replaced. Reverting to shutting down the client.");
+                    log.error("Encountered the following exception during processing " +
+                            "and the registered exception handler opted to " + action + "." +
+                            " The streams client is going to shut down now. ", throwable);
+                    close(Duration.ZERO);
+                }
+                final StreamThread deadThread = (StreamThread) Thread.currentThread();
+                threads.remove(deadThread);
+                addStreamThread();
+                deadThread.shutdown();
+                if (throwable instanceof RuntimeException) {
+                    throw (RuntimeException) throwable;
+                } else if (throwable instanceof Error) {
+                    throw (Error) throwable;
+                } else {
+                    throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable);
+                }

Review comment:
       I think it would be cleaner to extract this code to a separate method.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -202,6 +213,29 @@ private void testShutdownApplication(final int numThreads) throws InterruptedExc
             assertThat(processorValueCollector.size(), equalTo(1));
         }
     }
+
+    private void testReplaceThreads(final int numThreads) throws InterruptedException {
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+
+            final AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                if (count.get() > 2) {
+                    return SHUTDOWN_CLIENT;
+                }
+                return REPLACE_THREAD;
+            });
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+            produceMessages(0L, inputTopic, "A");
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+
+            assertThat(processorValueCollector.size(), equalTo(3));
+            //because we only have 2 threads at the start and each record kills a thread we must have replaced threads

Review comment:
       Could you please be a bit clearer in the explanatory comment? BTW, we execute this test also once with just one stream thread so the 2 stream threads in the comment are not correct. Also, wouldn't it be better to explain the verification in the call to `assertThat()` instead of in a comment? You can pass a reason to the method.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -202,6 +213,29 @@ private void testShutdownApplication(final int numThreads) throws InterruptedExc
             assertThat(processorValueCollector.size(), equalTo(1));
         }
     }
+
+    private void testReplaceThreads(final int numThreads) throws InterruptedException {
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+
+            final AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                if (count.get() > 2) {
+                    return SHUTDOWN_CLIENT;
+                }
+                return REPLACE_THREAD;
+            });

Review comment:
       I think it would be better to have a test that shows that a new thread that replaced a failed one, actually is able to process records. So, I would let the new thread process some records and then shutdown the client with a normal close.
   
   Maybe similar applies to the shutdown tests. First let the client/application process some records and then throw an exception that shuts down the client/application. I guess, this last paragraph is something for a separate PR.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,25 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                     "The old handler will be ignored as long as a new handler is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                if (globalStreamThread != null && Thread.currentThread().getName().equals(globalStreamThread.getName())) {
+                    log.warn("The global thread cannot be replaced. Reverting to shutting down the client.");
+                    log.error("Encountered the following exception during processing " +
+                            "and the registered exception handler opted to " + action + "." +
+                            " The streams client is going to shut down now. ", throwable);
+                    close(Duration.ZERO);
+                }
+                final StreamThread deadThread = (StreamThread) Thread.currentThread();
+                threads.remove(deadThread);
+                addStreamThread();
+                deadThread.shutdown();

Review comment:
       Do we need to shutdown the dead stream thread? `completeShutDown()` will be called anyways. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -202,6 +213,29 @@ private void testShutdownApplication(final int numThreads) throws InterruptedExc
             assertThat(processorValueCollector.size(), equalTo(1));
         }
     }
+
+    private void testReplaceThreads(final int numThreads) throws InterruptedException {
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+
+            final AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                if (count.get() > 2) {
+                    return SHUTDOWN_CLIENT;
+                }
+                return REPLACE_THREAD;
+            });
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+            produceMessages(0L, inputTopic, "A");
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+
+            assertThat(processorValueCollector.size(), equalTo(3));
+            //because we only have 2 threads at the start and each record kills a thread we must have replaced threads
+        }
+    }
 }

Review comment:
       A test is missing for a global stream thread that calls the uncaught exception handler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r539584371



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,18 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                     "The old handler will be ignored as long as a new handler is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                StreamThread deadThread = (StreamThread) threads.stream().filter(n -> n.getName().equals(Thread.currentThread().getName())).toArray()[0];
+                threads.remove(deadThread);

Review comment:
       To ensure #2 holds, do we need to have a mutex on thread creation? Or does it not matter?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org