You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/05 20:18:05 UTC

[GitHub] [kafka] TheKnowles opened a new pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

TheKnowles opened a new pull request #11382:
URL: https://github.com/apache/kafka/pull/11382


   This change allows Source Tasks the option to receive Producer Exceptions and decide what to do with them. Currently the task is just killed if the producer throws an exception via its callback. Subclasses of SourceTask can override ignoreNonRetriableProducerException(), returning true if Connect should simply ignore this record and continue processing. This is helpful is cases where we may want to write off a portion of the record/metadata (if it is too large to write) or log and continue processing rather than dying.
   
   A unit test has been added to validate the producer callback for failure being invoked. The sourceTask will ignore the exception and the task will not be killed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] TheKnowles commented on pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
TheKnowles commented on pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#issuecomment-1022663365


   > Thanks @TheKnowles for the PR. I've made a first pass and left a few comments.
   
   @mimaison Thank you for reviewing. I've replied to each comment above and pushed changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] TheKnowles commented on pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
TheKnowles commented on pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#issuecomment-937864412


   Unrelated tests locally and in jenkins appear flaky. All tests related to this change pass deterministically.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] TheKnowles commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
TheKnowles commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r753523689



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -366,7 +367,11 @@ private boolean sendRecords() {
                         if (e != null) {
                             log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);

Review comment:
       Now that this could be a tolerated error, it makes sense to have it respect the errors.log.enable configuration, but the log line would be duplicated, unconditionally writing it in the event we do not tolerate and a config check if we do. 
   
   Are you envisioning something like this?
   
   ```
   if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
       if (errorLogEnabled) { // get this value from the config in some manner
           log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
           log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
       }
       commitTaskRecord(preTransformRecord, null);
   } else {
       log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
       log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
       producerSendException.compareAndSet(null, e);
   }
   ```
   
   I would need to look more closely at the other layers of objects on top of the SourceTask. enableErrorLog() is available in the ConnectorConfig, but only the SinkConnectorConfig makes use of it. I would need to spin up some additional infrastructure. Not sure if I would want to add WorkerErrantRecordReporter to WorkerSourceTask or have the configuration pass down in some other manner.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] TheKnowles commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
TheKnowles commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r793104975



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -111,6 +111,23 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi
         return errantRecordFuture;
     }
 
