You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/09 17:23:58 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

abbccdda commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r573065903



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -213,27 +214,26 @@ private void recordSendError(final String topic, final Exception exception, fina
                 "indicating the task may be migrated out";
             sendException.set(new TaskMigratedException(errorMessage, exception));
         } else {
-            // TODO: KIP-572 handle `TimeoutException extends RetriableException`
-            // is seems inappropriate to pass `TimeoutException` into the `ProductionExceptionHander`
-            // -> should we add `TimeoutException` as `isFatalException` above (maybe not) ?
-            // -> maybe we should try to reset the task by throwing a `TaskCorruptedException` (including triggering `task.timeout.ms`) ?
             if (exception instanceof RetriableException) {
                 errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " +
                     "or the connection to broker was interrupted sending the request or receiving the response. " +
                     "\nConsider overwriting `max.block.ms` and /or " +
                     "`delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors";
-            }
-
-            if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
-                errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent.";
-                sendException.set(new StreamsException(errorMessage, exception));
+                sendException.set(

Review comment:
       Are we assuming any retriable exception to be handled as timeout exception?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org