You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Chris Egerton (Jira)" <ji...@apache.org> on 2020/07/09 16:12:00 UTC

[jira] [Commented] (KAFKA-10240) Sink tasks should not throw WakeupException on shutdown

    [ https://issues.apache.org/jira/browse/KAFKA-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154702#comment-17154702 ] 

Chris Egerton commented on KAFKA-10240:
---------------------------------------

For some background: this was actually discussed years ago on [https://github.com/apache/kafka/pull/1511], but never implemented.

> Sink tasks should not throw WakeupException on shutdown
> -------------------------------------------------------
>
>                 Key: KAFKA-10240
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10240
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0
>            Reporter: Chris Egerton
>            Assignee: Chris Egerton
>            Priority: Major
>
> * When a task is scheduled for shutdown, the framework [wakes up the consumer|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L159] for that task.
>  * As is noted in the [Javadocs for that method|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2348], “If no thread is blocking in a method which can throw {{org.apache.kafka.common.errors.WakeupException}}, the next call to such a method will raise it instead.”
>  * It just so happens that, if the framework isn’t in the middle of a call to the consumer and then the task gets stopped, the next call the framework will make on the consumer may be to commit offsets, which will immediately throw a {{WakeupException}}.
>  * Currently, the framework handles this by [immediately retrying the offset commit|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L337-L339] until it either throws a different exception or succeeds, and then throwing the original {{WakeupException}}. If this synchronous commit of offsets occurs during task shutdown (as opposed to in response to a consumer rebalance), it's unnecessary to throw the {{WakeupException}} back to the caller, and can cause alarming {{ERROR}}-level messages to get logged by the worker.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)