You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/26 11:44:30 UTC

[GitHub] [kafka] mimaison commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

mimaison commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r792539420



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -364,9 +365,16 @@ private boolean sendRecords() {
                     producerRecord,
                     (recordMetadata, e) -> {
                         if (e != null) {
-                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
-                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                            producerSendException.compareAndSet(null, e);
+                            if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {

Review comment:
       We can use `==` to compare enums.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -229,6 +246,13 @@ public synchronized boolean withinToleranceLimits() {
         }
     }
 
+    // For source connectors that want to skip kafka producer errors.
+    // They cannot use withinToleranceLimits() as no failure may have actually occurred prior to the producer failing
+    // to write to kafka.
+    public synchronized ToleranceType getErrorToleranceType() {

Review comment:
       Does this need to be `synchronized`?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -222,6 +222,13 @@ private void createWorkerTask() {
         createWorkerTask(TargetState.STARTED);
     }
 
+    private void createWorkerTaskWithErrorToleration() {

Review comment:
       Can we reuse the `createWorkerTask()` method just below by passing a `RetryWithToleranceOperator` argument instead of creating the `WorkerSourceTask` object here?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -364,9 +365,16 @@ private boolean sendRecords() {
                     producerRecord,
                     (recordMetadata, e) -> {
                         if (e != null) {
-                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
-                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                            producerSendException.compareAndSet(null, e);
+                            if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
+                                // executeFailed here allows the use of existing logging infrastructure/configuration
+                                retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class,
+                                        preTransformRecord, e);
+                                commitTaskRecord(preTransformRecord, null);

Review comment:
       Should we have a debug/trace log in this path?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -111,6 +111,23 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi
         return errantRecordFuture;
     }
 
+    public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass,
+                                                   SourceRecord sourceRecord,
+                                                   Throwable error) {
+
+        markAsFailed();
+        context.sourceRecord(sourceRecord);
+        context.currentContext(stage, executingClass);
+        context.error(error);
+        errorHandlingMetrics.recordFailure();
+        Future<Void> errantRecordFuture = context.report();
+        if (!withinToleranceLimits()) {
+            errorHandlingMetrics.recordError();
+            throw new ConnectException("Tolerance exceeded in error handler", error);

Review comment:
       Now that this message can come from 2 different paths, should we add some context to the message to disambiguate them?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -815,6 +822,32 @@ public void testSendRecordsTaskCommitRecordFail() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSourceTaskIgnoresProducerException() throws Exception {
+        createWorkerTaskWithErrorToleration();
+        expectTopicCreation(TOPIC);
+
+        // send two records
+        // record 1 will succeed
+        // record 2 will invoke the producer's failure callback, but ignore the exception via retryOperator
+        // and no ConnectException will be thrown
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+
+        expectSendRecordOnce();
+        expectSendRecordProducerCallbackFail();
+        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.anyObject(RecordMetadata.class));

Review comment:
       Instead of `EasyMock.anyObject(RecordMetadata.class)` should we use `EasyMock.isNull()` to assert we indeed pass `null` to the task in case there was a failure?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org