You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/09 01:57:32 UTC

[GitHub] [kafka] ableegoldman opened a new pull request, #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit

ableegoldman opened a new pull request, #12835:
URL: https://github.com/apache/kafka/pull/12835

   Add a new `#transactionInFlight` API to the StreamsProducer to expose the flag of the same name, then check whether there is an open transaction when we determine whether or not to perform a commit in TaskExecutor. This is to avoid unnecessarily dropping out of the group on transaction timeout in the case a transaction was begun outside of regular processing, eg when a punctuator forwards records but there are no newly consumer records and thus no new offsets to commit
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ableegoldman commented on pull request #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on PR #12835:
URL: https://github.com/apache/kafka/pull/12835#issuecomment-1312298957

   All tests passing now, except for a handful of unrelated/flaky tests
   
   Filed https://issues.apache.org/jira/browse/KAFKA-14384
   
   @cadonna I'll wait for a final pass/approval from you before merging, or feel free to merge it yourself of course. I wonder if it's worth backporting to earlier versions? Maybe at least to 3.3 since we haven't done the bugfix release yet?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit

Posted by GitBox <gi...@apache.org>.
cadonna commented on PR #12835:
URL: https://github.com/apache/kafka/pull/12835#issuecomment-1313290404

   Build failures are not related:
   
   ```
   Build / JDK 11 and Scala 2.13 / kafka.api.ProducerIdExpirationTest.testProducerIdExpirationWithNoTransactions(String).quorum=zk
   Build / JDK 11 and Scala 2.13 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.integration.SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff
   Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsExpirationTest.testTransactionAfterProducerIdExpires(String).quorum=zk
   Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ableegoldman commented on a diff in pull request #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on code in PR #12835:
URL: https://github.com/apache/kafka/pull/12835#discussion_r1018583064


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -175,14 +175,14 @@ void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, OffsetAndMet
 
         final Set<TaskId> corruptedTasks = new HashSet<>();
 
-        if (!offsetsPerTask.isEmpty()) {
-            if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
-                for (final Map.Entry<Task, Map<TopicPartition, OffsetAndMetadata>> taskToCommit : offsetsPerTask.entrySet()) {
-                    final Task task = taskToCommit.getKey();
+        if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
+            for (final Task task : taskManager.activeTaskIterable()) {
+                final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = offsetsPerTask.get(task);
+                if (offsetsToCommit != null || taskManager.streamsProducerForTask(task.id()).transactionInFlight()) {
                     try {
                         taskManager.streamsProducerForTask(task.id())
-                            .commitTransaction(taskToCommit.getValue(), taskManager.mainConsumer().groupMetadata());
-                        updateTaskCommitMetadata(taskToCommit.getValue());
+                            .commitTransaction(offsetsToCommit, taskManager.consumerGroupMetadata());

Review Comment:
   ah, yes, we should pass in an empty map not `null`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12835:
URL: https://github.com/apache/kafka/pull/12835#discussion_r1018815885


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java:
##########
@@ -35,4 +45,47 @@ public void testPunctuateWithPause() {
         taskExecutor.punctuate();
         verify(tasks).activeTasks();
     }
+
+    @Test
+    public void testCommitWithOpenTransactionButNoOffsetsEOSV2() {
+        final Tasks tasks = mock(Tasks.class);
+        final TaskManager taskManager = mock(TaskManager.class);
+        final ConsumerGroupMetadata groupMetadata = mock(ConsumerGroupMetadata.class);
+        when(taskManager.consumerGroupMetadata()).thenReturn(groupMetadata);
+
+        final TaskExecutionMetadata metadata = mock(TaskExecutionMetadata.class);
+        final StreamsProducer producer = mock(StreamsProducer.class);
+        when(metadata.processingMode()).thenReturn(EXACTLY_ONCE_V2);
+        when(taskManager.threadProducer()).thenReturn(producer);
+        when(producer.transactionInFlight()).thenReturn(true);
+
+        final TaskExecutor taskExecutor = new TaskExecutor(tasks, taskManager, metadata, new LogContext());
+        taskExecutor.commitOffsetsOrTransaction(Collections.emptyMap());
+
+        verify(producer).commitTransaction(Collections.emptyMap(), groupMetadata);
+    }
+
+    @Test
+    public void testCommitWithOpenTransactionButNoOffsetsEOSV1() {
+        final TaskId taskId = new TaskId(0, 0);
+        final Task task = mock(Task.class);
+        when(task.id()).thenReturn(taskId);

Review Comment:
   Do you have any concerns regarding using this helper methods to setup mocks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna merged pull request #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit

Posted by GitBox <gi...@apache.org>.
cadonna merged PR #12835:
URL: https://github.com/apache/kafka/pull/12835


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12835:
URL: https://github.com/apache/kafka/pull/12835#discussion_r1018813558


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -191,43 +191,49 @@ void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, OffsetAndMet
                         corruptedTasks.add(task.id());
                     }
                 }
-            } else {
-                final Map<TopicPartition, OffsetAndMetadata> allOffsets = offsetsPerTask.values().stream()
-                    .flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+            }
+        } else {
+            final Map<TopicPartition, OffsetAndMetadata> allOffsets = offsetsPerTask.values().stream()
+                .flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+            if (executionMetadata.processingMode() == EXACTLY_ONCE_V2) {
+                if (!offsetsPerTask.isEmpty() || taskManager.threadProducer().transactionInFlight()) {

Review Comment:
   Yes, moving `allOffsets` inside the branch is fine with me.
   However, I actually proposed to extract each branch to a separate method, so that you have something like:
   
   ```java
   if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
       commitForEosv1(...);
   } else if (executionMetadata.processingMode() == EXACTLY_ONCE_V2) {
       commitForEosv2(...);
   } else {
       commitForAlos(...);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ableegoldman commented on a diff in pull request #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on code in PR #12835:
URL: https://github.com/apache/kafka/pull/12835#discussion_r1022442137


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -191,43 +191,49 @@ void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, OffsetAndMet
                         corruptedTasks.add(task.id());
                     }
                 }
