You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2022/07/26 23:30:29 UTC

[kafka] branch 3.3 updated: KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12415)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 64ebc76df9 KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12415)
64ebc76df9 is described below

commit 64ebc76df96cdc9cea419c0ab09cfa82d0ca743c
Author: Christopher L. Shannon <ch...@gmail.com>
AuthorDate: Mon Jul 18 18:07:20 2022 -0400

    KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12415)
    
    Make sure to ack all records where produce failed, when a connector's `errors.tolerance` config property is set to `all`. Acking is essential so that the task will continue to commit future record offsets properly and remove the records from internal tracking, preventing a memory leak.
    
    (cherry picked and slightly modified from commit 63e06aafd0cf37f8488c3830946051b3a30db2a0)
    
    Reviewers: Chris Egerton <fe...@gmail.com>, Randall Hauch <rh...@gmail.com>
---
 .../connect/runtime/AbstractWorkerSourceTask.java   |  5 +++++
 .../kafka/connect/runtime/WorkerSourceTaskTest.java | 21 ++++++++++++++++++---
 2 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index d89f577688..407f5fd828 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -38,6 +38,7 @@ import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
+import org.apache.kafka.connect.runtime.errors.ToleranceType;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.source.SourceTaskContext;
@@ -406,6 +407,10 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
                             }
                             log.trace("{} Failed record: {}", AbstractWorkerSourceTask.this, preTransformRecord);
                             producerSendFailed(false, producerRecord, preTransformRecord, e);
+                            if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
+                                counter.skipRecord();
+                                submittedRecord.ifPresent(SubmittedRecords.SubmittedRecord::ack);
+                            }
                         } else {
                             counter.completeRecord();
                             log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 5ce0e44f3e..2d2cd00cf5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -696,24 +696,39 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         createWorkerTaskWithErrorToleration();
         expectTopicCreation(TOPIC);
 
+        //Use different offsets for each record so we can verify all were committed
+        final Map<String, Object> offset2 = Collections.singletonMap("key", 13);
+
         // send two records
         // record 1 will succeed
         // record 2 will invoke the producer's failure callback, but ignore the exception via retryOperator
         // and no ConnectException will be thrown
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
+        SourceRecord record2 = new SourceRecord(PARTITION, offset2, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        expectOffsetFlush(true);
         expectSendRecordOnce();
         expectSendRecordProducerCallbackFail();
         sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.isNull());
-        EasyMock.expectLastCall();
+
+        //As of KAFKA-14079 all offsets should be committed, even for failed records (if ignored)
+        //Only the last offset will be passed to the method as everything up to that point is committed
+        //Before KAFKA-14079 offset 12 would have been passed and not 13 as it would have been unacked
+        offsetWriter.offset(PARTITION, offset2);
+        PowerMock.expectLastCall();
 
         PowerMock.replayAll();
 
+        //Send records and then commit offsets and verify both were committed and no exception
         Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
         Whitebox.invokeMethod(workerTask, "sendRecords");
+        Whitebox.invokeMethod(workerTask, "updateCommittableOffsets");
+        workerTask.commitOffsets();
 
         PowerMock.verifyAll();
+
+        //Double check to make sure all submitted records were cleared
+        assertEquals(0, ((SubmittedRecords) Whitebox.getInternalState(workerTask,
+                "submittedRecords")).records.size());
     }
 
     @Test