You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2016/10/04 23:51:20 UTC
[jira] [Commented] (KAFKA-3559) Task creation time taking too long
in rebalance callback
[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15547089#comment-15547089 ]
Guozhang Wang commented on KAFKA-3559:
--------------------------------------
Here are some more thoughts on this issue and how we can improve the situation:
Currently with Kafka Streams each rebalance is expensive, even if it is only "partial" (i.e. only a few of the non-leader members in the consumer group has decided to re-join, which will not trigger a full rebalance but only will cause the coordinator to send back the assignment again), since anyways {{onPartitionRevoked}} and {{onPartitionAssigned}} will be triggered, closing and (re-)constructing the tasks. For example, on my local (a very small) laptop, with a complex topology containing 10+ stores and 15+ internal topics, with 3 threads on rebalance could take up to 20 seconds.
On the other hand, we want to close the tasks in {{onPartitionRevoked}} before the synchronization barrier only because threads may hold some file locks related to these tasks. And since tasks are all committed right before closing, I think it is safe to delay the destruction of tasks so that we may be able to save the time of closing / reconstructing such tasks. More specifically:
1. In {{onPartitionRevoked}}, instead of closing the tasks, we only need to commit the tasks and "pause" them by calling their topology processor's newly added {{flush}} calls, releasing the corresponding file locks of the tasks: in fact, it is automatically done since we will not process any messages during the rebalance anyways.
2. Then in {{onPartitionAssigned}}, we can if there are any tasks that have really been migrated out of the thread; for those tasks, closing them (and note that since these tasks are already committed in {{onPartitionRevoked}}, closing them will only involve calling the topology processor's {{close}} function, as well as closing the state stores), otherwise "resume" processing.
We need to think through some minor issues such as the above mentioned file locks for persistent state stores, how clean-up will work without introducing deadlocks, etc. But I think in general this solution should work.
> Task creation time taking too long in rebalance callback
> --------------------------------------------------------
>
> Key: KAFKA-3559
> URL: https://issues.apache.org/jira/browse/KAFKA-3559
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Guozhang Wang
> Assignee: Eno Thereska
> Labels: architecture
> Fix For: 0.10.2.0
>
>
> Currently in Kafka Streams, we create stream tasks upon getting newly assigned partitions in rebalance callback function {code} onPartitionAssigned {code}, which involves initialization of the processor state stores as well (including opening the rocksDB, restore the store from changelog, etc, which takes time).
> With a large number of state stores, the initialization time itself could take tens of seconds, which usually is larger than the consumer session timeout. As a result, when the callback is completed, the consumer is already treated as failed by the coordinator and rebalance again.
> We need to consider if we can optimize the initialization process, or move it out of the callback function, and while initializing the stores one-by-one, use poll call to send heartbeats to avoid being kicked out by coordinator.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)