-            } else {
-                final Map<TopicPartition, OffsetAndMetadata> allOffsets = offsetsPerTask.values().stream()
-                    .flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+            }
+        } else {
+            final Map<TopicPartition, OffsetAndMetadata> allOffsets = offsetsPerTask.values().stream()
+                .flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+            if (executionMetadata.processingMode() == EXACTLY_ONCE_V2) {
+                if (!offsetsPerTask.isEmpty() || taskManager.threadProducer().transactionInFlight()) {

Review Comment:
   Ah I see. Will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ableegoldman commented on a diff in pull request #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on code in PR #12835:
URL: https://github.com/apache/kafka/pull/12835#discussion_r1018584439


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -191,43 +191,49 @@ void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, OffsetAndMet
                         corruptedTasks.add(task.id());
                     }
                 }
-            } else {
-                final Map<TopicPartition, OffsetAndMetadata> allOffsets = offsetsPerTask.values().stream()
-                    .flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+            }
+        } else {
+            final Map<TopicPartition, OffsetAndMetadata> allOffsets = offsetsPerTask.values().stream()
+                .flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+            if (executionMetadata.processingMode() == EXACTLY_ONCE_V2) {
+                if (!offsetsPerTask.isEmpty() || taskManager.threadProducer().transactionInFlight()) {

Review Comment:
   While breaking up each processing mode into its own separate branch as you requested, I decided to move the `allOffsets` map inside this if condition -- ie it still uses the `offsetsPerTask` map for the emptiness check. I'm assuming this is fine since it's just a nit but lmk if you still prefer to do the check on the `allOffsets` map instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12835:
URL: https://github.com/apache/kafka/pull/12835#discussion_r1017658640


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -175,14 +175,14 @@ void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, OffsetAndMet
 
         final Set<TaskId> corruptedTasks = new HashSet<>();
 
-        if (!offsetsPerTask.isEmpty()) {
-            if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
-                for (final Map.Entry<Task, Map<TopicPartition, OffsetAndMetadata>> taskToCommit : offsetsPerTask.entrySet()) {
-                    final Task task = taskToCommit.getKey();
+        if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
+            for (final Task task : taskManager.activeTaskIterable()) {
+                final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = offsetsPerTask.get(task);
+                if (offsetsToCommit != null || taskManager.streamsProducerForTask(task.id()).transactionInFlight()) {
                     try {
                         taskManager.streamsProducerForTask(task.id())
-                            .commitTransaction(taskToCommit.getValue(), taskManager.mainConsumer().groupMetadata());
-                        updateTaskCommitMetadata(taskToCommit.getValue());
+                            .commitTransaction(offsetsToCommit, taskManager.consumerGroupMetadata());

Review Comment:
   Wouldn't this lead to an NPE if there are no offsets to commit but a transaction is in flight?
   See https://github.com/apache/kafka/blob/7387a1162533c52c83fb8899a31c505c42e0fe10/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L752



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -191,43 +191,49 @@ void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, OffsetAndMet
                         corruptedTasks.add(task.id());
                     }
                 }
-            } else {
-                final Map<TopicPartition, OffsetAndMetadata> allOffsets = offsetsPerTask.values().stream()
-                    .flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+            }
+        } else {
+            final Map<TopicPartition, OffsetAndMetadata> allOffsets = offsetsPerTask.values().stream()
+                .flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+            if (executionMetadata.processingMode() == EXACTLY_ONCE_V2) {
+                if (!offsetsPerTask.isEmpty() || taskManager.threadProducer().transactionInFlight()) {

Review Comment:
   nit: 
   ```suggestion
                   if (!allOffsets.isEmpty() || taskManager.threadProducer().transactionInFlight()) {
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -175,14 +175,14 @@ void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, OffsetAndMet
 
         final Set<TaskId> corruptedTasks = new HashSet<>();
 
-        if (!offsetsPerTask.isEmpty()) {
-            if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
-                for (final Map.Entry<Task, Map<TopicPartition, OffsetAndMetadata>> taskToCommit : offsetsPerTask.entrySet()) {
-                    final Task task = taskToCommit.getKey();
+        if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {

Review Comment:
   nit:
   Could you extract each branch to its own method so that for each processing mode we have a method that commits offsets or transaction. I think that would improve readability. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java:
##########
@@ -35,4 +45,47 @@ public void testPunctuateWithPause() {
         taskExecutor.punctuate();
         verify(tasks).activeTasks();
     }
+
+    @Test
+    public void testCommitWithOpenTransactionButNoOffsetsEOSV2() {
+        final Tasks tasks = mock(Tasks.class);
+        final TaskManager taskManager = mock(TaskManager.class);
+        final ConsumerGroupMetadata groupMetadata = mock(ConsumerGroupMetadata.class);
+        when(taskManager.consumerGroupMetadata()).thenReturn(groupMetadata);
+
+        final TaskExecutionMetadata metadata = mock(TaskExecutionMetadata.class);
+        final StreamsProducer producer = mock(StreamsProducer.class);
+        when(metadata.processingMode()).thenReturn(EXACTLY_ONCE_V2);
+        when(taskManager.threadProducer()).thenReturn(producer);
+        when(producer.transactionInFlight()).thenReturn(true);
+
+        final TaskExecutor taskExecutor = new TaskExecutor(tasks, taskManager, metadata, new LogContext());
+        taskExecutor.commitOffsetsOrTransaction(Collections.emptyMap());
+
+        verify(producer).commitTransaction(Collections.emptyMap(), groupMetadata);
+    }
+
+    @Test
+    public void testCommitWithOpenTransactionButNoOffsetsEOSV1() {
+        final TaskId taskId = new TaskId(0, 0);
+        final Task task = mock(Task.class);
+        when(task.id()).thenReturn(taskId);

Review Comment:
   ```suggestion
           final StreamTask task = statelessTask(new TaskId(0, 0)).build();
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java:
##########
@@ -35,4 +45,47 @@ public void testPunctuateWithPause() {
         taskExecutor.punctuate();
         verify(tasks).activeTasks();
     }
+
+    @Test
+    public void testCommitWithOpenTransactionButNoOffsetsEOSV2() {
+        final Tasks tasks = mock(Tasks.class);
+        final TaskManager taskManager = mock(TaskManager.class);
+        final ConsumerGroupMetadata groupMetadata = mock(ConsumerGroupMetadata.class);
+        when(taskManager.consumerGroupMetadata()).thenReturn(groupMetadata);
+
+        final TaskExecutionMetadata metadata = mock(TaskExecutionMetadata.class);
+        final StreamsProducer producer = mock(StreamsProducer.class);
+        when(metadata.processingMode()).thenReturn(EXACTLY_ONCE_V2);
+        when(taskManager.threadProducer()).thenReturn(producer);
+        when(producer.transactionInFlight()).thenReturn(true);
+
+        final TaskExecutor taskExecutor = new TaskExecutor(tasks, taskManager, metadata, new LogContext());
+        taskExecutor.commitOffsetsOrTransaction(Collections.emptyMap());
+
+        verify(producer).commitTransaction(Collections.emptyMap(), groupMetadata);
+    }
+
+    @Test
+    public void testCommitWithOpenTransactionButNoOffsetsEOSV1() {
+        final TaskId taskId = new TaskId(0, 0);
+        final Task task = mock(Task.class);
+        when(task.id()).thenReturn(taskId);
+
+        final Tasks tasks = mock(Tasks.class);
+        final ConsumerGroupMetadata groupMetadata = mock(ConsumerGroupMetadata.class);
+        final TaskManager taskManager = mock(TaskManager.class);
+        when(taskManager.activeTaskIterable()).thenReturn(Collections.singletonList(task));
+        when(taskManager.consumerGroupMetadata()).thenReturn(groupMetadata);
+
+        final StreamsProducer producer = mock(StreamsProducer.class);
+        final TaskExecutionMetadata metadata = mock(TaskExecutionMetadata.class);
+        when(metadata.processingMode()).thenReturn(EXACTLY_ONCE_ALPHA);
+        when(taskManager.streamsProducerForTask(taskId)).thenReturn(producer);
+        when(producer.transactionInFlight()).thenReturn(true);
+
+        final TaskExecutor taskExecutor = new TaskExecutor(tasks, taskManager, metadata, new LogContext());
+        taskExecutor.commitOffsetsOrTransaction(Collections.emptyMap());
+
+        verify(producer).commitTransaction(Mockito.isNull(), eq(groupMetadata));

Review Comment:
   If my comment about the NPE above is correct, this verification needs to be changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12835:
URL: https://github.com/apache/kafka/pull/12835#discussion_r1018816579


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4450,6 +4448,10 @@ public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
         task01.setCommittableOffsetsAndMetadata(offsetsT01);
         final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true);
 
+        final Tasks tasks = new Tasks(new LogContext());
+        tasks.addActiveTasks(asList(task00, task01, task02));
+        final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false);

Review Comment:
   I would use a mock for `Tasks`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ableegoldman commented on a diff in pull request #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on code in PR #12835:
URL: https://github.com/apache/kafka/pull/12835#discussion_r1022441487


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java:
##########
@@ -35,4 +45,47 @@ public void testPunctuateWithPause() {
         taskExecutor.punctuate();
         verify(tasks).activeTasks();
     }
+
+    @Test
+    public void testCommitWithOpenTransactionButNoOffsetsEOSV2() {
+        final Tasks tasks = mock(Tasks.class);
+        final TaskManager taskManager = mock(TaskManager.class);
+        final ConsumerGroupMetadata groupMetadata = mock(ConsumerGroupMetadata.class);
+        when(taskManager.consumerGroupMetadata()).thenReturn(groupMetadata);
+
+        final TaskExecutionMetadata metadata = mock(TaskExecutionMetadata.class);
+        final StreamsProducer producer = mock(StreamsProducer.class);
+        when(metadata.processingMode()).thenReturn(EXACTLY_ONCE_V2);
+        when(taskManager.threadProducer()).thenReturn(producer);
+        when(producer.transactionInFlight()).thenReturn(true);
+
+        final TaskExecutor taskExecutor = new TaskExecutor(tasks, taskManager, metadata, new LogContext());
+        taskExecutor.commitOffsetsOrTransaction(Collections.emptyMap());
+
+        verify(producer).commitTransaction(Collections.emptyMap(), groupMetadata);
+    }
+
+    @Test
+    public void testCommitWithOpenTransactionButNoOffsetsEOSV1() {
+        final TaskId taskId = new TaskId(0, 0);
+        final Task task = mock(Task.class);
+        when(task.id()).thenReturn(taskId);

Review Comment:
   Nope, wasn't aware of this helper method's existence -- thanks for the tip



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ableegoldman commented on pull request #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on PR #12835:
URL: https://github.com/apache/kafka/pull/12835#issuecomment-1314909856

   Sorry Bruno! I did miss those two comments it seems, apologies for that. I'll go back over your feedback and open a quick followup PR to make sure everything is addressed. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org