You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Hao Li <hl...@confluent.io.INVALID> on 2023/05/09 23:03:19 UTC

[DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Hi all,

I have submitted KIP-925 to add rack awareness logic in task assignment in
Kafka Streams and would like to start a discussion:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams

-- 
Thanks,
Hao

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by Hao Li <hl...@confluent.io.INVALID>.
Hi Colt,

Thanks for the feedback.

> most deployments have three racks, RF=3, and one replica for
each partition in each rack

This KIP is mainly targeting the case where the client won't always be in
the same rack as any replica. There's some proposal to make RF=2 and use
other tiered storage to do backup. If all TopicPartitions have replicas
which can be in same rack as clients, this KIP will not take effect and
rack aware assignment will be turned off.

As for setting the leader of replicas, I think this is something clients
don't have control over. Also as you mentioned, if there are multiple
clients, there's no "set the leader and fit all" solution.


On Thu, Jun 1, 2023 at 10:20 AM Colt McNealy <co...@littlehorse.io> wrote:

> Hi all,
>
> I've got a rather naive question here. The spiritual goal of this KIP is to
> reduce cross-rack traffic (normally, this manifests itself in terms of a
> higher AWS/Azure bill as cloud providers charge for cross-AZ traffic).
>
> To generalize, most deployments have three racks, RF=3, and one replica for
> each partition in each rack. Therefore, in the steady state (absent any
> cluster anomalies such as broker failure, etc) we are pretty confident that
> there should be a replica for every partition (input, changelog,
> repartition, output topic) on the same rack as a given Streams instance.
>
> Why not just let Sophie's High-Availability Task Assignor do its thing, and
> then *set the preferred leader* for each replica to a broker in the same
> rack/AZ as the Active Task? This would solve two problems:
>
> 1. The current KIP can't make any improvements in the case where a Task has
> three involved partitions (eg. input, changelog, output) and the leader for
> each partition is in a different rack. With this approach, we could get
> pretty close to having zero cross-AZ traffic in a healthy cluster.
> 2. There needs to be a lot of work done to balance availability, data
> movement, and cross-AZ traffic in the current proposal. My proposal doesn't
> actually involve any additional data movement; simply reassignment of
> partition leadership.
>
> The biggest argument against this proposal is that there could be two
> Streams apps using the same topic, which would cause some bickering.
> Secondly, some have observed that changing partition leadership can trigger
> ProducerFencedExceptions in EOS, which causes a state restoration.
>
> Colt McNealy
>
> *Founder, LittleHorse.dev*
>
>
> On Thu, Jun 1, 2023 at 10:02 AM Hao Li <hl...@confluent.io.invalid> wrote:
>
> > Hi Bruno,
> >
> > dropping config rack.aware.assignment.enabled
> > and add value NONE to the enum for the possible values of config
> > rack.aware.assignment.strategy sounds good to me.
> >
> > On Thu, Jun 1, 2023 at 12:39 AM Bruno Cadonna <ca...@apache.org>
> wrote:
> >
> > > Hi Hao,
> > >
> > > Thanks for the updates!
> > >
> > > What do you think about dropping config rack.aware.assignment.enabled
> > > and add value NONE to the enum for the possible values of config
> > > rack.aware.assignment.strategy?
> > >
> > > Best,
> > > Bruno
> > >
> > > On 31.05.23 23:31, Hao Li wrote:
> > > > Hi all,
> > > >
> > > > I've updated the KIP based on the feedback. Major changes I made:
> > > > 1. Add rack aware assignment to `StickyTaskAssignor`
> > > > 2. Reject `Prefer reliability and then find optimal cost` option in
> > > standby
> > > > task assignment.
> > > >
> > > >
> > > > On Wed, May 31, 2023 at 12:09 PM Hao Li <hl...@confluent.io> wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> Thanks for the feedback! I will update the KIP accordingly.
> > > >>
> > > >> *For Sophie's comments:*
> > > >>
> > > >> 1 and 2. Good catch. Fixed these.
> > > >>
> > > >> 3 and 4 Yes. We can make this public config and call out the
> > > >> clientConsumer config users need to set.
> > > >>
> > > >> 5. It's ideal to take the previous assignment in HAAssignor into
> > > >> consideration when we compute our target assignment, the
> complications
> > > come
> > > >> with making sure the assignment can eventually converge and we don't
> > do
> > > >> probing rebalance infinitely. It's not only about storing the
> previous
> > > >> assignment or get it somehow. We can actually get the previous
> > > assignment
> > > >> now like we do in StickyAssignor. But the previous assignment will
> > > change
> > > >> in each round of probing rebalance. The proposal which added some
> > > weight to
> > > >> make the rack aware assignment lean towards the original HAA's
> target
> > > >> assignment will add benefits of stability in some corner cases in
> case
> > > of
> > > >> tie in cross rack traffic cost. But it's not sticky. But the bottom
> > > line is
> > > >> it won't be worse than current HAA's stickiness.
> > > >>
> > > >> 6. I'm fine with changing the assignor config to public. Actually, I
> > > think
> > > >> we can min-cost algorithm with StickyAssignor as well to mitigate
> the
> > > >> problem of 5. So we can have one public config to choose an assignor
> > and
> > > >> one public config to enable the rack aware assignment.
> > > >>
> > > >> *For Bruno's comments:*
> > > >>
> > > >> The proposal was to implement all the options and use configs to
> > choose
> > > >> them during runtime. We can make those configs public as suggested.
> > > >> 1, 2, 3, 4, 5: agree and will fix those.
> > > >> 6: subscription protocol is not changed.
> > > >> 7: yeah. Let me fix the notations.
> > > >> 8: It meant clients. In the figure, it maps to `c1_1`, `c1_2`,
> `c1_3`
> > > etc.
> > > >> 9: I'm also ok with just optimizing reliability for standby tasks.
> Or
> > we
> > > >> could simply run the "balance reliability over cost" greedy
> algorithm
> > to
> > > >> see if any cost could be reduced.
> > > >> 10: Make sense. Will fix the wording.
> > > >> 11: Make sense. Will update the test part.
> > > >>
> > > >> *For Walker's comments:*
> > > >> 1. Stability for HAA is an issue. See my comments for Sophie's
> > feedback
> > > 5
> > > >> and 6. I think we could use the rack aware assignment for
> > > StickyAssignor as
> > > >> well. For HAA assignments, it's less sticky and we can only shoot
> for
> > > >> minimizing the cross rack traffic eventually when everything is
> > stable.
> > > >> 2. Yeah. This is a good point and we can also turn it on for
> > > >> StickyAssignor.
> > > >>
> > > >> Thanks,
> > > >> Hao
> > > >>
> > > >>
> > > >> On Tue, May 30, 2023 at 2:28 PM Sophie Blee-Goldman <
> > > >> ableegoldman@gmail.com> wrote:
> > > >>
> > > >>> Hey Hao, thanks for the KIP!
> > > >>>
> > > >>> 1. There's a typo in the "internal.rack.aware.assignment.strategry"
> > > >>> config,
> > > >>> this
> > > >>> should be internal.rack.aware.assignment.strategy.
> > > >>>
> > > >>> 2.
> > > >>>
> > > >>>>   For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
> > > >>> Number of
> > > >>>> edges E is T * N where T is the number of clients and N is the
> > number
> > > of
> > > >>>> Tasks. This is because a task can be assigned to any client so
> there
> > > >>> will
> > > >>>> be an edge between every task and every client. The total
> complexity
> > > >>> would
> > > >>>> be O(T * N) if we want to be more specific.
> > > >>>
> > > >>> I feel like I'm missing something here, but if E = T * N and the
> > > >>> complexity
> > > >>> is ~O(E^2), doesn't
> > > >>> this make the total complexity order of O(T^2 * N^2)?
> > > >>>
> > > >>> 3.
> > > >>>
> > > >>>> Since 3.C.I and 3.C.II have different tradeoffs and work better in
> > > >>>> different workloads etc, we
> > > >>>
> > > >>> could add an internal configuration to choose one of them at
> runtime.
> > > >>>>
> > > >>> Why only an internal configuration? Same goes for
> > > >>> internal.rack.aware.assignment.standby.strategry (which also has
> the
> > > typo)
> > > >>>
> > > >>> 4.
> > > >>>
> > > >>>>   There are no changes in public interfaces.
> > > >>>
> > > >>> I think it would be good to explicitly call out that users can
> > utilize
> > > >>> this
> > > >>> new feature by setting the
> > > >>> ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example
> > > >>>
> > > >>> 5.
> > > >>>
> > > >>>> The idea is that if we always try to make it overlap as much with
> > > >>>> HAAssignor’s target
> > > >>>
> > > >>> assignment, at least there’s a higher chance that tasks won’t be
> > > shuffled
> > > >>> a
> > > >>>> lot if the clients
> > > >>>
> > > >>> remain the same across rebalances.
> > > >>>>
> > > >>> This line definitely gave me some pause -- if there was one major
> > > takeaway
> > > >>> I had after KIP-441,
> > > >>> one thing that most limited the feature's success, it was our
> > > assumption
> > > >>> that clients are relatively
> > > >>> stable across rebalances. This was mostly true at limited scale or
> > for
> > > >>> on-prem setups, but
> > > >>> unsurprisingly broke down in cloud environments or larger clusters.
> > Not
> > > >>> only do clients naturally
> > > >>> fall in and out of the group, autoscaling is becoming more and more
> > of
> > > a
> > > >>> thing.
> > > >>>
> > > >>> Lastly, and this is more easily solved but still worth calling out,
> > an
> > > >>> assignment is only deterministic
> > > >>> as long as the client.id is persisted. Currently in Streams, we
> only
> > > >>> write
> > > >>> the process UUID to the
> > > >>> state directory if there is one, ie if at least one persistent
> > stateful
> > > >>> task exists in the topology. This
> > > >>> made sense in the context of KIP-441, which targeted heavily
> stateful
> > > >>> deployments, but this KIP
> > > >>> presumably intends to target more than just the persistent &
> stateful
> > > >>> subset of applications. To
> > > >>> make matters even worse,  "persistent" is defined in a semantically
> > > >>> inconsistent way throughout
> > > >>> Streams.
> > > >>>
> > > >>> All this is to say, it may sound more complicated to remember the
> > > previous
> > > >>> assignment, but (a)
> > > >>> imo it only introduces a lot more complexity and shaky assumptions
> to
> > > >>> continue down this
> > > >>> path, and (b) we actually already do persist some amount of state,
> > like
> > > >>> the
> > > >>> process UUID, and
> > > >>> (c) it seems like this is the perfect opportunity to finally rid
> > > ourselves
> > > >>> of the determinism constraint
> > > >>> which has frankly caused more trouble and time lost in sum than it
> > > would
> > > >>> have taken us to just
> > > >>> write the HighAvailabilityTaskAssignor to consider the previous
> > > assignment
> > > >>> from the start in KIP-441
> > > >>>
> > > >>> 6.
> > > >>>
> > > >>>> StickyTaskAssignor  users who would like to use rack aware
> > assignment
> > > >>>> should upgrade their
> > > >>>
> > > >>> Kafka Streams version to the version in which
> > > HighAvailabilityTaskAssignor
> > > >>>> and rack awareness
> > > >>>
> > > >>> assignment are available.
> > > >>>
> > > >>> Building off of the above, the HAAssignor hasn't worked out
> perfectly
> > > for
> > > >>> everybody up until now,
> > > >>> given that we are only adding complexity to it now, on the
> flipside I
> > > >>> would
> > > >>> hesitate to try and force
> > > >>> everyone to use it if they want to upgrade. We added a "secret"
> > > backdoor
> > > >>> internal config to allow
> > > >>> users to set the task assignor back in KIP-441 for this reason.
> WDYT
> > > about
> > > >>> bumping this to a public
> > > >>> config on the side in this KIP?
> > > >>>
> > > >>>
> > > >>> On Tue, May 23, 2023 at 11:46 AM Hao Li <hl...@confluent.io.invalid>
> > > wrote:
> > > >>>
> > > >>>> Thanks John! Yeah. The ConvergenceTest looks very helpful. I will
> > add
> > > >>> it to
> > > >>>> the test plan. I will also add tests to verify the new optimizer
> > will
> > > >>>> produce a balanced assignment which has no worse cross AZ cost
> than
> > > the
> > > >>>> existing assignor.
> > > >>>>
> > > >>>> Hao
> > > >>>>
> > > >>>> On Mon, May 22, 2023 at 3:39 PM John Roesler <vvcephei@apache.org
> >
> > > >>> wrote:
> > > >>>>
> > > >>>>> Hi Hao,
> > > >>>>>
> > > >>>>> Thanks for the KIP!
> > > >>>>>
> > > >>>>> Overall, I think this is a great idea. I always wanted to circle
> > back
> > > >>>>> after the Smooth Scaling KIP to put a proper optimization
> algorithm
> > > >>> into
> > > >>>>> place. I think this has the promise to really improve the quality
> > of
> > > >>> the
> > > >>>>> balanced assignments we produce.
> > > >>>>>
> > > >>>>> Thanks for providing the details about the MaxCut/MinFlow
> > algorithm.
> > > >>> It
> > > >>>>> seems like a good choice for me, assuming we choose the right
> > scaling
> > > >>>>> factors for the weights we add to the graph. Unfortunately, I
> don't
> > > >>> think
> > > >>>>> that there's a good way to see how easy or hard this is going to
> be
> > > >>> until
> > > >>>>> we actually implement it and test it.
> > > >>>>>
> > > >>>>> That leads to the only real piece of feedback I had on the KIP,
> > which
> > > >>> is
> > > >>>>> the testing portion. You mentioned system/integration/unit tests,
> > but
> > > >>>>> there's not too much information about what those tests will do.
> > I'd
> > > >>> like
> > > >>>>> to suggest that we invest in more simulation testing
> specifically,
> > > >>>> similar
> > > >>>>> to what we did in
> > > >>>>>
> > > >>>>
> > > >>>
> > >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
> > > >>>>> .
> > > >>>>>
> > > >>>>> In fact, it seems like we _could_ write the simulation up front,
> > and
> > > >>> then
> > > >>>>> implement the algorithm in a dummy way and just see whether it
> > passes
> > > >>> the
> > > >>>>> simulations or not, before actually integrating it with Kafka
> > > Streams.
> > > >>>>>
> > > >>>>> Basically, I'd be +1 on this KIP today, but I'd feel confident
> > about
> > > >>> it
> > > >>>> if
> > > >>>>> we had a little more detail regarding how we are going to verify
> > that
> > > >>> the
> > > >>>>> new optimizer is actually going to produce more optimal plans
> than
> > > the
> > > >>>>> existing assigner we have today.
> > > >>>>>
> > > >>>>> Thanks again!
> > > >>>>> -John
> > > >>>>>
> > > >>>>> On 2023/05/22 16:49:22 Hao Li wrote:
> > > >>>>>> Hi Colt,
> > > >>>>>>
> > > >>>>>> Thanks for the comments.
> > > >>>>>>
> > > >>>>>>> and I struggle to see how the algorithm isn't at least O(N)
> where
> > > >>> N
> > > >>>> is
> > > >>>>>> the number of Tasks...?
> > > >>>>>>
> > > >>>>>> For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
> > > >>> Number
> > > >>>>> of
> > > >>>>>> edges E is T * N where T is the number of clients and N is the
> > > >>> number
> > > >>>> of
> > > >>>>>> Tasks. This is because a task can be assigned to any client so
> > there
> > > >>>> will
> > > >>>>>> be an edge between every task and every client. The total
> > complexity
> > > >>>>> would
> > > >>>>>> be O(T * N) if we want to be more specific.
> > > >>>>>>
> > > >>>>>>> But if the leaders for each partition are spread across
> multiple
> > > >>>> zones,
> > > >>>>>> how will you handle that?
> > > >>>>>>
> > > >>>>>> This is what the min-cost flow solution is trying to solve? i.e.
> > > >>> Find
> > > >>>> an
> > > >>>>>> assignment of tasks to clients where across AZ traffic can be
> > > >>>> minimized.
> > > >>>>>> But there are some constraints to the solution and one of them
> is
> > we
> > > >>>> need
> > > >>>>>> to balance task assignment first (
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment
> > > >>>>> ).
> > > >>>>>> So in your example of three tasks' partitions being in the same
> AZ
> > > >>> of a
> > > >>>>>> client, if there are other clients, we still want to balance the
> > > >>> tasks
> > > >>>> to
> > > >>>>>> other clients even if putting all tasks to a single client can
> > > >>> result
> > > >>>> in
> > > >>>>> 0
> > > >>>>>> cross AZ traffic. In
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
> > > >>>>>> section, the algorithm will try to find a min-cost solution
> based
> > on
> > > >>>>>> balanced assignment instead of pure min-cost.
> > > >>>>>>
> > > >>>>>> Thanks,
> > > >>>>>> Hao
> > > >>>>>>
> > > >>>>>> On Tue, May 9, 2023 at 5:55 PM Colt McNealy <
> colt@littlehorse.io>
> > > >>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hello Hao,
> > > >>>>>>>
> > > >>>>>>> First of all, THANK YOU for putting this together. I had been
> > > >>> hoping
> > > >>>>>>> someone might bring something like this forward. A few
> comments:
> > > >>>>>>>
> > > >>>>>>> **1: Runtime Complexity
> > > >>>>>>>> Klein’s cycle canceling algorithm can solve the min-cost flow
> > > >>>>> problem in
> > > >>>>>>> O(E^2CU) time where C is max cost and U is max capacity. In our
> > > >>>>> particular
> > > >>>>>>> case, C is 1 and U is at most 3 (A task can have at most 3
> topics
> > > >>>>> including
> > > >>>>>>> changelog topic?). So the algorithm runs in O(E^2) time for our
> > > >>> case.
> > > >>>>>>>
> > > >>>>>>> A Task can have multiple input topics, and also multiple state
> > > >>>> stores,
> > > >>>>> and
> > > >>>>>>> multiple output topics. The most common case is three topics as
> > > >>> you
> > > >>>>>>> described, but this is not necessarily guaranteed. Also, math
> is
> > > >>> one
> > > >>>>> of my
> > > >>>>>>> weak points, but to me O(E^2) is equivalent to O(1), and I
> > > >>> struggle
> > > >>>> to
> > > >>>>> see
> > > >>>>>>> how the algorithm isn't at least O(N) where N is the number of
> > > >>>>> Tasks...?
> > > >>>>>>>
> > > >>>>>>> **2: Broker-Side Partition Assignments
> > > >>>>>>> Consider the case with just three topics in a Task (one input,
> > one
> > > >>>>> output,
> > > >>>>>>> one changelog). If all three partition leaders are in the same
> > > >>> Rack
> > > >>>> (or
> > > >>>>>>> better yet, the same broker), then we could get massive savings
> > by
> > > >>>>>>> assigning the Task to that Rack/availability zone. But if the
> > > >>> leaders
> > > >>>>> for
> > > >>>>>>> each partition are spread across multiple zones, how will you
> > > >>> handle
> > > >>>>> that?
> > > >>>>>>> Is that outside the scope of this KIP, or is it worth
> introducing
> > > >>> a
> > > >>>>>>> kafka-streams-generate-rebalance-proposal.sh tool?
> > > >>>>>>>
> > > >>>>>>> Colt McNealy
> > > >>>>>>> *Founder, LittleHorse.io*
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Tue, May 9, 2023 at 4:03 PM Hao Li <hli@confluent.io.invalid
> >
> > > >>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Hi all,
> > > >>>>>>>>
> > > >>>>>>>> I have submitted KIP-925 to add rack awareness logic in task
> > > >>>>> assignment
> > > >>>>>>> in
> > > >>>>>>>> Kafka Streams and would like to start a discussion:
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
> > > >>>>>>>>
> > > >>>>>>>> --
> > > >>>>>>>> Thanks,
> > > >>>>>>>> Hao
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> --
> > > >>>>>> Thanks,
> > > >>>>>> Hao
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>> --
> > > >>>> Thanks,
> > > >>>> Hao
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >> --
> > > >> Thanks,
> > > >> Hao
> > > >>
> > > >
> > > >
> > >
> >
> >
> > --
> > Thanks,
> > Hao
> >
>


-- 
Thanks,
Hao

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by Colt McNealy <co...@littlehorse.io>.
Hi all,

I've got a rather naive question here. The spiritual goal of this KIP is to
reduce cross-rack traffic (normally, this manifests itself in terms of a
higher AWS/Azure bill as cloud providers charge for cross-AZ traffic).

To generalize, most deployments have three racks, RF=3, and one replica for
each partition in each rack. Therefore, in the steady state (absent any
cluster anomalies such as broker failure, etc) we are pretty confident that
there should be a replica for every partition (input, changelog,
repartition, output topic) on the same rack as a given Streams instance.

Why not just let Sophie's High-Availability Task Assignor do its thing, and
then *set the preferred leader* for each replica to a broker in the same
rack/AZ as the Active Task? This would solve two problems:

1. The current KIP can't make any improvements in the case where a Task has
three involved partitions (eg. input, changelog, output) and the leader for
each partition is in a different rack. With this approach, we could get
pretty close to having zero cross-AZ traffic in a healthy cluster.
2. There needs to be a lot of work done to balance availability, data
movement, and cross-AZ traffic in the current proposal. My proposal doesn't
actually involve any additional data movement; simply reassignment of
partition leadership.

The biggest argument against this proposal is that there could be two
Streams apps using the same topic, which would cause some bickering.
Secondly, some have observed that changing partition leadership can trigger
ProducerFencedExceptions in EOS, which causes a state restoration.

Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Jun 1, 2023 at 10:02 AM Hao Li <hl...@confluent.io.invalid> wrote:

> Hi Bruno,
>
> dropping config rack.aware.assignment.enabled
> and add value NONE to the enum for the possible values of config
> rack.aware.assignment.strategy sounds good to me.
>
> On Thu, Jun 1, 2023 at 12:39 AM Bruno Cadonna <ca...@apache.org> wrote:
>
> > Hi Hao,
> >
> > Thanks for the updates!
> >
> > What do you think about dropping config rack.aware.assignment.enabled
> > and add value NONE to the enum for the possible values of config
> > rack.aware.assignment.strategy?
> >
> > Best,
> > Bruno
> >
> > On 31.05.23 23:31, Hao Li wrote:
> > > Hi all,
> > >
> > > I've updated the KIP based on the feedback. Major changes I made:
> > > 1. Add rack aware assignment to `StickyTaskAssignor`
> > > 2. Reject `Prefer reliability and then find optimal cost` option in
> > standby
> > > task assignment.
> > >
> > >
> > > On Wed, May 31, 2023 at 12:09 PM Hao Li <hl...@confluent.io> wrote:
> > >
> > >> Hi all,
> > >>
> > >> Thanks for the feedback! I will update the KIP accordingly.
> > >>
> > >> *For Sophie's comments:*
> > >>
> > >> 1 and 2. Good catch. Fixed these.
> > >>
> > >> 3 and 4 Yes. We can make this public config and call out the
> > >> clientConsumer config users need to set.
> > >>
> > >> 5. It's ideal to take the previous assignment in HAAssignor into
> > >> consideration when we compute our target assignment, the complications
> > come
> > >> with making sure the assignment can eventually converge and we don't
> do
> > >> probing rebalance infinitely. It's not only about storing the previous
> > >> assignment or get it somehow. We can actually get the previous
> > assignment
> > >> now like we do in StickyAssignor. But the previous assignment will
> > change
> > >> in each round of probing rebalance. The proposal which added some
> > weight to
> > >> make the rack aware assignment lean towards the original HAA's target
> > >> assignment will add benefits of stability in some corner cases in case
> > of
> > >> tie in cross rack traffic cost. But it's not sticky. But the bottom
> > line is
> > >> it won't be worse than current HAA's stickiness.
> > >>
> > >> 6. I'm fine with changing the assignor config to public. Actually, I
> > think
> > >> we can min-cost algorithm with StickyAssignor as well to mitigate the
> > >> problem of 5. So we can have one public config to choose an assignor
> and
> > >> one public config to enable the rack aware assignment.
> > >>
> > >> *For Bruno's comments:*
> > >>
> > >> The proposal was to implement all the options and use configs to
> choose
> > >> them during runtime. We can make those configs public as suggested.
> > >> 1, 2, 3, 4, 5: agree and will fix those.
> > >> 6: subscription protocol is not changed.
> > >> 7: yeah. Let me fix the notations.
> > >> 8: It meant clients. In the figure, it maps to `c1_1`, `c1_2`, `c1_3`
> > etc.
> > >> 9: I'm also ok with just optimizing reliability for standby tasks. Or
> we
> > >> could simply run the "balance reliability over cost" greedy algorithm
> to
> > >> see if any cost could be reduced.
> > >> 10: Make sense. Will fix the wording.
> > >> 11: Make sense. Will update the test part.
> > >>
> > >> *For Walker's comments:*
> > >> 1. Stability for HAA is an issue. See my comments for Sophie's
> feedback
> > 5
> > >> and 6. I think we could use the rack aware assignment for
> > StickyAssignor as
> > >> well. For HAA assignments, it's less sticky and we can only shoot for
> > >> minimizing the cross rack traffic eventually when everything is
> stable.
> > >> 2. Yeah. This is a good point and we can also turn it on for
> > >> StickyAssignor.
> > >>
> > >> Thanks,
> > >> Hao
> > >>
> > >>
> > >> On Tue, May 30, 2023 at 2:28 PM Sophie Blee-Goldman <
> > >> ableegoldman@gmail.com> wrote:
> > >>
> > >>> Hey Hao, thanks for the KIP!
> > >>>
> > >>> 1. There's a typo in the "internal.rack.aware.assignment.strategry"
> > >>> config,
> > >>> this
> > >>> should be internal.rack.aware.assignment.strategy.
> > >>>
> > >>> 2.
> > >>>
> > >>>>   For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
> > >>> Number of
> > >>>> edges E is T * N where T is the number of clients and N is the
> number
> > of
> > >>>> Tasks. This is because a task can be assigned to any client so there
> > >>> will
> > >>>> be an edge between every task and every client. The total complexity
> > >>> would
> > >>>> be O(T * N) if we want to be more specific.
> > >>>
> > >>> I feel like I'm missing something here, but if E = T * N and the
> > >>> complexity
> > >>> is ~O(E^2), doesn't
> > >>> this make the total complexity order of O(T^2 * N^2)?
> > >>>
> > >>> 3.
> > >>>
> > >>>> Since 3.C.I and 3.C.II have different tradeoffs and work better in
> > >>>> different workloads etc, we
> > >>>
> > >>> could add an internal configuration to choose one of them at runtime.
> > >>>>
> > >>> Why only an internal configuration? Same goes for
> > >>> internal.rack.aware.assignment.standby.strategry (which also has the
> > typo)
> > >>>
> > >>> 4.
> > >>>
> > >>>>   There are no changes in public interfaces.
> > >>>
> > >>> I think it would be good to explicitly call out that users can
> utilize
> > >>> this
> > >>> new feature by setting the
> > >>> ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example
> > >>>
> > >>> 5.
> > >>>
> > >>>> The idea is that if we always try to make it overlap as much with
> > >>>> HAAssignor’s target
> > >>>
> > >>> assignment, at least there’s a higher chance that tasks won’t be
> > shuffled
> > >>> a
> > >>>> lot if the clients
> > >>>
> > >>> remain the same across rebalances.
> > >>>>
> > >>> This line definitely gave me some pause -- if there was one major
> > takeaway
> > >>> I had after KIP-441,
> > >>> one thing that most limited the feature's success, it was our
> > assumption
> > >>> that clients are relatively
> > >>> stable across rebalances. This was mostly true at limited scale or
> for
> > >>> on-prem setups, but
> > >>> unsurprisingly broke down in cloud environments or larger clusters.
> Not
> > >>> only do clients naturally
> > >>> fall in and out of the group, autoscaling is becoming more and more
> of
> > a
> > >>> thing.
> > >>>
> > >>> Lastly, and this is more easily solved but still worth calling out,
> an
> > >>> assignment is only deterministic
> > >>> as long as the client.id is persisted. Currently in Streams, we only
> > >>> write
> > >>> the process UUID to the
> > >>> state directory if there is one, ie if at least one persistent
> stateful
> > >>> task exists in the topology. This
> > >>> made sense in the context of KIP-441, which targeted heavily stateful
> > >>> deployments, but this KIP
> > >>> presumably intends to target more than just the persistent & stateful
> > >>> subset of applications. To
> > >>> make matters even worse,  "persistent" is defined in a semantically
> > >>> inconsistent way throughout
> > >>> Streams.
> > >>>
> > >>> All this is to say, it may sound more complicated to remember the
> > previous
> > >>> assignment, but (a)
> > >>> imo it only introduces a lot more complexity and shaky assumptions to
> > >>> continue down this
> > >>> path, and (b) we actually already do persist some amount of state,
> like
> > >>> the
> > >>> process UUID, and
> > >>> (c) it seems like this is the perfect opportunity to finally rid
> > ourselves
> > >>> of the determinism constraint
> > >>> which has frankly caused more trouble and time lost in sum than it
> > would
> > >>> have taken us to just
> > >>> write the HighAvailabilityTaskAssignor to consider the previous
> > assignment
> > >>> from the start in KIP-441
> > >>>
> > >>> 6.
> > >>>
> > >>>> StickyTaskAssignor  users who would like to use rack aware
> assignment
> > >>>> should upgrade their
> > >>>
> > >>> Kafka Streams version to the version in which
> > HighAvailabilityTaskAssignor
> > >>>> and rack awareness
> > >>>
> > >>> assignment are available.
> > >>>
> > >>> Building off of the above, the HAAssignor hasn't worked out perfectly
> > for
> > >>> everybody up until now,
> > >>> given that we are only adding complexity to it now, on the flipside I
> > >>> would
> > >>> hesitate to try and force
> > >>> everyone to use it if they want to upgrade. We added a "secret"
> > backdoor
> > >>> internal config to allow
> > >>> users to set the task assignor back in KIP-441 for this reason. WDYT
> > about
> > >>> bumping this to a public
> > >>> config on the side in this KIP?
> > >>>
> > >>>
> > >>> On Tue, May 23, 2023 at 11:46 AM Hao Li <hl...@confluent.io.invalid>
> > wrote:
> > >>>
> > >>>> Thanks John! Yeah. The ConvergenceTest looks very helpful. I will
> add
> > >>> it to
> > >>>> the test plan. I will also add tests to verify the new optimizer
> will
> > >>>> produce a balanced assignment which has no worse cross AZ cost than
> > the
> > >>>> existing assignor.
> > >>>>
> > >>>> Hao
> > >>>>
> > >>>> On Mon, May 22, 2023 at 3:39 PM John Roesler <vv...@apache.org>
> > >>> wrote:
> > >>>>
> > >>>>> Hi Hao,
> > >>>>>
> > >>>>> Thanks for the KIP!
> > >>>>>
> > >>>>> Overall, I think this is a great idea. I always wanted to circle
> back
> > >>>>> after the Smooth Scaling KIP to put a proper optimization algorithm
> > >>> into
> > >>>>> place. I think this has the promise to really improve the quality
> of
> > >>> the
> > >>>>> balanced assignments we produce.
> > >>>>>
> > >>>>> Thanks for providing the details about the MaxCut/MinFlow
> algorithm.
> > >>> It
> > >>>>> seems like a good choice for me, assuming we choose the right
> scaling
> > >>>>> factors for the weights we add to the graph. Unfortunately, I don't
> > >>> think
> > >>>>> that there's a good way to see how easy or hard this is going to be
> > >>> until
> > >>>>> we actually implement it and test it.
> > >>>>>
> > >>>>> That leads to the only real piece of feedback I had on the KIP,
> which
> > >>> is
> > >>>>> the testing portion. You mentioned system/integration/unit tests,
> but
> > >>>>> there's not too much information about what those tests will do.
> I'd
> > >>> like
> > >>>>> to suggest that we invest in more simulation testing specifically,
> > >>>> similar
> > >>>>> to what we did in
> > >>>>>
> > >>>>
> > >>>
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
> > >>>>> .
> > >>>>>
> > >>>>> In fact, it seems like we _could_ write the simulation up front,
> and
> > >>> then
> > >>>>> implement the algorithm in a dummy way and just see whether it
> passes
> > >>> the
> > >>>>> simulations or not, before actually integrating it with Kafka
> > Streams.
> > >>>>>
> > >>>>> Basically, I'd be +1 on this KIP today, but I'd feel confident
> about
> > >>> it
> > >>>> if
> > >>>>> we had a little more detail regarding how we are going to verify
> that
> > >>> the
> > >>>>> new optimizer is actually going to produce more optimal plans than
> > the
> > >>>>> existing assigner we have today.
> > >>>>>
> > >>>>> Thanks again!
> > >>>>> -John
> > >>>>>
> > >>>>> On 2023/05/22 16:49:22 Hao Li wrote:
> > >>>>>> Hi Colt,
> > >>>>>>
> > >>>>>> Thanks for the comments.
> > >>>>>>
> > >>>>>>> and I struggle to see how the algorithm isn't at least O(N) where
> > >>> N
> > >>>> is
> > >>>>>> the number of Tasks...?
> > >>>>>>
> > >>>>>> For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
> > >>> Number
> > >>>>> of
> > >>>>>> edges E is T * N where T is the number of clients and N is the
> > >>> number
> > >>>> of
> > >>>>>> Tasks. This is because a task can be assigned to any client so
> there
> > >>>> will
> > >>>>>> be an edge between every task and every client. The total
> complexity
> > >>>>> would
> > >>>>>> be O(T * N) if we want to be more specific.
> > >>>>>>
> > >>>>>>> But if the leaders for each partition are spread across multiple
> > >>>> zones,
> > >>>>>> how will you handle that?
> > >>>>>>
> > >>>>>> This is what the min-cost flow solution is trying to solve? i.e.
> > >>> Find
> > >>>> an
> > >>>>>> assignment of tasks to clients where across AZ traffic can be
> > >>>> minimized.
> > >>>>>> But there are some constraints to the solution and one of them is
> we
> > >>>> need
> > >>>>>> to balance task assignment first (
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment
> > >>>>> ).
> > >>>>>> So in your example of three tasks' partitions being in the same AZ
> > >>> of a
> > >>>>>> client, if there are other clients, we still want to balance the
> > >>> tasks
> > >>>> to
> > >>>>>> other clients even if putting all tasks to a single client can
> > >>> result
> > >>>> in
> > >>>>> 0
> > >>>>>> cross AZ traffic. In
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
> > >>>>>> section, the algorithm will try to find a min-cost solution based
> on
> > >>>>>> balanced assignment instead of pure min-cost.
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Hao
> > >>>>>>
> > >>>>>> On Tue, May 9, 2023 at 5:55 PM Colt McNealy <co...@littlehorse.io>
> > >>>> wrote:
> > >>>>>>
> > >>>>>>> Hello Hao,
> > >>>>>>>
> > >>>>>>> First of all, THANK YOU for putting this together. I had been
> > >>> hoping
> > >>>>>>> someone might bring something like this forward. A few comments:
> > >>>>>>>
> > >>>>>>> **1: Runtime Complexity
> > >>>>>>>> Klein’s cycle canceling algorithm can solve the min-cost flow
> > >>>>> problem in
> > >>>>>>> O(E^2CU) time where C is max cost and U is max capacity. In our
> > >>>>> particular
> > >>>>>>> case, C is 1 and U is at most 3 (A task can have at most 3 topics
> > >>>>> including
> > >>>>>>> changelog topic?). So the algorithm runs in O(E^2) time for our
> > >>> case.
> > >>>>>>>
> > >>>>>>> A Task can have multiple input topics, and also multiple state
> > >>>> stores,
> > >>>>> and
> > >>>>>>> multiple output topics. The most common case is three topics as
> > >>> you
> > >>>>>>> described, but this is not necessarily guaranteed. Also, math is
> > >>> one
> > >>>>> of my
> > >>>>>>> weak points, but to me O(E^2) is equivalent to O(1), and I
> > >>> struggle
> > >>>> to
> > >>>>> see
> > >>>>>>> how the algorithm isn't at least O(N) where N is the number of
> > >>>>> Tasks...?
> > >>>>>>>
> > >>>>>>> **2: Broker-Side Partition Assignments
> > >>>>>>> Consider the case with just three topics in a Task (one input,
> one
> > >>>>> output,
> > >>>>>>> one changelog). If all three partition leaders are in the same
> > >>> Rack
> > >>>> (or
> > >>>>>>> better yet, the same broker), then we could get massive savings
> by
> > >>>>>>> assigning the Task to that Rack/availability zone. But if the
> > >>> leaders
> > >>>>> for
> > >>>>>>> each partition are spread across multiple zones, how will you
> > >>> handle
> > >>>>> that?
> > >>>>>>> Is that outside the scope of this KIP, or is it worth introducing
> > >>> a
> > >>>>>>> kafka-streams-generate-rebalance-proposal.sh tool?
> > >>>>>>>
> > >>>>>>> Colt McNealy
> > >>>>>>> *Founder, LittleHorse.io*
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, May 9, 2023 at 4:03 PM Hao Li <hl...@confluent.io.invalid>
> > >>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi all,
> > >>>>>>>>
> > >>>>>>>> I have submitted KIP-925 to add rack awareness logic in task
> > >>>>> assignment
> > >>>>>>> in
> > >>>>>>>> Kafka Streams and would like to start a discussion:
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
> > >>>>>>>>
> > >>>>>>>> --
> > >>>>>>>> Thanks,
> > >>>>>>>> Hao
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> Thanks,
> > >>>>>> Hao
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> Thanks,
> > >>>> Hao
> > >>>>
> > >>>
> > >>
> > >>
> > >> --
> > >> Thanks,
> > >> Hao
> > >>
> > >
> > >
> >
>
>
> --
> Thanks,
> Hao
>

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by Hao Li <hl...@confluent.io.INVALID>.
Hi Bruno,

dropping config rack.aware.assignment.enabled
and add value NONE to the enum for the possible values of config
rack.aware.assignment.strategy sounds good to me.

On Thu, Jun 1, 2023 at 12:39 AM Bruno Cadonna <ca...@apache.org> wrote:

> Hi Hao,
>
> Thanks for the updates!
>
> What do you think about dropping config rack.aware.assignment.enabled
> and add value NONE to the enum for the possible values of config
> rack.aware.assignment.strategy?
>
> Best,
> Bruno
>
> On 31.05.23 23:31, Hao Li wrote:
> > Hi all,
> >
> > I've updated the KIP based on the feedback. Major changes I made:
> > 1. Add rack aware assignment to `StickyTaskAssignor`
> > 2. Reject `Prefer reliability and then find optimal cost` option in
> standby
> > task assignment.
> >
> >
> > On Wed, May 31, 2023 at 12:09 PM Hao Li <hl...@confluent.io> wrote:
> >
> >> Hi all,
> >>
> >> Thanks for the feedback! I will update the KIP accordingly.
> >>
> >> *For Sophie's comments:*
> >>
> >> 1 and 2. Good catch. Fixed these.
> >>
> >> 3 and 4 Yes. We can make this public config and call out the
> >> clientConsumer config users need to set.
> >>
> >> 5. It's ideal to take the previous assignment in HAAssignor into
> >> consideration when we compute our target assignment, the complications
> come
> >> with making sure the assignment can eventually converge and we don't do
> >> probing rebalance infinitely. It's not only about storing the previous
> >> assignment or get it somehow. We can actually get the previous
> assignment
> >> now like we do in StickyAssignor. But the previous assignment will
> change
> >> in each round of probing rebalance. The proposal which added some
> weight to
> >> make the rack aware assignment lean towards the original HAA's target
> >> assignment will add benefits of stability in some corner cases in case
> of
> >> tie in cross rack traffic cost. But it's not sticky. But the bottom
> line is
> >> it won't be worse than current HAA's stickiness.
> >>
> >> 6. I'm fine with changing the assignor config to public. Actually, I
> think
> >> we can min-cost algorithm with StickyAssignor as well to mitigate the
> >> problem of 5. So we can have one public config to choose an assignor and
> >> one public config to enable the rack aware assignment.
> >>
> >> *For Bruno's comments:*
> >>
> >> The proposal was to implement all the options and use configs to choose
> >> them during runtime. We can make those configs public as suggested.
> >> 1, 2, 3, 4, 5: agree and will fix those.
> >> 6: subscription protocol is not changed.
> >> 7: yeah. Let me fix the notations.
> >> 8: It meant clients. In the figure, it maps to `c1_1`, `c1_2`, `c1_3`
> etc.
> >> 9: I'm also ok with just optimizing reliability for standby tasks. Or we
> >> could simply run the "balance reliability over cost" greedy algorithm to
> >> see if any cost could be reduced.
> >> 10: Make sense. Will fix the wording.
> >> 11: Make sense. Will update the test part.
> >>
> >> *For Walker's comments:*
> >> 1. Stability for HAA is an issue. See my comments for Sophie's feedback
> 5
> >> and 6. I think we could use the rack aware assignment for
> StickyAssignor as
> >> well. For HAA assignments, it's less sticky and we can only shoot for
> >> minimizing the cross rack traffic eventually when everything is stable.
> >> 2. Yeah. This is a good point and we can also turn it on for
> >> StickyAssignor.
> >>
> >> Thanks,
> >> Hao
> >>
> >>
> >> On Tue, May 30, 2023 at 2:28 PM Sophie Blee-Goldman <
> >> ableegoldman@gmail.com> wrote:
> >>
> >>> Hey Hao, thanks for the KIP!
> >>>
> >>> 1. There's a typo in the "internal.rack.aware.assignment.strategry"
> >>> config,
> >>> this
> >>> should be internal.rack.aware.assignment.strategy.
> >>>
> >>> 2.
> >>>
> >>>>   For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
> >>> Number of
> >>>> edges E is T * N where T is the number of clients and N is the number
> of
> >>>> Tasks. This is because a task can be assigned to any client so there
> >>> will
> >>>> be an edge between every task and every client. The total complexity
> >>> would
> >>>> be O(T * N) if we want to be more specific.
> >>>
> >>> I feel like I'm missing something here, but if E = T * N and the
> >>> complexity
> >>> is ~O(E^2), doesn't
> >>> this make the total complexity order of O(T^2 * N^2)?
> >>>
> >>> 3.
> >>>
> >>>> Since 3.C.I and 3.C.II have different tradeoffs and work better in
> >>>> different workloads etc, we
> >>>
> >>> could add an internal configuration to choose one of them at runtime.
> >>>>
> >>> Why only an internal configuration? Same goes for
> >>> internal.rack.aware.assignment.standby.strategry (which also has the
> typo)
> >>>
> >>> 4.
> >>>
> >>>>   There are no changes in public interfaces.
> >>>
> >>> I think it would be good to explicitly call out that users can utilize
> >>> this
> >>> new feature by setting the
> >>> ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example
> >>>
> >>> 5.
> >>>
> >>>> The idea is that if we always try to make it overlap as much with
> >>>> HAAssignor’s target
> >>>
> >>> assignment, at least there’s a higher chance that tasks won’t be
> shuffled
> >>> a
> >>>> lot if the clients
> >>>
> >>> remain the same across rebalances.
> >>>>
> >>> This line definitely gave me some pause -- if there was one major
> takeaway
> >>> I had after KIP-441,
> >>> one thing that most limited the feature's success, it was our
> assumption
> >>> that clients are relatively
> >>> stable across rebalances. This was mostly true at limited scale or for
> >>> on-prem setups, but
> >>> unsurprisingly broke down in cloud environments or larger clusters. Not
> >>> only do clients naturally
> >>> fall in and out of the group, autoscaling is becoming more and more of
> a
> >>> thing.
> >>>
> >>> Lastly, and this is more easily solved but still worth calling out, an
> >>> assignment is only deterministic
> >>> as long as the client.id is persisted. Currently in Streams, we only
> >>> write
> >>> the process UUID to the
> >>> state directory if there is one, ie if at least one persistent stateful
> >>> task exists in the topology. This
> >>> made sense in the context of KIP-441, which targeted heavily stateful
> >>> deployments, but this KIP
> >>> presumably intends to target more than just the persistent & stateful
> >>> subset of applications. To
> >>> make matters even worse,  "persistent" is defined in a semantically
> >>> inconsistent way throughout
> >>> Streams.
> >>>
> >>> All this is to say, it may sound more complicated to remember the
> previous
> >>> assignment, but (a)
> >>> imo it only introduces a lot more complexity and shaky assumptions to
> >>> continue down this
> >>> path, and (b) we actually already do persist some amount of state, like
> >>> the
> >>> process UUID, and
> >>> (c) it seems like this is the perfect opportunity to finally rid
> ourselves
> >>> of the determinism constraint
> >>> which has frankly caused more trouble and time lost in sum than it
> would
> >>> have taken us to just
> >>> write the HighAvailabilityTaskAssignor to consider the previous
> assignment
> >>> from the start in KIP-441
> >>>
> >>> 6.
> >>>
> >>>> StickyTaskAssignor  users who would like to use rack aware assignment
> >>>> should upgrade their
> >>>
> >>> Kafka Streams version to the version in which
> HighAvailabilityTaskAssignor
> >>>> and rack awareness
> >>>
> >>> assignment are available.
> >>>
> >>> Building off of the above, the HAAssignor hasn't worked out perfectly
> for
> >>> everybody up until now,
> >>> given that we are only adding complexity to it now, on the flipside I
> >>> would
> >>> hesitate to try and force
> >>> everyone to use it if they want to upgrade. We added a "secret"
> backdoor
> >>> internal config to allow
> >>> users to set the task assignor back in KIP-441 for this reason. WDYT
> about
> >>> bumping this to a public
> >>> config on the side in this KIP?
> >>>
> >>>
> >>> On Tue, May 23, 2023 at 11:46 AM Hao Li <hl...@confluent.io.invalid>
> wrote:
> >>>
> >>>> Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add
> >>> it to
> >>>> the test plan. I will also add tests to verify the new optimizer will
> >>>> produce a balanced assignment which has no worse cross AZ cost than
> the
> >>>> existing assignor.
> >>>>
> >>>> Hao
> >>>>
> >>>> On Mon, May 22, 2023 at 3:39 PM John Roesler <vv...@apache.org>
> >>> wrote:
> >>>>
> >>>>> Hi Hao,
> >>>>>
> >>>>> Thanks for the KIP!
> >>>>>
> >>>>> Overall, I think this is a great idea. I always wanted to circle back
> >>>>> after the Smooth Scaling KIP to put a proper optimization algorithm
> >>> into
> >>>>> place. I think this has the promise to really improve the quality of
> >>> the
> >>>>> balanced assignments we produce.
> >>>>>
> >>>>> Thanks for providing the details about the MaxCut/MinFlow algorithm.
> >>> It
> >>>>> seems like a good choice for me, assuming we choose the right scaling
> >>>>> factors for the weights we add to the graph. Unfortunately, I don't
> >>> think
> >>>>> that there's a good way to see how easy or hard this is going to be
> >>> until
> >>>>> we actually implement it and test it.
> >>>>>
> >>>>> That leads to the only real piece of feedback I had on the KIP, which
> >>> is
> >>>>> the testing portion. You mentioned system/integration/unit tests, but
> >>>>> there's not too much information about what those tests will do. I'd
> >>> like
> >>>>> to suggest that we invest in more simulation testing specifically,
> >>>> similar
> >>>>> to what we did in
> >>>>>
> >>>>
> >>>
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
> >>>>> .
> >>>>>
> >>>>> In fact, it seems like we _could_ write the simulation up front, and
> >>> then
> >>>>> implement the algorithm in a dummy way and just see whether it passes
> >>> the
> >>>>> simulations or not, before actually integrating it with Kafka
> Streams.
> >>>>>
> >>>>> Basically, I'd be +1 on this KIP today, but I'd feel confident about
> >>> it
> >>>> if
> >>>>> we had a little more detail regarding how we are going to verify that
> >>> the
> >>>>> new optimizer is actually going to produce more optimal plans than
> the
> >>>>> existing assigner we have today.
> >>>>>
> >>>>> Thanks again!
> >>>>> -John
> >>>>>
> >>>>> On 2023/05/22 16:49:22 Hao Li wrote:
> >>>>>> Hi Colt,
> >>>>>>
> >>>>>> Thanks for the comments.
> >>>>>>
> >>>>>>> and I struggle to see how the algorithm isn't at least O(N) where
> >>> N
> >>>> is
> >>>>>> the number of Tasks...?
> >>>>>>
> >>>>>> For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
> >>> Number
> >>>>> of
> >>>>>> edges E is T * N where T is the number of clients and N is the
> >>> number
> >>>> of
> >>>>>> Tasks. This is because a task can be assigned to any client so there
> >>>> will
> >>>>>> be an edge between every task and every client. The total complexity
> >>>>> would
> >>>>>> be O(T * N) if we want to be more specific.
> >>>>>>
> >>>>>>> But if the leaders for each partition are spread across multiple
> >>>> zones,
> >>>>>> how will you handle that?
> >>>>>>
> >>>>>> This is what the min-cost flow solution is trying to solve? i.e.
> >>> Find
> >>>> an
> >>>>>> assignment of tasks to clients where across AZ traffic can be
> >>>> minimized.
> >>>>>> But there are some constraints to the solution and one of them is we
> >>>> need
> >>>>>> to balance task assignment first (
> >>>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment
> >>>>> ).
> >>>>>> So in your example of three tasks' partitions being in the same AZ
> >>> of a
> >>>>>> client, if there are other clients, we still want to balance the
> >>> tasks
> >>>> to
> >>>>>> other clients even if putting all tasks to a single client can
> >>> result
> >>>> in
> >>>>> 0
> >>>>>> cross AZ traffic. In
> >>>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
> >>>>>> section, the algorithm will try to find a min-cost solution based on
> >>>>>> balanced assignment instead of pure min-cost.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Hao
> >>>>>>
> >>>>>> On Tue, May 9, 2023 at 5:55 PM Colt McNealy <co...@littlehorse.io>
> >>>> wrote:
> >>>>>>
> >>>>>>> Hello Hao,
> >>>>>>>
> >>>>>>> First of all, THANK YOU for putting this together. I had been
> >>> hoping
> >>>>>>> someone might bring something like this forward. A few comments:
> >>>>>>>
> >>>>>>> **1: Runtime Complexity
> >>>>>>>> Klein’s cycle canceling algorithm can solve the min-cost flow
> >>>>> problem in
> >>>>>>> O(E^2CU) time where C is max cost and U is max capacity. In our
> >>>>> particular
> >>>>>>> case, C is 1 and U is at most 3 (A task can have at most 3 topics
> >>>>> including
> >>>>>>> changelog topic?). So the algorithm runs in O(E^2) time for our
> >>> case.
> >>>>>>>
> >>>>>>> A Task can have multiple input topics, and also multiple state
> >>>> stores,
> >>>>> and
> >>>>>>> multiple output topics. The most common case is three topics as
> >>> you
> >>>>>>> described, but this is not necessarily guaranteed. Also, math is
> >>> one
> >>>>> of my
> >>>>>>> weak points, but to me O(E^2) is equivalent to O(1), and I
> >>> struggle
> >>>> to
> >>>>> see
> >>>>>>> how the algorithm isn't at least O(N) where N is the number of
> >>>>> Tasks...?
> >>>>>>>
> >>>>>>> **2: Broker-Side Partition Assignments
> >>>>>>> Consider the case with just three topics in a Task (one input, one
> >>>>> output,
> >>>>>>> one changelog). If all three partition leaders are in the same
> >>> Rack
> >>>> (or
> >>>>>>> better yet, the same broker), then we could get massive savings by
> >>>>>>> assigning the Task to that Rack/availability zone. But if the
> >>> leaders
> >>>>> for
> >>>>>>> each partition are spread across multiple zones, how will you
> >>> handle
> >>>>> that?
> >>>>>>> Is that outside the scope of this KIP, or is it worth introducing
> >>> a
> >>>>>>> kafka-streams-generate-rebalance-proposal.sh tool?
> >>>>>>>
> >>>>>>> Colt McNealy
> >>>>>>> *Founder, LittleHorse.io*
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, May 9, 2023 at 4:03 PM Hao Li <hl...@confluent.io.invalid>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi all,
> >>>>>>>>
> >>>>>>>> I have submitted KIP-925 to add rack awareness logic in task
> >>>>> assignment
> >>>>>>> in
> >>>>>>>> Kafka Streams and would like to start a discussion:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Thanks,
> >>>>>>>> Hao
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Thanks,
> >>>>>> Hao
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> Thanks,
> >>>> Hao
> >>>>
> >>>
> >>
> >>
> >> --
> >> Thanks,
> >> Hao
> >>
> >
> >
>


-- 
Thanks,
Hao

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by Bruno Cadonna <ca...@apache.org>.
Hi Hao,

Thanks for the updates!

What do you think about dropping config rack.aware.assignment.enabled 
and add value NONE to the enum for the possible values of config 
rack.aware.assignment.strategy?

Best,
Bruno

On 31.05.23 23:31, Hao Li wrote:
> Hi all,
> 
> I've updated the KIP based on the feedback. Major changes I made:
> 1. Add rack aware assignment to `StickyTaskAssignor`
> 2. Reject `Prefer reliability and then find optimal cost` option in standby
> task assignment.
> 
> 
> On Wed, May 31, 2023 at 12:09 PM Hao Li <hl...@confluent.io> wrote:
> 
>> Hi all,
>>
>> Thanks for the feedback! I will update the KIP accordingly.
>>
>> *For Sophie's comments:*
>>
>> 1 and 2. Good catch. Fixed these.
>>
>> 3 and 4 Yes. We can make this public config and call out the
>> clientConsumer config users need to set.
>>
>> 5. It's ideal to take the previous assignment in HAAssignor into
>> consideration when we compute our target assignment, the complications come
>> with making sure the assignment can eventually converge and we don't do
>> probing rebalance infinitely. It's not only about storing the previous
>> assignment or get it somehow. We can actually get the previous assignment
>> now like we do in StickyAssignor. But the previous assignment will change
>> in each round of probing rebalance. The proposal which added some weight to
>> make the rack aware assignment lean towards the original HAA's target
>> assignment will add benefits of stability in some corner cases in case of
>> tie in cross rack traffic cost. But it's not sticky. But the bottom line is
>> it won't be worse than current HAA's stickiness.
>>
>> 6. I'm fine with changing the assignor config to public. Actually, I think
>> we can min-cost algorithm with StickyAssignor as well to mitigate the
>> problem of 5. So we can have one public config to choose an assignor and
>> one public config to enable the rack aware assignment.
>>
>> *For Bruno's comments:*
>>
>> The proposal was to implement all the options and use configs to choose
>> them during runtime. We can make those configs public as suggested.
>> 1, 2, 3, 4, 5: agree and will fix those.
>> 6: subscription protocol is not changed.
>> 7: yeah. Let me fix the notations.
>> 8: It meant clients. In the figure, it maps to `c1_1`, `c1_2`, `c1_3` etc.
>> 9: I'm also ok with just optimizing reliability for standby tasks. Or we
>> could simply run the "balance reliability over cost" greedy algorithm to
>> see if any cost could be reduced.
>> 10: Make sense. Will fix the wording.
>> 11: Make sense. Will update the test part.
>>
>> *For Walker's comments:*
>> 1. Stability for HAA is an issue. See my comments for Sophie's feedback 5
>> and 6. I think we could use the rack aware assignment for StickyAssignor as
>> well. For HAA assignments, it's less sticky and we can only shoot for
>> minimizing the cross rack traffic eventually when everything is stable.
>> 2. Yeah. This is a good point and we can also turn it on for
>> StickyAssignor.
>>
>> Thanks,
>> Hao
>>
>>
>> On Tue, May 30, 2023 at 2:28 PM Sophie Blee-Goldman <
>> ableegoldman@gmail.com> wrote:
>>
>>> Hey Hao, thanks for the KIP!
>>>
>>> 1. There's a typo in the "internal.rack.aware.assignment.strategry"
>>> config,
>>> this
>>> should be internal.rack.aware.assignment.strategy.
>>>
>>> 2.
>>>
>>>>   For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
>>> Number of
>>>> edges E is T * N where T is the number of clients and N is the number of
>>>> Tasks. This is because a task can be assigned to any client so there
>>> will
>>>> be an edge between every task and every client. The total complexity
>>> would
>>>> be O(T * N) if we want to be more specific.
>>>
>>> I feel like I'm missing something here, but if E = T * N and the
>>> complexity
>>> is ~O(E^2), doesn't
>>> this make the total complexity order of O(T^2 * N^2)?
>>>
>>> 3.
>>>
>>>> Since 3.C.I and 3.C.II have different tradeoffs and work better in
>>>> different workloads etc, we
>>>
>>> could add an internal configuration to choose one of them at runtime.
>>>>
>>> Why only an internal configuration? Same goes for
>>> internal.rack.aware.assignment.standby.strategry (which also has the typo)
>>>
>>> 4.
>>>
>>>>   There are no changes in public interfaces.
>>>
>>> I think it would be good to explicitly call out that users can utilize
>>> this
>>> new feature by setting the
>>> ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example
>>>
>>> 5.
>>>
>>>> The idea is that if we always try to make it overlap as much with
>>>> HAAssignor’s target
>>>
>>> assignment, at least there’s a higher chance that tasks won’t be shuffled
>>> a
>>>> lot if the clients
>>>
>>> remain the same across rebalances.
>>>>
>>> This line definitely gave me some pause -- if there was one major takeaway
>>> I had after KIP-441,
>>> one thing that most limited the feature's success, it was our assumption
>>> that clients are relatively
>>> stable across rebalances. This was mostly true at limited scale or for
>>> on-prem setups, but
>>> unsurprisingly broke down in cloud environments or larger clusters. Not
>>> only do clients naturally
>>> fall in and out of the group, autoscaling is becoming more and more of a
>>> thing.
>>>
>>> Lastly, and this is more easily solved but still worth calling out, an
>>> assignment is only deterministic
>>> as long as the client.id is persisted. Currently in Streams, we only
>>> write
>>> the process UUID to the
>>> state directory if there is one, ie if at least one persistent stateful
>>> task exists in the topology. This
>>> made sense in the context of KIP-441, which targeted heavily stateful
>>> deployments, but this KIP
>>> presumably intends to target more than just the persistent & stateful
>>> subset of applications. To
>>> make matters even worse,  "persistent" is defined in a semantically
>>> inconsistent way throughout
>>> Streams.
>>>
>>> All this is to say, it may sound more complicated to remember the previous
>>> assignment, but (a)
>>> imo it only introduces a lot more complexity and shaky assumptions to
>>> continue down this
>>> path, and (b) we actually already do persist some amount of state, like
>>> the
>>> process UUID, and
>>> (c) it seems like this is the perfect opportunity to finally rid ourselves
>>> of the determinism constraint
>>> which has frankly caused more trouble and time lost in sum than it would
>>> have taken us to just
>>> write the HighAvailabilityTaskAssignor to consider the previous assignment
>>> from the start in KIP-441
>>>
>>> 6.
>>>
>>>> StickyTaskAssignor  users who would like to use rack aware assignment
>>>> should upgrade their
>>>
>>> Kafka Streams version to the version in which HighAvailabilityTaskAssignor
>>>> and rack awareness
>>>
>>> assignment are available.
>>>
>>> Building off of the above, the HAAssignor hasn't worked out perfectly for
>>> everybody up until now,
>>> given that we are only adding complexity to it now, on the flipside I
>>> would
>>> hesitate to try and force
>>> everyone to use it if they want to upgrade. We added a "secret" backdoor
>>> internal config to allow
>>> users to set the task assignor back in KIP-441 for this reason. WDYT about
>>> bumping this to a public
>>> config on the side in this KIP?
>>>
>>>
>>> On Tue, May 23, 2023 at 11:46 AM Hao Li <hl...@confluent.io.invalid> wrote:
>>>
>>>> Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add
>>> it to
>>>> the test plan. I will also add tests to verify the new optimizer will
>>>> produce a balanced assignment which has no worse cross AZ cost than the
>>>> existing assignor.
>>>>
>>>> Hao
>>>>
>>>> On Mon, May 22, 2023 at 3:39 PM John Roesler <vv...@apache.org>
>>> wrote:
>>>>
>>>>> Hi Hao,
>>>>>
>>>>> Thanks for the KIP!
>>>>>
>>>>> Overall, I think this is a great idea. I always wanted to circle back
>>>>> after the Smooth Scaling KIP to put a proper optimization algorithm
>>> into
>>>>> place. I think this has the promise to really improve the quality of
>>> the
>>>>> balanced assignments we produce.
>>>>>
>>>>> Thanks for providing the details about the MaxCut/MinFlow algorithm.
>>> It
>>>>> seems like a good choice for me, assuming we choose the right scaling
>>>>> factors for the weights we add to the graph. Unfortunately, I don't
>>> think
>>>>> that there's a good way to see how easy or hard this is going to be
>>> until
>>>>> we actually implement it and test it.
>>>>>
>>>>> That leads to the only real piece of feedback I had on the KIP, which
>>> is
>>>>> the testing portion. You mentioned system/integration/unit tests, but
>>>>> there's not too much information about what those tests will do. I'd
>>> like
>>>>> to suggest that we invest in more simulation testing specifically,
>>>> similar
>>>>> to what we did in
>>>>>
>>>>
>>> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
>>>>> .
>>>>>
>>>>> In fact, it seems like we _could_ write the simulation up front, and
>>> then
>>>>> implement the algorithm in a dummy way and just see whether it passes
>>> the
>>>>> simulations or not, before actually integrating it with Kafka Streams.
>>>>>
>>>>> Basically, I'd be +1 on this KIP today, but I'd feel confident about
>>> it
>>>> if
>>>>> we had a little more detail regarding how we are going to verify that
>>> the
>>>>> new optimizer is actually going to produce more optimal plans than the
>>>>> existing assigner we have today.
>>>>>
>>>>> Thanks again!
>>>>> -John
>>>>>
>>>>> On 2023/05/22 16:49:22 Hao Li wrote:
>>>>>> Hi Colt,
>>>>>>
>>>>>> Thanks for the comments.
>>>>>>
>>>>>>> and I struggle to see how the algorithm isn't at least O(N) where
>>> N
>>>> is
>>>>>> the number of Tasks...?
>>>>>>
>>>>>> For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
>>> Number
>>>>> of
>>>>>> edges E is T * N where T is the number of clients and N is the
>>> number
>>>> of
>>>>>> Tasks. This is because a task can be assigned to any client so there
>>>> will
>>>>>> be an edge between every task and every client. The total complexity
>>>>> would
>>>>>> be O(T * N) if we want to be more specific.
>>>>>>
>>>>>>> But if the leaders for each partition are spread across multiple
>>>> zones,
>>>>>> how will you handle that?
>>>>>>
>>>>>> This is what the min-cost flow solution is trying to solve? i.e.
>>> Find
>>>> an
>>>>>> assignment of tasks to clients where across AZ traffic can be
>>>> minimized.
>>>>>> But there are some constraints to the solution and one of them is we
>>>> need
>>>>>> to balance task assignment first (
>>>>>>
>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment
>>>>> ).
>>>>>> So in your example of three tasks' partitions being in the same AZ
>>> of a
>>>>>> client, if there are other clients, we still want to balance the
>>> tasks
>>>> to
>>>>>> other clients even if putting all tasks to a single client can
>>> result
>>>> in
>>>>> 0
>>>>>> cross AZ traffic. In
>>>>>>
>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
>>>>>> section, the algorithm will try to find a min-cost solution based on
>>>>>> balanced assignment instead of pure min-cost.
>>>>>>
>>>>>> Thanks,
>>>>>> Hao
>>>>>>
>>>>>> On Tue, May 9, 2023 at 5:55 PM Colt McNealy <co...@littlehorse.io>
>>>> wrote:
>>>>>>
>>>>>>> Hello Hao,
>>>>>>>
>>>>>>> First of all, THANK YOU for putting this together. I had been
>>> hoping
>>>>>>> someone might bring something like this forward. A few comments:
>>>>>>>
>>>>>>> **1: Runtime Complexity
>>>>>>>> Klein’s cycle canceling algorithm can solve the min-cost flow
>>>>> problem in
>>>>>>> O(E^2CU) time where C is max cost and U is max capacity. In our
>>>>> particular
>>>>>>> case, C is 1 and U is at most 3 (A task can have at most 3 topics
>>>>> including
>>>>>>> changelog topic?). So the algorithm runs in O(E^2) time for our
>>> case.
>>>>>>>
>>>>>>> A Task can have multiple input topics, and also multiple state
>>>> stores,
>>>>> and
>>>>>>> multiple output topics. The most common case is three topics as
>>> you
>>>>>>> described, but this is not necessarily guaranteed. Also, math is
>>> one
>>>>> of my
>>>>>>> weak points, but to me O(E^2) is equivalent to O(1), and I
>>> struggle
>>>> to
>>>>> see
>>>>>>> how the algorithm isn't at least O(N) where N is the number of
>>>>> Tasks...?
>>>>>>>
>>>>>>> **2: Broker-Side Partition Assignments
>>>>>>> Consider the case with just three topics in a Task (one input, one
>>>>> output,
>>>>>>> one changelog). If all three partition leaders are in the same
>>> Rack
>>>> (or
>>>>>>> better yet, the same broker), then we could get massive savings by
>>>>>>> assigning the Task to that Rack/availability zone. But if the
>>> leaders
>>>>> for
>>>>>>> each partition are spread across multiple zones, how will you
>>> handle
>>>>> that?
>>>>>>> Is that outside the scope of this KIP, or is it worth introducing
>>> a
>>>>>>> kafka-streams-generate-rebalance-proposal.sh tool?
>>>>>>>
>>>>>>> Colt McNealy
>>>>>>> *Founder, LittleHorse.io*
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 9, 2023 at 4:03 PM Hao Li <hl...@confluent.io.invalid>
>>>>> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I have submitted KIP-925 to add rack awareness logic in task
>>>>> assignment
>>>>>>> in
>>>>>>>> Kafka Streams and would like to start a discussion:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
>>>>>>>>
>>>>>>>> --
>>>>>>>> Thanks,
>>>>>>>> Hao
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Hao
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Hao
>>>>
>>>
>>
>>
>> --
>> Thanks,
>> Hao
>>
> 
> 

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by Hao Li <hl...@confluent.io.INVALID>.
Hi all,

I've updated the KIP based on the feedback. Major changes I made:
1. Add rack aware assignment to `StickyTaskAssignor`
2. Reject `Prefer reliability and then find optimal cost` option in standby
task assignment.


On Wed, May 31, 2023 at 12:09 PM Hao Li <hl...@confluent.io> wrote:

> Hi all,
>
> Thanks for the feedback! I will update the KIP accordingly.
>
> *For Sophie's comments:*
>
> 1 and 2. Good catch. Fixed these.
>
> 3 and 4 Yes. We can make this public config and call out the
> clientConsumer config users need to set.
>
> 5. It's ideal to take the previous assignment in HAAssignor into
> consideration when we compute our target assignment, the complications come
> with making sure the assignment can eventually converge and we don't do
> probing rebalance infinitely. It's not only about storing the previous
> assignment or get it somehow. We can actually get the previous assignment
> now like we do in StickyAssignor. But the previous assignment will change
> in each round of probing rebalance. The proposal which added some weight to
> make the rack aware assignment lean towards the original HAA's target
> assignment will add benefits of stability in some corner cases in case of
> tie in cross rack traffic cost. But it's not sticky. But the bottom line is
> it won't be worse than current HAA's stickiness.
>
> 6. I'm fine with changing the assignor config to public. Actually, I think
> we can min-cost algorithm with StickyAssignor as well to mitigate the
> problem of 5. So we can have one public config to choose an assignor and
> one public config to enable the rack aware assignment.
>
> *For Bruno's comments:*
>
> The proposal was to implement all the options and use configs to choose
> them during runtime. We can make those configs public as suggested.
> 1, 2, 3, 4, 5: agree and will fix those.
> 6: subscription protocol is not changed.
> 7: yeah. Let me fix the notations.
> 8: It meant clients. In the figure, it maps to `c1_1`, `c1_2`, `c1_3` etc.
> 9: I'm also ok with just optimizing reliability for standby tasks. Or we
> could simply run the "balance reliability over cost" greedy algorithm to
> see if any cost could be reduced.
> 10: Make sense. Will fix the wording.
> 11: Make sense. Will update the test part.
>
> *For Walker's comments:*
> 1. Stability for HAA is an issue. See my comments for Sophie's feedback 5
> and 6. I think we could use the rack aware assignment for StickyAssignor as
> well. For HAA assignments, it's less sticky and we can only shoot for
> minimizing the cross rack traffic eventually when everything is stable.
> 2. Yeah. This is a good point and we can also turn it on for
> StickyAssignor.
>
> Thanks,
> Hao
>
>
> On Tue, May 30, 2023 at 2:28 PM Sophie Blee-Goldman <
> ableegoldman@gmail.com> wrote:
>
>> Hey Hao, thanks for the KIP!
>>
>> 1. There's a typo in the "internal.rack.aware.assignment.strategry"
>> config,
>> this
>> should be internal.rack.aware.assignment.strategy.
>>
>> 2.
>>
>> >  For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
>> Number of
>> > edges E is T * N where T is the number of clients and N is the number of
>> > Tasks. This is because a task can be assigned to any client so there
>> will
>> > be an edge between every task and every client. The total complexity
>> would
>> > be O(T * N) if we want to be more specific.
>>
>> I feel like I'm missing something here, but if E = T * N and the
>> complexity
>> is ~O(E^2), doesn't
>> this make the total complexity order of O(T^2 * N^2)?
>>
>> 3.
>>
>> > Since 3.C.I and 3.C.II have different tradeoffs and work better in
>> > different workloads etc, we
>>
>> could add an internal configuration to choose one of them at runtime.
>> >
>> Why only an internal configuration? Same goes for
>> internal.rack.aware.assignment.standby.strategry (which also has the typo)
>>
>> 4.
>>
>> >  There are no changes in public interfaces.
>>
>> I think it would be good to explicitly call out that users can utilize
>> this
>> new feature by setting the
>> ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example
>>
>> 5.
>>
>> > The idea is that if we always try to make it overlap as much with
>> > HAAssignor’s target
>>
>> assignment, at least there’s a higher chance that tasks won’t be shuffled
>> a
>> > lot if the clients
>>
>> remain the same across rebalances.
>> >
>> This line definitely gave me some pause -- if there was one major takeaway
>> I had after KIP-441,
>> one thing that most limited the feature's success, it was our assumption
>> that clients are relatively
>> stable across rebalances. This was mostly true at limited scale or for
>> on-prem setups, but
>> unsurprisingly broke down in cloud environments or larger clusters. Not
>> only do clients naturally
>> fall in and out of the group, autoscaling is becoming more and more of a
>> thing.
>>
>> Lastly, and this is more easily solved but still worth calling out, an
>> assignment is only deterministic
>> as long as the client.id is persisted. Currently in Streams, we only
>> write
>> the process UUID to the
>> state directory if there is one, ie if at least one persistent stateful
>> task exists in the topology. This
>> made sense in the context of KIP-441, which targeted heavily stateful
>> deployments, but this KIP
>> presumably intends to target more than just the persistent & stateful
>> subset of applications. To
>> make matters even worse,  "persistent" is defined in a semantically
>> inconsistent way throughout
>> Streams.
>>
>> All this is to say, it may sound more complicated to remember the previous
>> assignment, but (a)
>> imo it only introduces a lot more complexity and shaky assumptions to
>> continue down this
>> path, and (b) we actually already do persist some amount of state, like
>> the
>> process UUID, and
>> (c) it seems like this is the perfect opportunity to finally rid ourselves
>> of the determinism constraint
>> which has frankly caused more trouble and time lost in sum than it would
>> have taken us to just
>> write the HighAvailabilityTaskAssignor to consider the previous assignment
>> from the start in KIP-441
>>
>> 6.
>>
>> > StickyTaskAssignor  users who would like to use rack aware assignment
>> > should upgrade their
>>
>> Kafka Streams version to the version in which HighAvailabilityTaskAssignor
>> > and rack awareness
>>
>> assignment are available.
>>
>> Building off of the above, the HAAssignor hasn't worked out perfectly for
>> everybody up until now,
>> given that we are only adding complexity to it now, on the flipside I
>> would
>> hesitate to try and force
>> everyone to use it if they want to upgrade. We added a "secret" backdoor
>> internal config to allow
>> users to set the task assignor back in KIP-441 for this reason. WDYT about
>> bumping this to a public
>> config on the side in this KIP?
>>
>>
>> On Tue, May 23, 2023 at 11:46 AM Hao Li <hl...@confluent.io.invalid> wrote:
>>
>> > Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add
>> it to
>> > the test plan. I will also add tests to verify the new optimizer will
>> > produce a balanced assignment which has no worse cross AZ cost than the
>> > existing assignor.
>> >
>> > Hao
>> >
>> > On Mon, May 22, 2023 at 3:39 PM John Roesler <vv...@apache.org>
>> wrote:
>> >
>> > > Hi Hao,
>> > >
>> > > Thanks for the KIP!
>> > >
>> > > Overall, I think this is a great idea. I always wanted to circle back
>> > > after the Smooth Scaling KIP to put a proper optimization algorithm
>> into
>> > > place. I think this has the promise to really improve the quality of
>> the
>> > > balanced assignments we produce.
>> > >
>> > > Thanks for providing the details about the MaxCut/MinFlow algorithm.
>> It
>> > > seems like a good choice for me, assuming we choose the right scaling
>> > > factors for the weights we add to the graph. Unfortunately, I don't
>> think
>> > > that there's a good way to see how easy or hard this is going to be
>> until
>> > > we actually implement it and test it.
>> > >
>> > > That leads to the only real piece of feedback I had on the KIP, which
>> is
>> > > the testing portion. You mentioned system/integration/unit tests, but
>> > > there's not too much information about what those tests will do. I'd
>> like
>> > > to suggest that we invest in more simulation testing specifically,
>> > similar
>> > > to what we did in
>> > >
>> >
>> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
>> > > .
>> > >
>> > > In fact, it seems like we _could_ write the simulation up front, and
>> then
>> > > implement the algorithm in a dummy way and just see whether it passes
>> the
>> > > simulations or not, before actually integrating it with Kafka Streams.
>> > >
>> > > Basically, I'd be +1 on this KIP today, but I'd feel confident about
>> it
>> > if
>> > > we had a little more detail regarding how we are going to verify that
>> the
>> > > new optimizer is actually going to produce more optimal plans than the
>> > > existing assigner we have today.
>> > >
>> > > Thanks again!
>> > > -John
>> > >
>> > > On 2023/05/22 16:49:22 Hao Li wrote:
>> > > > Hi Colt,
>> > > >
>> > > > Thanks for the comments.
>> > > >
>> > > > > and I struggle to see how the algorithm isn't at least O(N) where
>> N
>> > is
>> > > > the number of Tasks...?
>> > > >
>> > > > For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
>> Number
>> > > of
>> > > > edges E is T * N where T is the number of clients and N is the
>> number
>> > of
>> > > > Tasks. This is because a task can be assigned to any client so there
>> > will
>> > > > be an edge between every task and every client. The total complexity
>> > > would
>> > > > be O(T * N) if we want to be more specific.
>> > > >
>> > > > > But if the leaders for each partition are spread across multiple
>> > zones,
>> > > > how will you handle that?
>> > > >
>> > > > This is what the min-cost flow solution is trying to solve? i.e.
>> Find
>> > an
>> > > > assignment of tasks to clients where across AZ traffic can be
>> > minimized.
>> > > > But there are some constraints to the solution and one of them is we
>> > need
>> > > > to balance task assignment first (
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment
>> > > ).
>> > > > So in your example of three tasks' partitions being in the same AZ
>> of a
>> > > > client, if there are other clients, we still want to balance the
>> tasks
>> > to
>> > > > other clients even if putting all tasks to a single client can
>> result
>> > in
>> > > 0
>> > > > cross AZ traffic. In
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
>> > > > section, the algorithm will try to find a min-cost solution based on
>> > > > balanced assignment instead of pure min-cost.
>> > > >
>> > > > Thanks,
>> > > > Hao
>> > > >
>> > > > On Tue, May 9, 2023 at 5:55 PM Colt McNealy <co...@littlehorse.io>
>> > wrote:
>> > > >
>> > > > > Hello Hao,
>> > > > >
>> > > > > First of all, THANK YOU for putting this together. I had been
>> hoping
>> > > > > someone might bring something like this forward. A few comments:
>> > > > >
>> > > > > **1: Runtime Complexity
>> > > > > > Klein’s cycle canceling algorithm can solve the min-cost flow
>> > > problem in
>> > > > > O(E^2CU) time where C is max cost and U is max capacity. In our
>> > > particular
>> > > > > case, C is 1 and U is at most 3 (A task can have at most 3 topics
>> > > including
>> > > > > changelog topic?). So the algorithm runs in O(E^2) time for our
>> case.
>> > > > >
>> > > > > A Task can have multiple input topics, and also multiple state
>> > stores,
>> > > and
>> > > > > multiple output topics. The most common case is three topics as
>> you
>> > > > > described, but this is not necessarily guaranteed. Also, math is
>> one
>> > > of my
>> > > > > weak points, but to me O(E^2) is equivalent to O(1), and I
>> struggle
>> > to
>> > > see
>> > > > > how the algorithm isn't at least O(N) where N is the number of
>> > > Tasks...?
>> > > > >
>> > > > > **2: Broker-Side Partition Assignments
>> > > > > Consider the case with just three topics in a Task (one input, one
>> > > output,
>> > > > > one changelog). If all three partition leaders are in the same
>> Rack
>> > (or
>> > > > > better yet, the same broker), then we could get massive savings by
>> > > > > assigning the Task to that Rack/availability zone. But if the
>> leaders
>> > > for
>> > > > > each partition are spread across multiple zones, how will you
>> handle
>> > > that?
>> > > > > Is that outside the scope of this KIP, or is it worth introducing
>> a
>> > > > > kafka-streams-generate-rebalance-proposal.sh tool?
>> > > > >
>> > > > > Colt McNealy
>> > > > > *Founder, LittleHorse.io*
>> > > > >
>> > > > >
>> > > > > On Tue, May 9, 2023 at 4:03 PM Hao Li <hl...@confluent.io.invalid>
>> > > wrote:
>> > > > >
>> > > > > > Hi all,
>> > > > > >
>> > > > > > I have submitted KIP-925 to add rack awareness logic in task
>> > > assignment
>> > > > > in
>> > > > > > Kafka Streams and would like to start a discussion:
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
>> > > > > >
>> > > > > > --
>> > > > > > Thanks,
>> > > > > > Hao
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > > --
>> > > > Thanks,
>> > > > Hao
>> > > >
>> > >
>> >
>> >
>> > --
>> > Thanks,
>> > Hao
>> >
>>
>
>
> --
> Thanks,
> Hao
>


-- 
Thanks,
Hao

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by Hao Li <hl...@confluent.io.INVALID>.
Thanks Sophie! I've just made the updates :)

On Wed, May 31, 2023 at 2:29 PM Sophie Blee-Goldman <ab...@gmail.com>
wrote:

> Thanks Hao -- I really like Walker's idea to separate the assignment
> strategy from the rack awareness option.
> Having two different configs for these makes a lot of sense to me and seems
> like it will benefit the most users.
>
> Given your KIP already has a lot going on with it, I would be happy to
> write up a quick KIP just for the public
> assignor config to take that aspect off your hands. We can discuss that
> separately and keep this thread focused
> on the rack awareness algorithm. If that sounds good I'll kick off a KIP
> tonight, then you can just update the
> "Current Task Assignment Logic" section to mention that either assignor may
> be plugged in, and the rack
> awareness can be configured independently.
>
> If users are able to plug in the StickyAssignor if they experience
> problems, this is good enough for me w.r.t my
> other concern regarding task shuffling
>
> On Wed, May 31, 2023 at 12:09 PM Hao Li <hl...@confluent.io.invalid> wrote:
>
> > Hi all,
> >
> > Thanks for the feedback! I will update the KIP accordingly.
> >
> > *For Sophie's comments:*
> >
> > 1 and 2. Good catch. Fixed these.
> >
> > 3 and 4 Yes. We can make this public config and call out the
> clientConsumer
> > config users need to set.
> >
> > 5. It's ideal to take the previous assignment in HAAssignor into
> > consideration when we compute our target assignment, the complications
> come
> > with making sure the assignment can eventually converge and we don't do
> > probing rebalance infinitely. It's not only about storing the previous
> > assignment or get it somehow. We can actually get the previous assignment
> > now like we do in StickyAssignor. But the previous assignment will change
> > in each round of probing rebalance. The proposal which added some weight
> to
> > make the rack aware assignment lean towards the original HAA's target
> > assignment will add benefits of stability in some corner cases in case of
> > tie in cross rack traffic cost. But it's not sticky. But the bottom line
> is
> > it won't be worse than current HAA's stickiness.
> >
> > 6. I'm fine with changing the assignor config to public. Actually, I
> think
> > we can min-cost algorithm with StickyAssignor as well to mitigate the
> > problem of 5. So we can have one public config to choose an assignor and
> > one public config to enable the rack aware assignment.
> >
> > *For Bruno's comments:*
> >
> > The proposal was to implement all the options and use configs to choose
> > them during runtime. We can make those configs public as suggested.
> > 1, 2, 3, 4, 5: agree and will fix those.
> > 6: subscription protocol is not changed.
> > 7: yeah. Let me fix the notations.
> > 8: It meant clients. In the figure, it maps to `c1_1`, `c1_2`, `c1_3`
> etc.
> > 9: I'm also ok with just optimizing reliability for standby tasks. Or we
> > could simply run the "balance reliability over cost" greedy algorithm to
> > see if any cost could be reduced.
> > 10: Make sense. Will fix the wording.
> > 11: Make sense. Will update the test part.
> >
> > *For Walker's comments:*
> > 1. Stability for HAA is an issue. See my comments for Sophie's feedback 5
> > and 6. I think we could use the rack aware assignment for StickyAssignor
> as
> > well. For HAA assignments, it's less sticky and we can only shoot for
> > minimizing the cross rack traffic eventually when everything is stable.
> > 2. Yeah. This is a good point and we can also turn it on for
> > StickyAssignor.
> >
> > Thanks,
> > Hao
> >
> >
> > On Tue, May 30, 2023 at 2:28 PM Sophie Blee-Goldman <
> > ableegoldman@gmail.com>
> > wrote:
> >
> > > Hey Hao, thanks for the KIP!
> > >
> > > 1. There's a typo in the "internal.rack.aware.assignment.strategry"
> > config,
> > > this
> > > should be internal.rack.aware.assignment.strategy.
> > >
> > > 2.
> > >
> > > >  For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
> > Number
> > > of
> > > > edges E is T * N where T is the number of clients and N is the number
> > of
> > > > Tasks. This is because a task can be assigned to any client so there
> > will
> > > > be an edge between every task and every client. The total complexity
> > > would
> > > > be O(T * N) if we want to be more specific.
> > >
> > > I feel like I'm missing something here, but if E = T * N and the
> > complexity
> > > is ~O(E^2), doesn't
> > > this make the total complexity order of O(T^2 * N^2)?
> > >
> > > 3.
> > >
> > > > Since 3.C.I and 3.C.II have different tradeoffs and work better in
> > > > different workloads etc, we
> > >
> > > could add an internal configuration to choose one of them at runtime.
> > > >
> > > Why only an internal configuration? Same goes for
> > > internal.rack.aware.assignment.standby.strategry (which also has the
> > typo)
> > >
> > > 4.
> > >
> > > >  There are no changes in public interfaces.
> > >
> > > I think it would be good to explicitly call out that users can utilize
> > this
> > > new feature by setting the
> > > ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example
> > >
> > > 5.
> > >
> > > > The idea is that if we always try to make it overlap as much with
> > > > HAAssignor’s target
> > >
> > > assignment, at least there’s a higher chance that tasks won’t be
> > shuffled a
> > > > lot if the clients
> > >
> > > remain the same across rebalances.
> > > >
> > > This line definitely gave me some pause -- if there was one major
> > takeaway
> > > I had after KIP-441,
> > > one thing that most limited the feature's success, it was our
> assumption
> > > that clients are relatively
> > > stable across rebalances. This was mostly true at limited scale or for
> > > on-prem setups, but
> > > unsurprisingly broke down in cloud environments or larger clusters. Not
> > > only do clients naturally
> > > fall in and out of the group, autoscaling is becoming more and more of
> a
> > > thing.
> > >
> > > Lastly, and this is more easily solved but still worth calling out, an
> > > assignment is only deterministic
> > > as long as the client.id is persisted. Currently in Streams, we only
> > write
> > > the process UUID to the
> > > state directory if there is one, ie if at least one persistent stateful
> > > task exists in the topology. This
> > > made sense in the context of KIP-441, which targeted heavily stateful
> > > deployments, but this KIP
> > > presumably intends to target more than just the persistent & stateful
> > > subset of applications. To
> > > make matters even worse,  "persistent" is defined in a semantically
> > > inconsistent way throughout
> > > Streams.
> > >
> > > All this is to say, it may sound more complicated to remember the
> > previous
> > > assignment, but (a)
> > > imo it only introduces a lot more complexity and shaky assumptions to
> > > continue down this
> > > path, and (b) we actually already do persist some amount of state, like
> > the
> > > process UUID, and
> > > (c) it seems like this is the perfect opportunity to finally rid
> > ourselves
> > > of the determinism constraint
> > > which has frankly caused more trouble and time lost in sum than it
> would
> > > have taken us to just
> > > write the HighAvailabilityTaskAssignor to consider the previous
> > assignment
> > > from the start in KIP-441
> > >
> > > 6.
> > >
> > > > StickyTaskAssignor  users who would like to use rack aware assignment
> > > > should upgrade their
> > >
> > > Kafka Streams version to the version in which
> > HighAvailabilityTaskAssignor
> > > > and rack awareness
> > >
> > > assignment are available.
> > >
> > > Building off of the above, the HAAssignor hasn't worked out perfectly
> for
> > > everybody up until now,
> > > given that we are only adding complexity to it now, on the flipside I
> > would
> > > hesitate to try and force
> > > everyone to use it if they want to upgrade. We added a "secret"
> backdoor
> > > internal config to allow
> > > users to set the task assignor back in KIP-441 for this reason. WDYT
> > about
> > > bumping this to a public
> > > config on the side in this KIP?
> > >
> > >
> > > On Tue, May 23, 2023 at 11:46 AM Hao Li <hl...@confluent.io.invalid>
> > wrote:
> > >
> > > > Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add
> > it
> > > to
> > > > the test plan. I will also add tests to verify the new optimizer will
> > > > produce a balanced assignment which has no worse cross AZ cost than
> the
> > > > existing assignor.
> > > >
> > > > Hao
> > > >
> > > > On Mon, May 22, 2023 at 3:39 PM John Roesler <vv...@apache.org>
> > > wrote:
> > > >
> > > > > Hi Hao,
> > > > >
> > > > > Thanks for the KIP!
> > > > >
> > > > > Overall, I think this is a great idea. I always wanted to circle
> back
> > > > > after the Smooth Scaling KIP to put a proper optimization algorithm
> > > into
> > > > > place. I think this has the promise to really improve the quality
> of
> > > the
> > > > > balanced assignments we produce.
> > > > >
> > > > > Thanks for providing the details about the MaxCut/MinFlow
> algorithm.
> > It
> > > > > seems like a good choice for me, assuming we choose the right
> scaling
> > > > > factors for the weights we add to the graph. Unfortunately, I don't
> > > think
> > > > > that there's a good way to see how easy or hard this is going to be
> > > until
> > > > > we actually implement it and test it.
> > > > >
> > > > > That leads to the only real piece of feedback I had on the KIP,
> which
> > > is
> > > > > the testing portion. You mentioned system/integration/unit tests,
> but
> > > > > there's not too much information about what those tests will do.
> I'd
> > > like
> > > > > to suggest that we invest in more simulation testing specifically,
> > > > similar
> > > > > to what we did in
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
> > > > > .
> > > > >
> > > > > In fact, it seems like we _could_ write the simulation up front,
> and
> > > then
> > > > > implement the algorithm in a dummy way and just see whether it
> passes
> > > the
> > > > > simulations or not, before actually integrating it with Kafka
> > Streams.
> > > > >
> > > > > Basically, I'd be +1 on this KIP today, but I'd feel confident
> about
> > it
> > > > if
> > > > > we had a little more detail regarding how we are going to verify
> that
> > > the
> > > > > new optimizer is actually going to produce more optimal plans than
> > the
> > > > > existing assigner we have today.
> > > > >
> > > > > Thanks again!
> > > > > -John
> > > > >
> > > > > On 2023/05/22 16:49:22 Hao Li wrote:
> > > > > > Hi Colt,
> > > > > >
> > > > > > Thanks for the comments.
> > > > > >
> > > > > > > and I struggle to see how the algorithm isn't at least O(N)
> > where N
> > > > is
> > > > > > the number of Tasks...?
> > > > > >
> > > > > > For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
> > > Number
> > > > > of
> > > > > > edges E is T * N where T is the number of clients and N is the
> > number
> > > > of
> > > > > > Tasks. This is because a task can be assigned to any client so
> > there
> > > > will
> > > > > > be an edge between every task and every client. The total
> > complexity
> > > > > would
> > > > > > be O(T * N) if we want to be more specific.
> > > > > >
> > > > > > > But if the leaders for each partition are spread across
> multiple
> > > > zones,
> > > > > > how will you handle that?
> > > > > >
> > > > > > This is what the min-cost flow solution is trying to solve? i.e.
> > Find
> > > > an
> > > > > > assignment of tasks to clients where across AZ traffic can be
> > > > minimized.
> > > > > > But there are some constraints to the solution and one of them is
> > we
> > > > need
> > > > > > to balance task assignment first (
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment
> > > > > ).
> > > > > > So in your example of three tasks' partitions being in the same
> AZ
> > > of a
> > > > > > client, if there are other clients, we still want to balance the
> > > tasks
> > > > to
> > > > > > other clients even if putting all tasks to a single client can
> > result
> > > > in
> > > > > 0
> > > > > > cross AZ traffic. In
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
> > > > > > section, the algorithm will try to find a min-cost solution based
> > on
> > > > > > balanced assignment instead of pure min-cost.
> > > > > >
> > > > > > Thanks,
> > > > > > Hao
> > > > > >
> > > > > > On Tue, May 9, 2023 at 5:55 PM Colt McNealy <colt@littlehorse.io
> >
> > > > wrote:
> > > > > >
> > > > > > > Hello Hao,
> > > > > > >
> > > > > > > First of all, THANK YOU for putting this together. I had been
> > > hoping
> > > > > > > someone might bring something like this forward. A few
> comments:
> > > > > > >
> > > > > > > **1: Runtime Complexity
> > > > > > > > Klein’s cycle canceling algorithm can solve the min-cost flow
> > > > > problem in
> > > > > > > O(E^2CU) time where C is max cost and U is max capacity. In our
> > > > > particular
> > > > > > > case, C is 1 and U is at most 3 (A task can have at most 3
> topics
> > > > > including
> > > > > > > changelog topic?). So the algorithm runs in O(E^2) time for our
> > > case.
> > > > > > >
> > > > > > > A Task can have multiple input topics, and also multiple state
> > > > stores,
> > > > > and
> > > > > > > multiple output topics. The most common case is three topics as
> > you
> > > > > > > described, but this is not necessarily guaranteed. Also, math
> is
> > > one
> > > > > of my
> > > > > > > weak points, but to me O(E^2) is equivalent to O(1), and I
> > struggle
> > > > to
> > > > > see
> > > > > > > how the algorithm isn't at least O(N) where N is the number of
> > > > > Tasks...?
> > > > > > >
> > > > > > > **2: Broker-Side Partition Assignments
> > > > > > > Consider the case with just three topics in a Task (one input,
> > one
> > > > > output,
> > > > > > > one changelog). If all three partition leaders are in the same
> > Rack
> > > > (or
> > > > > > > better yet, the same broker), then we could get massive savings
> > by
> > > > > > > assigning the Task to that Rack/availability zone. But if the
> > > leaders
> > > > > for
> > > > > > > each partition are spread across multiple zones, how will you
> > > handle
> > > > > that?
> > > > > > > Is that outside the scope of this KIP, or is it worth
> > introducing a
> > > > > > > kafka-streams-generate-rebalance-proposal.sh tool?
> > > > > > >
> > > > > > > Colt McNealy
> > > > > > > *Founder, LittleHorse.io*
> > > > > > >
> > > > > > >
> > > > > > > On Tue, May 9, 2023 at 4:03 PM Hao Li <hli@confluent.io.invalid
> >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I have submitted KIP-925 to add rack awareness logic in task
> > > > > assignment
> > > > > > > in
> > > > > > > > Kafka Streams and would like to start a discussion:
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
> > > > > > > >
> > > > > > > > --
> > > > > > > > Thanks,
> > > > > > > > Hao
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Thanks,
> > > > > > Hao
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Hao
> > > >
> > >
> >
> >
> > --
> > Thanks,
> > Hao
> >
>


-- 
Thanks,
Hao

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by Sophie Blee-Goldman <ab...@gmail.com>.
Thanks Hao -- I really like Walker's idea to separate the assignment
strategy from the rack awareness option.
Having two different configs for these makes a lot of sense to me and seems
like it will benefit the most users.

Given your KIP already has a lot going on with it, I would be happy to
write up a quick KIP just for the public
assignor config to take that aspect off your hands. We can discuss that
separately and keep this thread focused
on the rack awareness algorithm. If that sounds good I'll kick off a KIP
tonight, then you can just update the
"Current Task Assignment Logic" section to mention that either assignor may
be plugged in, and the rack
awareness can be configured independently.

If users are able to plug in the StickyAssignor if they experience
problems, this is good enough for me w.r.t my
other concern regarding task shuffling

On Wed, May 31, 2023 at 12:09 PM Hao Li <hl...@confluent.io.invalid> wrote:

> Hi all,
>
> Thanks for the feedback! I will update the KIP accordingly.
>
> *For Sophie's comments:*
>
> 1 and 2. Good catch. Fixed these.
>
> 3 and 4 Yes. We can make this public config and call out the clientConsumer
> config users need to set.
>
> 5. It's ideal to take the previous assignment in HAAssignor into
> consideration when we compute our target assignment, the complications come
> with making sure the assignment can eventually converge and we don't do
> probing rebalance infinitely. It's not only about storing the previous
> assignment or get it somehow. We can actually get the previous assignment
> now like we do in StickyAssignor. But the previous assignment will change
> in each round of probing rebalance. The proposal which added some weight to
> make the rack aware assignment lean towards the original HAA's target
> assignment will add benefits of stability in some corner cases in case of
> tie in cross rack traffic cost. But it's not sticky. But the bottom line is
> it won't be worse than current HAA's stickiness.
>
> 6. I'm fine with changing the assignor config to public. Actually, I think
> we can min-cost algorithm with StickyAssignor as well to mitigate the
> problem of 5. So we can have one public config to choose an assignor and
> one public config to enable the rack aware assignment.
>
> *For Bruno's comments:*
>
> The proposal was to implement all the options and use configs to choose
> them during runtime. We can make those configs public as suggested.
> 1, 2, 3, 4, 5: agree and will fix those.
> 6: subscription protocol is not changed.
> 7: yeah. Let me fix the notations.
> 8: It meant clients. In the figure, it maps to `c1_1`, `c1_2`, `c1_3` etc.
> 9: I'm also ok with just optimizing reliability for standby tasks. Or we
> could simply run the "balance reliability over cost" greedy algorithm to
> see if any cost could be reduced.
> 10: Make sense. Will fix the wording.
> 11: Make sense. Will update the test part.
>
> *For Walker's comments:*
> 1. Stability for HAA is an issue. See my comments for Sophie's feedback 5
> and 6. I think we could use the rack aware assignment for StickyAssignor as
> well. For HAA assignments, it's less sticky and we can only shoot for
> minimizing the cross rack traffic eventually when everything is stable.
> 2. Yeah. This is a good point and we can also turn it on for
> StickyAssignor.
>
> Thanks,
> Hao
>
>
> On Tue, May 30, 2023 at 2:28 PM Sophie Blee-Goldman <
> ableegoldman@gmail.com>
> wrote:
>
> > Hey Hao, thanks for the KIP!
> >
> > 1. There's a typo in the "internal.rack.aware.assignment.strategry"
> config,
> > this
> > should be internal.rack.aware.assignment.strategy.
> >
> > 2.
> >
> > >  For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
> Number
> > of
> > > edges E is T * N where T is the number of clients and N is the number
> of
> > > Tasks. This is because a task can be assigned to any client so there
> will
> > > be an edge between every task and every client. The total complexity
> > would
> > > be O(T * N) if we want to be more specific.
> >
> > I feel like I'm missing something here, but if E = T * N and the
> complexity
> > is ~O(E^2), doesn't
> > this make the total complexity order of O(T^2 * N^2)?
> >
> > 3.
> >
> > > Since 3.C.I and 3.C.II have different tradeoffs and work better in
> > > different workloads etc, we
> >
> > could add an internal configuration to choose one of them at runtime.
> > >
> > Why only an internal configuration? Same goes for
> > internal.rack.aware.assignment.standby.strategry (which also has the
> typo)
> >
> > 4.
> >
> > >  There are no changes in public interfaces.
> >
> > I think it would be good to explicitly call out that users can utilize
> this
> > new feature by setting the
> > ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example
> >
> > 5.
> >
> > > The idea is that if we always try to make it overlap as much with
> > > HAAssignor’s target
> >
> > assignment, at least there’s a higher chance that tasks won’t be
> shuffled a
> > > lot if the clients
> >
> > remain the same across rebalances.
> > >
> > This line definitely gave me some pause -- if there was one major
> takeaway
> > I had after KIP-441,
> > one thing that most limited the feature's success, it was our assumption
> > that clients are relatively
> > stable across rebalances. This was mostly true at limited scale or for
> > on-prem setups, but
> > unsurprisingly broke down in cloud environments or larger clusters. Not
> > only do clients naturally
> > fall in and out of the group, autoscaling is becoming more and more of a
> > thing.
> >
> > Lastly, and this is more easily solved but still worth calling out, an
> > assignment is only deterministic
> > as long as the client.id is persisted. Currently in Streams, we only
> write
> > the process UUID to the
> > state directory if there is one, ie if at least one persistent stateful
> > task exists in the topology. This
> > made sense in the context of KIP-441, which targeted heavily stateful
> > deployments, but this KIP
> > presumably intends to target more than just the persistent & stateful
> > subset of applications. To
> > make matters even worse,  "persistent" is defined in a semantically
> > inconsistent way throughout
> > Streams.
> >
> > All this is to say, it may sound more complicated to remember the
> previous
> > assignment, but (a)
> > imo it only introduces a lot more complexity and shaky assumptions to
> > continue down this
> > path, and (b) we actually already do persist some amount of state, like
> the
> > process UUID, and
> > (c) it seems like this is the perfect opportunity to finally rid
> ourselves
> > of the determinism constraint
> > which has frankly caused more trouble and time lost in sum than it would
> > have taken us to just
> > write the HighAvailabilityTaskAssignor to consider the previous
> assignment
> > from the start in KIP-441
> >
> > 6.
> >
> > > StickyTaskAssignor  users who would like to use rack aware assignment
> > > should upgrade their
> >
> > Kafka Streams version to the version in which
> HighAvailabilityTaskAssignor
> > > and rack awareness
> >
> > assignment are available.
> >
> > Building off of the above, the HAAssignor hasn't worked out perfectly for
> > everybody up until now,
> > given that we are only adding complexity to it now, on the flipside I
> would
> > hesitate to try and force
> > everyone to use it if they want to upgrade. We added a "secret" backdoor
> > internal config to allow
> > users to set the task assignor back in KIP-441 for this reason. WDYT
> about
> > bumping this to a public
> > config on the side in this KIP?
> >
> >
> > On Tue, May 23, 2023 at 11:46 AM Hao Li <hl...@confluent.io.invalid>
> wrote:
> >
> > > Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add
> it
> > to
> > > the test plan. I will also add tests to verify the new optimizer will
> > > produce a balanced assignment which has no worse cross AZ cost than the
> > > existing assignor.
> > >
> > > Hao
> > >
> > > On Mon, May 22, 2023 at 3:39 PM John Roesler <vv...@apache.org>
> > wrote:
> > >
> > > > Hi Hao,
> > > >
> > > > Thanks for the KIP!
> > > >
> > > > Overall, I think this is a great idea. I always wanted to circle back
> > > > after the Smooth Scaling KIP to put a proper optimization algorithm
> > into
> > > > place. I think this has the promise to really improve the quality of
> > the
> > > > balanced assignments we produce.
> > > >
> > > > Thanks for providing the details about the MaxCut/MinFlow algorithm.
> It
> > > > seems like a good choice for me, assuming we choose the right scaling
> > > > factors for the weights we add to the graph. Unfortunately, I don't
> > think
> > > > that there's a good way to see how easy or hard this is going to be
> > until
> > > > we actually implement it and test it.
> > > >
> > > > That leads to the only real piece of feedback I had on the KIP, which
> > is
> > > > the testing portion. You mentioned system/integration/unit tests, but
> > > > there's not too much information about what those tests will do. I'd
> > like
> > > > to suggest that we invest in more simulation testing specifically,
> > > similar
> > > > to what we did in
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
> > > > .
> > > >
> > > > In fact, it seems like we _could_ write the simulation up front, and
> > then
> > > > implement the algorithm in a dummy way and just see whether it passes
> > the
> > > > simulations or not, before actually integrating it with Kafka
> Streams.
> > > >
> > > > Basically, I'd be +1 on this KIP today, but I'd feel confident about
> it
> > > if
> > > > we had a little more detail regarding how we are going to verify that
> > the
> > > > new optimizer is actually going to produce more optimal plans than
> the
> > > > existing assigner we have today.
> > > >
> > > > Thanks again!
> > > > -John
> > > >
> > > > On 2023/05/22 16:49:22 Hao Li wrote:
> > > > > Hi Colt,
> > > > >
> > > > > Thanks for the comments.
> > > > >
> > > > > > and I struggle to see how the algorithm isn't at least O(N)
> where N
> > > is
> > > > > the number of Tasks...?
> > > > >
> > > > > For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
> > Number
> > > > of
> > > > > edges E is T * N where T is the number of clients and N is the
> number
> > > of
> > > > > Tasks. This is because a task can be assigned to any client so
> there
> > > will
> > > > > be an edge between every task and every client. The total
> complexity
> > > > would
> > > > > be O(T * N) if we want to be more specific.
> > > > >
> > > > > > But if the leaders for each partition are spread across multiple
> > > zones,
> > > > > how will you handle that?
> > > > >
> > > > > This is what the min-cost flow solution is trying to solve? i.e.
> Find
> > > an
> > > > > assignment of tasks to clients where across AZ traffic can be
> > > minimized.
> > > > > But there are some constraints to the solution and one of them is
> we
> > > need
> > > > > to balance task assignment first (
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment
> > > > ).
> > > > > So in your example of three tasks' partitions being in the same AZ
> > of a
> > > > > client, if there are other clients, we still want to balance the
> > tasks
> > > to
> > > > > other clients even if putting all tasks to a single client can
> result
> > > in
> > > > 0
> > > > > cross AZ traffic. In
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
> > > > > section, the algorithm will try to find a min-cost solution based
> on
> > > > > balanced assignment instead of pure min-cost.
> > > > >
> > > > > Thanks,
> > > > > Hao
> > > > >
> > > > > On Tue, May 9, 2023 at 5:55 PM Colt McNealy <co...@littlehorse.io>
> > > wrote:
> > > > >
> > > > > > Hello Hao,
> > > > > >
> > > > > > First of all, THANK YOU for putting this together. I had been
> > hoping
> > > > > > someone might bring something like this forward. A few comments:
> > > > > >
> > > > > > **1: Runtime Complexity
> > > > > > > Klein’s cycle canceling algorithm can solve the min-cost flow
> > > > problem in
> > > > > > O(E^2CU) time where C is max cost and U is max capacity. In our
> > > > particular
> > > > > > case, C is 1 and U is at most 3 (A task can have at most 3 topics
> > > > including
> > > > > > changelog topic?). So the algorithm runs in O(E^2) time for our
> > case.
> > > > > >
> > > > > > A Task can have multiple input topics, and also multiple state
> > > stores,
> > > > and
> > > > > > multiple output topics. The most common case is three topics as
> you
> > > > > > described, but this is not necessarily guaranteed. Also, math is
> > one
> > > > of my
> > > > > > weak points, but to me O(E^2) is equivalent to O(1), and I
> struggle
> > > to
> > > > see
> > > > > > how the algorithm isn't at least O(N) where N is the number of
> > > > Tasks...?
> > > > > >
> > > > > > **2: Broker-Side Partition Assignments
> > > > > > Consider the case with just three topics in a Task (one input,
> one
> > > > output,
> > > > > > one changelog). If all three partition leaders are in the same
> Rack
> > > (or
> > > > > > better yet, the same broker), then we could get massive savings
> by
> > > > > > assigning the Task to that Rack/availability zone. But if the
> > leaders
> > > > for
> > > > > > each partition are spread across multiple zones, how will you
> > handle
> > > > that?
> > > > > > Is that outside the scope of this KIP, or is it worth
> introducing a
> > > > > > kafka-streams-generate-rebalance-proposal.sh tool?
> > > > > >
> > > > > > Colt McNealy
> > > > > > *Founder, LittleHorse.io*
> > > > > >
> > > > > >
> > > > > > On Tue, May 9, 2023 at 4:03 PM Hao Li <hl...@confluent.io.invalid>
> > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I have submitted KIP-925 to add rack awareness logic in task
> > > > assignment
> > > > > > in
> > > > > > > Kafka Streams and would like to start a discussion:
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
> > > > > > >
> > > > > > > --
> > > > > > > Thanks,
> > > > > > > Hao
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Thanks,
> > > > > Hao
> > > > >
> > > >
> > >
> > >
> > > --
> > > Thanks,
> > > Hao
> > >
> >
>
>
> --
> Thanks,
> Hao
>

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by Hao Li <hl...@confluent.io.INVALID>.
Hi all,

Thanks for the feedback! I will update the KIP accordingly.

*For Sophie's comments:*

1 and 2. Good catch. Fixed these.

3 and 4 Yes. We can make this public config and call out the clientConsumer
config users need to set.

5. It's ideal to take the previous assignment in HAAssignor into
consideration when we compute our target assignment, the complications come
with making sure the assignment can eventually converge and we don't do
probing rebalance infinitely. It's not only about storing the previous
assignment or get it somehow. We can actually get the previous assignment
now like we do in StickyAssignor. But the previous assignment will change
in each round of probing rebalance. The proposal which added some weight to
make the rack aware assignment lean towards the original HAA's target
assignment will add benefits of stability in some corner cases in case of
tie in cross rack traffic cost. But it's not sticky. But the bottom line is
it won't be worse than current HAA's stickiness.

6. I'm fine with changing the assignor config to public. Actually, I think
we can min-cost algorithm with StickyAssignor as well to mitigate the
problem of 5. So we can have one public config to choose an assignor and
one public config to enable the rack aware assignment.

*For Bruno's comments:*

The proposal was to implement all the options and use configs to choose
them during runtime. We can make those configs public as suggested.
1, 2, 3, 4, 5: agree and will fix those.
6: subscription protocol is not changed.
7: yeah. Let me fix the notations.
8: It meant clients. In the figure, it maps to `c1_1`, `c1_2`, `c1_3` etc.
9: I'm also ok with just optimizing reliability for standby tasks. Or we
could simply run the "balance reliability over cost" greedy algorithm to
see if any cost could be reduced.
10: Make sense. Will fix the wording.
11: Make sense. Will update the test part.

*For Walker's comments:*
1. Stability for HAA is an issue. See my comments for Sophie's feedback 5
and 6. I think we could use the rack aware assignment for StickyAssignor as
well. For HAA assignments, it's less sticky and we can only shoot for
minimizing the cross rack traffic eventually when everything is stable.
2. Yeah. This is a good point and we can also turn it on for StickyAssignor.

Thanks,
Hao


On Tue, May 30, 2023 at 2:28 PM Sophie Blee-Goldman <ab...@gmail.com>
wrote:

> Hey Hao, thanks for the KIP!
>
> 1. There's a typo in the "internal.rack.aware.assignment.strategry" config,
> this
> should be internal.rack.aware.assignment.strategy.
>
> 2.
>
> >  For O(E^2 * (CU)) complexity, C and U can be viewed as constant. Number
> of
> > edges E is T * N where T is the number of clients and N is the number of
> > Tasks. This is because a task can be assigned to any client so there will
> > be an edge between every task and every client. The total complexity
> would
> > be O(T * N) if we want to be more specific.
>
> I feel like I'm missing something here, but if E = T * N and the complexity
> is ~O(E^2), doesn't
> this make the total complexity order of O(T^2 * N^2)?
>
> 3.
>
> > Since 3.C.I and 3.C.II have different tradeoffs and work better in
> > different workloads etc, we
>
> could add an internal configuration to choose one of them at runtime.
> >
> Why only an internal configuration? Same goes for
> internal.rack.aware.assignment.standby.strategry (which also has the typo)
>
> 4.
>
> >  There are no changes in public interfaces.
>
> I think it would be good to explicitly call out that users can utilize this
> new feature by setting the
> ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example
>
> 5.
>
> > The idea is that if we always try to make it overlap as much with
> > HAAssignor’s target
>
> assignment, at least there’s a higher chance that tasks won’t be shuffled a
> > lot if the clients
>
> remain the same across rebalances.
> >
> This line definitely gave me some pause -- if there was one major takeaway
> I had after KIP-441,
> one thing that most limited the feature's success, it was our assumption
> that clients are relatively
> stable across rebalances. This was mostly true at limited scale or for
> on-prem setups, but
> unsurprisingly broke down in cloud environments or larger clusters. Not
> only do clients naturally
> fall in and out of the group, autoscaling is becoming more and more of a
> thing.
>
> Lastly, and this is more easily solved but still worth calling out, an
> assignment is only deterministic
> as long as the client.id is persisted. Currently in Streams, we only write
> the process UUID to the
> state directory if there is one, ie if at least one persistent stateful
> task exists in the topology. This
> made sense in the context of KIP-441, which targeted heavily stateful
> deployments, but this KIP
> presumably intends to target more than just the persistent & stateful
> subset of applications. To
> make matters even worse,  "persistent" is defined in a semantically
> inconsistent way throughout
> Streams.
>
> All this is to say, it may sound more complicated to remember the previous
> assignment, but (a)
> imo it only introduces a lot more complexity and shaky assumptions to
> continue down this
> path, and (b) we actually already do persist some amount of state, like the
> process UUID, and
> (c) it seems like this is the perfect opportunity to finally rid ourselves
> of the determinism constraint
> which has frankly caused more trouble and time lost in sum than it would
> have taken us to just
> write the HighAvailabilityTaskAssignor to consider the previous assignment
> from the start in KIP-441
>
> 6.
>
> > StickyTaskAssignor  users who would like to use rack aware assignment
> > should upgrade their
>
> Kafka Streams version to the version in which HighAvailabilityTaskAssignor
> > and rack awareness
>
> assignment are available.
>
> Building off of the above, the HAAssignor hasn't worked out perfectly for
> everybody up until now,
> given that we are only adding complexity to it now, on the flipside I would
> hesitate to try and force
> everyone to use it if they want to upgrade. We added a "secret" backdoor
> internal config to allow
> users to set the task assignor back in KIP-441 for this reason. WDYT about
> bumping this to a public
> config on the side in this KIP?
>
>
> On Tue, May 23, 2023 at 11:46 AM Hao Li <hl...@confluent.io.invalid> wrote:
>
> > Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add it
> to
> > the test plan. I will also add tests to verify the new optimizer will
> > produce a balanced assignment which has no worse cross AZ cost than the
> > existing assignor.
> >
> > Hao
> >
> > On Mon, May 22, 2023 at 3:39 PM John Roesler <vv...@apache.org>
> wrote:
> >
> > > Hi Hao,
> > >
> > > Thanks for the KIP!
> > >
> > > Overall, I think this is a great idea. I always wanted to circle back
> > > after the Smooth Scaling KIP to put a proper optimization algorithm
> into
> > > place. I think this has the promise to really improve the quality of
> the
> > > balanced assignments we produce.
> > >
> > > Thanks for providing the details about the MaxCut/MinFlow algorithm. It
> > > seems like a good choice for me, assuming we choose the right scaling
> > > factors for the weights we add to the graph. Unfortunately, I don't
> think
> > > that there's a good way to see how easy or hard this is going to be
> until
> > > we actually implement it and test it.
> > >
> > > That leads to the only real piece of feedback I had on the KIP, which
> is
> > > the testing portion. You mentioned system/integration/unit tests, but
> > > there's not too much information about what those tests will do. I'd
> like
> > > to suggest that we invest in more simulation testing specifically,
> > similar
> > > to what we did in
> > >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
> > > .
> > >
> > > In fact, it seems like we _could_ write the simulation up front, and
> then
> > > implement the algorithm in a dummy way and just see whether it passes
> the
> > > simulations or not, before actually integrating it with Kafka Streams.
> > >
> > > Basically, I'd be +1 on this KIP today, but I'd feel confident about it
> > if
> > > we had a little more detail regarding how we are going to verify that
> the
> > > new optimizer is actually going to produce more optimal plans than the
> > > existing assigner we have today.
> > >
> > > Thanks again!
> > > -John
> > >
> > > On 2023/05/22 16:49:22 Hao Li wrote:
> > > > Hi Colt,
> > > >
> > > > Thanks for the comments.
> > > >
> > > > > and I struggle to see how the algorithm isn't at least O(N) where N
> > is
> > > > the number of Tasks...?
> > > >
> > > > For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
> Number
> > > of
> > > > edges E is T * N where T is the number of clients and N is the number
> > of
> > > > Tasks. This is because a task can be assigned to any client so there
> > will
> > > > be an edge between every task and every client. The total complexity
> > > would
> > > > be O(T * N) if we want to be more specific.
> > > >
> > > > > But if the leaders for each partition are spread across multiple
> > zones,
> > > > how will you handle that?
> > > >
> > > > This is what the min-cost flow solution is trying to solve? i.e. Find
> > an
> > > > assignment of tasks to clients where across AZ traffic can be
> > minimized.
> > > > But there are some constraints to the solution and one of them is we
> > need
> > > > to balance task assignment first (
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment
> > > ).
> > > > So in your example of three tasks' partitions being in the same AZ
> of a
> > > > client, if there are other clients, we still want to balance the
> tasks
> > to
> > > > other clients even if putting all tasks to a single client can result
> > in
> > > 0
> > > > cross AZ traffic. In
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
> > > > section, the algorithm will try to find a min-cost solution based on
> > > > balanced assignment instead of pure min-cost.
> > > >
> > > > Thanks,
> > > > Hao
> > > >
> > > > On Tue, May 9, 2023 at 5:55 PM Colt McNealy <co...@littlehorse.io>
> > wrote:
> > > >
> > > > > Hello Hao,
> > > > >
> > > > > First of all, THANK YOU for putting this together. I had been
> hoping
> > > > > someone might bring something like this forward. A few comments:
> > > > >
> > > > > **1: Runtime Complexity
> > > > > > Klein’s cycle canceling algorithm can solve the min-cost flow
> > > problem in
> > > > > O(E^2CU) time where C is max cost and U is max capacity. In our
> > > particular
> > > > > case, C is 1 and U is at most 3 (A task can have at most 3 topics
> > > including
> > > > > changelog topic?). So the algorithm runs in O(E^2) time for our
> case.
> > > > >
> > > > > A Task can have multiple input topics, and also multiple state
> > stores,
> > > and
> > > > > multiple output topics. The most common case is three topics as you
> > > > > described, but this is not necessarily guaranteed. Also, math is
> one
> > > of my
> > > > > weak points, but to me O(E^2) is equivalent to O(1), and I struggle
> > to
> > > see
> > > > > how the algorithm isn't at least O(N) where N is the number of
> > > Tasks...?
> > > > >
> > > > > **2: Broker-Side Partition Assignments
> > > > > Consider the case with just three topics in a Task (one input, one
> > > output,
> > > > > one changelog). If all three partition leaders are in the same Rack
> > (or
> > > > > better yet, the same broker), then we could get massive savings by
> > > > > assigning the Task to that Rack/availability zone. But if the
> leaders
> > > for
> > > > > each partition are spread across multiple zones, how will you
> handle
> > > that?
> > > > > Is that outside the scope of this KIP, or is it worth introducing a
> > > > > kafka-streams-generate-rebalance-proposal.sh tool?
> > > > >
> > > > > Colt McNealy
> > > > > *Founder, LittleHorse.io*
> > > > >
> > > > >
> > > > > On Tue, May 9, 2023 at 4:03 PM Hao Li <hl...@confluent.io.invalid>
> > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I have submitted KIP-925 to add rack awareness logic in task
> > > assignment
> > > > > in
> > > > > > Kafka Streams and would like to start a discussion:
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
> > > > > >
> > > > > > --
> > > > > > Thanks,
> > > > > > Hao
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Hao
> > > >
> > >
> >
> >
> > --
> > Thanks,
> > Hao
> >
>


-- 
Thanks,
Hao

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by Walker Carlson <wc...@confluent.io.INVALID>.
Hi Hao,

Most of the comments I had on this kip are already mentioned, but I did
want to share my two major concerns.

1. Stability. I worry about stability. If we only have the HA assignor work
with rack awareness we will have a lot of state movement in many cases.
Sophie and Bruno have this concern as well.

2. It seems the rack awareness assignment operation can be run after any
assignment algorithm. I would think that maybe we can leave it agnostic if
it is using the sicky assignor or the HA assignor and let the users choose
the strategy. Maybe just have the rack awareness be off or on,
independent of the assignment strategy.

Walker

On Wed, May 31, 2023 at 7:46 AM Bruno Cadonna <ca...@apache.org> wrote:

> Hi Hao,
>
>
> Thank you for the KIP! Really interesting!
>
> In general, I think the KIP is a bit too vague. You explain the main
> algorithm and different options. It is not clear to me on what option we
> will start voting. One way out of this situation would be to cut the KIP
> down to the simplest options and evaluate those. Then, we would have a
> starting point from which we can move on.
>
>
> 1.
> Mention that the KIP optimizes only the read path and does nothing about
> the write path.
>
>
> 2.
> "U is 1 and C is at most 3 (A task can have at most 3 topics including
> changelog topic?)"
> where C is max cost and U is max capacity
>
> C is not at most 3 to answer the question in the KIP. A task can have
> any number of topics it reads from. Some examples:
> - a processor API operator with multiple state stores reads from
> multiple changelog topics
> - a cascade of merge operators would result in a task with multiple
> input topics
> - a cascade of joins would result in a task with multiple input topics
> and multiple changelog topics.
> I am pretty sure there are also other examples.
>
> Assuming cost C is 1 for each topic partition is a simplification.
> Traffic for tasks can vary significantly. I saw joins that had 100s of
> bytes/s on one side and 10s of MB/s on the other side. I guess the
> cross-rack traffic depends on the data rate. Please correct me if I am
> wrong. I am fine with simplifying but then we also need to explicitly
> state the simplification and its limitation in the KIP to manage
> expectations.
>
> C is just a factor in the complexity but we should be clear about the
> simplification we made. How much C influences the actual performance we
> do not know and we should evaluate this as part of the implementation of
> the KIP. Maybe add this aspect to the performance experiments in the
> test plan section.
>
>
> 3.
> I second Sophie's question about the complexity being O(T^2 * N^2)
> instead of O(T*N).
>
>
> 4.
> The improved algorithm in "Min cost with balanced sub-topology" contains
> a bunch more edges and the complexity of the algorithm depends on the
> square of the number of edges. Can you say something about the trade-off
> or even quantify it? How does does the complexity change from O(T^2 * N^2)?
>
>
> 5.
> If you propose to implement multiple algorithms, the KIP should add
> public configs as Sophie proposed.
>
>
> 6.
> Does any of the algorithm change the subscription protocol? Usually we
> describe those changes in a KIP.
>
>
> 7.
> I have a couple of minor comment about notation:
>
> 7.1 For the complexity, I think it would be better to either use |T| and
> |C| or define new variables like for example N_task = |T| and N_client =
> |C| for the formula to be consistent with the mapping function you
> define in the previous section.
>
> 7.2 Using C for the set of clients and the cost is confusing. Maybe use
> Cost or $ for cost.
>
>
> 8.
> In the "Graph construction" section in the "Min cost with balanced
> sub-topology" section, you write
> "Create new set of nodes which has same number as clients"
> Shouldn't this be number of tasks.
>
>
> 9.
> Regarding standby assignment, have you considered to simplify the setup
> by defining if rack-aware configs are set, the standby assignment is
> optimized for reliability and if they are not set costs are optimized. I
> think that would be a good starting point on which we can iterate in
> future.
>
>
> 10.
> Just a clarification and something that you should corrected in the KIP.
> In "Assignment of stateless tasks" you contrast stateless tasks with
> active task. However, active tasks can be stateless or stateful. So
> "Rack awareness assignment algorithm for active tasks" should actually
> be "Rack awareness assignment algorithm for active stateful tasks".
> Please use the terms accordingly otherwise, it gets confusing.
>
>
> 11.
> In the testing plan, I think it would be useful to also have performance
> experiments along other dimension like number of topic partitions a task
> reads from, i.e., basically the varying costs per task.
>
>
> Best,
> Bruno
>
> On 30.05.23 23:28, Sophie Blee-Goldman wrote:
> > Hey Hao, thanks for the KIP!
> >
> > 1. There's a typo in the "internal.rack.aware.assignment.strategry"
> config,
> > this
> > should be internal.rack.aware.assignment.strategy.
> >
> > 2.
> >
> >>   For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
> Number of
> >> edges E is T * N where T is the number of clients and N is the number of
> >> Tasks. This is because a task can be assigned to any client so there
> will
> >> be an edge between every task and every client. The total complexity
> would
> >> be O(T * N) if we want to be more specific.
> >
> > I feel like I'm missing something here, but if E = T * N and the
> complexity
> > is ~O(E^2), doesn't
> > this make the total complexity order of O(T^2 * N^2)?
> >
> > 3.
> >
> >> Since 3.C.I and 3.C.II have different tradeoffs and work better in
> >> different workloads etc, we
> >
> > could add an internal configuration to choose one of them at runtime.
> >>
> > Why only an internal configuration? Same goes for
> > internal.rack.aware.assignment.standby.strategry (which also has the
> typo)
> >
> > 4.
> >
> >>   There are no changes in public interfaces.
> >
> > I think it would be good to explicitly call out that users can utilize
> this
> > new feature by setting the
> > ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example
> >
> > 5.
> >
> >> The idea is that if we always try to make it overlap as much with
> >> HAAssignor’s target
> >
> > assignment, at least there’s a higher chance that tasks won’t be
> shuffled a
> >> lot if the clients
> >
> > remain the same across rebalances.
> >>
> > This line definitely gave me some pause -- if there was one major
> takeaway
> > I had after KIP-441,
> > one thing that most limited the feature's success, it was our assumption
> > that clients are relatively
> > stable across rebalances. This was mostly true at limited scale or for
> > on-prem setups, but
> > unsurprisingly broke down in cloud environments or larger clusters. Not
> > only do clients naturally
> > fall in and out of the group, autoscaling is becoming more and more of a
> > thing.
> >
> > Lastly, and this is more easily solved but still worth calling out, an
> > assignment is only deterministic
> > as long as the client.id is persisted. Currently in Streams, we only
> write
> > the process UUID to the
> > state directory if there is one, ie if at least one persistent stateful
> > task exists in the topology. This
> > made sense in the context of KIP-441, which targeted heavily stateful
> > deployments, but this KIP
> > presumably intends to target more than just the persistent & stateful
> > subset of applications. To
> > make matters even worse,  "persistent" is defined in a semantically
> > inconsistent way throughout
> > Streams.
> >
> > All this is to say, it may sound more complicated to remember the
> previous
> > assignment, but (a)
> > imo it only introduces a lot more complexity and shaky assumptions to
> > continue down this
> > path, and (b) we actually already do persist some amount of state, like
> the
> > process UUID, and
> > (c) it seems like this is the perfect opportunity to finally rid
> ourselves
> > of the determinism constraint
> > which has frankly caused more trouble and time lost in sum than it would
> > have taken us to just
> > write the HighAvailabilityTaskAssignor to consider the previous
> assignment
> > from the start in KIP-441
> >
> > 6.
> >
> >> StickyTaskAssignor  users who would like to use rack aware assignment
> >> should upgrade their
> >
> > Kafka Streams version to the version in which
> HighAvailabilityTaskAssignor
> >> and rack awareness
> >
> > assignment are available.
> >
> > Building off of the above, the HAAssignor hasn't worked out perfectly for
> > everybody up until now,
> > given that we are only adding complexity to it now, on the flipside I
> would
> > hesitate to try and force
> > everyone to use it if they want to upgrade. We added a "secret" backdoor
> > internal config to allow
> > users to set the task assignor back in KIP-441 for this reason. WDYT
> about
> > bumping this to a public
> > config on the side in this KIP?
> >
> >
> > On Tue, May 23, 2023 at 11:46 AM Hao Li <hl...@confluent.io.invalid>
> wrote:
> >
> >> Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add
> it to
> >> the test plan. I will also add tests to verify the new optimizer will
> >> produce a balanced assignment which has no worse cross AZ cost than the
> >> existing assignor.
> >>
> >> Hao
> >>
> >> On Mon, May 22, 2023 at 3:39 PM John Roesler <vv...@apache.org>
> wrote:
> >>
> >>> Hi Hao,
> >>>
> >>> Thanks for the KIP!
> >>>
> >>> Overall, I think this is a great idea. I always wanted to circle back
> >>> after the Smooth Scaling KIP to put a proper optimization algorithm
> into
> >>> place. I think this has the promise to really improve the quality of
> the
> >>> balanced assignments we produce.
> >>>
> >>> Thanks for providing the details about the MaxCut/MinFlow algorithm. It
> >>> seems like a good choice for me, assuming we choose the right scaling
> >>> factors for the weights we add to the graph. Unfortunately, I don't
> think
> >>> that there's a good way to see how easy or hard this is going to be
> until
> >>> we actually implement it and test it.
> >>>
> >>> That leads to the only real piece of feedback I had on the KIP, which
> is
> >>> the testing portion. You mentioned system/integration/unit tests, but
> >>> there's not too much information about what those tests will do. I'd
> like
> >>> to suggest that we invest in more simulation testing specifically,
> >> similar
> >>> to what we did in
> >>>
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
> >>> .
> >>>
> >>> In fact, it seems like we _could_ write the simulation up front, and
> then
> >>> implement the algorithm in a dummy way and just see whether it passes
> the
> >>> simulations or not, before actually integrating it with Kafka Streams.
> >>>
> >>> Basically, I'd be +1 on this KIP today, but I'd feel confident about it
> >> if
> >>> we had a little more detail regarding how we are going to verify that
> the
> >>> new optimizer is actually going to produce more optimal plans than the
> >>> existing assigner we have today.
> >>>
> >>> Thanks again!
> >>> -John
> >>>
> >>> On 2023/05/22 16:49:22 Hao Li wrote:
> >>>> Hi Colt,
> >>>>
> >>>> Thanks for the comments.
> >>>>
> >>>>> and I struggle to see how the algorithm isn't at least O(N) where N
> >> is
> >>>> the number of Tasks...?
> >>>>
> >>>> For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
> Number
> >>> of
> >>>> edges E is T * N where T is the number of clients and N is the number
> >> of
> >>>> Tasks. This is because a task can be assigned to any client so there
> >> will
> >>>> be an edge between every task and every client. The total complexity
> >>> would
> >>>> be O(T * N) if we want to be more specific.
> >>>>
> >>>>> But if the leaders for each partition are spread across multiple
> >> zones,
> >>>> how will you handle that?
> >>>>
> >>>> This is what the min-cost flow solution is trying to solve? i.e. Find
> >> an
> >>>> assignment of tasks to clients where across AZ traffic can be
> >> minimized.
> >>>> But there are some constraints to the solution and one of them is we
> >> need
> >>>> to balance task assignment first (
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment
> >>> ).
> >>>> So in your example of three tasks' partitions being in the same AZ of
> a
> >>>> client, if there are other clients, we still want to balance the tasks
> >> to
> >>>> other clients even if putting all tasks to a single client can result
> >> in
> >>> 0
> >>>> cross AZ traffic. In
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
> >>>> section, the algorithm will try to find a min-cost solution based on
> >>>> balanced assignment instead of pure min-cost.
> >>>>
> >>>> Thanks,
> >>>> Hao
> >>>>
> >>>> On Tue, May 9, 2023 at 5:55 PM Colt McNealy <co...@littlehorse.io>
> >> wrote:
> >>>>
> >>>>> Hello Hao,
> >>>>>
> >>>>> First of all, THANK YOU for putting this together. I had been hoping
> >>>>> someone might bring something like this forward. A few comments:
> >>>>>
> >>>>> **1: Runtime Complexity
> >>>>>> Klein’s cycle canceling algorithm can solve the min-cost flow
> >>> problem in
> >>>>> O(E^2CU) time where C is max cost and U is max capacity. In our
> >>> particular
> >>>>> case, C is 1 and U is at most 3 (A task can have at most 3 topics
> >>> including
> >>>>> changelog topic?). So the algorithm runs in O(E^2) time for our case.
> >>>>>
> >>>>> A Task can have multiple input topics, and also multiple state
> >> stores,
> >>> and
> >>>>> multiple output topics. The most common case is three topics as you
> >>>>> described, but this is not necessarily guaranteed. Also, math is one
> >>> of my
> >>>>> weak points, but to me O(E^2) is equivalent to O(1), and I struggle
> >> to
> >>> see
> >>>>> how the algorithm isn't at least O(N) where N is the number of
> >>> Tasks...?
> >>>>>
> >>>>> **2: Broker-Side Partition Assignments
> >>>>> Consider the case with just three topics in a Task (one input, one
> >>> output,
> >>>>> one changelog). If all three partition leaders are in the same Rack
> >> (or
> >>>>> better yet, the same broker), then we could get massive savings by
> >>>>> assigning the Task to that Rack/availability zone. But if the leaders
> >>> for
> >>>>> each partition are spread across multiple zones, how will you handle
> >>> that?
> >>>>> Is that outside the scope of this KIP, or is it worth introducing a
> >>>>> kafka-streams-generate-rebalance-proposal.sh tool?
> >>>>>
> >>>>> Colt McNealy
> >>>>> *Founder, LittleHorse.io*
> >>>>>
> >>>>>
> >>>>> On Tue, May 9, 2023 at 4:03 PM Hao Li <hl...@confluent.io.invalid>
> >>> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> I have submitted KIP-925 to add rack awareness logic in task
> >>> assignment
> >>>>> in
> >>>>>> Kafka Streams and would like to start a discussion:
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
> >>>>>>
> >>>>>> --
> >>>>>> Thanks,
> >>>>>> Hao
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> Thanks,
> >>>> Hao
> >>>>
> >>>
> >>
> >>
> >> --
> >> Thanks,
> >> Hao
> >>
> >
>

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by Bruno Cadonna <ca...@apache.org>.
Hi Hao,


