You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/16 23:57:42 UTC

[kafka] branch 3.4 updated: KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new c8d0168b1fb KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262)
c8d0168b1fb is described below

commit c8d0168b1fba07f1f782ba862058765df9df6797
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Thu Feb 16 15:51:34 2023 -0800

    KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262)
    
    Reviewers: Chris Egerton <ch...@aiven.io>
---
 .../connect/runtime/AbstractWorkerSourceTask.java  |  4 +-
 .../runtime/ExactlyOnceWorkerSourceTask.java       | 20 ++++++---
 .../kafka/connect/storage/OffsetStorageWriter.java |  7 ---
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java   | 51 +++++++++-------------
 4 files changed, 39 insertions(+), 43 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index ff15f631a73..fb3c04be6cf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -353,8 +353,10 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
                         recordPollReturned(toSend.size(), time.milliseconds() - start);
                     }
                 }
-                if (toSend == null)
+                if (toSend == null) {
+                    batchDispatched();
                     continue;
+                }
                 log.trace("{} About to send {} records to Kafka", this, toSend.size());
                 if (sendRecords()) {
                     batchDispatched();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 2642ae776ac..21f6bd4f59d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -255,10 +255,6 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
 
         long started = time.milliseconds();
 
-        // We might have just aborted a transaction, in which case we'll have to begin a new one
-        // in order to commit offsets
-        maybeBeginTransaction();
-
         AtomicReference<Throwable> flushError = new AtomicReference<>();
         boolean shouldFlush = false;
         try {
@@ -269,6 +265,20 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
         } catch (Throwable e) {
             flushError.compareAndSet(null, e);
         }
+        if (flushError.get() == null && !transactionOpen && !shouldFlush) {
+            // There is no contents on the framework side to commit, so skip the offset flush and producer commit
+            long durationMillis = time.milliseconds() - started;
+            recordCommitSuccess(durationMillis);
+            log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis);
+
+            commitSourceTask();
+            return;
+        }
+
+        // We might have just aborted a transaction, in which case we'll have to begin a new one
+        // in order to commit offsets
+        maybeBeginTransaction();
+
         if (shouldFlush) {
             // Now we can actually write the offsets to the internal topic.
             // No need to track the flush future here since it's guaranteed to complete by the time
@@ -393,7 +403,7 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
         }
 
         private void maybeCommitTransaction(boolean shouldCommit) {
-            if (shouldCommit && (transactionOpen || offsetWriter.willFlush())) {
+            if (shouldCommit) {
                 try (LoggingContext loggingContext = LoggingContext.forOffsets(id)) {
                     commitTransaction();
                 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
index 692669e7544..cb944034db1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
@@ -150,13 +150,6 @@ public class OffsetStorageWriter {
         }
     }
 
-    /**
-     * @return whether there's anything to flush right now.
-     */
-    public synchronized boolean willFlush() {
-        return !data.isEmpty();
-    }
-
     /**
      * Flush the current offsets and clear them from this writer. This is non-blocking: it
      * moves the current set of offsets out of the way, serializes the data, and asynchronously
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 632f2d8f15f..617a644be1f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -216,7 +216,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
         verify(sourceTask, MockitoUtils.anyTimes()).commitRecord(any(), any());
 
         verify(offsetWriter, MockitoUtils.anyTimes()).offset(PARTITION, OFFSET);
-        verify(offsetWriter, MockitoUtils.anyTimes()).willFlush();
         verify(offsetWriter, MockitoUtils.anyTimes()).beginFlush();
         verify(offsetWriter, MockitoUtils.anyTimes()).doFlush(any());
 
@@ -297,9 +296,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
             return null;
         }).when(statusListener).onPause(eq(taskId));
 
-        // The task checks to see if there are offsets to commit before pausing
-        when(offsetWriter.willFlush()).thenReturn(false);
-
         startTaskThread();
 
         assertTrue(pauseLatch.await(5, TimeUnit.SECONDS));
@@ -319,7 +315,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
 
         expectSuccessfulSends();
         expectSuccessfulFlushes();
-        when(offsetWriter.willFlush()).thenReturn(true);
 
         final CountDownLatch pauseLatch = new CountDownLatch(1);
         doAnswer(invocation -> {
@@ -342,7 +337,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
 
         verify(statusListener).onPause(taskId);
         // Task should have flushed offsets for every record poll, once on pause, and once for end-of-life offset commit
-        verifyTransactions(pollCount() + 2);
+        verifyTransactions(pollCount() + 2, pollCount() + 2);
         verifySends();
         verifyPossibleTopicCreation();
         // make sure we didn't poll again after triggering shutdown
@@ -421,7 +416,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
 
         expectSuccessfulSends();
         expectSuccessfulFlushes();
-        when(offsetWriter.willFlush()).thenReturn(true);
 
         startTaskThread();
 
@@ -430,7 +424,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
         awaitShutdown();
 
         // Task should have flushed offsets for every record poll and for end-of-life offset commit
-        verifyTransactions(pollCount() + 1);
+        verifyTransactions(pollCount() + 1, pollCount() + 1);
         verifySends();
         verifyPossibleTopicCreation();
 
@@ -498,7 +492,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
         final CountDownLatch workerStopLatch = new CountDownLatch(1);
         final RuntimeException exception = new RuntimeException();
         // We check if there are offsets that need to be committed before shutting down the task
-        when(offsetWriter.willFlush()).thenReturn(false);
         when(sourceTask.poll()).thenAnswer(invocation -> {
             pollLatch.countDown();
             ConcurrencyUtils.awaitLatch(workerStopLatch, "task was not stopped in time");
@@ -512,7 +505,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
         workerStopLatch.countDown();
         awaitShutdown(false);
 
-        verifyTransactions(0);
+        verifyTransactions(0, 0);
 
         verifyPreflight();
         verifyStartup();
@@ -525,7 +518,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
         // Test that the task handles an empty list of records
         createWorkerTask();
 
-        when(offsetWriter.willFlush()).thenReturn(false);
+        when(offsetWriter.beginFlush()).thenReturn(false);
 
         startTaskThread();
 
@@ -533,7 +526,8 @@ public class ExactlyOnceWorkerSourceTaskTest {
 
         awaitShutdown();
 
-        verifyTransactions(0);
+        // task commits for each empty poll, plus the final commit
+        verifyTransactions(pollCount() + 1, 0);
 
         verifyPreflight();
         verifyStartup();
@@ -550,7 +544,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
 
         expectSuccessfulSends();
         expectSuccessfulFlushes();
-        when(offsetWriter.willFlush()).thenReturn(true);
 
         startTaskThread();
 
@@ -559,7 +552,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
         awaitShutdown();
 
         // Task should have flushed offsets for every record poll, and for end-of-life offset commit
-        verifyTransactions(pollCount() + 1);
+        verifyTransactions(pollCount() + 1, pollCount() + 1);
         verifySends();
         verifyPossibleTopicCreation();
 
@@ -583,7 +576,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
 
         expectSuccessfulSends();
         expectSuccessfulFlushes();
-        when(offsetWriter.willFlush()).thenReturn(true);
 
         startTaskThread();
 
@@ -601,7 +593,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
         awaitShutdown();
 
         // Task should have flushed offsets twice based on offset commit interval, and performed final end-of-life offset commit
-        verifyTransactions(3);
+        verifyTransactions(3, 3);
         verifySends();
         verifyPossibleTopicCreation();
 
@@ -639,7 +631,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
 
         expectSuccessfulSends();
         expectSuccessfulFlushes();
-        when(offsetWriter.willFlush()).thenReturn(true);
 
         TransactionContext transactionContext = workerTask.sourceTaskContext.transactionContext();
 
@@ -681,7 +672,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
     @Test
     public void testCommitFlushSyncCallbackFailure() throws Exception {
         Exception failure = new RecordTooLargeException();
-        when(offsetWriter.willFlush()).thenReturn(true);
         when(offsetWriter.beginFlush()).thenReturn(true);
         when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
             Callback<Void> callback = invocation.getArgument(0);
@@ -694,7 +684,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
     @Test
     public void testCommitFlushAsyncCallbackFailure() throws Exception {
         Exception failure = new RecordTooLargeException();
-        when(offsetWriter.willFlush()).thenReturn(true);
         when(offsetWriter.beginFlush()).thenReturn(true);
         // doFlush delegates its callback to the producer,
         // which delays completing the callback until commitTransaction
@@ -713,7 +702,6 @@ public class ExactlyOnceWorkerSourceTaskTest {
     @Test
     public void testCommitTransactionFailure() throws Exception {
         Exception failure = new RecordTooLargeException();
-        when(offsetWriter.willFlush()).thenReturn(true);
         when(offsetWriter.beginFlush()).thenReturn(true);
         doThrow(failure).when(producer).commitTransaction();
         testCommitFailure(failure, true);
@@ -829,8 +817,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
             return null;
         }).when(sourceTask).start(eq(TASK_PROPS));
 
-        when(offsetWriter.willFlush()).thenReturn(false);
-
+        when(offsetWriter.beginFlush()).thenReturn(false);
         startTaskThread();
 
         // Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
@@ -842,7 +829,8 @@ public class ExactlyOnceWorkerSourceTaskTest {
         awaitShutdown(false);
 
         verify(sourceTask, never()).poll();
-        verifyTransactions(0);
+        // task commit called on shutdown
+        verifyTransactions(1, 0);
         verifySends(0);
 
         verifyPreflight();
@@ -1037,13 +1025,16 @@ public class ExactlyOnceWorkerSourceTaskTest {
         verify(producer, times(count)).send(any(), any());
     }
 
-    private void verifyTransactions(int numBatches) throws InterruptedException, TimeoutException {
-        VerificationMode mode = times(numBatches);
-        verify(producer, mode).beginTransaction();
-        verify(producer, mode).commitTransaction();
-        verify(offsetWriter, mode).beginFlush();
-        verify(offsetWriter, mode).doFlush(any());
-        verify(sourceTask, mode).commit();
+    private void verifyTransactions(int numTaskCommits, int numProducerCommits) throws InterruptedException {
+        // these operations happen on every commit opportunity
+        VerificationMode commitOpportunities = times(numTaskCommits);
+        verify(offsetWriter, commitOpportunities).beginFlush();
+        verify(sourceTask, commitOpportunities).commit();
+        // these operations only happen on non-empty commits
+        VerificationMode commits = times(numProducerCommits);
+        verify(producer, commits).beginTransaction();
+        verify(producer, commits).commitTransaction();
+        verify(offsetWriter, commits).doFlush(any());
     }
 
     private void assertTransactionMetrics(int minimumMaxSizeExpected) {