You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2016/10/04 23:51:20 UTC

[jira] [Commented] (KAFKA-3559) Task creation time taking too long in rebalance callback

    [ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15547089#comment-15547089 ] 

Guozhang Wang commented on KAFKA-3559:
--------------------------------------

Here are some more thoughts on this issue and how we can improve the situation:

Currently with Kafka Streams each rebalance is expensive, even if it is only "partial" (i.e. only a few of the non-leader members in the consumer group has decided to re-join, which will not trigger a full rebalance but only will cause the coordinator to send back the assignment again), since anyways {{onPartitionRevoked}} and {{onPartitionAssigned}} will be triggered, closing and (re-)constructing the tasks. For example, on my local (a very small) laptop, with a complex topology containing 10+ stores and 15+ internal topics, with 3 threads on rebalance could take up to 20 seconds.

On the other hand, we want to close the tasks in {{onPartitionRevoked}} before the synchronization barrier only because threads may hold some file locks related to these tasks. And since tasks are all committed right before closing, I think it is safe to delay the destruction of tasks so that we may be able to save the time of closing / reconstructing such tasks. More specifically:

1. In {{onPartitionRevoked}}, instead of closing the tasks, we only need to commit the tasks and "pause" them by calling their topology processor's newly added {{flush}} calls, releasing the corresponding file locks of the tasks: in fact, it is automatically done since we will not process any messages during the rebalance anyways.
2. Then in {{onPartitionAssigned}}, we can if there are any tasks that have really been migrated out of the thread; for those tasks, closing them (and note that since these tasks are already committed in {{onPartitionRevoked}}, closing them will only involve calling the topology processor's {{close}} function, as well as closing the state stores), otherwise "resume" processing.

We need to think through some minor issues such as the above mentioned file locks for persistent state stores, how clean-up will work without introducing deadlocks, etc. But I think in general this solution should work.

> Task creation time taking too long in rebalance callback
> --------------------------------------------------------
>
>                 Key: KAFKA-3559
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3559
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Eno Thereska
>              Labels: architecture
>             Fix For: 0.10.2.0
>
>
> Currently in Kafka Streams, we create stream tasks upon getting newly assigned partitions in rebalance callback function {code} onPartitionAssigned {code}, which involves initialization of the processor state stores as well (including opening the rocksDB, restore the store from changelog, etc, which takes time).
> With a large number of state stores, the initialization time itself could take tens of seconds, which usually is larger than the consumer session timeout. As a result, when the callback is completed, the consumer is already treated as failed by the coordinator and rebalance again.
> We need to consider if we can optimize the initialization process, or move it out of the callback function, and while initializing the stores one-by-one, use poll call to send heartbeats to avoid being kicked out by coordinator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)