You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/05 08:07:26 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #11381: KAFKA-12648: allow users to set a StreamsUncaughtExceptionHandler on individual named topologies

ableegoldman opened a new pull request #11381:
URL: https://github.com/apache/kafka/pull/11381


   In the current exception handler, there's not much granularity and all exceptions from all sources are treated equally. With the introduction of independent named topologies, it would be nice to have some way to differentiate which topology an exception was thrown from during processing.
   
   Currently we only consider exceptions thrown from the `process()` method of a task in that named topology, but eventually we can expand this to other exceptions that are tied to a particular topology, for example errors pertaining to the topics of that query
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman closed pull request #11381: KAFKA-12648: allow users to set a StreamsUncaughtExceptionHandler on individual named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman closed pull request #11381:
URL: https://github.com/apache/kafka/pull/11381


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11381: KAFKA-12648: allow users to set a StreamsUncaughtExceptionHandler on individual named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #11381:
URL: https://github.com/apache/kafka/pull/11381#issuecomment-946108493


   Closing in favor of https://github.com/apache/kafka/pull/11405


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11381: KAFKA-12648: allow users to set a StreamsUncaughtExceptionHandler on individual named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11381:
URL: https://github.com/apache/kafka/pull/11381#discussion_r730071032



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -513,8 +514,8 @@ private boolean wrappedExceptionIsIn(final Throwable throwable, final Set<Class<
         return action;
     }
 