Thank you for the KIP! Really interesting!

In general, I think the KIP is a bit too vague. You explain the main 
algorithm and different options. It is not clear to me on what option we 
will start voting. One way out of this situation would be to cut the KIP 
down to the simplest options and evaluate those. Then, we would have a 
starting point from which we can move on.


1.
Mention that the KIP optimizes only the read path and does nothing about 
the write path.


2.
"U is 1 and C is at most 3 (A task can have at most 3 topics including 
changelog topic?)"
where C is max cost and U is max capacity

C is not at most 3 to answer the question in the KIP. A task can have 
any number of topics it reads from. Some examples:
- a processor API operator with multiple state stores reads from 
multiple changelog topics
- a cascade of merge operators would result in a task with multiple 
input topics
- a cascade of joins would result in a task with multiple input topics 
and multiple changelog topics.
I am pretty sure there are also other examples.

Assuming cost C is 1 for each topic partition is a simplification. 
Traffic for tasks can vary significantly. I saw joins that had 100s of 
bytes/s on one side and 10s of MB/s on the other side. I guess the 
cross-rack traffic depends on the data rate. Please correct me if I am 
wrong. I am fine with simplifying but then we also need to explicitly 
state the simplification and its limitation in the KIP to manage 
expectations.

C is just a factor in the complexity but we should be clear about the 
simplification we made. How much C influences the actual performance we 
do not know and we should evaluate this as part of the implementation of 
the KIP. Maybe add this aspect to the performance experiments in the 
test plan section.


