You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Chris Egerton (Jira)" <ji...@apache.org> on 2020/06/19 18:34:00 UTC

[jira] [Created] (KAFKA-10188) Sink task preCommit method gets called after task is stopped

Chris Egerton created KAFKA-10188:
-------------------------------------

             Summary: Sink task preCommit method gets called after task is stopped
                 Key: KAFKA-10188
                 URL: https://issues.apache.org/jira/browse/KAFKA-10188
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
            Reporter: Chris Egerton
            Assignee: Chris Egerton


When the [final cleanup for a sink task|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L191] is initiated, the framework [first calls stop() on the task|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L167], and then [closes the consumer for the task|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L171]. Closing the consumer has the side effect of triggering [the onPartitionsRevoked method|https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsRevoked-java.util.Collection-] of its {{ConsumerRebalanceListener}}, which in turn [causes the framework to call WorkerSinkTask::closePartitions|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L694], which in turn [calls WorkerSinkTask::commitOffsets|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L618], and finally, in turn [calls SinkTask:preCommit|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L386].

Calling {{SinkTask:preCommit}} after {{SinkTask::stop}} is likely to cause errors with tasks as they should be performing resource cleanup during {{stop}}, and the [current documentation on the SinkTask lifecycle|https://kafka.apache.org/25/javadoc/org/apache/kafka/connect/sink/SinkTask.html] makes no mention of anything happening after tasks are stopped.

 

The framework already [ensures that offsets are committed|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196] for tasks after the last call to {{SinkTask:put}} has been made, so the offset commit after {{SinkTask::stop}} has already been invoked can and should be removed with no compromise of existing delivery guarantees provided by the framework.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)