You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2017/07/10 18:28:00 UTC

[jira] [Commented] (KAFKA-5578) Streams Task Assignor should consider the staleness of state directories when allocating tasks

    [ https://issues.apache.org/jira/browse/KAFKA-5578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080805#comment-16080805 ] 

Guozhang Wang commented on KAFKA-5578:
--------------------------------------

Hey [~damianguy] do you mean that there could be multiple threads that have the state dir for a given task and that task is not on any thread's {{previous active tasks}} list? In the StreamPartitionAssignor we chose priorities as:

1. Previous active tasks: this information is get from the {{removeActiveTask}} so at most one thread could ever has this task in its list.
2. Previous standby tasks: this information is collected from stat dir, so that multiple threads could have this task in its list.
3. Pick random one with load balance in mind.

Your concern is that if there is none for priority 1), then we may have multiple candidates for priority 2) but actually have different restoration cost, is that correct? I wonder in practice, with state dir cleanup process, how often could we ever hit that issue?

> Streams Task Assignor should consider the staleness of state directories when allocating tasks
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5578
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5578
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Damian Guy
>
> During task assignment we use the presence of a state directory to assign precedence to which instances should be assigned the task. We first chose previous active tasks, but then fall back to the existence of a state dir. Unfortunately we don't take into account the recency of the data from the available state dirs. So in the case where a task has run on many instances, it may be that we chose an instance that has relatively old data.
> When doing task assignment we should take into consideration the age of the data in the state dirs. We could use the data from the checkpoint files to determine which instance is most up-to-date and attempt to assign accordingly (obviously making sure that tasks are still balanced across available instances)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)