You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/02 16:08:30 UTC

[GitHub] [kafka] wcarlson5 commented on a change in pull request #11712: WIP: Put failed tasks to end of processing list

wcarlson5 commented on a change in pull request #11712:
URL: https://github.com/apache/kafka/pull/11712#discussion_r797772309



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -270,6 +278,23 @@ Task task(final TaskId taskId) {
         return readOnlyActiveTasks;
     }
 
+    List<Task> orderedActiveTasks() {
+        return Collections.unmodifiableList(orderedActiveTasks);
+    }
+
+    void moveActiveTasksToTailFor(final String topologyName) {
+        final List<Task> tasksToMove = new LinkedList<>();
+        final Iterator<Task> iterator = orderedActiveTasks.iterator();
+        while (iterator.hasNext()) {
+            final Task task = iterator.next();
+            if (task.id().topologyName().equals(topologyName)) {
+                iterator.remove();
+                tasksToMove.add(task);
+            }
+        }
+        orderedActiveTasks.addAll(tasksToMove);

Review comment:
       It would be much simpler but unfortunately its not as simple as we first thought. The producer has only one transaction, so the records of the good tasks are mixed in with the records of the failed task and there is no way to separate them. So we need to take the tasks that we know will fail and process all the other tasks without them. That way we continue making progress. We can take the failing tasks and backoff and retry again later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org