You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/30 21:38:35 UTC

[GitHub] [kafka] mjsax opened a new pull request #9800: KAFKA-9274: Fix commit-TimeoutException handling for EOS

mjsax opened a new pull request #9800:
URL: https://github.com/apache/kafka/pull/9800


   If EOS is enabled and the TX commit fails with a timeout,
   we should not process more messages (what is ok for non-EOS)
   because we don't really know the status of the TX.
   If the commit was indeed successful, we won't have an open TX
   and calling send() would fail with anfatal error.
   
   Instead, we should retry the (idempotent) commit of the TX,
   and start a new TX afterwards.
   
   Call for review @vvcephei @abbccdda @guozhangwang @hachikuji @bob-barrett 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #9800: KAFKA-9274: Fix commit-TimeoutException handling for EOS

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #9800:
URL: https://github.com/apache/kafka/pull/9800


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9800: KAFKA-9274: Fix commit-TimeoutException handling for EOS

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9800:
URL: https://github.com/apache/kafka/pull/9800#issuecomment-754980147


   > which part of the change is triggering a retry of TX commit?
   
   The main processing loop does, ie, we don't really need any change to get a retry. In each loop we iterate over all tasks, and if the tasks flag "needCommit" is set to `true` it would retry to commit the task (note, that we don't reset the flag to `false` if we hit a timeout) when commit-interval-ms passed. I though it's ok to just "pause" processing for a full commit interval for this case -- we could of course also set `commitRequested=true` to retry the commit directly in the next iteration instead of waiting for the next commit-interval to pass. Thoughts?
   
   > I assume isProcessable will return false once we are already within a transaction
   
   No. Before this change, `isProcessable()` returns `true` if there is available data in our internal buffer. It does not know anything about TX.
   
   > how could we break out of this check
   
   Not sure if I can follow?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9800: KAFKA-9274: Fix commit-TimeoutException handling for EOS

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9800:
URL: https://github.com/apache/kafka/pull/9800#discussion_r552274256



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -633,6 +636,10 @@ public boolean isProcessable(final long wallClockTime) {
             return false;
         }
 
+        if (hasPendingTransaction) {

Review comment:
       Sure. Just realizing that the variable name is not ideal -- it actually about an "pending TX _commit_"...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9800: KAFKA-9274: Fix commit-TimeoutException handling for EOS

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9800:
URL: https://github.com/apache/kafka/pull/9800#discussion_r552275624



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -408,9 +417,74 @@ public void shouldProcessInOrder() {
         assertEquals(3, source2.numReceived);
     }
 
+    @Test
+    public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
+        task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST);
+
+        assertFalse(task.process(time.milliseconds()));
+
+        task.addRecords(partition1, asList(
+            getConsumerRecord(partition1, 10),
+            getConsumerRecord(partition1, 20),
+            getConsumerRecord(partition1, 30)
+        ));
+
+        assertTrue(task.process(time.milliseconds()));
+        task.prepareCommit();
+        assertTrue(task.process(time.milliseconds()));
+        task.postCommit(false);
+        assertTrue(task.process(time.milliseconds()));
+
+        assertFalse(task.process(time.milliseconds()));
+    }
+
+    @Test
+    public void shouldNotProcessRecordsAfterPrepareCommitWhenEosAlphaEnabled() {
+        task = createStatelessTask(createConfig(StreamsConfig.EXACTLY_ONCE, "0"), StreamsConfig.METRICS_LATEST);
+
+        assertFalse(task.process(time.milliseconds()));
+
+        task.addRecords(partition1, asList(
+            getConsumerRecord(partition1, 10),
+            getConsumerRecord(partition1, 20),
+            getConsumerRecord(partition1, 30)
+        ));
+
+        assertTrue(task.process(time.milliseconds()));
+        task.prepareCommit();
+        assertFalse(task.process(time.milliseconds()));
+        task.postCommit(false);
+        assertTrue(task.process(time.milliseconds()));
+        assertTrue(task.process(time.milliseconds()));

Review comment:
       We need to call `process()` twice here. We have 3 input records, and want to confirm that all three are processed. If we don't call `process()` two times here, the assert in L460 below would not return `false`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9800: KAFKA-9274: Fix commit-TimeoutException handling for EOS

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9800:
URL: https://github.com/apache/kafka/pull/9800#discussion_r552319865



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -408,9 +417,74 @@ public void shouldProcessInOrder() {
         assertEquals(3, source2.numReceived);
     }
 
+    @Test
+    public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
+        task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST);

Review comment:
       Thinking about it, the common test logic is so small, it seems to make the test hard to read if we extract it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9800: KAFKA-9274: Fix commit-TimeoutException handling for EOS

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9800:
URL: https://github.com/apache/kafka/pull/9800#discussion_r551462812



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -633,6 +636,10 @@ public boolean isProcessable(final long wallClockTime) {
             return false;
         }
 
+        if (hasPendingTransaction) {

Review comment:
       Could we add a comment here to explain why a task is not processable under pending transaction?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -408,9 +417,74 @@ public void shouldProcessInOrder() {
         assertEquals(3, source2.numReceived);
     }
 
+    @Test
+    public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
+        task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST);
+
+        assertFalse(task.process(time.milliseconds()));
+
+        task.addRecords(partition1, asList(
+            getConsumerRecord(partition1, 10),
+            getConsumerRecord(partition1, 20),
+            getConsumerRecord(partition1, 30)
+        ));
+
+        assertTrue(task.process(time.milliseconds()));
+        task.prepareCommit();
+        assertTrue(task.process(time.milliseconds()));
+        task.postCommit(false);
+        assertTrue(task.process(time.milliseconds()));
+
+        assertFalse(task.process(time.milliseconds()));
+    }
+
+    @Test
+    public void shouldNotProcessRecordsAfterPrepareCommitWhenEosAlphaEnabled() {
+        task = createStatelessTask(createConfig(StreamsConfig.EXACTLY_ONCE, "0"), StreamsConfig.METRICS_LATEST);
+
+        assertFalse(task.process(time.milliseconds()));
+
+        task.addRecords(partition1, asList(
+            getConsumerRecord(partition1, 10),
+            getConsumerRecord(partition1, 20),
+            getConsumerRecord(partition1, 30)
+        ));
+
+        assertTrue(task.process(time.milliseconds()));
+        task.prepareCommit();
+        assertFalse(task.process(time.milliseconds()));
+        task.postCommit(false);
+        assertTrue(task.process(time.milliseconds()));
+        assertTrue(task.process(time.milliseconds()));

Review comment:
       Why do we need to assert twice?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -408,9 +417,74 @@ public void shouldProcessInOrder() {
         assertEquals(3, source2.numReceived);
     }
 
+    @Test
+    public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
+        task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST);

Review comment:
       nit: we may extract the common test logic to be reused by all 3 process record tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org