You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/15 17:53:19 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request #10544: KAFKA-12648: minimal changes for error handling namedTopologies

wcarlson5 opened a new pull request #10544:
URL: https://github.com/apache/kafka/pull/10544


   changed the obvious ones to attribute to a named topology. This might be all we need, we can always add more if they come up.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #10544: KAFKA-12648: minimal changes for error handling namedTopologies

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #10544:
URL: https://github.com/apache/kafka/pull/10544#issuecomment-820620451


   @ableegoldman This is what I think the error handling should look like. We can try to categorize more but I think we should make this best effort and we can add to it as we go.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10544: KAFKA-12648: minimum changes for error handling namedTopologies

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10544:
URL: https://github.com/apache/kafka/pull/10544#discussion_r614398250



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable throwable) {
 
     private void handleStreamsUncaughtException(final Throwable throwable,
                                                 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+        if (throwable instanceof NamedTopologyStreamsException) {
+            String name = ((NamedTopologyStreamsException) throwable).getTopologyName();
+            ((StreamThread) Thread.currentThread()).deprioritizeNamedTopology(name);

Review comment:
       The thread is guaranteed to be handled on the dying thread. Though we might want to call this on the new thread
   
   EDIT: we def will need to track the failed topologies in `KafkaStreams`, though tracking successes across restarts will be an issue




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10544: KAFKA-12648: minimum changes for error handling namedTopologies

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10544:
URL: https://github.com/apache/kafka/pull/10544#discussion_r614398757



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1198,7 +1210,7 @@ int process(final int maxNumRecords, final Time time) {
                 task.recordProcessBatchTime(now - then);
             }
         }
-
+        reprioritizeTasks();

Review comment:
       if processing succeeds 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -43,6 +43,8 @@
     private final Map<TaskId, Task> allTasksPerId = new TreeMap<>();
     private final Map<TaskId, Task> readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId);
     private final Collection<Task> readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values());
+    private final Collection<Task> misbehavingTasks = new HashSet<>();
+    private final Collection<Task> taskJail = new HashSet<>();

Review comment:
       working title

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable throwable) {
 
     private void handleStreamsUncaughtException(final Throwable throwable,
                                                 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+        if (throwable instanceof NamedTopologyStreamsException) {
+            String name = ((NamedTopologyStreamsException) throwable).getTopologyName();
+            ((StreamThread) Thread.currentThread()).deprioritizeNamedTopology(name);

Review comment:
       The thread is guaranteed to be handled on the dying thread. Though we might want to call this on the new thread

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable throwable) {
 
     private void handleStreamsUncaughtException(final Throwable throwable,
                                                 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+        if (throwable instanceof NamedTopologyStreamsException) {

Review comment:
       we should consider exceptions that were not StreamExceptions as well I think 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -304,4 +310,26 @@ void addTask(final Task task) {
         }
         allTasksPerId.put(task.id(), task);
     }
+
+    public void deprioritizeNamedTopology(String name) {
+        for (Task task: readOnlyTasks) {
+            if (task.id().namedTopology().equals(name)){
+                misbehavingTasks.add(task);
+            }
+        }
+        for (Task task: misbehavingTasks) {

Review comment:
       I think suspending will work. however I am not sure it will survive thread restarts. I may need to store the topology names that are in each state to repopulate each of these lists in new threads




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 closed pull request #10544: KAFKA-12648: minimum changes for error handling namedTopologies

Posted by GitBox <gi...@apache.org>.
wcarlson5 closed pull request #10544:
URL: https://github.com/apache/kafka/pull/10544


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10544: KAFKA-12648: minimum changes for error handling namedTopologies

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10544:
URL: https://github.com/apache/kafka/pull/10544#discussion_r617027950



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable throwable) {
 
     private void handleStreamsUncaughtException(final Throwable throwable,
                                                 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+        if (throwable instanceof NamedTopologyStreamsException) {

Review comment:
       Agreed

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable throwable) {
 
     private void handleStreamsUncaughtException(final Throwable throwable,
                                                 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+        if (throwable instanceof NamedTopologyStreamsException) {
+            String name = ((NamedTopologyStreamsException) throwable).getTopologyName();
+            ((StreamThread) Thread.currentThread()).deprioritizeNamedTopology(name);

Review comment:
       I think it's fine to lose some state after a restart, presumably we'll just start up with all queries assumed to be good and then re-de-prioritize any that are still failing. We can iterate on this and let ksql persist this info in the command topic and give Streams hints when starting up that a query may still be failing, but just tracking it in-memory seems good enough for a first pass




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10544: KAFKA-12648: minimum changes for error handling namedTopologies

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10544:
URL: https://github.com/apache/kafka/pull/10544#discussion_r614398250



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable throwable) {
 
     private void handleStreamsUncaughtException(final Throwable throwable,
                                                 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+        if (throwable instanceof NamedTopologyStreamsException) {
+            String name = ((NamedTopologyStreamsException) throwable).getTopologyName();
+            ((StreamThread) Thread.currentThread()).deprioritizeNamedTopology(name);

Review comment:
       The thread is guaranteed to be handled on the dying thread. Though we might want to call this on the new thread
   
   EDIT: we def will need to track the failed topologies in `KafkaStreams`, though tracking successes will be an issue




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org