You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Damian Guy (JIRA)" <ji...@apache.org> on 2016/10/05 08:30:20 UTC

[jira] [Assigned] (KAFKA-4253) Fix Kafka Stream thread shutting down process ordering

     [ https://issues.apache.org/jira/browse/KAFKA-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Damian Guy reassigned KAFKA-4253:
---------------------------------

    Assignee: Damian Guy

> Fix Kafka Stream thread shutting down process ordering
> ------------------------------------------------------
>
>                 Key: KAFKA-4253
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4253
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0
>            Reporter: Guozhang Wang
>            Assignee: Damian Guy
>
> Currently we close the stream thread in the following way:
> 0. Commit all tasks.
> 1. Close producer.
> 2. Close consumer.
> 3. Close restore consumer.
> 4. For each task, close its topology processors one-by-one following the topology order by calling {{processor.close()}}.
> 5. For each task, close its state manager as well as flushing and closing all its associated state stores.
> We choose to close the producer / consumer clients before shutting down the tasks because we need to make sure all sent records has been acked so that we have the right log-end-offset when closing the store and checkpointing the offset of the changelog. However there is also an issue with this ordering, in which users choose to write more records in their {{processor.close()}} calls, this will cause RTE since the producers has already been closed, and no changelog records will be able to write.
> Thinking about this issue, a more appropriate ordering will be:
> 1. For each task, close their topology processors following the topology order by calling {{processor.close()}}.
> 2. For each task, commit its state by calling {{task.commit()}}. At this time all sent records should be acked since {{producer.flush()}} is called.
> 3. For each task, close their {{ProcessorStateManager}}.
> 4. Close all embedded clients, i.e. producer / consumer / restore consumer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)