3.
I second Sophie's question about the complexity being O(T^2 * N^2) 
instead of O(T*N).


4.
The improved algorithm in "Min cost with balanced sub-topology" contains 
a bunch more edges and the complexity of the algorithm depends on the 
square of the number of edges. Can you say something about the trade-off 
or even quantify it? How does does the complexity change from O(T^2 * N^2)?


5.
If you propose to implement multiple algorithms, the KIP should add 
public configs as Sophie proposed.


6.
Does any of the algorithm change the subscription protocol? Usually we 
describe those changes in a KIP.


7.
I have a couple of minor comment about notation:

7.1 For the complexity, I think it would be better to either use |T| and 
|C| or define new variables like for example N_task = |T| and N_client = 
|C| for the formula to be consistent with the mapping function you 
define in the previous section.

7.2 Using C for the set of clients and the cost is confusing. Maybe use 
Cost or $ for cost.


8.
In the "Graph construction" section in the "Min cost with balanced 
sub-topology" section, you write
"Create new set of nodes which has same number as clients"
Shouldn't this be number of tasks.


9.
Regarding standby assignment, have you considered to simplify the setup 
by defining if rack-aware configs are set, the standby assignment is 
optimized for reliability and if they are not set costs are optimized. I 
think that would be a good starting point on which we can iterate in future.