-    private void handleStreamsUncaughtException(final Throwable throwable,
-                                                final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+    protected void handleStreamsUncaughtException(final Throwable throwable,

Review comment:
       Ah, I had meant to pass in `namedTopologyException.getCause` to the handler. Yes, you should not need to unwrap twice, that was just a mistake in the PR. I'm on board with wrapping everything as a StreamsException though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11381: KAFKA-12648: allow users to set a StreamsUncaughtExceptionHandler on individual named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11381:
URL: https://github.com/apache/kafka/pull/11381#discussion_r722480165



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -299,6 +301,7 @@ public boolean isRunning() {
     private final java.util.function.Consumer<Long> cacheResizer;
 
     private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
+    private final Map<String, java.util.function.Consumer<Throwable>> topologyExceptionHandlers;

Review comment:
       Good catch, yes we should clean up after a topology is removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11381: KAFKA-12648: allow users to set a StreamsUncaughtExceptionHandler on individual named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11381:
URL: https://github.com/apache/kafka/pull/11381#discussion_r722488122



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1280,29 +1281,36 @@ int process(final int maxNumRecords, final Time time) {
             int processed = 0;
             final long then = now;
             try {
-                while (processed < maxNumRecords && task.process(now)) {
-                    task.clearTaskTimeout();
-                    processed++;
+                try {
+                    while (processed < maxNumRecords && task.process(now)) {
+                        task.clearTaskTimeout();
+                        processed++;
+                    }
+                } catch (final TimeoutException timeoutException) {
+                    task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
+                    log.debug(
+                        String.format(
+                            "Could not complete processing records for %s due to the following exception; will move to next task and retry later",
+                            task.id()),
+                        timeoutException
+                    );
+                } catch (final TaskMigratedException e) {
+                    log.info("Failed to process stream task {} since it got migrated to another thread already. " +
+                                 "Will trigger a new rebalance and close all tasks as zombies together.", task.id());
+                    throw e;
+                } catch (final RuntimeException e) {
+                    log.error("Failed to process stream task {} due to the following error:", task.id(), e);
+                    throw e;
+                } finally {
+                    now = time.milliseconds();
+                    totalProcessed += processed;
+                    task.recordProcessBatchTime(now - then);
+                }
+            } catch (final Throwable e) {
+                final String topologyName = task.id().topologyName();

Review comment:
       Fixed 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11381: KAFKA-12648: allow users to set a StreamsUncaughtExceptionHandler on individual named topologies

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11381:
URL: https://github.com/apache/kafka/pull/11381#discussion_r722928724



##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/NamedTopologyException.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+public class NamedTopologyException extends KafkaException {

Review comment:
       We should add some java doc for this class, to explain what's the meaning for this exception, and some description, like other custom exception classes.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -138,6 +140,29 @@ public void addNamedTopology(final NamedTopology newTopology) {
         topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
     }
 
+    /**
+     * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
+     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * it to begin processing the new topology.

Review comment:
       Should we add some description for `topologyExceptionHandler`? ex: `This method will also set the {@code topologyExceptionHandler} for the {@newTopology}`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11381: KAFKA-12648: allow users to set a StreamsUncaughtExceptionHandler on individual named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #11381:
URL: https://github.com/apache/kafka/pull/11381#issuecomment-944632005


   Hey @wcarlson5 @guozhangwang @showuon , I opened a new PR with an alternate approach: https://github.com/apache/kafka/pull/11405
   
   I think this one has the advantage of benefiting everyone since it can be useful to know the taskid as well, and also to clean up the exceptions to make sure we always hand a StreamsException to the user. Lmk your thoughts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11381: KAFKA-12648: allow users to set a StreamsUncaughtExceptionHandler on individual named topologies

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11381:
URL: https://github.com/apache/kafka/pull/11381#discussion_r722408690



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1280,29 +1281,36 @@ int process(final int maxNumRecords, final Time time) {
             int processed = 0;
             final long then = now;
             try {
-                while (processed < maxNumRecords && task.process(now)) {
-                    task.clearTaskTimeout();
-                    processed++;
+                try {
+                    while (processed < maxNumRecords && task.process(now)) {
+                        task.clearTaskTimeout();
+                        processed++;
+                    }
+                } catch (final TimeoutException timeoutException) {
+                    task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
+                    log.debug(
+                        String.format(
+                            "Could not complete processing records for %s due to the following exception; will move to next task and retry later",
+                            task.id()),
+                        timeoutException
+                    );
+                } catch (final TaskMigratedException e) {
+                    log.info("Failed to process stream task {} since it got migrated to another thread already. " +
+                                 "Will trigger a new rebalance and close all tasks as zombies together.", task.id());
+                    throw e;
+                } catch (final RuntimeException e) {
+                    log.error("Failed to process stream task {} due to the following error:", task.id(), e);
+                    throw e;
+                } finally {
+                    now = time.milliseconds();
+                    totalProcessed += processed;
+                    task.recordProcessBatchTime(now - then);
+                }
+            } catch (final Throwable e) {
+                final String topologyName = task.id().topologyName();

Review comment:
       Do we want to swallow if it is not a named topology? should we not just rethrow the error in that case?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -299,6 +301,7 @@ public boolean isRunning() {
     private final java.util.function.Consumer<Long> cacheResizer;
 
     private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
+    private final Map<String, java.util.function.Consumer<Throwable>> topologyExceptionHandlers;

Review comment:
       Do we want to be able to clean these up after we removing a topology? We might not but it might also be a good idea.
   
   Example we remove a topology with a handler and replace one with the same name without a handler or using the non named topology handler




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11381: KAFKA-12648: allow users to set a StreamsUncaughtExceptionHandler on individual named topologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #11381:
URL: https://github.com/apache/kafka/pull/11381#issuecomment-934171853


   Haven't finished writing tests for this, but the non-testing code is ready for review -- cc @wcarlson5 @guozhangwang @rodesai 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11381: KAFKA-12648: allow users to set a StreamsUncaughtExceptionHandler on individual named topologies

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11381:
URL: https://github.com/apache/kafka/pull/11381#discussion_r724594540



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -971,6 +972,9 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin
             streamsUncaughtExceptionHandler
         );
         streamThread.setStateListener(streamStateListener);
+        for (final Map.Entry<String, Consumer<Throwable>> exceptionHandler : topologyExceptionHandlers.entrySet()) {

Review comment:
       Should we synchronize on the `changeThreadCount` object as well?

##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/NamedTopologyException.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+public class NamedTopologyException extends KafkaException {
+
+    final String topologyName;
+
+    public NamedTopologyException(final String topologyName, final Throwable throwable) {

Review comment:
       Could we consider overriding the `toString` function to include the `topologyName`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -513,8 +514,8 @@ private boolean wrappedExceptionIsIn(final Throwable throwable, final Set<Class<
         return action;
     }
 
-    private void handleStreamsUncaughtException(final Throwable throwable,
-                                                final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+    protected void handleStreamsUncaughtException(final Throwable throwable,

Review comment:
       Since we wrap the real exception with the NamedTopologyException, the logic inside `getActionForThrowable` need to possible peal off that layer (i.e. we may need to unwrap twice)..
   
   I'm thinking if we can do something simpler than wrapping / unwrapping here: 
   1) we are almost wrapping all non-streams exceptions as a StreamsException anyways, so let's just take a look at where we do not and enforce that. In a way we know that the thrown exception to the thread is either a StreamsException by itself and has no cause, or a wrapped StreamsException (but only wrapped once) with a cause.
   2) We added a `topologyName` to StreamsException, where `null` indicate it's global. When throwing that directly or wrapping it on a non-streams cause, we set this field.
   3) The uncaughtExceptionHandler would then expect only only see `StreamsException` as in 1) above or unchecked exception; for the former we look at that field directly, for the later we just treat it as a global fatal one.
   
   Then we do not need a wrapper exception anymore, and does not need nested try-catch either.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org