You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/25 17:16:05 UTC

[GitHub] [kafka] wcarlson5 commented on a change in pull request #11712: WIP: Put failed tasks to end of processing list

wcarlson5 commented on a change in pull request #11712:
URL: https://github.com/apache/kafka/pull/11712#discussion_r791937041



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -270,6 +278,23 @@ Task task(final TaskId taskId) {
         return readOnlyActiveTasks;
     }
 
+    List<Task> orderedActiveTasks() {
+        return Collections.unmodifiableList(orderedActiveTasks);
+    }
+
+    void moveActiveTasksToTailFor(final String topologyName) {

Review comment:
       I am not sure we want to move all the tasks in a topology? Maybe we can do that by task or sub topology?
   
   maybe topology is best but I will need to think about it a bit

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -270,6 +278,23 @@ Task task(final TaskId taskId) {
         return readOnlyActiveTasks;
     }
 
+    List<Task> orderedActiveTasks() {
+        return Collections.unmodifiableList(orderedActiveTasks);
+    }
+
+    void moveActiveTasksToTailFor(final String topologyName) {
+        final List<Task> tasksToMove = new LinkedList<>();
+        final Iterator<Task> iterator = orderedActiveTasks.iterator();
+        while (iterator.hasNext()) {
+            final Task task = iterator.next();
+            if (task.id().topologyName().equals(topologyName)) {
+                iterator.remove();
+                tasksToMove.add(task);
+            }
+        }
+        orderedActiveTasks.addAll(tasksToMove);

Review comment:
       It might be nice to keep the tasks/topologies that have failed in another list entirely. Then when reprocessing after an exception we can run all the good tasks first and commit them before running the failures. This will be important for EOS as we con't commit only part of a transaction. 
   
   The larger part of that doesn't need to be done it this PR but keeping the groups separate would be nice in my mind




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org