10.
Just a clarification and something that you should corrected in the KIP. 
In "Assignment of stateless tasks" you contrast stateless tasks with 
active task. However, active tasks can be stateless or stateful. So 
"Rack awareness assignment algorithm for active tasks" should actually 
be "Rack awareness assignment algorithm for active stateful tasks". 
Please use the terms accordingly otherwise, it gets confusing.


11.
In the testing plan, I think it would be useful to also have performance 
experiments along other dimension like number of topic partitions a task 
reads from, i.e., basically the varying costs per task.


Best,
Bruno

On 30.05.23 23:28, Sophie Blee-Goldman wrote:
> Hey Hao, thanks for the KIP!
> 
> 1. There's a typo in the "internal.rack.aware.assignment.strategry" config,
> this
> should be internal.rack.aware.assignment.strategy.
> 
> 2.
> 
>>   For O(E^2 * (CU)) complexity, C and U can be viewed as constant. Number of
>> edges E is T * N where T is the number of clients and N is the number of
>> Tasks. This is because a task can be assigned to any client so there will
>> be an edge between every task and every client. The total complexity would
>> be O(T * N) if we want to be more specific.
> 
> I feel like I'm missing something here, but if E = T * N and the complexity
> is ~O(E^2), doesn't
> this make the total complexity order of O(T^2 * N^2)?
> 
> 3.
> 
>> Since 3.C.I and 3.C.II have different tradeoffs and work better in
>> different workloads etc, we
> 
> could add an internal configuration to choose one of them at runtime.
>>
> Why only an internal configuration? Same goes for
> internal.rack.aware.assignment.standby.strategry (which also has the typo)
> 
> 4.
> 
>>   There are no changes in public interfaces.
> 
> I think it would be good to explicitly call out that users can utilize this
> new feature by setting the
> ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example
> 
> 5.
> 
>> The idea is that if we always try to make it overlap as much with
>> HAAssignor’s target
> 
> assignment, at least there’s a higher chance that tasks won’t be shuffled a
>> lot if the clients
> 
> remain the same across rebalances.
>>
> This line definitely gave me some pause -- if there was one major takeaway
> I had after KIP-441,
> one thing that most limited the feature's success, it was our assumption
> that clients are relatively
> stable across rebalances. This was mostly true at limited scale or for
> on-prem setups, but
> unsurprisingly broke down in cloud environments or larger clusters. Not
> only do clients naturally
> fall in and out of the group, autoscaling is becoming more and more of a
> thing.
> 
> Lastly, and this is more easily solved but still worth calling out, an
> assignment is only deterministic
> as long as the client.id is persisted. Currently in Streams, we only write
> the process UUID to the
> state directory if there is one, ie if at least one persistent stateful
> task exists in the topology. This
> made sense in the context of KIP-441, which targeted heavily stateful
> deployments, but this KIP
> presumably intends to target more than just the persistent & stateful
> subset of applications. To
> make matters even worse,  "persistent" is defined in a semantically
> inconsistent way throughout
> Streams.
> 
> All this is to say, it may sound more complicated to remember the previous
> assignment, but (a)
> imo it only introduces a lot more complexity and shaky assumptions to
> continue down this
> path, and (b) we actually already do persist some amount of state, like the
> process UUID, and
> (c) it seems like this is the perfect opportunity to finally rid ourselves
> of the determinism constraint
> which has frankly caused more trouble and time lost in sum than it would
> have taken us to just
> write the HighAvailabilityTaskAssignor to consider the previous assignment
> from the start in KIP-441
> 
> 6.
> 
>> StickyTaskAssignor  users who would like to use rack aware assignment
>> should upgrade their
> 
> Kafka Streams version to the version in which HighAvailabilityTaskAssignor
>> and rack awareness
> 
> assignment are available.
> 
> Building off of the above, the HAAssignor hasn't worked out perfectly for
> everybody up until now,
> given that we are only adding complexity to it now, on the flipside I would
> hesitate to try and force
> everyone to use it if they want to upgrade. We added a "secret" backdoor
> internal config to allow
> users to set the task assignor back in KIP-441 for this reason. WDYT about
> bumping this to a public
> config on the side in this KIP?
> 
> 
> On Tue, May 23, 2023 at 11:46 AM Hao Li <hl...@confluent.io.invalid> wrote:
> 
>> Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add it to
>> the test plan. I will also add tests to verify the new optimizer will
>> produce a balanced assignment which has no worse cross AZ cost than the
>> existing assignor.
>>
>> Hao
>>
>> On Mon, May 22, 2023 at 3:39 PM John Roesler <vv...@apache.org> wrote:
>>
>>> Hi Hao,
>>>
>>> Thanks for the KIP!
>>>
>>> Overall, I think this is a great idea. I always wanted to circle back
>>> after the Smooth Scaling KIP to put a proper optimization algorithm into
>>> place. I think this has the promise to really improve the quality of the
>>> balanced assignments we produce.
>>>
>>> Thanks for providing the details about the MaxCut/MinFlow algorithm. It
>>> seems like a good choice for me, assuming we choose the right scaling
>>> factors for the weights we add to the graph. Unfortunately, I don't think
>>> that there's a good way to see how easy or hard this is going to be until
>>> we actually implement it and test it.
>>>
>>> That leads to the only real piece of feedback I had on the KIP, which is
>>> the testing portion. You mentioned system/integration/unit tests, but
>>> there's not too much information about what those tests will do. I'd like
>>> to suggest that we invest in more simulation testing specifically,
>> similar
>>> to what we did in
>>>
>> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
>>> .
>>>
>>> In fact, it seems like we _could_ write the simulation up front, and then
>>> implement the algorithm in a dummy way and just see whether it passes the
>>> simulations or not, before actually integrating it with Kafka Streams.
>>>
>>> Basically, I'd be +1 on this KIP today, but I'd feel confident about it
>> if
>>> we had a little more detail regarding how we are going to verify that the
>>> new optimizer is actually going to produce more optimal plans than the
>>> existing assigner we have today.
>>>
>>> Thanks again!
>>> -John
>>>
>>> On 2023/05/22 16:49:22 Hao Li wrote:
>>>> Hi Colt,
>>>>
>>>> Thanks for the comments.
>>>>
>>>>> and I struggle to see how the algorithm isn't at least O(N) where N
>> is
>>>> the number of Tasks...?
>>>>
>>>> For O(E^2 * (CU)) complexity, C and U can be viewed as constant. Number
>>> of
>>>> edges E is T * N where T is the number of clients and N is the number
>> of
>>>> Tasks. This is because a task can be assigned to any client so there
>> will
>>>> be an edge between every task and every client. The total complexity
>>> would
>>>> be O(T * N) if we want to be more specific.
>>>>
>>>>> But if the leaders for each partition are spread across multiple
>> zones,
>>>> how will you handle that?
>>>>
>>>> This is what the min-cost flow solution is trying to solve? i.e. Find
>> an
>>>> assignment of tasks to clients where across AZ traffic can be
>> minimized.
>>>> But there are some constraints to the solution and one of them is we
>> need
>>>> to balance task assignment first (
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment
>>> ).
>>>> So in your example of three tasks' partitions being in the same AZ of a
>>>> client, if there are other clients, we still want to balance the tasks
>> to
>>>> other clients even if putting all tasks to a single client can result
>> in
>>> 0
>>>> cross AZ traffic. In
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
>>>> section, the algorithm will try to find a min-cost solution based on
>>>> balanced assignment instead of pure min-cost.
>>>>
>>>> Thanks,
>>>> Hao
>>>>
>>>> On Tue, May 9, 2023 at 5:55 PM Colt McNealy <co...@littlehorse.io>
>> wrote:
>>>>
>>>>> Hello Hao,
>>>>>
>>>>> First of all, THANK YOU for putting this together. I had been hoping
>>>>> someone might bring something like this forward. A few comments:
>>>>>
>>>>> **1: Runtime Complexity
>>>>>> Klein’s cycle canceling algorithm can solve the min-cost flow
>>> problem in
>>>>> O(E^2CU) time where C is max cost and U is max capacity. In our
>>> particular
>>>>> case, C is 1 and U is at most 3 (A task can have at most 3 topics
>>> including
>>>>> changelog topic?). So the algorithm runs in O(E^2) time for our case.
>>>>>
>>>>> A Task can have multiple input topics, and also multiple state
>> stores,
>>> and
>>>>> multiple output topics. The most common case is three topics as you
>>>>> described, but this is not necessarily guaranteed. Also, math is one
>>> of my
>>>>> weak points, but to me O(E^2) is equivalent to O(1), and I struggle
>> to
>>> see
>>>>> how the algorithm isn't at least O(N) where N is the number of
>>> Tasks...?
>>>>>
>>>>> **2: Broker-Side Partition Assignments
>>>>> Consider the case with just three topics in a Task (one input, one
>>> output,
>>>>> one changelog). If all three partition leaders are in the same Rack
>> (or
>>>>> better yet, the same broker), then we could get massive savings by
>>>>> assigning the Task to that Rack/availability zone. But if the leaders
>>> for
>>>>> each partition are spread across multiple zones, how will you handle
>>> that?
>>>>> Is that outside the scope of this KIP, or is it worth introducing a
>>>>> kafka-streams-generate-rebalance-proposal.sh tool?
>>>>>
>>>>> Colt McNealy
>>>>> *Founder, LittleHorse.io*
>>>>>
>>>>>
>>>>> On Tue, May 9, 2023 at 4:03 PM Hao Li <hl...@confluent.io.invalid>
>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I have submitted KIP-925 to add rack awareness logic in task
>>> assignment
>>>>> in
>>>>>> Kafka Streams and would like to start a discussion:
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Hao
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Hao
>>>>
>>>
>>
>>
>> --
>> Thanks,
>> Hao
>>
> 

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by Sophie Blee-Goldman <ab...@gmail.com>.
Hey Hao, thanks for the KIP!

