You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2023/04/10 22:10:00 UTC
[jira] [Assigned] (KAFKA-14054) Unexpected client shutdown as TimeoutException is thrown as IllegalStateException
[ https://issues.apache.org/jira/browse/KAFKA-14054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax reassigned KAFKA-14054:
---------------------------------------
Assignee: Matthias J. Sax
> Unexpected client shutdown as TimeoutException is thrown as IllegalStateException
> ---------------------------------------------------------------------------------
>
> Key: KAFKA-14054
> URL: https://issues.apache.org/jira/browse/KAFKA-14054
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.1.0, 3.2.0, 3.1.1
> Reporter: Donald
> Assignee: Matthias J. Sax
> Priority: Major
>
> Re: https://forum.confluent.io/t/bug-timeoutexception-is-thrown-as-illegalstateexception-causing-client-shutdown/5460/2
> 1) TimeoutException is thrown as IllegalStateException in {_}org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded{_}. Which causes the client to shutdown in {_}org.apache.kafka.streams.KafkaStreams#getActionForThrowable{_}.
> 2) Should Timeout be a recoverable error which is expected to be handled by User?
> 3) This issue is exposed by change KAFKA-12887 which was introduced in kafka-streams ver 3.1.0
> *code referenced*
> {code:java|title=org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded}
> public boolean commitNeeded() {
> if (commitNeeded) {
> return true;
> } else {
> for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
> final TopicPartition partition = entry.getKey();
> try {
> final long offset = mainConsumer.position(partition);
> if (offset > entry.getValue() + 1) {
> commitNeeded = true;
> entry.setValue(offset - 1);
> }
> } catch (final TimeoutException error) {
> // the `consumer.position()` call should never block, because we know that we did process data
> // for the requested partition and thus the consumer should have a valid local position
> // that it can return immediately
> // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
> throw new IllegalStateException(error);
> } catch (final KafkaException fatal) {
> throw new StreamsException(fatal);
> }
> }
> return commitNeeded;
> }
> }
> {code}
> {code:java|title=org.apache.kafka.streams.KafkaStreams#getActionForThrowable}
> private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable,
> final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
> final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action;
> if (wrappedExceptionIsIn(throwable, EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
> action = SHUTDOWN_CLIENT;
> } else {
> action = streamsUncaughtExceptionHandler.handle(throwable);
> }
> return action;
> }
> private void handleStreamsUncaughtException(final Throwable throwable,
> final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
> final boolean skipThreadReplacement) {
> final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
> if (oldHandler) {
> log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." +
> "The old handler will be ignored as long as a new handler is set.");
> }
> switch (action) {
> case REPLACE_THREAD:
> if (!skipThreadReplacement) {
> log.error("Replacing thread in the streams uncaught exception handler", throwable);
> replaceStreamThread(throwable);
> } else {
> log.debug("Skipping thread replacement for recoverable error");
> }
> break;
> case SHUTDOWN_CLIENT:
> log.error("Encountered the following exception during processing " +
> "and Kafka Streams opted to " + action + "." +
> " The streams client is going to shut down now. ", throwable);
> closeToError();
> break;
> {code}
> *Stacktrace*
> {code:java|title=error log kafka-streams v. 3.1.0}
> 2022-06-22 13:58:35,796 ERROR thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1] logger=o.a.k.s.KafkaStreams - stream-client [com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
> org.apache.kafka.streams.errors.StreamsException: java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642)
> at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
> Caused by: java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined
> at org.apache.kafka.streams.processor.internals.StreamTask.commitNeeded(StreamTask.java:1185)
> at org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1111)
> at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1084)
> at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1071)
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:817)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
> ... 1 common frames omitted
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined
> 2022-06-22 13:58:35,796 INFO thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1] logger=o.a.k.s.KafkaStreams - stream-client [com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d] State transition from RUNNING to PENDING_ERROR
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)