You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Robert Yokota (JIRA)" <ji...@apache.org> on 2018/05/11 18:05:00 UTC

[jira] [Comment Edited] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

    [ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472374#comment-16472374 ] 

Robert Yokota edited comment on KAFKA-6566 at 5/11/18 6:04 PM:
---------------------------------------------------------------

I've just started looking at this, but it looks like the correct place to put the the {{task.stop()}} is in {{WorkerSourceTask.close()}}.  This would mirror the call to {{task.stop()}} in {{WorkerSinkTask.close()}}.   {{close()}} is called in a finally block in {{WorkerTask.doRun()}} here:

https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L176

There is another possible change I am looking at and that is to put a call to {{task.stop()}} in {{WorkerSinkTask.stop()}}.  This would mirror the call to {{task.stop()}} in {{WorkerSourceTask.stop()}}.

Ideally the source and sink would be symmetrical in order to make it easier to reason about esp. for Connect developers.  The above changes assume that {{task.stop()}} is idempotent for both the source and sink.






was (Author: rayokota):
I've just started looking at this, but it looks like the correct place to put the the `task.stop()` is in `WorkerSourceTask.close()`.  This would mirror the call to `task.stop()` in `WorkerSinkTask.close()`.   `close()` is called in a finally block in `WorkerTask.doRun()` here:

https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L176

There is another possible change I am looking at and that is to put a call to `task.stop()` in `WorkerSinkTask.stop()`.  This would mirror the call to `task.stop()` in `WorkerSourceTask.stop()`.

Ideally the source and sink would be symmetrical in order to make it easier to reason about esp. for Connect developers.  The above changes assume that `task.stop()` is idempotent for both the source and sink.





> SourceTask#stop() not called after exception raised in poll()
> -------------------------------------------------------------
>
>                 Key: KAFKA-6566
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6566
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.0.0
>            Reporter: Gunnar Morling
>            Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that {{SourceTask#stop()}} will be called by the Kafka Connect framework in case an exception has been raised in {{poll()}}. That's not the case, though. As an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful action to take, as it'll allow the task to clean up any resources such as releasing any database connections, right after that failure and not only once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
>     @Override
>     public String version() {
>         return null;
>     }
>     @Override
>     public void start(Map<String, String> props) {
>     }
>     @Override
>     public Class<? extends Task> taskClass() {
>         return TestTask.class;
>     }
>     @Override
>     public List<Map<String, String>> taskConfigs(int maxTasks) {
>         return Collections.singletonList(Collections.singletonMap("foo", "bar"));
>     }
>     @Override
>     public void stop() {
>     }
>     @Override
>     public ConfigDef config() {
>         return new ConfigDef();
>     }
>     public static class TestTask extends SourceTask {
>         @Override
>         public String version() {
>             return null;
>         }
>         @Override
>         public void start(Map<String, String> props) {
>         }
>         @Override
>         public List<SourceRecord> poll() throws InterruptedException {
>             throw new RuntimeException();
>         }
>         @Override
>         public void stop() {
>             System.out.println("stop() called");
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)