1. There's a typo in the "internal.rack.aware.assignment.strategry" config,
this
should be internal.rack.aware.assignment.strategy.

2.

>  For O(E^2 * (CU)) complexity, C and U can be viewed as constant. Number of
> edges E is T * N where T is the number of clients and N is the number of
> Tasks. This is because a task can be assigned to any client so there will
> be an edge between every task and every client. The total complexity would
> be O(T * N) if we want to be more specific.

I feel like I'm missing something here, but if E = T * N and the complexity
is ~O(E^2), doesn't
this make the total complexity order of O(T^2 * N^2)?

3.

> Since 3.C.I and 3.C.II have different tradeoffs and work better in
> different workloads etc, we

could add an internal configuration to choose one of them at runtime.
>
Why only an internal configuration? Same goes for
internal.rack.aware.assignment.standby.strategry (which also has the typo)

4.

>  There are no changes in public interfaces.

I think it would be good to explicitly call out that users can utilize this
new feature by setting the
ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example

5.

> The idea is that if we always try to make it overlap as much with
> HAAssignor’s target

assignment, at least there’s a higher chance that tasks won’t be shuffled a
> lot if the clients

remain the same across rebalances.
>
This line definitely gave me some pause -- if there was one major takeaway
I had after KIP-441,
one thing that most limited the feature's success, it was our assumption
that clients are relatively
stable across rebalances. This was mostly true at limited scale or for
on-prem setups, but
unsurprisingly broke down in cloud environments or larger clusters. Not
only do clients naturally
fall in and out of the group, autoscaling is becoming more and more of a
thing.

