You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/29 19:33:58 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #12920: KAFKA-14339 : Do not perform producerCommit on serializationError when trying offsetWriter flush

C0urante commented on code in PR #12920:
URL: https://github.com/apache/kafka/pull/12920#discussion_r1035151360


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -282,16 +282,18 @@ private void commitTransaction() {
 
         // Commit the transaction
         // Blocks until all outstanding records have been sent and ack'd
-        try {
-            producer.commitTransaction();
-        } catch (Throwable t) {
-            log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t);
-            flushError.compareAndSet(null, t);
+        Throwable error = flushError.get();
+        if (error == null) {
+            try {
+                producer.commitTransaction();
+            } catch (Throwable t) {
+                log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t);
+                flushError.compareAndSet(null, t);
+            }
         }
-
         transactionOpen = false;

Review Comment:
   Nit: can/should we move `transactionOpen = false` into the `if (error == null)` block above, after the call to `producer.commitTransaction()`? It shouldn't have much effect right now but that may change if we decide to add error tolerance for offset commits with exactly-once support enabled.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -282,16 +282,18 @@ private void commitTransaction() {
 
         // Commit the transaction
         // Blocks until all outstanding records have been sent and ack'd

Review Comment:
   This should probably be moved above the call to `producer::commitTransaction`



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java:
##########
@@ -687,7 +687,26 @@ public void testCommitFlushCallbackFailure() throws Exception {
             callback.onCompletion(failure, null);
             return null;
         });
-        testCommitFailure(failure);
+        testCommitFailure(failure, false);
+    }
+
+    @Test
+    public void testCommitFlushAsyncCallbackFailure() throws Exception {

Review Comment:
   Should we rename `testCommitFlushCallbackFailure` to `testCommitFlushSyncCallbackFailure` to match this?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java:
##########
@@ -687,7 +687,26 @@ public void testCommitFlushCallbackFailure() throws Exception {
             callback.onCompletion(failure, null);
             return null;
         });
-        testCommitFailure(failure);
+        testCommitFailure(failure, false);
+    }
+
+    @Test
+    public void testCommitFlushAsyncCallbackFailure() throws Exception {
+        Exception failure = new RecordTooLargeException();
+        when(offsetWriter.willFlush()).thenReturn(true);
+        when(offsetWriter.beginFlush()).thenReturn(true);
+        // doFlush delegates it's callback to the producer,
+        // which delays completing the callback until commitTransaction
+        AtomicReference<Callback<Void>> callback = new AtomicReference<>();
+        when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
+            callback.set(invocation.getArgument(0));
+            return null;
+        });
+        doAnswer(invocation -> {
+            callback.get().onCompletion(failure, null);
+            throw failure;

Review Comment:
   We already have test coverage for the case where `producer::commitTransaction` throws an exception. Just to be sure that we're testing the offset flush callback instead of the try/catch around the transaction commit, we shouldn't throw anything from here.
   
   ```suggestion
               return null;
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java:
##########
@@ -687,7 +687,26 @@ public void testCommitFlushCallbackFailure() throws Exception {
             callback.onCompletion(failure, null);
             return null;
         });
-        testCommitFailure(failure);
+        testCommitFailure(failure, false);
+    }
+
+    @Test
+    public void testCommitFlushAsyncCallbackFailure() throws Exception {
+        Exception failure = new RecordTooLargeException();
+        when(offsetWriter.willFlush()).thenReturn(true);
+        when(offsetWriter.beginFlush()).thenReturn(true);
+        // doFlush delegates it's callback to the producer,

Review Comment:
   Nit:
   ```suggestion
           // doFlush delegates its callback to the producer,
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -282,16 +282,18 @@ private void commitTransaction() {
 
         // Commit the transaction
         // Blocks until all outstanding records have been sent and ack'd
-        try {
-            producer.commitTransaction();
-        } catch (Throwable t) {
-            log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t);
-            flushError.compareAndSet(null, t);
+        Throwable error = flushError.get();

Review Comment:
   ```suggestion
           Throwable error = flushError.get();
           // Only commit the transaction if we were able to serialize the offsets; otherwise, we may commit source records without committing their offsets
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org