You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vamossagar12 (via GitHub)" <gi...@apache.org> on 2023/05/17 12:33:47 UTC

[GitHub] [kafka] vamossagar12 opened a new pull request, #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

vamossagar12 opened a new pull request, #13726:
URL: https://github.com/apache/kafka/pull/13726

   AbstractWorkerSourceTask is not enforcing the errors.retry.timeout and errors.retry.delay.max.ms parameters in case of a RetriableException during task.poll(). [KIP-298 ](https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect) explicitly mentions that `poll() in SourceTask will retry in case of RetriableException. 
   The current behaviour is that in case of RetriableException, null is returned immediately.
   This PR aims to add that support.
   
   Note that I think SinkTask#put doesn't honour which can be taken up in a follow up PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1218190316


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   Coming back to this, this is the [method](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L173) which gets invoked eventually for any retry operations . And in this case, the only way to get out of a string of loop of RetriableExceptions is for the error.tolerance to be set to 0.
   
   I don't see how having error.tolerance set to none impacts in this case. This holds true for other retry operations as well like headers converters etc.
   Also, if you change the tolerance type to NONE in this method https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java#L227 and run any one of testExecute** tests that invoke those methods, the tests pass which is in line with what I said above.
   
   I appears to me that the none and all behaviour is applicable only for non retriable exceptions. RetriableExceptions would be retried none the less. Should we improve the documentation for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1218215790


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   Also, for task.poll() there is not records.skipped that is applicable. We either get records or don't get them, so there is not question of skipping them. Rest of the metrics, IMHO still point to operation level failures and are usable as is. Let me know what do you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214147751


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java:
##########
@@ -483,6 +494,37 @@ public void testFailureInPoll() throws Exception {
         assertPollMetrics(0);
     }
 
+    @Test
+    public void testRetriableExceptionInPoll() throws Exception {
+
+        final ErrorHandlingMetrics errorHandlingMetrics = mock(ErrorHandlingMetrics.class);
+        final RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(30, 15, ALL, SYSTEM, errorHandlingMetrics);

Review Comment:
   Shouldn't we use `MockTime` here so that we can advance the time programatically on each call to poll and ensure the retries occur as per the deadline?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   Looks like this currently only handles `org.apache.kafka.connect.errors.RetriableException`, should we also add `org.apache.kafka.common.errors.RetriableException`? Looks like there's some history here - https://github.com/apache/kafka/pull/6675



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   It looks like the `RetryWithToleranceOperator` currently expects to only be used for per-record kind of operations (conversion, transformation etc.) - [here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L217), [here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L233) and [here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L183) for instance. I think we'll need to make a number of updates there before it can be used in operations such as `SourceTask::poll` and `SinkTask::put`. 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   Should we be retrying on `RetriableException`s even if `error.tolerance` is set to `none`? Looks like that's what is happening here?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   Furthermore, we aren't propagating any exceptions after retry exhaustion (due to timeout) so the task will never be marked as failed and we'll just keep calling `poll` in the execution loop for the worker source task. That definitely doesn't seem like the right thing to do as it defeats the purpose of configuring `errors.retry.timeout`, `errors.retry.delay.max.ms`, `errors.tolerance`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1218190316


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   Coming back to this, this is the [method](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L173) which gets invoked eventually for any retry operations . And in this case, the only way to get out of a string of loop of RetriableExceptions is for the deadline to be set to exceed (assuming no infinite retries).
   
   I don't see how having error.tolerance set to none impacts in this case. This holds true for other retry operations as well like headers converters etc.
   Also, if you change the tolerance type to NONE in this method https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java#L227 and run any one of testExecute** tests that invoke this method, the tests pass which is in line with what I said above.
   
   I appears to me that the none and all behaviour is applicable only for non retriable exceptions. RetriableExceptions would be retried none the less. Should we improve the documentation for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1218188816


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   Coming back to this, this is the [method](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L173) which gets invoked eventually for any retry operations . And in this case, the only way to get out of a string of loop of RetriableExceptions is for the error.tolerance to be set to 0.
   
   I don't see how having error.tolerance set to none impacts in this case. This holds true for other retry operations as well like headers converters etc. 
   Also, if you change the tolerance type to `NONE` in this method https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java#L227 and run any one of `testExecute**` tests that invoke those methods, the tests pass which is in line with what I said above.
   
   I appears to me that the `none` and `all` behaviour is applicable only for non retriable exceptions. RetriableExceptions would be retried none the less. Should we improve the documentation for this?
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214192966


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   Oh.. I just went by what the KIP says in the Proposed Approach section. I would assume this change would need an amendment of the KIP .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214227660


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java:
##########
@@ -483,6 +494,37 @@ public void testFailureInPoll() throws Exception {
         assertPollMetrics(0);
     }
 
+    @Test
+    public void testRetriableExceptionInPoll() throws Exception {
+
+        final ErrorHandlingMetrics errorHandlingMetrics = mock(ErrorHandlingMetrics.class);
+        final RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(30, 15, ALL, SYSTEM, errorHandlingMetrics);

Review Comment:
   That's probably due to https://github.com/apache/kafka/pull/13726#discussion_r1214152577 (i.e. poll being called repeatedly in the task execution loop)? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] blacktooth commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "blacktooth (via GitHub)" <gi...@apache.org>.
blacktooth commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1318930687


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   Wondering if [RetryUtil](https://github.com/apache/kafka/blob/0029bc4897e603614a49e0b0f1e623abbe650c61/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java#L55) is a better alternative to use to avoid publishing these metrics. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214203205


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java:
##########
@@ -483,6 +494,37 @@ public void testFailureInPoll() throws Exception {
         assertPollMetrics(0);
     }
 
+    @Test
+    public void testRetriableExceptionInPoll() throws Exception {
+
+        final ErrorHandlingMetrics errorHandlingMetrics = mock(ErrorHandlingMetrics.class);
+        final RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(30, 15, ALL, SYSTEM, errorHandlingMetrics);

Review Comment:
   I am using the existing mechanisms to run the test using CountDownLatch. I could get the enough number of invocations but will think about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214197359


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   I think only `total-records-skipped` is a problem which is defined as `Total number of records skipped by this task.`.  `total-record-failures` is actually defined as the total number of failures seen and not the total number of records which have failed. I would assume it should be easy to update the number of records in the poll(), why do you say we need to make a number of updates?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214199833


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   Yeah but I didn't think I really needed to handle that case and the existing methods should be taking care of it going by the example above with converters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1218215790


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   Also, for task.poll() there is not records.skipped that is applicable. We either get records or don't get them, so there is not question of skipping them. That condition needs to be handled as today if the context fails, then record skipped is invoked irrespective of the operation like [here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L217) and [here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L233). Rest of the metrics, IMHO still point to operation level failures and are usable as is. Let me know what do you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214198459


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   That's a good point. I assumed it should be taken care because even the transformations also don't make any such checks [here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L482)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13726:
URL: https://github.com/apache/kafka/pull/13726#issuecomment-1552416862

   Test failures seem unrelated. Couple of failing connect related tests are being fixed via separate JIRA tickets


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1216322429


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   The way I understood it is the number of operations that have failed and that's independent of the number of records in the batch. That's what the subtle difference in naming seems to be suggest to me 
   
   ```
   recordProcessingFailures = createTemplate("total-record-failures", TASK_ERROR_HANDLING_GROUP_NAME,
                   "The number of record processing failures in this task.", taskErrorHandlingTags);
   recordProcessingErrors = createTemplate("total-record-errors", TASK_ERROR_HANDLING_GROUP_NAME,
                   "The number of record processing errors in this task. ", taskErrorHandlingTags);
   recordsSkipped = createTemplate("total-records-skipped", TASK_ERROR_HANDLING_GROUP_NAME,
                   "The number of records skipped due to errors.", taskErrorHandlingTags);
   ```
   i.e The number of `records` v/s The number of `record processing`. Does that make sense or am I reading too much between the lines?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214227126


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   `total-record-failures` is defined as the number of "record processing failures" in a task - https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L366-L367. I don't think a failed call to `SourceTask::poll` counts as a record processing failure?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1218211612


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   Not sure why would the task not fail. As already explained [here](https://github.com/apache/kafka/pull/13726#discussion_r1218190316), if a non retriable exception is thrown, then 
   based on this condition 
   
   
   https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L225-L233
   
   a ConnectException would be thrown or a null value is returned. It depends if the exception thrown from the specific operation is assignable or not and also on the tolerance limits. So, if the tolerance limit is none, then it fails. 
   
   Also, if it's a retriableException and the deadline exceeds, then a null value is returned. The current code [here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L356) already handles null values returned from poll(). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1218214268


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java:
##########
@@ -483,6 +494,37 @@ public void testFailureInPoll() throws Exception {
         assertPollMetrics(0);
     }
 
+    @Test
+    public void testRetriableExceptionInPoll() throws Exception {
+
+        final ErrorHandlingMetrics errorHandlingMetrics = mock(ErrorHandlingMetrics.class);
+        final RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(30, 15, ALL, SYSTEM, errorHandlingMetrics);

Review Comment:
   yeah. I guess that's expected as explained already [here](https://github.com/apache/kafka/pull/13726#discussion_r1218190316)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1218215790


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   Also, for task.poll() there is not records.skipped that is applicable. We either get records or don't get them, so there is not question of skipping them. That condition needs to be handled as today if the context fails, then record skipped is invoked irrespective of the operation like [here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L217) and [here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L233). Rest of the metrics, IMHO still point to operation level failures and are usable as is. Let me know what do you think. One of my tests needs to change as well if we go ahead with the above approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13726:
URL: https://github.com/apache/kafka/pull/13726#issuecomment-1575441251

   @yashmayya , FWIW I found this very old PR which aims to do something similar( but was closed) https://github.com/apache/kafka/pull/7857/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] blacktooth commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

Posted by "blacktooth (via GitHub)" <gi...@apache.org>.
blacktooth commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1318930687


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass());

Review Comment:
   @vamossagar12  @yashmayya Wondering if [RetryUtil](https://github.com/apache/kafka/blob/0029bc4897e603614a49e0b0f1e623abbe650c61/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java#L55) is a better alternative to use to avoid publishing these metrics. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException [kafka]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13726:
URL: https://github.com/apache/kafka/pull/13726#issuecomment-1844200113

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch) <p> If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org