Lastly, and this is more easily solved but still worth calling out, an
assignment is only deterministic
as long as the client.id is persisted. Currently in Streams, we only write
the process UUID to the
state directory if there is one, ie if at least one persistent stateful
task exists in the topology. This
made sense in the context of KIP-441, which targeted heavily stateful
deployments, but this KIP
presumably intends to target more than just the persistent & stateful
subset of applications. To
make matters even worse,  "persistent" is defined in a semantically
inconsistent way throughout
Streams.

All this is to say, it may sound more complicated to remember the previous
assignment, but (a)
imo it only introduces a lot more complexity and shaky assumptions to
continue down this
path, and (b) we actually already do persist some amount of state, like the
process UUID, and
(c) it seems like this is the perfect opportunity to finally rid ourselves
of the determinism constraint
which has frankly caused more trouble and time lost in sum than it would
have taken us to just
write the HighAvailabilityTaskAssignor to consider the previous assignment
from the start in KIP-441

6.

> StickyTaskAssignor  users who would like to use rack aware assignment
> should upgrade their

Kafka Streams version to the version in which HighAvailabilityTaskAssignor
> and rack awareness

assignment are available.

Building off of the above, the HAAssignor hasn't worked out perfectly for
everybody up until now,
given that we are only adding complexity to it now, on the flipside I would
hesitate to try and force
everyone to use it if they want to upgrade. We added a "secret" backdoor
internal config to allow
users to set the task assignor back in KIP-441 for this reason. WDYT about
bumping this to a public
config on the side in this KIP?


