You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/10 15:57:45 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #9697: KAFKA-10810: Replace stream threads

cadonna commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r540249465



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,25 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                     "The old handler will be ignored as long as a new handler is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                if (globalStreamThread != null && Thread.currentThread().getName().equals(globalStreamThread.getName())) {
+                    log.warn("The global thread cannot be replaced. Reverting to shutting down the client.");
+                    log.error("Encountered the following exception during processing " +
+                            "and the registered exception handler opted to " + action + "." +
+                            " The streams client is going to shut down now. ", throwable);
+                    close(Duration.ZERO);
+                }
+                final StreamThread deadThread = (StreamThread) Thread.currentThread();
+                threads.remove(deadThread);
+                addStreamThread();
+                deadThread.shutdown();
+                if (throwable instanceof RuntimeException) {
+                    throw (RuntimeException) throwable;
+                } else if (throwable instanceof Error) {
+                    throw (Error) throwable;
+                } else {
+                    throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable);
+                }

Review comment:
       I think it would be cleaner to extract this code to a separate method.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -202,6 +213,29 @@ private void testShutdownApplication(final int numThreads) throws InterruptedExc
             assertThat(processorValueCollector.size(), equalTo(1));
         }
     }
+
+    private void testReplaceThreads(final int numThreads) throws InterruptedException {
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+
+            final AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                if (count.get() > 2) {
+                    return SHUTDOWN_CLIENT;
+                }
+                return REPLACE_THREAD;
+            });
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+            produceMessages(0L, inputTopic, "A");
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+
+            assertThat(processorValueCollector.size(), equalTo(3));
+            //because we only have 2 threads at the start and each record kills a thread we must have replaced threads

Review comment:
       Could you please be a bit clearer in the explanatory comment? BTW, we execute this test also once with just one stream thread so the 2 stream threads in the comment are not correct. Also, wouldn't it be better to explain the verification in the call to `assertThat()` instead of in a comment? You can pass a reason to the method.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -202,6 +213,29 @@ private void testShutdownApplication(final int numThreads) throws InterruptedExc
             assertThat(processorValueCollector.size(), equalTo(1));
         }
     }
+
+    private void testReplaceThreads(final int numThreads) throws InterruptedException {
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+
+            final AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                if (count.get() > 2) {
+                    return SHUTDOWN_CLIENT;
+                }
+                return REPLACE_THREAD;
+            });

Review comment:
       I think it would be better to have a test that shows that a new thread that replaced a failed one, actually is able to process records. So, I would let the new thread process some records and then shutdown the client with a normal close.
   
   Maybe similar applies to the shutdown tests. First let the client/application process some records and then throw an exception that shuts down the client/application. I guess, this last paragraph is something for a separate PR.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,25 @@ private void handleStreamsUncaughtException(final Throwable throwable,
                     "The old handler will be ignored as long as a new handler is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                if (globalStreamThread != null && Thread.currentThread().getName().equals(globalStreamThread.getName())) {
+                    log.warn("The global thread cannot be replaced. Reverting to shutting down the client.");
+                    log.error("Encountered the following exception during processing " +
+                            "and the registered exception handler opted to " + action + "." +
+                            " The streams client is going to shut down now. ", throwable);
+                    close(Duration.ZERO);
+                }
+                final StreamThread deadThread = (StreamThread) Thread.currentThread();
+                threads.remove(deadThread);
+                addStreamThread();
+                deadThread.shutdown();

Review comment:
       Do we need to shutdown the dead stream thread? `completeShutDown()` will be called anyways. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -202,6 +213,29 @@ private void testShutdownApplication(final int numThreads) throws InterruptedExc
             assertThat(processorValueCollector.size(), equalTo(1));
         }
     }
+
+    private void testReplaceThreads(final int numThreads) throws InterruptedException {
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
+
+            final AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                if (count.get() > 2) {
+                    return SHUTDOWN_CLIENT;
+                }
+                return REPLACE_THREAD;
+            });
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+            produceMessages(0L, inputTopic, "A");
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+
+            assertThat(processorValueCollector.size(), equalTo(3));
+            //because we only have 2 threads at the start and each record kills a thread we must have replaced threads
+        }
+    }
 }

Review comment:
       A test is missing for a global stream thread that calls the uncaught exception handler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org