You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Richard Yu (JIRA)" <ji...@apache.org> on 2018/02/18 04:30:04 UTC

[jira] [Comment Edited] (KAFKA-4696) Streams standby task assignment should be state-store aware

    [ https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368437#comment-16368437 ] 

Richard Yu edited comment on KAFKA-4696 at 2/18/18 4:29 AM:
------------------------------------------------------------

[~damianguy] [~mjsax] While looking through {{StickyTaskAssignor}}, I have found that {{StickyTaskAssignor#leastLoaded()}} is used to determine the next {{ClientState}} for which the task will be assigned to (note that this approach is used mostly for {{StickyTaskAssignor#assignStandby()}}). In {{leastLoaded()}}, the main mode of comparison is through the use of the {{ClientState#hasMoreAvailableCapacityThan()}} method which essentially compares the number of tasks currently assigned and then returns which is less (after dividing by respective capacities to determine what fraction of each {{ClientState}}'s storage is occupied). However, this Jira would require that tasks with StateStores be distinguished from tasks without them. In essence, how would one compare two ClientStates which would be lighter?

One solution will be to let the tasks be weighted. (e.g. tasks with StateStores has weight 2 and tasks without having weight 1). However, that would bring about complications when dealing when the total percentage of capacity which is occupied. What are your thoughts on this approach?


was (Author: yohan123):
[~damianguy] [~mjsax] While looking through {{StickyTaskAssignor}}, I have found that {{StickyTaskAssignor#leastLoaded()}} is used to determine the next {{ClientState}} for which the task will be assigned to (note that this approach is used mostly for {{StickyTaskAssignor#assignStandby()}}). In {{leastLoaded()}}, the main mode of comparison is through the use of the {{ClientState#hasMoreAvailableCapacityThan()}} method which essentially compares the number of tasks currently    assigned and then returns which is less (after dividing by respective capacities to determine what fraction of each {{ClientState}}'s storage is occupied). However, this Jira would require that tasks with StateStores be distinguished from tasks without them. In essence, how would one compare two ClientStates which would be lighter?

One solution will be to let the tasks be weighted. (e.g. tasks with StateStores has weight 2 and tasks without having weight 1). However, that would bring about complications when dealing when the total percentage of capacity which is occupied. What are your thoughts on this approach?

> Streams standby task assignment should be state-store aware
> -----------------------------------------------------------
>
>                 Key: KAFKA-4696
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4696
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>    Affects Versions: 0.10.2.0, 0.11.0.0
>            Reporter: Damian Guy
>            Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This can result in uneven balance of standby task assignment as all tasks are assigned, but only those tasks with state-stores are ever created by {{StreamThread}}. So what seems like an optimal strategy during assignment time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  One of the clients may end up with both state-store tasks, while the other has none.
> Further to this, standby task configuration is currently "all or nothing". It might make sense to allow more fine grained configuration, i.e., the ability to specify the number of standby replicas individually for each stateful operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)