You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/07 22:58:14 UTC

[GitHub] [kafka] aakashnshah opened a new pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

aakashnshah opened a new pull request #8829:
URL: https://github.com/apache/kafka/pull/8829


   Currently, the errant record reporter doesn't take into account the value of `errors.tolerance.` Added a check if the reporter is within the tolerance limits; if not, then a `ConnectException` is thrown. This is essentially what is done across other parts of the `RetryAndToleranceOperator`.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah edited a comment on pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

Posted by GitBox <gi...@apache.org>.
aakashnshah edited a comment on pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#issuecomment-642299655


   I've added a condition to `WorkerSinkTask` in `deliverMessages()` to throw a `ConnectException` if the Errant Record Reporter to account for the case that the developer swallows the exception thrown from `executeFailed(...)` within `put(...)`. Right now, I am storing this information (whether an exception must be thrown and the exception to throw) within `WorkerErrantRecordReporter`, let me know if you guys think this is a good idea of it would be better done within `RetryWithToleranceOperator`. @rhauch @wicknicks 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah edited a comment on pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

Posted by GitBox <gi...@apache.org>.
aakashnshah edited a comment on pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#issuecomment-642299655


   I've added a condition to `WorkerSinkTask` in `deliverMessages()` to throw a `ConnectException` if the Errant Record Reporter to account for the case that the developer swallows the exception thrown from `executeFailed(...)` within `put(...)`. Right now, I am storing this information within `WorkerErrantRecordReporter`, let me know if you guys think this is a good idea of it would be better done within `RetryWithToleranceOperator`. @rhauch @wicknicks 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#issuecomment-642345842


   Docs should be updated in a subsequent PR, but it's probably good to update the KIP to reflect this implementation detail: this feature adheres to the existing DLQ all vs none tolerance, and update the JavaDoc to include a @throws statement


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#discussion_r436411235



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -87,6 +87,12 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi
     public Future<Void> executeFailed(Stage stage, Class<?> executingClass,
                                       ConsumerRecord<byte[], byte[]> consumerRecord,
                                       Throwable error) {
+        if (!withinToleranceLimits()) {
+            errorHandlingMetrics.recordFailure();
+            markAsFailed();
+            throw new ConnectException("Tolerance exceeded in the errant record reporter", error);
+        }
+

Review comment:
       This marks the failure when we get a new error but the previous error already put us over the limit. IOW, if this is the first error that is reported, then `totalFailures` will be 0 when this method is called and thus the `withinToleranceLimits()` will return `true` (i.e., we haven't recorded any errors yet) and we will *not* enter the if block due to the negation.
   
   Seems like we should actually do this check *after* we record the error. That would be something like:
   ```
           markAsFailed();
           context.consumerRecord(consumerRecord);
           context.currentContext(stage, executingClass);
           context.error(error);
           errorHandlingMetrics.recordError();
           if (!withinToleranceLimits()) {
               errorHandlingMetrics.recordFailure();
               throw new ConnectException("Tolerance exceeded in error handler", error);
           }
           return context.report();
   ```
   Note that I added the `markAsFailed()` call since that's what increments the `totalFailures` field (and calls `errorHandlingMetrics.recordErrorTimestamp()`). 
   
   BTW, I'm not sure whether we should call `errorHandlingMetrics.recordError()` or `errorHandlingMetrics.recordFailure()` or both.
   
   IIUC, then when we get to the if-block on the first error being reported, the `markAsFailed()` method will have incremented the `totalFailures` (we were not doing that in this method before this PR), and if `errors.tolerance=NONE` is used we will fail on the *first* error -- which is what we want.
   
   I also think that if we add other error tolerance policies in the future, this logic will work correctly, as long as `withinToleranceLimits()` is implemented to return `false` when we should fail rather than report.
   
   Also, it'd be great to have unit tests that verify this behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wicknicks commented on a change in pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

Posted by GitBox <gi...@apache.org>.
wicknicks commented on a change in pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#discussion_r436843713



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -87,6 +87,12 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi
     public Future<Void> executeFailed(Stage stage, Class<?> executingClass,
                                       ConsumerRecord<byte[], byte[]> consumerRecord,
                                       Throwable error) {
+        if (!withinToleranceLimits()) {
+            errorHandlingMetrics.recordFailure();
+            markAsFailed();
+            throw new ConnectException("Tolerance exceeded in the errant record reporter", error);
+        }
+

Review comment:
       if we attempt an operation and it fails, `recordFailure` will be incremented, but `recordError` only tracks the cases where the when we encounter a problem that the framework cannot retry or skip. In the first case, we may still be able to retry or skip the record. In the `executeFailed` scenario, we should `recordFailure()` every time, and only `recordError` only when we have to fail the task.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -87,6 +87,12 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi
     public Future<Void> executeFailed(Stage stage, Class<?> executingClass,
                                       ConsumerRecord<byte[], byte[]> consumerRecord,
                                       Throwable error) {
+        if (!withinToleranceLimits()) {
+            errorHandlingMetrics.recordFailure();
+            markAsFailed();
+            throw new ConnectException("Tolerance exceeded in the errant record reporter", error);

Review comment:
       Since this is called from the task(), is it enough to just raise an exception? that may be swallowed by the task, and could continue processing. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah commented on pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

Posted by GitBox <gi...@apache.org>.
aakashnshah commented on pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#issuecomment-642418469


   I've updated the KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors @rhauch @wicknicks 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wicknicks commented on a change in pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

Posted by GitBox <gi...@apache.org>.
wicknicks commented on a change in pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#discussion_r438452151



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -556,6 +556,9 @@ private void deliverMessages() {
             log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size());
             long start = time.milliseconds();
             task.put(new ArrayList<>(messageBatch));
+            if (workerErrantRecordReporter != null && workerErrantRecordReporter.mustThrowException()) {
+                throw workerErrantRecordReporter.getExceptionToThrow();
+            }

Review comment:
       instead, you can just check: 
   
   ```
           if (retryWithToleranceOperator.failed()) {
               throw retryWithToleranceOperator.error();
           }
   ```
   
   because we are already storing the error in the processing context. you can expose that through the operator. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -99,8 +102,15 @@ public WorkerErrantRecordReporter(
                 valLength, key, value, headers);
         }
 
-        Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
-            SinkTask.class, consumerRecord, error);
+        Future<Void> future;
+        try {
+            future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+                SinkTask.class, consumerRecord, error);
+        } catch (ConnectException e) {
+            mustThrowException = true;
+            exceptionToThrow = e;
+            throw e;
+        }

Review comment:
       we don't need these vars, the errors are already stored in the ProcessingContext. look at comment above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#discussion_r436411235



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -87,6 +87,12 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi
     public Future<Void> executeFailed(Stage stage, Class<?> executingClass,
                                       ConsumerRecord<byte[], byte[]> consumerRecord,
                                       Throwable error) {
+        if (!withinToleranceLimits()) {
+            errorHandlingMetrics.recordFailure();
+            markAsFailed();
+            throw new ConnectException("Tolerance exceeded in the errant record reporter", error);
+        }
+

Review comment:
       This marks the failure when we get a new error but the previous error already put us over the limit. IOW, if this is the first error that is reported, then `totalFailures` will be 0 when this method is called and thus the `withinToleranceLimits()` will return `true` (i.e., we haven't recorded any errors yet) and we will *not* enter the if block due to the negation.
   
   Seems like we should actually do this check *after* we record the error. That would be something like:
   ```
           markAsFailed();     // This is what increments the `totalFailures`, and we were missing this as well
           context.consumerRecord(consumerRecord);
           context.currentContext(stage, executingClass);
           context.error(error);
           errorHandlingMetrics.recordError();
           if (!withinToleranceLimits()) {
               errorHandlingMetrics.recordFailure();
               throw new ConnectException("Tolerance exceeded in error handler", error);
           }
           return context.report();
   ```
   
   IIUC, then when we get to the if-block on the first error being reported, the `markAsFailed()` method will have incremented the `totalFailures` (we were not doing that in this method before this PR), and if `errors.tolerance=NONE` is used we will fail on the *first* error -- which is what we want.
   
   I also think that if we add other error tolerance policies in the future, this logic will work correctly, as long as `withinToleranceLimits()` is implemented to return `false` when we should fail rather than report.
   
   Also, it'd be great to have unit tests that verify this behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wicknicks commented on a change in pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

Posted by GitBox <gi...@apache.org>.
wicknicks commented on a change in pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#discussion_r438475355



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
##########
@@ -237,6 +239,7 @@ public void testErrantRecordReporter() throws Exception {
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+        props.put(ERRORS_TOLERANCE_CONFIG, ToleranceType.ALL.value());

Review comment:
       minor: we should move this test to `ErrorHandlingIntegrationTest`. this class was meant to be an example of how to do integration tests.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -556,6 +556,10 @@ private void deliverMessages() {
             log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size());
             long start = time.milliseconds();
             task.put(new ArrayList<>(messageBatch));
+            if (retryWithToleranceOperator.failed() && !retryWithToleranceOperator.withinToleranceLimits()) {

Review comment:
       let's add a small comment saying why we need to do this: specifically, that if the errors raised from the operator were swallowed by the task implementation, then here we need to kill the task, and if they were not swallowed, we would not get here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah commented on a change in pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

Posted by GitBox <gi...@apache.org>.
aakashnshah commented on a change in pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#discussion_r436437989



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -87,6 +87,12 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi
     public Future<Void> executeFailed(Stage stage, Class<?> executingClass,
                                       ConsumerRecord<byte[], byte[]> consumerRecord,
                                       Throwable error) {
+        if (!withinToleranceLimits()) {
+            errorHandlingMetrics.recordFailure();
+            markAsFailed();
+            throw new ConnectException("Tolerance exceeded in the errant record reporter", error);
+        }
+

Review comment:
       Thanks @rhauch for the comments, I agree with what you're saying. I think we should call both `errorHandlingMetrics.recordError()` and `errorHandlingMetrics.recordFailure()`. It seems like `recordError()` and `recordFailure()` represent basically the same thing (failed operations) and they're both called at some point when `execute(...)` is called by the `RetryWithToleranceOperator`, albeit at different times.
   
   Additionally, it looks as if in other circumstances, the error is reported to the various reporters even if the error tolerance has been exceeded, so I'll adjust the order of operations accordingly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch merged pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

Posted by GitBox <gi...@apache.org>.
rhauch merged pull request #8829:
URL: https://github.com/apache/kafka/pull/8829


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah commented on pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

Posted by GitBox <gi...@apache.org>.
aakashnshah commented on pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#issuecomment-642299655


   I've added a condition to `WorkerSinkTask` in `deliverMessages()` to throw a `ConnectException` if the Errant Record Reporter to account for the case that the developer swallows the exception thrown from `executeFailed(...)` within `put(...)`. Right now, I am storing this information within `WorkerErrantRecordReporter`, let me know if you guys think this is a good idea of it would be better done within `RetryWithToleranceOperator`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#discussion_r436411235



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -87,6 +87,12 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi
     public Future<Void> executeFailed(Stage stage, Class<?> executingClass,
                                       ConsumerRecord<byte[], byte[]> consumerRecord,
                                       Throwable error) {
+        if (!withinToleranceLimits()) {
+            errorHandlingMetrics.recordFailure();
+            markAsFailed();
+            throw new ConnectException("Tolerance exceeded in the errant record reporter", error);
+        }
+

Review comment:
       This marks the failure when we get a new error but the previous error already put us over the limit. IOW, if this is the first error that is reported, then `totalFailures` will be 0 when this method is called and thus the `withinToleranceLimits()` will return `true` (i.e., we haven't recorded any errors yet) and we will *not* enter the if block due to the negation.
   
   Seems like we should actually do this check *after* we record the error. That would be something like:
   ```
           markAsFailed();     // This is what increments the `totalFailures`, and we were missing this as well
           context.consumerRecord(consumerRecord);
           context.currentContext(stage, executingClass);
           context.error(error);
           errorHandlingMetrics.recordError();
           if (!withinToleranceLimits()) {
               errorHandlingMetrics.recordFailure();
               throw new ConnectException("Tolerance exceeded in error handler", error);
           }
           return context.report();
   ```
   
   IIUC, then when we get to the if-block on the first error being reported, the `markAsFailed()` method will increment the `totalFailures`, and if `errors.tolerance=NONE` is used we will fail on the first error.
   
   Also, it'd be great to have unit tests that verify this behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#discussion_r436411235



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -87,6 +87,12 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi
     public Future<Void> executeFailed(Stage stage, Class<?> executingClass,
                                       ConsumerRecord<byte[], byte[]> consumerRecord,
                                       Throwable error) {
+        if (!withinToleranceLimits()) {
+            errorHandlingMetrics.recordFailure();
+            markAsFailed();
+            throw new ConnectException("Tolerance exceeded in the errant record reporter", error);
+        }
+

Review comment:
       This marks the failure when we get a new error but the previous error already put us over the limit. IOW, if this is the first error that is reported, then `totalFailures` will be 0 when this method is called and thus the `withinToleranceLimits()` will return `true` (i.e., we haven't recorded any errors yet) and we will *not* enter the if block due to the negation.
   
   Seems like we should actually do this check *after* we record the error. That would be something like:
   ```
           markAsFailed();     // This is what increments the `totalFailures`, and we were missing this as well
           context.consumerRecord(consumerRecord);
           context.currentContext(stage, executingClass);
           context.error(error);
           errorHandlingMetrics.recordError();
           if (!withinToleranceLimits()) {
               errorHandlingMetrics.recordFailure();
               throw new ConnectException("Tolerance exceeded in error handler", error);
           }
           return context.report();
   ```
   
   IIUC, then when we get to the if-block on the first error being reported, the `markAsFailed()` method will have incremented the `totalFailures` (we were not doing that in this method before this PR), and if `errors.tolerance=NONE` is used we will fail on the *first* error -- which is what we want.
   
   Also, it'd be great to have unit tests that verify this behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org