+    public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass,
+                                                   SourceRecord sourceRecord,
+                                                   Throwable error) {
+
+        markAsFailed();
+        context.sourceRecord(sourceRecord);
+        context.currentContext(stage, executingClass);
+        context.error(error);
+        errorHandlingMetrics.recordFailure();
+        Future<Void> errantRecordFuture = context.report();
+        if (!withinToleranceLimits()) {
+            errorHandlingMetrics.recordError();
+            throw new ConnectException("Tolerance exceeded in error handler", error);

Review comment:
       I added some context to the string error message denoting it was a Source Worker. I am open to suggestions on how verbose this message should be.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -222,6 +222,13 @@ private void createWorkerTask() {
         createWorkerTask(TargetState.STARTED);
     }
 
+    private void createWorkerTaskWithErrorToleration() {

Review comment:
       +1 I have refactored the constructors to be cleaner with various parameter lists.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -815,6 +822,32 @@ public void testSendRecordsTaskCommitRecordFail() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSourceTaskIgnoresProducerException() throws Exception {
+        createWorkerTaskWithErrorToleration();
+        expectTopicCreation(TOPIC);
+
+        // send two records
+        // record 1 will succeed
+        // record 2 will invoke the producer's failure callback, but ignore the exception via retryOperator
+        // and no ConnectException will be thrown
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+
+        expectSendRecordOnce();
+        expectSendRecordProducerCallbackFail();
+        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.anyObject(RecordMetadata.class));

Review comment:
       +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
mimaison merged pull request #11382:
URL: https://github.com/apache/kafka/pull/11382


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] TheKnowles edited a comment on pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
TheKnowles edited a comment on pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#issuecomment-974141478


   Rebased, squashed, and force pushed for merging post KIP vote. Tests related to this change pass locally. There are a handful of unrelated nondeterministic test failures.
   
   edit: Fixed a related test that was missed until CI picked it up. lastSendFailed state was removed in WorkerSourceTask.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#issuecomment-1023171704


   @TheKnowles Don't worry about squashing everything, it's done automatically when we merge PRs.
   
   Thanks for the quick update, I'll take another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] TheKnowles commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
TheKnowles commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r793529301



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -364,9 +365,16 @@ private boolean sendRecords() {
                     producerRecord,
                     (recordMetadata, e) -> {
                         if (e != null) {
-                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
-                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                            producerSendException.compareAndSet(null, e);
+                            if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
+                                // executeFailed here allows the use of existing logging infrastructure/configuration
+                                retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class,
+                                        preTransformRecord, e);
+                                commitTaskRecord(preTransformRecord, null);

Review comment:
       My misunderstanding, thank you both for the feedback. Update made.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r753311712



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -366,7 +367,11 @@ private boolean sendRecords() {
                         if (e != null) {
                             log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);

Review comment:
       Actually, that may complicate things by causing records to be given to `SourceTask::commitRecord` out of order (a record that caused a producer failure may be committed after a record that was dispatched to the producer after it). So probably best to keep the error-handling logic here, but I do still wonder if we can respect the logging-related configuration properties.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r754408473



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -366,7 +367,11 @@ private boolean sendRecords() {
                         if (e != null) {
                             log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);

Review comment:
       Yes, I was thinking the behavior could be something like that code snippet, although we'd also want to respect the [errors.log.include.messages](https://kafka.apache.org/30/documentation.html#sourceconnectorconfigs_errors.log.include.messages) property and would probably want the format of the error messages to be similar to the error messages we emit in other places where messages are tolerated (such as when conversion or transformation fails).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r792539420



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -364,9 +365,16 @@ private boolean sendRecords() {
                     producerRecord,
                     (recordMetadata, e) -> {
                         if (e != null) {
-                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
-                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                            producerSendException.compareAndSet(null, e);
+                            if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {

Review comment:
       We can use `==` to compare enums.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -229,6 +246,13 @@ public synchronized boolean withinToleranceLimits() {
         }
     }
 
+    // For source connectors that want to skip kafka producer errors.
+    // They cannot use withinToleranceLimits() as no failure may have actually occurred prior to the producer failing
+    // to write to kafka.
+    public synchronized ToleranceType getErrorToleranceType() {

Review comment:
       Does this need to be `synchronized`?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -222,6 +222,13 @@ private void createWorkerTask() {
         createWorkerTask(TargetState.STARTED);
     }
 
+    private void createWorkerTaskWithErrorToleration() {

Review comment:
       Can we reuse the `createWorkerTask()` method just below by passing a `RetryWithToleranceOperator` argument instead of creating the `WorkerSourceTask` object here?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -364,9 +365,16 @@ private boolean sendRecords() {
                     producerRecord,
                     (recordMetadata, e) -> {
                         if (e != null) {
-                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
-                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                            producerSendException.compareAndSet(null, e);
+                            if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
+                                // executeFailed here allows the use of existing logging infrastructure/configuration
+                                retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class,
+                                        preTransformRecord, e);
+                                commitTaskRecord(preTransformRecord, null);

Review comment:
       Should we have a debug/trace log in this path?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -111,6 +111,23 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi
         return errantRecordFuture;
     }
 
+    public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass,
+                                                   SourceRecord sourceRecord,
+                                                   Throwable error) {
+
+        markAsFailed();
+        context.sourceRecord(sourceRecord);
+        context.currentContext(stage, executingClass);
+        context.error(error);
+        errorHandlingMetrics.recordFailure();
+        Future<Void> errantRecordFuture = context.report();
+        if (!withinToleranceLimits()) {
+            errorHandlingMetrics.recordError();
+            throw new ConnectException("Tolerance exceeded in error handler", error);

Review comment:
       Now that this message can come from 2 different paths, should we add some context to the message to disambiguate them?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -815,6 +822,32 @@ public void testSendRecordsTaskCommitRecordFail() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSourceTaskIgnoresProducerException() throws Exception {
+        createWorkerTaskWithErrorToleration();
+        expectTopicCreation(TOPIC);
+
+        // send two records
+        // record 1 will succeed
+        // record 2 will invoke the producer's failure callback, but ignore the exception via retryOperator
+        // and no ConnectException will be thrown
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+
+        expectSendRecordOnce();
+        expectSendRecordProducerCallbackFail();
+        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.anyObject(RecordMetadata.class));

Review comment:
       Instead of `EasyMock.anyObject(RecordMetadata.class)` should we use `EasyMock.isNull()` to assert we indeed pass `null` to the task in case there was a failure?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r793112244



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -364,9 +365,16 @@ private boolean sendRecords() {
                     producerRecord,
                     (recordMetadata, e) -> {
                         if (e != null) {
-                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
-                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                            producerSendException.compareAndSet(null, e);
+                            if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
+                                // executeFailed here allows the use of existing logging infrastructure/configuration
+                                retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class,
+                                        preTransformRecord, e);
+                                commitTaskRecord(preTransformRecord, null);

Review comment:
       We should not be logging at `ERROR` level for every single record if we aren't failing the task unless the user has explicitly enabled this by setting `errors.log.enable` to `true` in their connector config.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#issuecomment-1023514264


   Thanks @TheKnowles for this contribution! Sorry it took so long between getting votes on the KIP and reviews on your PR. This feature will be in the next minor release, Kafka 3.2.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r753308642



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -366,7 +367,11 @@ private boolean sendRecords() {
                         if (e != null) {
                             log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);

Review comment:
       Should we modify this line to respect the [errors.log.enable](https://kafka.apache.org/30/documentation.html#sourceconnectorconfigs_errors.log.enable) (and possibly [errors.log.include.messages](https://kafka.apache.org/30/documentation.html#sourceconnectorconfigs_errors.log.include.messages)) properties?
   
   I wonder if it might be useful to still unconditionally set `producerSendException` (or perhaps even convert that field from an `AtomicReference<Throwable>` to some kind of list, and append to it here) and then modify the contents (and possibly also name) of `maybeThrowProducerSendException` to have our error-handling logic. Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] TheKnowles commented on pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
TheKnowles commented on pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#issuecomment-974141478


   Rebased, squashed, and force pushed for merging post KIP vote. Tests related to this change pass locally. There are a handful of unrelated nondeterministic test failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] TheKnowles commented on pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
TheKnowles commented on pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#issuecomment-1023136203


   Happy to squash and force push once everyone is pleased with the changes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] TheKnowles commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
TheKnowles commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r793104719



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -364,9 +365,16 @@ private boolean sendRecords() {
                     producerRecord,
                     (recordMetadata, e) -> {
                         if (e != null) {
-                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
-                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                            producerSendException.compareAndSet(null, e);
+                            if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {

Review comment:
       +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r793489157



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -364,9 +365,16 @@ private boolean sendRecords() {
                     producerRecord,
                     (recordMetadata, e) -> {
                         if (e != null) {
-                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
-                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                            producerSendException.compareAndSet(null, e);
+                            if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
+                                // executeFailed here allows the use of existing logging infrastructure/configuration
+                                retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class,
+                                        preTransformRecord, e);
+                                commitTaskRecord(preTransformRecord, null);

Review comment:
       Let's keep the existing `trace` and `error` log lines in the `else` block.
   My suggestion is to add a line at the debug or trace level in the `if` block so users can know if an error is ignored.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#issuecomment-1023461101


   @C0urante Do you have further comments or just I merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] TheKnowles commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
TheKnowles commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r793104806



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -364,9 +365,16 @@ private boolean sendRecords() {
                     producerRecord,
                     (recordMetadata, e) -> {
                         if (e != null) {
-                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
-                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                            producerSendException.compareAndSet(null, e);
+                            if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
+                                // executeFailed here allows the use of existing logging infrastructure/configuration
+                                retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class,
+                                        preTransformRecord, e);
+                                commitTaskRecord(preTransformRecord, null);

Review comment:
       Previously it was suggested to have the tolerance operator handle via the logging report. I would personally find it useful to have it in the connect log regardless of tolerance error logging configuration. I've moved the error/debug log lines to above the tolerance check to log in all instances.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -229,6 +246,13 @@ public synchronized boolean withinToleranceLimits() {
         }
     }
 
+    // For source connectors that want to skip kafka producer errors.
+    // They cannot use withinToleranceLimits() as no failure may have actually occurred prior to the producer failing
+    // to write to kafka.
+    public synchronized ToleranceType getErrorToleranceType() {

Review comment:
       It does not. Type is immutable and thread safe. I had dug through the ticket that retroactively made this class thread safe and it seemed like a good idea at the time to slap a synchronized on it to match the rest of the class, but is not necessary at all. Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] TheKnowles commented on pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
TheKnowles commented on pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#issuecomment-937864412


   Unrelated tests locally and in jenkins appear flaky. All tests related to this change pass deterministically.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] TheKnowles commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

Posted by GitBox <gi...@apache.org>.
TheKnowles commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r763062207



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -366,7 +367,11 @@ private boolean sendRecords() {
                         if (e != null) {
                             log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);

Review comment:
       The error retry handling infrastructure predominantly concerns itself with the sink side of the house. Insofar that any refactoring I would want to do would probably necessitate a KIP on its own. To that end, I have added an additional executeFailed() function to RetryWithToleranceOperator to allow the source worker to handle error logging with all of the existing infrastructure/configuration that exists for sink tasks.
   
   I toy'ed around with the idea of having the new executeFailed() fire without a tolerance type check. This would work for failing/ignoring as expected, but with no mechanism to then decide if we should call commitRecord(). We could block on the future from executeFailed() and then check withinToleranceLimits() but that introduces non determinism with interrupt/execution exceptions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org