You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2020/06/21 04:27:00 UTC

[jira] [Commented] (KAFKA-10121) Streams Task Assignment optimization design

    [ https://issues.apache.org/jira/browse/KAFKA-10121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17141289#comment-17141289 ] 

Guozhang Wang commented on KAFKA-10121:
---------------------------------------

Thanks for filing this ticket John! Since this is a more of a research-ish exploring ticket, I'd like to point out a few other shortcomings that we may want to address as well. We do not need to get all of the known issues in one shot though, just incrementally improve the algorithm would be great.

1. when considering balancing, our categorization of tasks, especially stateful tasks are rather simple by treating them all equal. In practice, however, a stateful task with 10 state stores is definitely more expensive than a stateful task with 1 state store. For standby tasks and stateless tasks, the same argument can apply as well. On the other hand, our “capacity” definition per instance is also very simple as it is only defined by num.threads, whereas in practice e.g. when instances are running on VMs or containers one can imagine the capacity of an instance can be defined as its resource like CPU / network / etc.

So I think a finer-grained task workload and instance capacity based on collected metrics (e.g. CPU and network usage) would be a better solution here, and also the goal would not be shooting for "balancing" but to make sure each instance's capacity would not be violated (of course, the capacity can just be a soft limit) with the resulted assignment.

This also means that, even with N instances joining the group, if a subset of these instances would suffice the capacity of the tasks then we may want to only assign tasks to M < N of them to achieve the placement goal --- this may make sense for multi-tenant cloud native environment.

2. With task fusion where tasks assigned to the same instance can avoid communicating through read/write to Kafka, the network cost would also need to be considered during the placement. In this case, the goal of the placement could become two fold: find a placement that minimize a) the state migration cost (i.e. try to be stickiness and maximize the overlaps with existing assignment as the description of this ticket) and b) the inter-instance communication cost, while respecting the finer-defined capacity of each instance.

Since the cost of a) is only a one-time payment while cost of b) is a permanent recurring payment, maybe we would change our algorithm to optimize on  minimizing inter-instance networking cost and then as a secondary goal maximizing stickiness.

BTW I found this paper is particularly interesting among all replica placement works I've read so far: http://www.ninewhilenine.org/publications/debs08.pdf some of the ideas above is also inspired by it.

> Streams Task Assignment optimization design
> -------------------------------------------
>
>                 Key: KAFKA-10121
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10121
>             Project: Kafka
>          Issue Type: Task
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: John Roesler
>            Priority: Minor
>
> Beginning in Kafka 2.6.0, Streams has a new task assignment algorithm that reacts to cluster membership changes by starting out 100% sticky and warming up tasks in the background to eventually migrate to a 100% balanced assignment. See KIP-441 for the details.
> However, in computing the final, 100% balanced, assignment, the assignor doesn't take into account the current ownership of the tasks. Thus, when instances are added or removed, the assignor is likely to migrate large numbers of tasks. This is mitigated by the fact that the migrations happen at a trickle over time in the background, but it's still better to avoid unnecessary migrations if possible. See the example below for details.
> The solution seems to be to use some kind of optimization algorithm to find a 100% balanced assignment that also has maximum overlap with the current assignment.
> I'd formally state the optimization problem as:
> > Generate a 100% balanced assignment that has the maximum overlap with the current assignment.
>  
> Example, with additional detail:
> The main focus of the KIP-441 work was the migration mechanism that allows Streams to warm up state for new instances in the background while continuing to process tasks on the instances that previously owned them. Accordingly the assignment algorithm itself focuses on simplicity and guaranteed balance, not optimality.
> There are three kinds of balance that all have to be met for Stream to be 100% balanced:
>  # Active task balance: no member should have more active processing workload than any other
>  # Stateful task balance: no member should have more stateful tasks (either active and stateful or standby) than any other
>  # Task parallel balance: no member should have more tasks (partitions) for a single subtopology than another
> (Note: in all these cases, an instance may actually have one more task than another, if the number of members doesn't evenly divide the number of tasks. For a simple case, consider if you have two members and only one task. It can only be assigned to one of the members, and the assignment is still as balanced as it could be.)
> The current algorithm ensures all three kinds of balance thusly:
>  # sort all members by name (to ensure assignment stability)
>  # sort all tasks by subtopology first, then by partition. E.g., sorted like this: 0_0, 0_1, 0_2, 1_0, 1_1
>  # for all tasks that are stateful, iterate over both tasks and members in sorted order, assigning each task t[i] to the member m[i % num_tasks]
>  # for each standby replica we need to assign, continue looping over the sorted members, assigning each replica to the next member (assuming the member doesn't already have a replica of the task)
>  # for each stateless task, assign an active replica to the member with the least number of tasks. Since the active assignment of the member with the least number of tasks should have at most 1 task less than any other member after step 3, the assignment after step 5 is still balanced.
> To demonstrate how a more sophisticated algorithm could minimize migrations, consider the following simple assignment with two instances and six tasks:
> m1: [0_0, 0_2, 0_4]
> m2: [0_1, 0_3, 0_5]
> Adding a new member causes four of the tasks to migrate:
> m1: [0_0, 0_3]
> m2: [0_1, 0_4]
> m3: [0_2, 0_5]
> However, the following assignment is equally balanced, and only two of the tasks need to migrate:
> m1: [0_0, 0_2]
> m2: [0_1, 0_3]
> m3: [0_4, 0_5]
>  
> Of course, the full problem, including all three kinds of balance is much more complex to optimize.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)