On Tue, May 23, 2023 at 11:46 AM Hao Li <hl...@confluent.io.invalid> wrote:

> Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add it to
> the test plan. I will also add tests to verify the new optimizer will
> produce a balanced assignment which has no worse cross AZ cost than the
> existing assignor.
>
> Hao
>
> On Mon, May 22, 2023 at 3:39 PM John Roesler <vv...@apache.org> wrote:
>
> > Hi Hao,
> >
> > Thanks for the KIP!
> >
> > Overall, I think this is a great idea. I always wanted to circle back
> > after the Smooth Scaling KIP to put a proper optimization algorithm into
> > place. I think this has the promise to really improve the quality of the
> > balanced assignments we produce.
> >
> > Thanks for providing the details about the MaxCut/MinFlow algorithm. It
> > seems like a good choice for me, assuming we choose the right scaling
> > factors for the weights we add to the graph. Unfortunately, I don't think
> > that there's a good way to see how easy or hard this is going to be until
> > we actually implement it and test it.
> >
> > That leads to the only real piece of feedback I had on the KIP, which is
> > the testing portion. You mentioned system/integration/unit tests, but
> > there's not too much information about what those tests will do. I'd like
> > to suggest that we invest in more simulation testing specifically,
> similar
> > to what we did in
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
> > .
> >
> > In fact, it seems like we _could_ write the simulation up front, and then
> > implement the algorithm in a dummy way and just see whether it passes the
> > simulations or not, before actually integrating it with Kafka Streams.
> >
> > Basically, I'd be +1 on this KIP today, but I'd feel confident about it
> if
> > we had a little more detail regarding how we are going to verify that the
> > new optimizer is actually going to produce more optimal plans than the
> > existing assigner we have today.
> >
> > Thanks again!
> > -John
> >
> > On 2023/05/22 16:49:22 Hao Li wrote:
> > > Hi Colt,
> > >
> > > Thanks for the comments.
> > >
> > > > and I struggle to see how the algorithm isn't at least O(N) where N
> is
> > > the number of Tasks...?
> > >
> > > For O(E^2 * (CU)) complexity, C and U can be viewed as constant. Number
> > of
> > > edges E is T * N where T is the number of clients and N is the number
> of
> > > Tasks. This is because a task can be assigned to any client so there
> will
> > > be an edge between every task and every client. The total complexity
> > would
> > > be O(T * N) if we want to be more specific.
> > >
> > > > But if the leaders for each partition are spread across multiple
> zones,
> > > how will you handle that?
> > >
> > > This is what the min-cost flow solution is trying to solve? i.e. Find
> an
> > > assignment of tasks to clients where across AZ traffic can be
> minimized.
> > > But there are some constraints to the solution and one of them is we
> need
> > > to balance task assignment first (
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment
> > ).
> > > So in your example of three tasks' partitions being in the same AZ of a
> > > client, if there are other clients, we still want to balance the tasks
> to
> > > other clients even if putting all tasks to a single client can result
> in
> > 0
> > > cross AZ traffic. In
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
> > > section, the algorithm will try to find a min-cost solution based on
> > > balanced assignment instead of pure min-cost.
> > >
> > > Thanks,
> > > Hao
> > >
> > > On Tue, May 9, 2023 at 5:55 PM Colt McNealy <co...@littlehorse.io>
> wrote:
> > >
> > > > Hello Hao,
> > > >
> > > > First of all, THANK YOU for putting this together. I had been hoping
> > > > someone might bring something like this forward. A few comments:
> > > >
> > > > **1: Runtime Complexity
> > > > > Klein’s cycle canceling algorithm can solve the min-cost flow
> > problem in
> > > > O(E^2CU) time where C is max cost and U is max capacity. In our
> > particular
> > > > case, C is 1 and U is at most 3 (A task can have at most 3 topics
> > including
> > > > changelog topic?). So the algorithm runs in O(E^2) time for our case.
> > > >
> > > > A Task can have multiple input topics, and also multiple state
> stores,
> > and
> > > > multiple output topics. The most common case is three topics as you
> > > > described, but this is not necessarily guaranteed. Also, math is one
> > of my
> > > > weak points, but to me O(E^2) is equivalent to O(1), and I struggle
> to
> > see
> > > > how the algorithm isn't at least O(N) where N is the number of
> > Tasks...?
> > > >
> > > > **2: Broker-Side Partition Assignments
> > > > Consider the case with just three topics in a Task (one input, one
> > output,
> > > > one changelog). If all three partition leaders are in the same Rack
> (or
> > > > better yet, the same broker), then we could get massive savings by
> > > > assigning the Task to that Rack/availability zone. But if the leaders
> > for
> > > > each partition are spread across multiple zones, how will you handle
> > that?
> > > > Is that outside the scope of this KIP, or is it worth introducing a
> > > > kafka-streams-generate-rebalance-proposal.sh tool?
> > > >
> > > > Colt McNealy
> > > > *Founder, LittleHorse.io*
> > > >
> > > >
> > > > On Tue, May 9, 2023 at 4:03 PM Hao Li <hl...@confluent.io.invalid>
> > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I have submitted KIP-925 to add rack awareness logic in task
> > assignment
> > > > in
> > > > > Kafka Streams and would like to start a discussion:
> > > > >
> > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
> > > > >
> > > > > --
> > > > > Thanks,
> > > > > Hao
> > > > >
> > > >
> > >
> > >
> > > --
> > > Thanks,
> > > Hao
> > >
> >
>
>
> --
> Thanks,
> Hao
>

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by Hao Li <hl...@confluent.io.INVALID>.
Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add it to
the test plan. I will also add tests to verify the new optimizer will
produce a balanced assignment which has no worse cross AZ cost than the
existing assignor.

