You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/05 23:45:07 UTC

[GitHub] [kafka] mjsax opened a new pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

mjsax opened a new pull request #10072:
URL: https://github.com/apache/kafka/pull/10072


   Part of KIP-572: follow up work to PR #9800. It's not save to retry a TX
   commit after a timeout, because it's unclear if the commit was
   successful or not, and thus on retry we might get an
   IllegalStateException. Instead, we will throw a TaskCorruptedException
   to retry the TX if the commit failed.
   
   Call for review @ableegoldman @vvcephei @abbccdda @guozhangwang 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r574219892



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -213,27 +214,26 @@ private void recordSendError(final String topic, final Exception exception, fina
                 "indicating the task may be migrated out";
             sendException.set(new TaskMigratedException(errorMessage, exception));
         } else {
-            // TODO: KIP-572 handle `TimeoutException extends RetriableException`
-            // is seems inappropriate to pass `TimeoutException` into the `ProductionExceptionHander`
-            // -> should we add `TimeoutException` as `isFatalException` above (maybe not) ?
-            // -> maybe we should try to reset the task by throwing a `TaskCorruptedException` (including triggering `task.timeout.ms`) ?
             if (exception instanceof RetriableException) {
                 errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " +
                     "or the connection to broker was interrupted sending the request or receiving the response. " +
                     "\nConsider overwriting `max.block.ms` and /or " +
                     "`delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors";
-            }
-
-            if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
-                errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent.";
-                sendException.set(new StreamsException(errorMessage, exception));
+                sendException.set(

Review comment:
       Correct. Before this change, we would fail if the producer stops to retry and gives as a retryable exception.
   
   And if we can retry, we can throw a TCE. Thus, I would not say "we handle a retryable exception as timeout", but I would say, we stop failing but throw TCE on any retyable error (what is an orthogonal fix). However, because a timeout is a retryable error, we don't need to do any specific handling for timeout any longer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#issuecomment-781840391


   Updated.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r578937359



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##########
@@ -32,6 +33,7 @@
     public final int topicGroupId;
     /** The ID of the partition. */
     public final int partition;
+    public Task task;

Review comment:
       Passing the task into the `RecordCollector` also introduced a cyclic dependency, as we pass the collector into the task. But getting the changelog partitions within `handleCorrupted` makes sense -- it's actually even cleaner to begin with.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #10072:
URL: https://github.com/apache/kafka/pull/10072


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r574778749



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##########
@@ -32,6 +33,7 @@
     public final int topicGroupId;
     /** The ID of the partition. */
     public final int partition;
+    public Task task;

Review comment:
       Well, we also use `TaskId` all throughout the StreamsPartitionAssignor, and it definitely does not include a reference to a real `Task` object there (I think the `TaskId` ultimately comes from the PartitionGrouper`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r571331549



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##########
@@ -32,6 +33,7 @@
     public final int topicGroupId;
     /** The ID of the partition. */
     public final int partition;
+    public Task task;

Review comment:
       I was not happy about it either... Any good suggestions how to do better? I could not come up with a better solution quickly unfortunately. :(
   
   We could add it to the constructor and make it mandatory, but the "splash radios" would be quite large...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r573065903



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -213,27 +214,26 @@ private void recordSendError(final String topic, final Exception exception, fina
                 "indicating the task may be migrated out";
             sendException.set(new TaskMigratedException(errorMessage, exception));
         } else {
-            // TODO: KIP-572 handle `TimeoutException extends RetriableException`
-            // is seems inappropriate to pass `TimeoutException` into the `ProductionExceptionHander`
-            // -> should we add `TimeoutException` as `isFatalException` above (maybe not) ?
-            // -> maybe we should try to reset the task by throwing a `TaskCorruptedException` (including triggering `task.timeout.ms`) ?
             if (exception instanceof RetriableException) {
                 errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " +
                     "or the connection to broker was interrupted sending the request or receiving the response. " +
                     "\nConsider overwriting `max.block.ms` and /or " +
                     "`delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors";
-            }
-
-            if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
-                errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent.";
-                sendException.set(new StreamsException(errorMessage, exception));
+                sendException.set(

Review comment:
       Are we assuming any retriable exception to be handled as timeout exception?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r574781150



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##########
@@ -32,6 +33,7 @@
     public final int topicGroupId;
     /** The ID of the partition. */
     public final int partition;
+    public Task task;

Review comment:
       This is only used in the RecordCollectorImpl, right? Can't we just pass the actual `Task` object through to the `RecordCollectorImpl`? ie modify the constructor to accept a `Task` object instead of a `TaskId`, since we can get the `TaskId` from the `Task`. Going the other way just doesn't make any sense 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r579430512



##########
File path: docs/streams/upgrade-guide.html
##########
@@ -127,6 +127,15 @@ <h3><a id="streams_api_changes_280" href="#streams_api_changes_280">Streams API
         into the constructor, it is no longer required to set mandatory configuration parameters
         (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument">KIP-680</a>).
     </p>
+    <p>
+        Kafka Streams is now handling <code>TimeoutException</code> thrown by the consumer, producer, and admin client.
+        If a timeout occurs on a task, Kafka Streams moves to the next task and retries to make progress on the failed
+        task in the next iteration.
+        To bound how long Kafka Streams retries a task, you can set <code>task.timeout.ms</code> (default is 5 minutes).
+        If a task does not make progress within the specified task timeout (the timeout it tracked on a per-task basis)

Review comment:
       nit: the timeout _is_ tracked on a per-task basis

##########
File path: docs/streams/upgrade-guide.html
##########
@@ -127,6 +127,15 @@ <h3><a id="streams_api_changes_280" href="#streams_api_changes_280">Streams API
         into the constructor, it is no longer required to set mandatory configuration parameters
         (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument">KIP-680</a>).
     </p>
+    <p>
+        Kafka Streams is now handling <code>TimeoutException</code> thrown by the consumer, producer, and admin client.
+        If a timeout occurs on a task, Kafka Streams moves to the next task and retries to make progress on the failed
+        task in the next iteration.
+        To bound how long Kafka Streams retries a task, you can set <code>task.timeout.ms</code> (default is 5 minutes).
+        If a task does not make progress within the specified task timeout (the timeout it tracked on a per-task basis)

Review comment:
       or better yet
   ```suggestion
           If a task does not make progress within the specified task timeout, which is tracked on a per-task basis,
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r574221154



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##########
@@ -32,6 +33,7 @@
     public final int topicGroupId;
     /** The ID of the partition. */
     public final int partition;
+    public Task task;

Review comment:
       Thinking about it more: Atm we only do this for active tasks, but we set the reference in `ActiveTaskCreator` -- thus, the `task` reference should never be `null` for active tasks?
   
   We can close the `null` issue by doing the same thing in `StandbyTaskCreator` if we think it's worth it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r571317046



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##########
@@ -32,6 +33,7 @@
     public final int topicGroupId;
     /** The ID of the partition. */
     public final int partition;
+    public Task task;

Review comment:
       I have to say, it makes me a little uncomfortable to stick the actual `Task` object inside the basic `TaskId` container class. Especially if 99% of the time it will be null, given that we use `TaskId` all over the place and only call `setTask` a handful of time. It will only get increasingly difficult about whether it's safe to assume a given `TaskId` object has an actual non-null `Task` or not, and therefore is safe to use 😬 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r574782591



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##########
@@ -32,6 +33,7 @@
     public final int topicGroupId;
     /** The ID of the partition. */
     public final int partition;
+    public Task task;

Review comment:
       Not sure how big the "blast radius" of that would be. I guess another possible option is to just provide the `TaskId` in the TaskCorruptedException, since we only need the `Task` to get the changelogPartitions, and we ultimately handle the TaskCorruptedException inside the TaskManager, which should be able to look up the changelogPartitions for a given TaskId right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org