You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2022/07/18 22:06:59 UTC
[kafka] branch 3.2 updated: KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12412)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push:
new ca674d9e17 KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12412)
ca674d9e17 is described below
commit ca674d9e17e249ddc422b54449f847332dc03e97
Author: Christopher L. Shannon <ch...@gmail.com>
AuthorDate: Mon Jul 18 18:06:45 2022 -0400
KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12412)
Make sure to ack all records where produce failed, when a connector's `errors.tolerance` config property is set to `all`. Acking is essential so that the task will continue to commit future record offsets properly and remove the records from internal tracking, preventing a memory leak.
Reviewers: Chris Egerton <fe...@gmail.com>, Randall Hauch <rh...@gmail.com>
---
.../kafka/connect/runtime/WorkerSourceTask.java | 4 ++++
.../kafka/connect/runtime/WorkerSourceTaskTest.java | 21 ++++++++++++++++++---
2 files changed, 22 insertions(+), 3 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 9ce2b8dbb8..a3d9b036c2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -375,6 +375,10 @@ class WorkerSourceTask extends WorkerTask {
// executeFailed here allows the use of existing logging infrastructure/configuration
retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class,
preTransformRecord, e);
+
+ //Ack the record so it will be skipped and offsets are committed
+ submittedRecord.ack();
+ counter.skipRecord();
commitTaskRecord(preTransformRecord, null);
} else {
log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 78db83c7ee..41df088c2a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -835,25 +835,40 @@ public class WorkerSourceTaskTest extends ThreadedTest {
createWorkerTaskWithErrorToleration();
expectTopicCreation(TOPIC);
+ //Use different offsets for each record so we can verify all were committed
+ final Map<String, Object> offset2 = Collections.singletonMap("key", 13);
+
// send two records
// record 1 will succeed
// record 2 will invoke the producer's failure callback, but ignore the exception via retryOperator
// and no ConnectException will be thrown
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
- SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
+ SourceRecord record2 = new SourceRecord(PARTITION, offset2, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ expectOffsetFlush(true);
expectSendRecordOnce();
expectSendRecordProducerCallbackFail();
sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.isNull());
- EasyMock.expectLastCall();
+
+ //As of KAFKA-14079 all offsets should be committed, even for failed records (if ignored)
+ //Only the last offset will be passed to the method as everything up to that point is committed
+ //Before KAFKA-14079 offset 12 would have been passed and not 13 as it would have been unacked
+ offsetWriter.offset(PARTITION, offset2);
+ PowerMock.expectLastCall();
PowerMock.replayAll();
+ //Send records and then commit offsets and verify both were committed and no exception
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
Whitebox.invokeMethod(workerTask, "sendRecords");
+ Whitebox.invokeMethod(workerTask, "updateCommittableOffsets");
+ workerTask.commitOffsets();
PowerMock.verifyAll();
+
+ //Double check to make sure all submitted records were cleared
+ assertEquals(0, ((SubmittedRecords) Whitebox.getInternalState(workerTask,
+ "submittedRecords")).records.size());
}
@Test