You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/19 08:53:36 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #11675: KAFKA-12648: POC for committing tasks on error

ableegoldman commented on a change in pull request #11675:
URL: https://github.com/apache/kafka/pull/11675#discussion_r787472259



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -84,6 +84,7 @@
     private final StateDirectory stateDirectory;
     private final StreamThread.ProcessingMode processingMode;
     private final Tasks tasks;
+    private final List<Task> successfullyProcessed = new ArrayList<>();

Review comment:
       nit: put this in the `Tasks` class with the other metadata

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -788,7 +788,15 @@ void runOnce() {
              */
             do {
                 log.debug("Processing tasks with {} iterations.", numIterations);
-                final int processed = taskManager.process(numIterations, time);
+                final int processed;
+                try {
+                    processed = taskManager.process(numIterations, time);
+                } catch (final Exception e) {
+                    log.error("encountered an error when processing tasks." +
+                        " Will commit all previously successfully processed tasks", e);
+                    taskManager.commitSuccessfullyProcessedTasks();

Review comment:
       nit: why not just keep this logic within `taskManager#process` and make `#commitSuccessfullyProcessedTasks` private?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1321,11 +1322,22 @@ int process(final int maxNumRecords, final Time time) {
                 totalProcessed += processed;
                 task.recordProcessBatchTime(now - then);
             }
+            successfullyProcessed.add(task);
         }
-
+        successfullyProcessed.clear();
         return totalProcessed;
     }
 
+    public int commitSuccessfullyProcessedTasks() {
+        final int committed = commit(successfullyProcessed);

Review comment:
       @mjsax please double check my understanding here since it's been a while since I thought about/worked with this stuff, but IIRC it's important to note that under the current Producer semantics at least, we can only do this kind of partial commit when using ALOS or EOS-v1. In KIP-447/EOS-v2, if one task/partition is bad then the transaction will need to be aborted, and thus all partitions/tasks will unfortunately need to be aborted as well. 
   
   The API is a bit misleading since it implies you can choose which offsets to send to the transaction, but I recall @mjsax mentioning that the transaction is applied across all partitions at a time

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1321,11 +1322,22 @@ int process(final int maxNumRecords, final Time time) {
                 totalProcessed += processed;
                 task.recordProcessBatchTime(now - then);
             }
+            successfullyProcessed.add(task);
         }
-
+        successfullyProcessed.clear();

Review comment:
       nit: if we just clear it at the beginning of `#process` then we only need to clear it in one place, vs clearing it here and also in `#commitSuccessfullyProcessedTasks`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org