You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/24 22:24:17 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #11675: KAFKA-12648: POC for committing tasks on error

mjsax commented on a change in pull request #11675:
URL: https://github.com/apache/kafka/pull/11675#discussion_r791199995



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1321,11 +1322,22 @@ int process(final int maxNumRecords, final Time time) {
                 totalProcessed += processed;
                 task.recordProcessBatchTime(now - then);
             }
+            successfullyProcessed.add(task);
         }
-
+        successfullyProcessed.clear();
         return totalProcessed;
     }
 
+    public int commitSuccessfullyProcessedTasks() {
+        final int committed = commit(successfullyProcessed);

Review comment:
       Correct. You need to distinguish between the read and write path here: the producer has no idea from what input partitions we consume, and thus it can only offer a generic `addOffsetsToTransaction` API and it's the user's (ie, our) responsibility to provide the correct offsets. What we write into the output topic has nothing to do with what we consumed from the input though: Even if you omit offsets for some input partitions, you still would have the corresponding pending writes on the output topic, so just omitting the offsets from the map does not help you to abort the pending writes for those partitions. What writes are committed has nothing to do with what offsets you pass to the producer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org