Hao

On Mon, May 22, 2023 at 3:39 PM John Roesler <vv...@apache.org> wrote:

> Hi Hao,
>
> Thanks for the KIP!
>
> Overall, I think this is a great idea. I always wanted to circle back
> after the Smooth Scaling KIP to put a proper optimization algorithm into
> place. I think this has the promise to really improve the quality of the
> balanced assignments we produce.
>
> Thanks for providing the details about the MaxCut/MinFlow algorithm. It
> seems like a good choice for me, assuming we choose the right scaling
> factors for the weights we add to the graph. Unfortunately, I don't think
> that there's a good way to see how easy or hard this is going to be until
> we actually implement it and test it.
>
> That leads to the only real piece of feedback I had on the KIP, which is
> the testing portion. You mentioned system/integration/unit tests, but
> there's not too much information about what those tests will do. I'd like
> to suggest that we invest in more simulation testing specifically, similar
> to what we did in
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
> .
>
> In fact, it seems like we _could_ write the simulation up front, and then
> implement the algorithm in a dummy way and just see whether it passes the
> simulations or not, before actually integrating it with Kafka Streams.
>
> Basically, I'd be +1 on this KIP today, but I'd feel confident about it if
> we had a little more detail regarding how we are going to verify that the
> new optimizer is actually going to produce more optimal plans than the
> existing assigner we have today.
>
> Thanks again!
> -John
>
> On 2023/05/22 16:49:22 Hao Li wrote:
> > Hi Colt,
> >
> > Thanks for the comments.
> >
> > > and I struggle to see how the algorithm isn't at least O(N) where N is
> > the number of Tasks...?
> >
> > For O(E^2 * (CU)) complexity, C and U can be viewed as constant. Number
> of
> > edges E is T * N where T is the number of clients and N is the number of
> > Tasks. This is because a task can be assigned to any client so there will
> > be an edge between every task and every client. The total complexity
> would
> > be O(T * N) if we want to be more specific.
> >
> > > But if the leaders for each partition are spread across multiple zones,
> > how will you handle that?
> >
> > This is what the min-cost flow solution is trying to solve? i.e. Find an
> > assignment of tasks to clients where across AZ traffic can be minimized.
> > But there are some constraints to the solution and one of them is we need
> > to balance task assignment first (
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment
> ).
> > So in your example of three tasks' partitions being in the same AZ of a
> > client, if there are other clients, we still want to balance the tasks to
> > other clients even if putting all tasks to a single client can result in
> 0
> > cross AZ traffic. In
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
> > section, the algorithm will try to find a min-cost solution based on
> > balanced assignment instead of pure min-cost.
> >
> > Thanks,
> > Hao
> >
> > On Tue, May 9, 2023 at 5:55 PM Colt McNealy <co...@littlehorse.io> wrote:
> >
> > > Hello Hao,
> > >
> > > First of all, THANK YOU for putting this together. I had been hoping
> > > someone might bring something like this forward. A few comments:
> > >
> > > **1: Runtime Complexity
> > > > Klein’s cycle canceling algorithm can solve the min-cost flow
> problem in
> > > O(E^2CU) time where C is max cost and U is max capacity. In our
> particular
> > > case, C is 1 and U is at most 3 (A task can have at most 3 topics
> including
> > > changelog topic?). So the algorithm runs in O(E^2) time for our case.
> > >
> > > A Task can have multiple input topics, and also multiple state stores,
> and
> > > multiple output topics. The most common case is three topics as you
> > > described, but this is not necessarily guaranteed. Also, math is one
> of my
> > > weak points, but to me O(E^2) is equivalent to O(1), and I struggle to
> see
> > > how the algorithm isn't at least O(N) where N is the number of
> Tasks...?
> > >
> > > **2: Broker-Side Partition Assignments
> > > Consider the case with just three topics in a Task (one input, one
> output,
> > > one changelog). If all three partition leaders are in the same Rack (or
> > > better yet, the same broker), then we could get massive savings by
> > > assigning the Task to that Rack/availability zone. But if the leaders
> for
> > > each partition are spread across multiple zones, how will you handle
> that?
> > > Is that outside the scope of this KIP, or is it worth introducing a
> > > kafka-streams-generate-rebalance-proposal.sh tool?
> > >
> > > Colt McNealy
> > > *Founder, LittleHorse.io*
> > >
> > >
> > > On Tue, May 9, 2023 at 4:03 PM Hao Li <hl...@confluent.io.invalid>
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I have submitted KIP-925 to add rack awareness logic in task
> assignment
> > > in
> > > > Kafka Streams and would like to start a discussion:
> > > >
> > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
> > > >
> > > > --
> > > > Thanks,
> > > > Hao
> > > >
> > >
> >
> >
> > --
> > Thanks,
> > Hao
> >
>


-- 
Thanks,
Hao

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by John Roesler <vv...@apache.org>.
Hi Hao,

Thanks for the KIP!

Overall, I think this is a great idea. I always wanted to circle back after the Smooth Scaling KIP to put a proper optimization algorithm into place. I think this has the promise to really improve the quality of the balanced assignments we produce.

Thanks for providing the details about the MaxCut/MinFlow algorithm. It seems like a good choice for me, assuming we choose the right scaling factors for the weights we add to the graph. Unfortunately, I don't think that there's a good way to see how easy or hard this is going to be until we actually implement it and test it.

That leads to the only real piece of feedback I had on the KIP, which is the testing portion. You mentioned system/integration/unit tests, but there's not too much information about what those tests will do. I'd like to suggest that we invest in more simulation testing specifically, similar to what we did in https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java .

In fact, it seems like we _could_ write the simulation up front, and then implement the algorithm in a dummy way and just see whether it passes the simulations or not, before actually integrating it with Kafka Streams.

Basically, I'd be +1 on this KIP today, but I'd feel confident about it if we had a little more detail regarding how we are going to verify that the new optimizer is actually going to produce more optimal plans than the existing assigner we have today.

Thanks again!
-John

On 2023/05/22 16:49:22 Hao Li wrote:
> Hi Colt,
> 
> Thanks for the comments.
> 
> > and I struggle to see how the algorithm isn't at least O(N) where N is
> the number of Tasks...?
> 
> For O(E^2 * (CU)) complexity, C and U can be viewed as constant. Number of
> edges E is T * N where T is the number of clients and N is the number of
> Tasks. This is because a task can be assigned to any client so there will
> be an edge between every task and every client. The total complexity would
> be O(T * N) if we want to be more specific.
> 
> > But if the leaders for each partition are spread across multiple zones,
> how will you handle that?
> 
> This is what the min-cost flow solution is trying to solve? i.e. Find an
> assignment of tasks to clients where across AZ traffic can be minimized.
> But there are some constraints to the solution and one of them is we need
> to balance task assignment first (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment).
> So in your example of three tasks' partitions being in the same AZ of a
> client, if there are other clients, we still want to balance the tasks to
> other clients even if putting all tasks to a single client can result in 0
> cross AZ traffic. In
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
> section, the algorithm will try to find a min-cost solution based on
> balanced assignment instead of pure min-cost.
> 
> Thanks,
> Hao
> 
> On Tue, May 9, 2023 at 5:55 PM Colt McNealy <co...@littlehorse.io> wrote:
> 
> > Hello Hao,
> >
> > First of all, THANK YOU for putting this together. I had been hoping
> > someone might bring something like this forward. A few comments:
> >
> > **1: Runtime Complexity
> > > Klein’s cycle canceling algorithm can solve the min-cost flow problem in
> > O(E^2CU) time where C is max cost and U is max capacity. In our particular
> > case, C is 1 and U is at most 3 (A task can have at most 3 topics including
> > changelog topic?). So the algorithm runs in O(E^2) time for our case.
> >
> > A Task can have multiple input topics, and also multiple state stores, and
> > multiple output topics. The most common case is three topics as you
> > described, but this is not necessarily guaranteed. Also, math is one of my
> > weak points, but to me O(E^2) is equivalent to O(1), and I struggle to see
> > how the algorithm isn't at least O(N) where N is the number of Tasks...?
> >
> > **2: Broker-Side Partition Assignments
> > Consider the case with just three topics in a Task (one input, one output,
> > one changelog). If all three partition leaders are in the same Rack (or
> > better yet, the same broker), then we could get massive savings by
> > assigning the Task to that Rack/availability zone. But if the leaders for
> > each partition are spread across multiple zones, how will you handle that?
> > Is that outside the scope of this KIP, or is it worth introducing a
> > kafka-streams-generate-rebalance-proposal.sh tool?
> >
> > Colt McNealy
> > *Founder, LittleHorse.io*
> >
> >
> > On Tue, May 9, 2023 at 4:03 PM Hao Li <hl...@confluent.io.invalid> wrote:
> >
> > > Hi all,
> > >
> > > I have submitted KIP-925 to add rack awareness logic in task assignment
> > in
> > > Kafka Streams and would like to start a discussion:
> > >
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
> > >
> > > --
> > > Thanks,
> > > Hao
> > >
> >
> 
> 
> -- 
> Thanks,
> Hao
> 

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by Hao Li <hl...@confluent.io.INVALID>.
Hi Colt,

Thanks for the comments.

> and I struggle to see how the algorithm isn't at least O(N) where N is
the number of Tasks...?

For O(E^2 * (CU)) complexity, C and U can be viewed as constant. Number of
edges E is T * N where T is the number of clients and N is the number of
Tasks. This is because a task can be assigned to any client so there will
be an edge between every task and every client. The total complexity would
be O(T * N) if we want to be more specific.

> But if the leaders for each partition are spread across multiple zones,
how will you handle that?

This is what the min-cost flow solution is trying to solve? i.e. Find an
assignment of tasks to clients where across AZ traffic can be minimized.
But there are some constraints to the solution and one of them is we need
to balance task assignment first (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment).
So in your example of three tasks' partitions being in the same AZ of a
client, if there are other clients, we still want to balance the tasks to
other clients even if putting all tasks to a single client can result in 0
cross AZ traffic. In
https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
section, the algorithm will try to find a min-cost solution based on
balanced assignment instead of pure min-cost.

Thanks,
Hao

On Tue, May 9, 2023 at 5:55 PM Colt McNealy <co...@littlehorse.io> wrote:

> Hello Hao,
>
> First of all, THANK YOU for putting this together. I had been hoping
> someone might bring something like this forward. A few comments:
>
> **1: Runtime Complexity
> > Klein’s cycle canceling algorithm can solve the min-cost flow problem in
> O(E^2CU) time where C is max cost and U is max capacity. In our particular
> case, C is 1 and U is at most 3 (A task can have at most 3 topics including
> changelog topic?). So the algorithm runs in O(E^2) time for our case.
>
> A Task can have multiple input topics, and also multiple state stores, and
> multiple output topics. The most common case is three topics as you
> described, but this is not necessarily guaranteed. Also, math is one of my
> weak points, but to me O(E^2) is equivalent to O(1), and I struggle to see
> how the algorithm isn't at least O(N) where N is the number of Tasks...?
>
> **2: Broker-Side Partition Assignments
> Consider the case with just three topics in a Task (one input, one output,
> one changelog). If all three partition leaders are in the same Rack (or
> better yet, the same broker), then we could get massive savings by
> assigning the Task to that Rack/availability zone. But if the leaders for
> each partition are spread across multiple zones, how will you handle that?
> Is that outside the scope of this KIP, or is it worth introducing a
> kafka-streams-generate-rebalance-proposal.sh tool?
>
> Colt McNealy
> *Founder, LittleHorse.io*
>
>
> On Tue, May 9, 2023 at 4:03 PM Hao Li <hl...@confluent.io.invalid> wrote:
>
> > Hi all,
> >
> > I have submitted KIP-925 to add rack awareness logic in task assignment
> in
> > Kafka Streams and would like to start a discussion:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
> >
> > --
> > Thanks,
> > Hao
> >
>


-- 
Thanks,
Hao

Re: [DISCUSS] KIP-925: rack aware task assignment in Kafka Streams

Posted by Colt McNealy <co...@littlehorse.io>.
Hello Hao,

First of all, THANK YOU for putting this together. I had been hoping
someone might bring something like this forward. A few comments:

**1: Runtime Complexity
> Klein’s cycle canceling algorithm can solve the min-cost flow problem in
O(E^2CU) time where C is max cost and U is max capacity. In our particular
case, C is 1 and U is at most 3 (A task can have at most 3 topics including
changelog topic?). So the algorithm runs in O(E^2) time for our case.

A Task can have multiple input topics, and also multiple state stores, and
multiple output topics. The most common case is three topics as you
described, but this is not necessarily guaranteed. Also, math is one of my
weak points, but to me O(E^2) is equivalent to O(1), and I struggle to see
how the algorithm isn't at least O(N) where N is the number of Tasks...?

**2: Broker-Side Partition Assignments
Consider the case with just three topics in a Task (one input, one output,
one changelog). If all three partition leaders are in the same Rack (or
better yet, the same broker), then we could get massive savings by
assigning the Task to that Rack/availability zone. But if the leaders for
each partition are spread across multiple zones, how will you handle that?
Is that outside the scope of this KIP, or is it worth introducing a
kafka-streams-generate-rebalance-proposal.sh tool?

Colt McNealy
*Founder, LittleHorse.io*


On Tue, May 9, 2023 at 4:03 PM Hao Li <hl...@confluent.io.invalid> wrote:

> Hi all,
>
> I have submitted KIP-925 to add rack awareness logic in task assignment in
> Kafka Streams and would like to start a discussion:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
>
> --
> Thanks,
> Hao
>