You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2023/04/10 22:10:00 UTC

[jira] [Assigned] (KAFKA-14054) Unexpected client shutdown as TimeoutException is thrown as IllegalStateException

     [ https://issues.apache.org/jira/browse/KAFKA-14054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias J. Sax reassigned KAFKA-14054:
---------------------------------------

    Assignee: Matthias J. Sax

> Unexpected client shutdown as TimeoutException is thrown as IllegalStateException
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-14054
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14054
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.1.0, 3.2.0, 3.1.1
>            Reporter: Donald
>            Assignee: Matthias J. Sax
>            Priority: Major
>
>  Re: https://forum.confluent.io/t/bug-timeoutexception-is-thrown-as-illegalstateexception-causing-client-shutdown/5460/2
> 1) TimeoutException is thrown as IllegalStateException in {_}org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded{_}. Which causes the client to shutdown in {_}org.apache.kafka.streams.KafkaStreams#getActionForThrowable{_}.
> 2) Should Timeout be a recoverable error which is expected to be handled by User?
> 3) This issue is exposed by change KAFKA-12887 which was introduced in kafka-streams ver 3.1.0
> *code referenced*
> {code:java|title=org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded}
> public boolean commitNeeded() {
>         if (commitNeeded) {
>             return true;
>         } else {
>             for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
>                 final TopicPartition partition = entry.getKey();
>                 try {
>                     final long offset = mainConsumer.position(partition);
>                     if (offset > entry.getValue() + 1) {
>                         commitNeeded = true;
>                         entry.setValue(offset - 1);
>                     }
>                 } catch (final TimeoutException error) {
>                     // the `consumer.position()` call should never block, because we know that we did process data
>                     // for the requested partition and thus the consumer should have a valid local position
>                     // that it can return immediately
>                     // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
>                     throw new IllegalStateException(error);
>                 } catch (final KafkaException fatal) {
>                     throw new StreamsException(fatal);
>                 }
>             }
>             return commitNeeded;
>         }
>     }
> {code}
> {code:java|title=org.apache.kafka.streams.KafkaStreams#getActionForThrowable}
> private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable,
>                                                                                                 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
>         final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action;
>         if (wrappedExceptionIsIn(throwable, EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
>             action = SHUTDOWN_CLIENT;
>         } else {
>             action = streamsUncaughtExceptionHandler.handle(throwable);
>         }
>         return action;
>     }
>     private void handleStreamsUncaughtException(final Throwable throwable,
>                                                 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
>                                                 final boolean skipThreadReplacement) {
>         final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
>         if (oldHandler) {
>             log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." +
>                     "The old handler will be ignored as long as a new handler is set.");
>         }
>         switch (action) {
>             case REPLACE_THREAD:
>                 if (!skipThreadReplacement) {
>                     log.error("Replacing thread in the streams uncaught exception handler", throwable);
>                     replaceStreamThread(throwable);
>                 } else {
>                     log.debug("Skipping thread replacement for recoverable error");
>                 }
>                 break;
>             case SHUTDOWN_CLIENT:
>                 log.error("Encountered the following exception during processing " +
>                         "and Kafka Streams opted to " + action + "." +
>                         " The streams client is going to shut down now. ", throwable);
>                 closeToError();
>                 break;
> {code}
>  *Stacktrace*
> {code:java|title=error log kafka-streams v. 3.1.0}
> 2022-06-22 13:58:35,796 ERROR thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1] logger=o.a.k.s.KafkaStreams - stream-client [com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
> org.apache.kafka.streams.errors.StreamsException: java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642)
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
> Caused by: java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined
>         at org.apache.kafka.streams.processor.internals.StreamTask.commitNeeded(StreamTask.java:1185)
>         at org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1111)
>         at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1084)
>         at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1071)
>         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:817)
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
>         ... 1 common frames omitted
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined
> 2022-06-22 13:58:35,796  INFO thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1] logger=o.a.k.s.KafkaStreams - stream-client [com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d] State transition from RUNNING to PENDING_ERROR
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)