You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/15 21:26:39 UTC

[GitHub] [kafka] wcarlson5 commented on a change in pull request #10544: KAFKA-12648: minimum changes for error handling namedTopologies

wcarlson5 commented on a change in pull request #10544:
URL: https://github.com/apache/kafka/pull/10544#discussion_r614398757



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1198,7 +1210,7 @@ int process(final int maxNumRecords, final Time time) {
                 task.recordProcessBatchTime(now - then);
             }
         }
-
+        reprioritizeTasks();

Review comment:
       if processing succeeds 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -43,6 +43,8 @@
     private final Map<TaskId, Task> allTasksPerId = new TreeMap<>();
     private final Map<TaskId, Task> readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId);
     private final Collection<Task> readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values());
+    private final Collection<Task> misbehavingTasks = new HashSet<>();
+    private final Collection<Task> taskJail = new HashSet<>();

Review comment:
       working title

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable throwable) {
 
     private void handleStreamsUncaughtException(final Throwable throwable,
                                                 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+        if (throwable instanceof NamedTopologyStreamsException) {
+            String name = ((NamedTopologyStreamsException) throwable).getTopologyName();
+            ((StreamThread) Thread.currentThread()).deprioritizeNamedTopology(name);

Review comment:
       The thread is guaranteed to be handled on the dying thread. Though we might want to call this on the new thread

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable throwable) {
 
     private void handleStreamsUncaughtException(final Throwable throwable,
                                                 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+        if (throwable instanceof NamedTopologyStreamsException) {

Review comment:
       we should consider exceptions that were not StreamExceptions as well I think 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -304,4 +310,26 @@ void addTask(final Task task) {
         }
         allTasksPerId.put(task.id(), task);
     }
+
+    public void deprioritizeNamedTopology(String name) {
+        for (Task task: readOnlyTasks) {
+            if (task.id().namedTopology().equals(name)){
+                misbehavingTasks.add(task);
+            }
+        }
+        for (Task task: misbehavingTasks) {

Review comment:
       I think suspending will work. however I am not sure it will survive thread restarts. I may need to store the topology names that are in each state to repopulate each of these lists in new threads




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org