You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/10 02:07:24 UTC

[GitHub] [kafka] ableegoldman commented on a diff in pull request #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit

ableegoldman commented on code in PR #12835:
URL: https://github.com/apache/kafka/pull/12835#discussion_r1018583064


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -175,14 +175,14 @@ void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, OffsetAndMet
 
         final Set<TaskId> corruptedTasks = new HashSet<>();
 
-        if (!offsetsPerTask.isEmpty()) {
-            if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
-                for (final Map.Entry<Task, Map<TopicPartition, OffsetAndMetadata>> taskToCommit : offsetsPerTask.entrySet()) {
-                    final Task task = taskToCommit.getKey();
+        if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
+            for (final Task task : taskManager.activeTaskIterable()) {
+                final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = offsetsPerTask.get(task);
+                if (offsetsToCommit != null || taskManager.streamsProducerForTask(task.id()).transactionInFlight()) {
                     try {
                         taskManager.streamsProducerForTask(task.id())
-                            .commitTransaction(taskToCommit.getValue(), taskManager.mainConsumer().groupMetadata());
-                        updateTaskCommitMetadata(taskToCommit.getValue());
+                            .commitTransaction(offsetsToCommit, taskManager.consumerGroupMetadata());

Review Comment:
   ah, yes, we should pass in an empty map not `null`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org