You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Viktor Somogyi-Vass <vi...@gmail.com> on 2019/08/09 15:25:37 UTC

Re: [DISCUSS] KIP-435: Internal Partition Reassignment Batching

Hey Stanislav,

I reiterated on the current algorithm and indeed it would change the order
of replicas in ZK but wouldn't do a leader election, so one would need to
run the preferred replica election tool. However still in the light of this
I'm not sure I'd keep the existing behavior as users wouldn't win anything
with it. Changing leadership automatically would result in a simpler,
easier and more responsive reassign algorithm which is especially important
if the reassignment is done to free up the broker from under heavy load.
Automated tools (Cruise Control) would also have simpler life.
Let me know what you think.

Viktor

On Thu, Jul 11, 2019 at 7:16 PM Viktor Somogyi-Vass <vi...@gmail.com>
wrote:

> Hi Stan,
>
> I meant the following (maybe rare) scenario - we have replicas [1, 2, 3] on
> a lot of partitions and the user runs a massive rebalance to change them
> all to [3, 2, 1]. In the old behavior, I think that this would not do
> anything but simply change the replica set in ZK.
> Then, the user could run kafka-preferred-replica-election.sh on a given set
> of partitions to make sure the new leader 3 gets elected.
>
> I thought the old algorithm would elect 3 as the leader in this case right
> away at the end but I have to double check this. In any case I think it
> would make sense in the new algorithm if we elected the new preferred
> leader right away, regardless of the new leader is chosen from the existing
> replicas or not. If the whole reassignment is in fact just changing the
> replica order then either way it is a simple (trivial) operation and doing
> it batched wouldn't slow it down much as there is no data movement
> involved. If the reassignment is mixed, meaning it contains reordering and
> real movement as well then in fact it would help to even out the load
> faster as data movements can get long. For instance in case of a
> reassignment batch of two partitions concurrently: P1: (1,2,3) -> (3,2,1)
> and P2: (4,5,6) -> (7,8,9) the P2 reassignment would elect a new leader but
> P1 wouldn't and it wouldn't help the goal of normalizing traffic on broker
> 1 that much.
> Again, I'll have to check how the current algorithm works and if it has
> any unknown drawbacks to implement what I sketched up above.
>
> As for generic preferred leader election when called from the admin api or
> the auto leader balance feature I think you're right that we should leave
> it as it is. It doesn't involve any data movement so it's fairly fast and
> it normalizes the cluster state quickly.
>
> Viktor
>
> On Tue, Jul 9, 2019 at 9:04 PM Stanislav Kozlovski <st...@confluent.io>
> wrote:
>
>> Hey Viktor,
>>
>>  I think it is intuitive if there are on a global level...If we applied
>> > them on every batch then we
>> > couldn't really guarantee any limits as the user would be able to get
>> > around it with submitting lots of reassignments.
>>
>>
>> Agreed. Could we mention this explicitly in the KIP?
>>
>> Also if I understand correctly, AlterPartitionAssignmentsRequest would be
>> a
>> > partition level batching, isn't it? So if you submit 10 partitions at
>> once
>> > then they'll all be started by the controller immediately as per my
>> > understanding.
>>
>>
>> Yep, absolutely
>>
>> I've raised the ordering problem on the discussion of KIP-455 in a bit
>> > different form and as I remember the verdict there was that we shouldn't
>> > expose ordering as an API. It might not be easy as you say and there
>> might
>> > be much better strategies to follow (like disk or network utilization
>> > goals). Therefore I'll remove this section from the KIP.
>>
>>
>> Sounds good to me.
>>
>> I'm not sure I get this scenario. So are you saying that after they
>> > submitted a reassignment they also submit a preferred leader change?
>> > In my mind what I would do is:
>> > i) make auto.leader.rebalance.enable to obey the leader movement limit
>> as
>> > this way it will be easier to calculate the reassignment batches.
>> >
>>
>> Sorry, this is my fault -- I should have been more clear.
>> First, I didn't think through this well enough at the time, I don't think.
>> If we have replicas=[1, 2, 3] and we reassign them to [4, 5, 6], it is
>> obvious that a leader shift will happen. Your KIP proposes we shift
>> replicas 1 and 4 first.
>>
>> I meant the following (maybe rare) scenario - we have replicas [1, 2, 3]
>> on
>> a lot of partitions and the user runs a massive rebalance to change them
>> all to [3, 2, 1]. In the old behavior, I think that this would not do
>> anything but simply change the replica set in ZK.
>> Then, the user could run kafka-preferred-replica-election.sh on a given
>> set
>> of partitions to make sure the new leader 3 gets elected.
>>
>> ii) the user could submit preferred leader election but it's basically a
>> > form of reassignment so it would fall under the batching criterias. If
>> the
>> > batch they submit is smaller than the internal, then it'd be executed
>> > immediately but otherwise it would be split into more batches. It might
>> be
>> > a different behavior as it may not be executed it in one batch but I
>> think
>> > it isn't a problem because we'll default to Int.MAX with the batches and
>> > otherwise because since it's a form of reassignment I think it makes
>> sense
>> > to apply the same logic.
>>
>>
>> The AdminAPI for that is "electPreferredLeaders(Collection<TopicPartition>
>> partitions)" and the old zNode is "{"partitions": [{"topic": "a",
>> "partition": 0}]}" so it is a bit less explicit than our other
>> reassignment
>> API, but the functionality is the same.
>> You're 100% right that it is a form of reassignment, but I hadn't thought
>> of it like that and I some other people wouldn't have either.
>> If I understand correctly, you're suggesting that we count the
>> "reassignment.parallel.leader.movements" config against such preferred
>> elections. I think that makes sense. If we are to do that we should
>> explicitly mention that this KIP touches the preferred election logic as
>> well. Would that config also bound the number of leader movements the auto
>> rebalance inside the broker would do as well?
>>
>> Then again, the "reassignment.parallel.leader.movements" config has a bit
>> of a different meaning when used in the context of a real reassignment
>> (where data moves from the leader to N more replicas) versus in the
>> preferred election switch (where all we need is two lightweight
>> LeaderAndIsr requests), so I am not entirely convinced we may want to use
>> the same config for that.
>> It might be easier to limit the scope of this KIP and not place a bound on
>> preferred leader election.
>>
>> Thanks,
>> Stanislav
>>
>>
>> On Mon, Jul 8, 2019 at 4:28 PM Viktor Somogyi-Vass <
>> viktorsomogyi@gmail.com>
>> wrote:
>>
>> > Hey Stanislav,
>> >
>> > Thanks for the thorough look at the KIP! :)
>> >
>> > > Let me first explicitly confirm my understanding of the configs and
>> the
>> > > algorithm:
>> > > * reassignment.parallel.replica.count - the maximum total number of
>> > > replicas that we can move at once, *per partition*
>> > > * reassignment.parallel.partition.count - the maximum number of
>> > partitions
>> > > we can move at once
>> > > * reassignment.parallel.leader.movements - the maximum number of
>> leader
>> > > movements we can have at once
>> >
>> > Yes.
>> >
>> > > As far as I currently understand it, your proposed algorithm will
>> > naturally
>> > > prioritize leader movement first. e.g if
>> > > reassignment.parallel.replica.count=1 and
>> > >
>> >
>> >
>> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
>> > > we will always move one, the first possible, replica in the replica
>> set
>> > > (which will be the leader if part of the excess replica set (ER)).
>> > > Am I correct in saying that?
>> >
>> > Yes.
>> >
>> > > 1. Does it make sense to add `max` somewhere in the configs' names?
>> >
>> > If you imply that it's not always an exact number (because the last
>> batch
>> > could be smaller) than I think it's a good idea. I don't mind this
>> naming
>> > or having max in it either.
>> >
>> > > 2. How does this KIP play along with KIP-455's notion of multiple
>> > > rebalances - do the configs apply on a
>> > > single AlterPartitionAssignmentsRequest or are they global?
>> >
>> > I think it is intuitive if there are on a global level and the user
>> would
>> > have the option to set KIP-435's settings to Int.MAX and thus eliminate
>> any
>> > batching and submit their own batches through
>> > AlterPartitionAssignmentsRequest. If we applied them on every batch
>> then we
>> > couldn't really guarantee any limits as the user would be able to get
>> > around it with submitting lots of reassignments. By default though we
>> set
>> > the batching limits to Int.MAX so effectively they're disabled.
>> > Also if I understand correctly, AlterPartitionAssignmentsRequest would
>> be a
>> > partition level batching, isn't it? So if you submit 10 partitions at
>> once
>> > then they'll all be started by the controller immediately as per my
>> > understanding.
>> >
>> > > 3. Unless I've missed it, the algorithm does not take into account
>> > > `reassignment.parallel.leader.movements`
>> >
>> > Yea the reassignment step calculation doesn't contain that because it
>> > describes only one partition's reassignment step calculation. We simply
>> > fill our reassignment batch with as many leader movements as we can and
>> > then fill the rest with reassignments which don't require leader
>> movement
>> > if we have space. I'll create a pseudo code block on the KIP for this.
>> >
>> > > 4. The KIP says that the order of the input has some control over how
>> the
>> > > batches are created - i.e it's deterministic. What would the batches
>> of
>> > the
>> > > following reassignment look like:
>> > > reassignment.parallel.replica.count=1
>> > > reassignment.parallel.partition.count=MAX_INT
>> > > reassignment.parallel.leader.movements=1
>> > > partitionA - (0,1,2) -> (3, 4, 5)
>> > > partitionB - (0,1,2) -> (3,4,5)
>> > > partitionC - (0,1,2) -> (3, 4, 5)
>> > > From my understanding, we would start with A(0->3), B(1->4) and
>> C(1->4).
>> > Is
>> > > that correct? Would the second step then continue with B(0->3)?
>> > >
>> > > If the configurations are global, I can imagine we will have a bit
>> more
>> > > trouble in preserving the expected ordering, especially across
>> controller
>> > > failovers -- but I'll avoid speculating until you confirm the scope of
>> > the
>> > > config.
>> >
>> > I've raised the ordering problem on the discussion of KIP-455 in a bit
>> > different form and as I remember the verdict there was that we shouldn't
>> > expose ordering as an API. It might not be easy as you say and there
>> might
>> > be much better strategies to follow (like disk or network utilization
>> > goals). Therefore I'll remove this section from the KIP. (Otherwise
>> yes, I
>> > would have thought of the same behavior what you described.)
>> > Since #3 also was about the pseudo code, it would be something like
>> this:
>> > Config:
>> > reassignment.parallel.replica.count=R
>> > reassignment.parallel.partition.count=P
>> > reassignment.parallel.leader.movements=L
>> > Code:
>> > val batchSize = max(L,P)
>> > // split the individual partition reassignments whether they involve
>> leader
>> > movement or not
>> > val partitionMovements =
>> >
>> >
>> calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment)
>> > // fill the batch with as much leader movements as possible and take the
>> > rest from other reassignments
>> > val currentBatch = if (partitionMovements.leaderMovements.size <
>> batchSize)
>> >   partitionMovements.leaderMovements ++
>> > partitionsToReassign.otherPartitionMovements.take(batchSize -
>> > partitionMovements.leaderMovements.size)
>> > else
>> >  partitionMovements.leaderMovements.take(batchSize)
>> > executeReassignmentOnBatch(currentBatch)
>> >
>> > > 5. Regarding the new behavior of electing the new preferred leader in
>> the
>> > > "first step" of the reassignment - does this obey the
>> > > `auto.leader.rebalance.enable` config?
>> > > If not, I have concerns regarding how backwards compatible this might
>> be
>> > -
>> > > e.g imagine a user does a huge reassignment (as they have always done)
>> > and
>> > > suddenly a huge leader shift happens, whereas the user expected to
>> > manually
>> > > shift preferred leaders at a slower rate via
>> > > the kafka-preferred-replica-election.sh tool.
>> >
>> > I'm not sure I get this scenario. So are you saying that after they
>> > submitted a reassignment they also submit a preferred leader change?
>> > In my mind what I would do is:
>> > i) make auto.leader.rebalance.enable to obey the leader movement limit
>> as
>> > this way it will be easier to calculate the reassignment batches.
>> > ii) the user could submit preferred leader election but it's basically a
>> > form of reassignment so it would fall under the batching criterias. If
>> the
>> > batch they submit is smaller than the internal, then it'd be executed
>> > immediately but otherwise it would be split into more batches. It might
>> be
>> > a different behavior as it may not be executed it in one batch but I
>> think
>> > it isn't a problem because we'll default to Int.MAX with the batches and
>> > otherwise because since it's a form of reassignment I think it makes
>> sense
>> > to apply the same logic.
>> >
>> > > 6. What is the expected behavior if we dynamically change one of the
>> > > configs to a lower value while a reassignment is happening. Would we
>> > cancel
>> > > some of the currently reassigned partitions or would we account for
>> the
>> > new
>> > > values on the next reassignment? I assume the latter but it's good to
>> be
>> > > explicit
>> >
>> > Yep, it would be the latter. I'll make this explicit in the KIP too.
>> >
>> > About the nits:
>> > Noted, will update the KIP to reflect your suggestions :)
>> >
>> > Viktor
>> >
>> > On Fri, Jun 28, 2019 at 5:59 PM Stanislav Kozlovski <
>> > stanislav@confluent.io>
>> > wrote:
>> > >
>> > > Hey there Viktor,
>> > >
>> > > Thanks for working on this KIP! I agree with the notion that
>> reliability,
>> > > stability and predictability of a reassignment should be a core
>> feature
>> > of
>> > > Kafka.
>> > >
>> > > Let me first explicitly confirm my understanding of the configs and
>> the
>> > > algorithm:
>> > > * reassignment.parallel.replica.count - the maximum total number of
>> > > replicas that we can move at once, *per partition*
>> > > * reassignment.parallel.partition.count - the maximum number of
>> > partitions
>> > > we can move at once
>> > > * reassignment.parallel.leader.movements - the maximum number of
>> leader
>> > > movements we can have at once
>> > >
>> > > As far as I currently understand it, your proposed algorithm will
>> > naturally
>> > > prioritize leader movement first. e.g if
>> > > reassignment.parallel.replica.count=1 and
>> > >
>> >
>> >
>> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
>> > > we will always move one, the first possible, replica in the replica
>> set
>> > > (which will be the leader if part of the excess replica set (ER)).
>> > > Am I correct in saying that?
>> > >
>> > > Regarding the KIP, I've got a couple of comments/questions::
>> > >
>> > > 1. Does it make sense to add `max` somewhere in the configs' names?
>> > >
>> > > 2. How does this KIP play along with KIP-455's notion of multiple
>> > > rebalances - do the configs apply on a
>> > > single AlterPartitionAssignmentsRequest or are they global?
>> > >
>> > > 3. Unless I've missed it, the algorithm does not take into account
>> > > `reassignment.parallel.leader.movements`
>> > >
>> > > 4. The KIP says that the order of the input has some control over how
>> the
>> > > batches are created - i.e it's deterministic. What would the batches
>> of
>> > the
>> > > following reassignment look like:
>> > > reassignment.parallel.replica.count=1
>> > > reassignment.parallel.partition.count=MAX_INT
>> > > reassignment.parallel.leader.movements=1
>> > > partitionA - (0,1,2) -> (3, 4, 5)
>> > > partitionB - (0,1,2) -> (3,4,5)
>> > > partitionC - (0,1,2) -> (3, 4, 5)
>> > > From my understanding, we would start with A(0->3), B(1->4) and
>> C(1->4).
>> > Is
>> > > that correct? Would the second step then continue with B(0->3)?
>> > >
>> > > If the configurations are global, I can imagine we will have a bit
>> more
>> > > trouble in preserving the expected ordering, especially across
>> controller
>> > > failovers -- but I'll avoid speculating until you confirm the scope of
>> > the
>> > > config.
>> > >
>> > > 5. Regarding the new behavior of electing the new preferred leader in
>> the
>> > > "first step" of the reassignment - does this obey the
>> > > `auto.leader.rebalance.enable` config?
>> > > If not, I have concerns regarding how backwards compatible this might
>> be
>> > -
>> > > e.g imagine a user does a huge reassignment (as they have always done)
>> > and
>> > > suddenly a huge leader shift happens, whereas the user expected to
>> > manually
>> > > shift preferred leaders at a slower rate via
>> > > the kafka-preferred-replica-election.sh tool.
>> > >
>> > > 6. What is the expected behavior if we dynamically change one of the
>> > > configs to a lower value while a reassignment is happening. Would we
>> > cancel
>> > > some of the currently reassigned partitions or would we account for
>> the
>> > new
>> > > values on the next reassignment? I assume the latter but it's good to
>> be
>> > > explicit
>> > >
>> > > As some small nits:
>> > > - could we have a section in the KIP where we explicitly define what
>> each
>> > > config does? This can be inferred from the KIP as is but requires
>> careful
>> > > reading, whereas some developers might want to skim through the
>> change to
>> > > get a quick sense. It also improves readability but that's my personal
>> > > opinion.
>> > > - could you better clarify how a reassignment step is different from
>> the
>> > > currently existing algorithm? maybe laying both algorithms out in the
>> KIP
>> > > would be most clear
>> > > - the names for the OngoingPartitionReassignment and
>> > > CurrentPartitionReassignment fields in the
>> > > ListPartitionReassignmentsResponse are a bit confusing to me.
>> > > Unfortunately, I don't have a better suggestion, but maybe somebody
>> else
>> > in
>> > > the community has.
>> > >
>> > > Thanks,
>> > > Stanislav
>> > >
>> > > On Thu, Jun 27, 2019 at 3:24 PM Viktor Somogyi-Vass <
>> > viktorsomogyi@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi All,
>> > > >
>> > > > I've renamed my KIP as its name was a bit confusing so we'll
>> continue
>> > it in
>> > > > this thread.
>> > > > The previous thread for record:
>> > > >
>> > > >
>> >
>> >
>> https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E
>> > > >
>> > > > A short summary of the KIP:
>> > > > In case of a vast partition reassignment (thousands of partitions at
>> > once)
>> > > > Kafka can collapse under the increased replication traffic. This KIP
>> > will
>> > > > mitigate it by introducing internal batching done by the controller.
>> > > > Besides putting a bandwidth limit on the replication it is useful to
>> > batch
>> > > > partition movements as fewer number of partitions will use the
>> > available
>> > > > bandwidth for reassignment and they finish faster.
>> > > > The main control handles are:
>> > > > - the number of parallel leader movements,
>> > > > - the number of parallel partition movements
>> > > > - and the number of parallel replica movements.
>> > > >
>> > > > Thank you for the feedback and ideas so far in the previous thread
>> and
>> > I'm
>> > > > happy to receive more.
>> > > >
>> > > > Regards,
>> > > > Viktor
>> > > >
>> > >
>> > >
>> > > --
>> > > Best,
>> > > Stanislav
>> >
>>
>>
>> --
>> Best,
>> Stanislav
>>
>

Re: [DISCUSS] KIP-435: Internal Partition Reassignment Batching

Posted by Viktor Somogyi-Vass <vi...@gmail.com>.
Hey Colin,

Sure, that's understandable, we should keep the project stable first and
foremost. I'll work out the details of the client side reassignment changes
so perhaps we'll have better, clearer picture about this and we'll decide
when we feel the reassignment API can take more changes in :).

Viktor

On Mon, Dec 9, 2019 at 8:16 PM Colin McCabe <cm...@apache.org> wrote:

> Hi Viktor,
>
> Thanks for the thoughtful reply.
>
> I'm actually not a PMC on the Apache Kafka project, although I hope to
> become one in the future :)  In any case, the tasks that the PMC tackles
> tend to be more related to things like project policies, committers, legal
> stuff, brand stuff, and so on.  I don't think this discussion about
> reassignment needs to be a PMC discussion.
>
> I think it helps to stay focused on the pain points that users are
> experiencing.  Prior to KIP-455, we had a lot of them.  For example, there
> was no officially supported way to cancel reassignments or change them
> after they were created.  There was no official API so behavior could
> change between versions without much warning.  The tooling needed to depend
> on ZooKeeper, and many tools pulled in internal classes from Kafka.
>
> We've fixed these pain points now with the new API, but it will take time
> for these changes to propagate to all the external tools.  I think it would
> be good to see how the new API works out before we think about another
> major revision.  There will probably be pain points with the new API, but
> it's not completely clear what they will be yet.
>
> best,
> Colin
>
>
> On Fri, Dec 6, 2019, at 08:55, Viktor Somogyi-Vass wrote:
> > Hi Colin,
> >
> > Thanks for the honest answer. As a bottom line I think a better
> > reassignment logic should be included with Kafka somehow instead of
> relying
> > on external tools. It makes the system more mature on its own and
> > standardizes what others implemented many times.
> > Actually I also agree that decoupling this complexity would come with
> > benefits but I haven't looked into this direction yet. And frankly with
> > KIP-500 there will be a lot of changes in the controller itself.
> >
> > My overall vision with this is that there could be an API of some sort at
> > some point where you'd be able to specify how would you want to do your
> > rebalance, what plugins you'd use, what would be the default, etc.. I
> > really don't mind if this functionality is on the client or on the broker
> > as long as it gives a consistent experience for the users. For a better
> > overall user experience though I think this is a basic building block and
> > it made sense to me to put both functionality onto the server side as
> there
> > is nothing much to be configured on batching except partition, replica
> > batch sizes and maybe leader movements. Also users could very well limit
> > the blast radius of a reassignment by applying throttling too where they
> > don't care about the order of reassignments or any special requirements
> > just about the fact that it's sustainable in the cluster and finishes
> > without causing troubles. With small batches and throttling individual
> > partition reassignments finish earlier because you divide the given
> > bandwidth among fewer participants at a time, so it's more effective. By
> > just tweaking these knobs (batch size and throttling) we could achieve a
> > significantly better and more effective rebalance.
> >
> > Policies I think can be somewhat parallel to this as they may require
> some
> > information from metrics and you're right that the controller could
> gather
> > these easier but it's harder to make plugins, although I don't think it's
> > impossible to give a good framework for this. Also reassignment can be
> > extracted from the controller (Stan has already looked into separating
> the
> > reassignment logic from the KafkaController class in #7339) so I think we
> > could come up with a construct that is decoupled from the controller but
> > still resides on the broker side. Furthermore policies could also be
> > dynamically configured if needed.
> >
> > Any change that we make should be backward compatible and I'd default
> them
> > to the current behavior which means external tools wouldn't have to take
> > extra steps but those who don't use any external tools would benefit
> these
> > changes.
> >
> > I think it would be useful to discuss the overall vision on reassignments
> > as it would help us to decide whether this thing goes on the client or
> the
> > broker side so allow me to pose some questions to you and other PMCs as
> I'd
> > like to get some clarity and the overall stance on this so I can align my
> > work better.
> >
> > What would be the overall goal that we want to achieve? Do we want to
> give
> > a solution for automatic partition load balancing or just a framework
> that
> > standardizes what many others implemented? Or do we want to adopt a
> > solution or recommend a specific one?
> > - For the first it might be better to extend the existing tools but for
> the
> > other it might be better to provide an API.
> > - Giving a tool for this makes ramping up easier for newbies and overall
> > gives a better user experience.
> > - Adopting one could be easy as we get a bunch of functionality which are
> > relatively battle proven but I haven't seen developers from any of the
> > tools yet to volunteer. Or perhaps the Kafka PMC group would need to
> decide
> > on a tool (which is waaaay out of my league :) ).
> >
> > An alternative solution is we don't do anything but just suggest users to
> > use any tools. The question then is which tools should they use? Do we
> > recommend them to decide themselves?
> > - The variety of tools and deciding between them may not be easy and
> there
> > might be a number of factors based what functionality do you need, etc..
> > Also contributing to any of the projects might be more cumbersome than
> > contributing to Kafka itself. Therefore there might be benefits for the
> > users in consolidating the tools in Kafka.
> >
> > Would we want to achieve the state where Kafka automatically heals itself
> > through rebalances?
> > - As far as I know currently for most of the an administrator have to
> > initiate these tasks and approve proposals but it might be desirable if
> > Kafka worked on its own where possible. Less maintenance burden.
> >
> > To sum up: I could also lean towards having this on the client side so
> I'll
> > rework the KIP and look into how can we solve the reassignment batching
> > problem the best on the client side and will come back to you. However I
> > think there are important questions above that we need to clear up to set
> > the directions. Also if you think answering the above questions would
> > exceed the boundaries of email communication I can write up a KIP about
> > automatic partition balancing (of course if you can provide the
> directions
> > I should go) and we can have a separate discussion.
> >
> > Thanks,
> > Viktor
> >
> >
> > On Thu, Dec 5, 2019 at 1:23 AM Colin McCabe <co...@cmccabe.xyz> wrote:
> >
> > > Hi Stanislav & Viktor,
> > >
> > > Kafka has historically delegated partition reassignment to external
> > > tools.  For example, the choice of which partitions to reassign is
> > > determined by an external tool.  The choice of what throttle to use is
> also
> > > set externally.
> > >  So I think that putting this into the controller is not very
> consistent
> > > with how Kafka currently works.  It seems better to continue to let
> > > batching be handled by an external tool, rather than handled by the
> > > controller.
> > >
> > > The big advantage of having reassignment done by an external tool is
> that
> > > users are free to set their own reassignment policies without hacking
> Kafka
> > > itself.  So, for example, you can build a reassignment tool that moves
> > > partitions only when they're smaller than 1 GB without writing a KIP
> for a
> > > new reassign.only.if.smaller.than.1g configuration.
> > >
> > > The same is true of batching, I think.  Do you want to set up your
> batch
> > > so that you're always moving a given number of megabytes per second?
> Or
> > > maybe you want to move 3 partitions at all times?  Do you want to move
> the
> > > biggest partitions first?  Etc. etc.  These things are all policy, and
> it's
> > > a lot easier to iterate on the policy when it's not hard-coded in the
> > > controller.  Even changing these configurations at runtime will be very
> > > difficult if they are controller configs.
> > >
> > > Of course, there are disadvantages to having reassignment done by an
> > > external tool.  The external tool is another thing you have to deploy
> and
> > > run.  The tool also needs to collect statistics about the cluster
> which the
> > > controller already mostly has, so there is some duplication there.
> > >
> > > Having some reassignment policies set by the controller and some set
> by an
> > > external tool gets us the worst of both worlds.  Users would need to
> deploy
> > > and run external reassignment tools, but they would also need to worry
> > > about correctly configuring the controller so that it could execute
> their
> > > preferred policies.
> > >
> > > We just got done making a major change to the reassignment API, and
> that
> > > change is still percolating through the environment.  Most external
> tools
> > > haven't been updated yet to use the new API.  I think we should see
> how the
> > > new API works out before making more major changes to reassignment.
> > >
> > > In practice, external reassignment tools already do handle batching.
> For
> > > example, Cruise Control decides what to move, but also divides up the
> work
> > > into batches.  The main functionality that we lack is a way to do this
> with
> > > just the code shipped as part of Apache Kafka itself.
> > >
> > > So, if we want to give users a better out-of-the-box experience, why
> not
> > > add a mode to kafka-reassign-partitions.sh that lets it keep running
> for a
> > > while and breaks down the requested partition moves into batches?  I
> think
> > > this is much more consistent with how the system currently works, and
> much
> > > less likely to create technical debt for the future, than moving
> > > functionality into the controller.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Wed, Dec 4, 2019, at 06:11, Stanislav Kozlovski wrote:
> > > > Hey all,
> > > >
> > > > I spoke offline with Viktor regarding this KIP and intentions on
> > > starting a
> > > > vote soon, so I made another pass on it. I have some comments:
> > > >
> > > > From our past discussion:
> > > >
> > > > > I reiterated on the current algorithm and indeed it would change
> the
> > > order
> > > > of replicas in ZK but wouldn't do a leader election, so one would
> need to
> > > > run the preferred replica election tool. However still in the light
> of
> > > this
> > > > I'm not sure I'd keep the existing behavior as users wouldn't win
> > > anything
> > > > with it.
> > > >
> > > > I think we should mention the change in the KIP at the very least
> such
> > > that
> > > > we can discuss it with other reviewers.
> > > >
> > > > > If I understand correctly, you're suggesting that we count the
> > > > "reassignment.parallel.leader.movements" config against such
> preferred
> > > > elections. I think that makes sense. If we are to do that we should
> > > > explicitly mention that this KIP touches the preferred election
> logic as
> > > > well. Would that config also bound the number of leader movements the
> > > auto
> > > > rebalance inside the broker would do as well?
> > > > > ... It might be easier to limit the scope of this KIP and not
> place a
> > > > bound on preferred leader election.
> > > >
> > > > What are your thoughts on this?
> > > >
> > > > And now comments on the KIP:
> > > >
> > > > 1.
> > > >
> > > >    1. > Update CR in Zookeeper
> > > >    (/brokers/topics/[topic]/partitions/[partitionId]/state) with TR
> for
> > > the
> > > >    given partitions.
> > > >
> > > >
> > > > I believe the leader for the partition is the "owner" of that znode.
> > > Having
> > > > the controller update it may complicate things.
> > > > Has this been updated with the latest KIP-455? We keep reassignment
> state
> > > > in the /brokers/topics/[topic] znode
> > > >
> > > > 2.
> > > >
> > > > How does reassignment cancellation work with these new incremental
> steps?
> > > > In your example, what happens if I cancel the current reassignment
> while
> > > > the replica set is (5, 6, 2, 3, 4)? Where and how would we keep the
> state
> > > > to revert back to (0, 1, 2, 3, 4)?
> > > >
> > > >
> > > > 3.
> > > >
> > > > > The only exception is the first step where (if needed) we add that
> many
> > > > replicas that is enough to fulfil the min.insync.replicas
> requirement set
> > > > on the broker, even if it exceeds the limit on parallel replica
> > > > reassignments. For instance if there are 4 brokers,
> min.insync.replicas
> > > set
> > > > to 3 but there are only 1 in-sync replica, then we immediately add 2
> > > other
> > > > in one step, so the producers are able to continue.
> > > >
> > > > Wouldn't that be adding extra replication traffic and pressure on an
> > > > already problematic partition? I may be missing something but I
> actually
> > > > think that this wouldn't help in common circumstances.
> > > >
> > > > 4.
> > > >
> > > > > Calculate the replicas to be dropped (DR):
> > > >
> > > >    1. Calculate n = max(reassignment.parallel.replica.count,
> size(FTR) -
> > > >    size(CR))
> > > >
> > > >
> > > > Do you mean min() here?
> > > >
> > > > 5.
> > > >
> > > >    1.
> > > >       1. Take the first *reassignment.parallel.replica.count*
> replicas of
> > > >       ER, that will be the set of dropped replicas
> > > >
> > > >
> > > > Do you mean `N replicas of ER` here?
> > > >
> > > > 6.
> > > >
> > > > > Calculate the new replicas (NR) to be added
> > > >
> > > >    1. Calculate that if the partition has enough online replicas to
> > > fulfil
> > > >    the min.insync.replicas config so the producers are able to
> continue.
> > > >    2. If the preferred leader is different in FTR and it is not yet
> > > >    reassigned, then add it to the NR
> > > >    3. If size(NR) < min.insync.replicas then take
> > > min(min.insync.replicas,
> > > >    reassignment.parallel.replica.count) - size(NR) replicas from FTR
> > > >    4. Otherwise take as many replica as
> > > >    *reassignment.parallel.replica.count* allows
> > > >
> > > >
> > > > Not sure I understand this. Wouldn't we always take as many new
> replicas
> > > > (NR) as the ones we are dropping (DR)?
> > > >
> > > > 7.
> > > >
> > > > This new configuration would tell how many replicas of a single
> partition
> > > > can be moved at once.
> > > >
> > > > I believe this description is outdated
> > > >
> > > > 8.
> > > >
> > > > I don't see it as a rejected alternative - have we considered
> delegating
> > > > this logic out to the client and making Kafka accept a series of
> > > > reassignment "steps" to run?
> > > >
> > > >
> > > > Thanks!
> > > > Stanislav
> > > >
> > > > On Wed, Aug 21, 2019 at 12:33 PM Viktor Somogyi-Vass <
> > > > viktorsomogyi@gmail.com> wrote:
> > > >
> > > > > Hey Folks,
> > > > >
> > > > > I think I'll open a vote early next week about this if there are no
> > > more
> > > > > feedback.
> > > > >
> > > > > Thanks,
> > > > > Viktor
> > > > >
> > > > > On Fri, Aug 9, 2019 at 5:25 PM Viktor Somogyi-Vass <
> > > > > viktorsomogyi@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hey Stanislav,
> > > > > >
> > > > > > I reiterated on the current algorithm and indeed it would change
> the
> > > > > order
> > > > > > of replicas in ZK but wouldn't do a leader election, so one would
> > > need to
> > > > > > run the preferred replica election tool. However still in the
> light
> > > of
> > > > > this
> > > > > > I'm not sure I'd keep the existing behavior as users wouldn't win
> > > > > anything
> > > > > > with it. Changing leadership automatically would result in a
> simpler,
> > > > > > easier and more responsive reassign algorithm which is especially
> > > > > important
> > > > > > if the reassignment is done to free up the broker from under
> heavy
> > > load.
> > > > > > Automated tools (Cruise Control) would also have simpler life.
> > > > > > Let me know what you think.
> > > > > >
> > > > > > Viktor
> > > > > >
> > > > > > On Thu, Jul 11, 2019 at 7:16 PM Viktor Somogyi-Vass <
> > > > > > viktorsomogyi@gmail.com> wrote:
> > > > > >
> > > > > >> Hi Stan,
> > > > > >>
> > > > > >> I meant the following (maybe rare) scenario - we have replicas
> [1,
> > > 2, 3]
> > > > > >> on
> > > > > >> a lot of partitions and the user runs a massive rebalance to
> change
> > > them
> > > > > >> all to [3, 2, 1]. In the old behavior, I think that this would
> not
> > > do
> > > > > >> anything but simply change the replica set in ZK.
> > > > > >> Then, the user could run kafka-preferred-replica-election.sh on
> a
> > > given
> > > > > >> set
> > > > > >> of partitions to make sure the new leader 3 gets elected.
> > > > > >>
> > > > > >> I thought the old algorithm would elect 3 as the leader in this
> case
> > > > > >> right away at the end but I have to double check this. In any
> case I
> > > > > think
> > > > > >> it would make sense in the new algorithm if we elected the new
> > > preferred
> > > > > >> leader right away, regardless of the new leader is chosen from
> the
> > > > > existing
> > > > > >> replicas or not. If the whole reassignment is in fact just
> changing
> > > the
> > > > > >> replica order then either way it is a simple (trivial)
> operation and
> > > > > doing
> > > > > >> it batched wouldn't slow it down much as there is no data
> movement
> > > > > >> involved. If the reassignment is mixed, meaning it contains
> > > reordering
> > > > > and
> > > > > >> real movement as well then in fact it would help to even out the
> > > load
> > > > > >> faster as data movements can get long. For instance in case of a
> > > > > >> reassignment batch of two partitions concurrently: P1: (1,2,3)
> ->
> > > > > (3,2,1)
> > > > > >> and P2: (4,5,6) -> (7,8,9) the P2 reassignment would elect a new
> > > leader
> > > > > but
> > > > > >> P1 wouldn't and it wouldn't help the goal of normalizing
> traffic on
> > > > > broker
> > > > > >> 1 that much.
> > > > > >> Again, I'll have to check how the current algorithm works and
> if it
> > > has
> > > > > >> any unknown drawbacks to implement what I sketched up above.
> > > > > >>
> > > > > >> As for generic preferred leader election when called from the
> admin
> > > api
> > > > > >> or the auto leader balance feature I think you're right that we
> > > should
> > > > > >> leave it as it is. It doesn't involve any data movement so it's
> > > fairly
> > > > > fast
> > > > > >> and it normalizes the cluster state quickly.
> > > > > >>
> > > > > >> Viktor
> > > > > >>
> > > > > >> On Tue, Jul 9, 2019 at 9:04 PM Stanislav Kozlovski <
> > > > > >> stanislav@confluent.io> wrote:
> > > > > >>
> > > > > >>> Hey Viktor,
> > > > > >>>
> > > > > >>>  I think it is intuitive if there are on a global level...If we
> > > applied
> > > > > >>> > them on every batch then we
> > > > > >>> > couldn't really guarantee any limits as the user would be
> able
> > > to get
> > > > > >>> > around it with submitting lots of reassignments.
> > > > > >>>
> > > > > >>>
> > > > > >>> Agreed. Could we mention this explicitly in the KIP?
> > > > > >>>
> > > > > >>> Also if I understand correctly,
> AlterPartitionAssignmentsRequest
> > > would
> > > > > >>> be a
> > > > > >>> > partition level batching, isn't it? So if you submit 10
> > > partitions at
> > > > > >>> once
> > > > > >>> > then they'll all be started by the controller immediately as
> per
> > > my
> > > > > >>> > understanding.
> > > > > >>>
> > > > > >>>
> > > > > >>> Yep, absolutely
> > > > > >>>
> > > > > >>> I've raised the ordering problem on the discussion of KIP-455
> in a
> > > bit
> > > > > >>> > different form and as I remember the verdict there was that
> we
> > > > > >>> shouldn't
> > > > > >>> > expose ordering as an API. It might not be easy as you say
> and
> > > there
> > > > > >>> might
> > > > > >>> > be much better strategies to follow (like disk or network
> > > utilization
> > > > > >>> > goals). Therefore I'll remove this section from the KIP.
> > > > > >>>
> > > > > >>>
> > > > > >>> Sounds good to me.
> > > > > >>>
> > > > > >>> I'm not sure I get this scenario. So are you saying that after
> they
> > > > > >>> > submitted a reassignment they also submit a preferred leader
> > > change?
> > > > > >>> > In my mind what I would do is:
> > > > > >>> > i) make auto.leader.rebalance.enable to obey the leader
> movement
> > > > > limit
> > > > > >>> as
> > > > > >>> > this way it will be easier to calculate the reassignment
> batches.
> > > > > >>> >
> > > > > >>>
> > > > > >>> Sorry, this is my fault -- I should have been more clear.
> > > > > >>> First, I didn't think through this well enough at the time, I
> don't
> > > > > >>> think.
> > > > > >>> If we have replicas=[1, 2, 3] and we reassign them to [4, 5,
> 6],
> > > it is
> > > > > >>> obvious that a leader shift will happen. Your KIP proposes we
> shift
> > > > > >>> replicas 1 and 4 first.
> > > > > >>>
> > > > > >>> I meant the following (maybe rare) scenario - we have replicas
> [1,
> > > 2,
> > > > > 3]
> > > > > >>> on
> > > > > >>> a lot of partitions and the user runs a massive rebalance to
> change
> > > > > them
> > > > > >>> all to [3, 2, 1]. In the old behavior, I think that this would
> not
> > > do
> > > > > >>> anything but simply change the replica set in ZK.
> > > > > >>> Then, the user could run kafka-preferred-replica-election.sh
> on a
> > > given
> > > > > >>> set
> > > > > >>> of partitions to make sure the new leader 3 gets elected.
> > > > > >>>
> > > > > >>> ii) the user could submit preferred leader election but it's
> > > basically
> > > > > a
> > > > > >>> > form of reassignment so it would fall under the batching
> > > criterias.
> > > > > If
> > > > > >>> the
> > > > > >>> > batch they submit is smaller than the internal, then it'd be
> > > executed
> > > > > >>> > immediately but otherwise it would be split into more
> batches. It
> > > > > >>> might be
> > > > > >>> > a different behavior as it may not be executed it in one
> batch
> > > but I
> > > > > >>> think
> > > > > >>> > it isn't a problem because we'll default to Int.MAX with the
> > > batches
> > > > > >>> and
> > > > > >>> > otherwise because since it's a form of reassignment I think
> it
> > > makes
> > > > > >>> sense
> > > > > >>> > to apply the same logic.
> > > > > >>>
> > > > > >>>
> > > > > >>> The AdminAPI for that is
> > > > > >>> "electPreferredLeaders(Collection<TopicPartition>
> > > > > >>> partitions)" and the old zNode is "{"partitions": [{"topic":
> "a",
> > > > > >>> "partition": 0}]}" so it is a bit less explicit than our other
> > > > > >>> reassignment
> > > > > >>> API, but the functionality is the same.
> > > > > >>> You're 100% right that it is a form of reassignment, but I
> hadn't
> > > > > thought
> > > > > >>> of it like that and I some other people wouldn't have either.
> > > > > >>> If I understand correctly, you're suggesting that we count the
> > > > > >>> "reassignment.parallel.leader.movements" config against such
> > > preferred
> > > > > >>> elections. I think that makes sense. If we are to do that we
> should
> > > > > >>> explicitly mention that this KIP touches the preferred election
> > > logic
> > > > > as
> > > > > >>> well. Would that config also bound the number of leader
> movements
> > > the
> > > > > >>> auto
> > > > > >>> rebalance inside the broker would do as well?
> > > > > >>>
> > > > > >>> Then again, the "reassignment.parallel.leader.movements" config
> > > has a
> > > > > bit
> > > > > >>> of a different meaning when used in the context of a real
> > > reassignment
> > > > > >>> (where data moves from the leader to N more replicas) versus
> in the
> > > > > >>> preferred election switch (where all we need is two lightweight
> > > > > >>> LeaderAndIsr requests), so I am not entirely convinced we may
> want
> > > to
> > > > > use
> > > > > >>> the same config for that.
> > > > > >>> It might be easier to limit the scope of this KIP and not
> place a
> > > bound
> > > > > >>> on
> > > > > >>> preferred leader election.
> > > > > >>>
> > > > > >>> Thanks,
> > > > > >>> Stanislav
> > > > > >>>
> > > > > >>>
> > > > > >>> On Mon, Jul 8, 2019 at 4:28 PM Viktor Somogyi-Vass <
> > > > > >>> viktorsomogyi@gmail.com>
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>> > Hey Stanislav,
> > > > > >>> >
> > > > > >>> > Thanks for the thorough look at the KIP! :)
> > > > > >>> >
> > > > > >>> > > Let me first explicitly confirm my understanding of the
> > > configs and
> > > > > >>> the
> > > > > >>> > > algorithm:
> > > > > >>> > > * reassignment.parallel.replica.count - the maximum total
> > > number of
> > > > > >>> > > replicas that we can move at once, *per partition*
> > > > > >>> > > * reassignment.parallel.partition.count - the maximum
> number of
> > > > > >>> > partitions
> > > > > >>> > > we can move at once
> > > > > >>> > > * reassignment.parallel.leader.movements - the maximum
> number
> > > of
> > > > > >>> leader
> > > > > >>> > > movements we can have at once
> > > > > >>> >
> > > > > >>> > Yes.
> > > > > >>> >
> > > > > >>> > > As far as I currently understand it, your proposed
> algorithm
> > > will
> > > > > >>> > naturally
> > > > > >>> > > prioritize leader movement first. e.g if
> > > > > >>> > > reassignment.parallel.replica.count=1 and
> > > > > >>> > >
> > > > > >>> >
> > > > > >>> >
> > > > > >>>
> > > > >
> > >
> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> > > > > >>> > > we will always move one, the first possible, replica in the
> > > replica
> > > > > >>> set
> > > > > >>> > > (which will be the leader if part of the excess replica set
> > > (ER)).
> > > > > >>> > > Am I correct in saying that?
> > > > > >>> >
> > > > > >>> > Yes.
> > > > > >>> >
> > > > > >>> > > 1. Does it make sense to add `max` somewhere in the
> configs'
> > > names?
> > > > > >>> >
> > > > > >>> > If you imply that it's not always an exact number (because
> the
> > > last
> > > > > >>> batch
> > > > > >>> > could be smaller) than I think it's a good idea. I don't mind
> > > this
> > > > > >>> naming
> > > > > >>> > or having max in it either.
> > > > > >>> >
> > > > > >>> > > 2. How does this KIP play along with KIP-455's notion of
> > > multiple
> > > > > >>> > > rebalances - do the configs apply on a
> > > > > >>> > > single AlterPartitionAssignmentsRequest or are they global?
> > > > > >>> >
> > > > > >>> > I think it is intuitive if there are on a global level and
> the
> > > user
> > > > > >>> would
> > > > > >>> > have the option to set KIP-435's settings to Int.MAX and thus
> > > > > >>> eliminate any
> > > > > >>> > batching and submit their own batches through
> > > > > >>> > AlterPartitionAssignmentsRequest. If we applied them on every
> > > batch
> > > > > >>> then we
> > > > > >>> > couldn't really guarantee any limits as the user would be
> able
> > > to get
> > > > > >>> > around it with submitting lots of reassignments. By default
> > > though we
> > > > > >>> set
> > > > > >>> > the batching limits to Int.MAX so effectively they're
> disabled.
> > > > > >>> > Also if I understand correctly,
> AlterPartitionAssignmentsRequest
> > > > > would
> > > > > >>> be a
> > > > > >>> > partition level batching, isn't it? So if you submit 10
> > > partitions at
> > > > > >>> once
> > > > > >>> > then they'll all be started by the controller immediately as
> per
> > > my
> > > > > >>> > understanding.
> > > > > >>> >
> > > > > >>> > > 3. Unless I've missed it, the algorithm does not take into
> > > account
> > > > > >>> > > `reassignment.parallel.leader.movements`
> > > > > >>> >
> > > > > >>> > Yea the reassignment step calculation doesn't contain that
> > > because it
> > > > > >>> > describes only one partition's reassignment step
> calculation. We
> > > > > simply
> > > > > >>> > fill our reassignment batch with as many leader movements as
> we
> > > can
> > > > > and
> > > > > >>> > then fill the rest with reassignments which don't require
> leader
> > > > > >>> movement
> > > > > >>> > if we have space. I'll create a pseudo code block on the KIP
> for
> > > > > this.
> > > > > >>> >
> > > > > >>> > > 4. The KIP says that the order of the input has some
> control
> > > over
> > > > > >>> how the
> > > > > >>> > > batches are created - i.e it's deterministic. What would
> the
> > > > > batches
> > > > > >>> of
> > > > > >>> > the
> > > > > >>> > > following reassignment look like:
> > > > > >>> > > reassignment.parallel.replica.count=1
> > > > > >>> > > reassignment.parallel.partition.count=MAX_INT
> > > > > >>> > > reassignment.parallel.leader.movements=1
> > > > > >>> > > partitionA - (0,1,2) -> (3, 4, 5)
> > > > > >>> > > partitionB - (0,1,2) -> (3,4,5)
> > > > > >>> > > partitionC - (0,1,2) -> (3, 4, 5)
> > > > > >>> > > From my understanding, we would start with A(0->3),
> B(1->4) and
> > > > > >>> C(1->4).
> > > > > >>> > Is
> > > > > >>> > > that correct? Would the second step then continue with
> B(0->3)?
> > > > > >>> > >
> > > > > >>> > > If the configurations are global, I can imagine we will
> have a
> > > bit
> > > > > >>> more
> > > > > >>> > > trouble in preserving the expected ordering, especially
> across
> > > > > >>> controller
> > > > > >>> > > failovers -- but I'll avoid speculating until you confirm
> the
> > > scope
> > > > > >>> of
> > > > > >>> > the
> > > > > >>> > > config.
> > > > > >>> >
> > > > > >>> > I've raised the ordering problem on the discussion of
> KIP-455 in
> > > a
> > > > > bit
> > > > > >>> > different form and as I remember the verdict there was that
> we
> > > > > >>> shouldn't
> > > > > >>> > expose ordering as an API. It might not be easy as you say
> and
> > > there
> > > > > >>> might
> > > > > >>> > be much better strategies to follow (like disk or network
> > > utilization
> > > > > >>> > goals). Therefore I'll remove this section from the KIP.
> > > (Otherwise
> > > > > >>> yes, I
> > > > > >>> > would have thought of the same behavior what you described.)
> > > > > >>> > Since #3 also was about the pseudo code, it would be
> something
> > > like
> > > > > >>> this:
> > > > > >>> > Config:
> > > > > >>> > reassignment.parallel.replica.count=R
> > > > > >>> > reassignment.parallel.partition.count=P
> > > > > >>> > reassignment.parallel.leader.movements=L
> > > > > >>> > Code:
> > > > > >>> > val batchSize = max(L,P)
> > > > > >>> > // split the individual partition reassignments whether they
> > > involve
> > > > > >>> leader
> > > > > >>> > movement or not
> > > > > >>> > val partitionMovements =
> > > > > >>> >
> > > > > >>> >
> > > > > >>>
> > > > >
> > >
> calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment)
> > > > > >>> > // fill the batch with as much leader movements as possible
> and
> > > take
> > > > > >>> the
> > > > > >>> > rest from other reassignments
> > > > > >>> > val currentBatch = if
> (partitionMovements.leaderMovements.size <
> > > > > >>> batchSize)
> > > > > >>> >   partitionMovements.leaderMovements ++
> > > > > >>> > partitionsToReassign.otherPartitionMovements.take(batchSize -
> > > > > >>> > partitionMovements.leaderMovements.size)
> > > > > >>> > else
> > > > > >>> >  partitionMovements.leaderMovements.take(batchSize)
> > > > > >>> > executeReassignmentOnBatch(currentBatch)
> > > > > >>> >
> > > > > >>> > > 5. Regarding the new behavior of electing the new preferred
> > > leader
> > > > > >>> in the
> > > > > >>> > > "first step" of the reassignment - does this obey the
> > > > > >>> > > `auto.leader.rebalance.enable` config?
> > > > > >>> > > If not, I have concerns regarding how backwards compatible
> this
> > > > > >>> might be
> > > > > >>> > -
> > > > > >>> > > e.g imagine a user does a huge reassignment (as they have
> > > always
> > > > > >>> done)
> > > > > >>> > and
> > > > > >>> > > suddenly a huge leader shift happens, whereas the user
> > > expected to
> > > > > >>> > manually
> > > > > >>> > > shift preferred leaders at a slower rate via
> > > > > >>> > > the kafka-preferred-replica-election.sh tool.
> > > > > >>> >
> > > > > >>> > I'm not sure I get this scenario. So are you saying that
> after
> > > they
> > > > > >>> > submitted a reassignment they also submit a preferred leader
> > > change?
> > > > > >>> > In my mind what I would do is:
> > > > > >>> > i) make auto.leader.rebalance.enable to obey the leader
> movement
> > > > > limit
> > > > > >>> as
> > > > > >>> > this way it will be easier to calculate the reassignment
> batches.
> > > > > >>> > ii) the user could submit preferred leader election but it's
> > > > > basically
> > > > > >>> a
> > > > > >>> > form of reassignment so it would fall under the batching
> > > criterias.
> > > > > If
> > > > > >>> the
> > > > > >>> > batch they submit is smaller than the internal, then it'd be
> > > executed
> > > > > >>> > immediately but otherwise it would be split into more
> batches. It
> > > > > >>> might be
> > > > > >>> > a different behavior as it may not be executed it in one
> batch
> > > but I
> > > > > >>> think
> > > > > >>> > it isn't a problem because we'll default to Int.MAX with the
> > > batches
> > > > > >>> and
> > > > > >>> > otherwise because since it's a form of reassignment I think
> it
> > > makes
> > > > > >>> sense
> > > > > >>> > to apply the same logic.
> > > > > >>> >
> > > > > >>> > > 6. What is the expected behavior if we dynamically change
> one
> > > of
> > > > > the
> > > > > >>> > > configs to a lower value while a reassignment is happening.
> > > Would
> > > > > we
> > > > > >>> > cancel
> > > > > >>> > > some of the currently reassigned partitions or would we
> > > account for
> > > > > >>> the
> > > > > >>> > new
> > > > > >>> > > values on the next reassignment? I assume the latter but
> it's
> > > good
> > > > > >>> to be
> > > > > >>> > > explicit
> > > > > >>> >
> > > > > >>> > Yep, it would be the latter. I'll make this explicit in the
> KIP
> > > too.
> > > > > >>> >
> > > > > >>> > About the nits:
> > > > > >>> > Noted, will update the KIP to reflect your suggestions :)
> > > > > >>> >
> > > > > >>> > Viktor
> > > > > >>> >
> > > > > >>> > On Fri, Jun 28, 2019 at 5:59 PM Stanislav Kozlovski <
> > > > > >>> > stanislav@confluent.io>
> > > > > >>> > wrote:
> > > > > >>> > >
> > > > > >>> > > Hey there Viktor,
> > > > > >>> > >
> > > > > >>> > > Thanks for working on this KIP! I agree with the notion
> that
> > > > > >>> reliability,
> > > > > >>> > > stability and predictability of a reassignment should be a
> core
> > > > > >>> feature
> > > > > >>> > of
> > > > > >>> > > Kafka.
> > > > > >>> > >
> > > > > >>> > > Let me first explicitly confirm my understanding of the
> > > configs and
> > > > > >>> the
> > > > > >>> > > algorithm:
> > > > > >>> > > * reassignment.parallel.replica.count - the maximum total
> > > number of
> > > > > >>> > > replicas that we can move at once, *per partition*
> > > > > >>> > > * reassignment.parallel.partition.count - the maximum
> number of
> > > > > >>> > partitions
> > > > > >>> > > we can move at once
> > > > > >>> > > * reassignment.parallel.leader.movements - the maximum
> number
> > > of
> > > > > >>> leader
> > > > > >>> > > movements we can have at once
> > > > > >>> > >
> > > > > >>> > > As far as I currently understand it, your proposed
> algorithm
> > > will
> > > > > >>> > naturally
> > > > > >>> > > prioritize leader movement first. e.g if
> > > > > >>> > > reassignment.parallel.replica.count=1 and
> > > > > >>> > >
> > > > > >>> >
> > > > > >>> >
> > > > > >>>
> > > > >
> > >
> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> > > > > >>> > > we will always move one, the first possible, replica in the
> > > replica
> > > > > >>> set
> > > > > >>> > > (which will be the leader if part of the excess replica set
> > > (ER)).
> > > > > >>> > > Am I correct in saying that?
> > > > > >>> > >
> > > > > >>> > > Regarding the KIP, I've got a couple of
> comments/questions::
> > > > > >>> > >
> > > > > >>> > > 1. Does it make sense to add `max` somewhere in the
> configs'
> > > names?
> > > > > >>> > >
> > > > > >>> > > 2. How does this KIP play along with KIP-455's notion of
> > > multiple
> > > > > >>> > > rebalances - do the configs apply on a
> > > > > >>> > > single AlterPartitionAssignmentsRequest or are they global?
> > > > > >>> > >
> > > > > >>> > > 3. Unless I've missed it, the algorithm does not take into
> > > account
> > > > > >>> > > `reassignment.parallel.leader.movements`
> > > > > >>> > >
> > > > > >>> > > 4. The KIP says that the order of the input has some
> control
> > > over
> > > > > >>> how the
> > > > > >>> > > batches are created - i.e it's deterministic. What would
> the
> > > > > batches
> > > > > >>> of
> > > > > >>> > the
> > > > > >>> > > following reassignment look like:
> > > > > >>> > > reassignment.parallel.replica.count=1
> > > > > >>> > > reassignment.parallel.partition.count=MAX_INT
> > > > > >>> > > reassignment.parallel.leader.movements=1
> > > > > >>> > > partitionA - (0,1,2) -> (3, 4, 5)
> > > > > >>> > > partitionB - (0,1,2) -> (3,4,5)
> > > > > >>> > > partitionC - (0,1,2) -> (3, 4, 5)
> > > > > >>> > > From my understanding, we would start with A(0->3),
> B(1->4) and
> > > > > >>> C(1->4).
> > > > > >>> > Is
> > > > > >>> > > that correct? Would the second step then continue with
> B(0->3)?
> > > > > >>> > >
> > > > > >>> > > If the configurations are global, I can imagine we will
> have a
> > > bit
> > > > > >>> more
> > > > > >>> > > trouble in preserving the expected ordering, especially
> across
> > > > > >>> controller
> > > > > >>> > > failovers -- but I'll avoid speculating until you confirm
> the
> > > scope
> > > > > >>> of
> > > > > >>> > the
> > > > > >>> > > config.
> > > > > >>> > >
> > > > > >>> > > 5. Regarding the new behavior of electing the new preferred
> > > leader
> > > > > >>> in the
> > > > > >>> > > "first step" of the reassignment - does this obey the
> > > > > >>> > > `auto.leader.rebalance.enable` config?
> > > > > >>> > > If not, I have concerns regarding how backwards compatible
> this
> > > > > >>> might be
> > > > > >>> > -
> > > > > >>> > > e.g imagine a user does a huge reassignment (as they have
> > > always
> > > > > >>> done)
> > > > > >>> > and
> > > > > >>> > > suddenly a huge leader shift happens, whereas the user
> > > expected to
> > > > > >>> > manually
> > > > > >>> > > shift preferred leaders at a slower rate via
> > > > > >>> > > the kafka-preferred-replica-election.sh tool.
> > > > > >>> > >
> > > > > >>> > > 6. What is the expected behavior if we dynamically change
> one
> > > of
> > > > > the
> > > > > >>> > > configs to a lower value while a reassignment is happening.
> > > Would
> > > > > we
> > > > > >>> > cancel
> > > > > >>> > > some of the currently reassigned partitions or would we
> > > account for
> > > > > >>> the
> > > > > >>> > new
> > > > > >>> > > values on the next reassignment? I assume the latter but
> it's
> > > good
> > > > > >>> to be
> > > > > >>> > > explicit
> > > > > >>> > >
> > > > > >>> > > As some small nits:
> > > > > >>> > > - could we have a section in the KIP where we explicitly
> define
> > > > > what
> > > > > >>> each
> > > > > >>> > > config does? This can be inferred from the KIP as is but
> > > requires
> > > > > >>> careful
> > > > > >>> > > reading, whereas some developers might want to skim
> through the
> > > > > >>> change to
> > > > > >>> > > get a quick sense. It also improves readability but that's
> my
> > > > > >>> personal
> > > > > >>> > > opinion.
> > > > > >>> > > - could you better clarify how a reassignment step is
> different
> > > > > from
> > > > > >>> the
> > > > > >>> > > currently existing algorithm? maybe laying both algorithms
> out
> > > in
> > > > > >>> the KIP
> > > > > >>> > > would be most clear
> > > > > >>> > > - the names for the OngoingPartitionReassignment and
> > > > > >>> > > CurrentPartitionReassignment fields in the
> > > > > >>> > > ListPartitionReassignmentsResponse are a bit confusing to
> me.
> > > > > >>> > > Unfortunately, I don't have a better suggestion, but maybe
> > > somebody
> > > > > >>> else
> > > > > >>> > in
> > > > > >>> > > the community has.
> > > > > >>> > >
> > > > > >>> > > Thanks,
> > > > > >>> > > Stanislav
> > > > > >>> > >
> > > > > >>> > > On Thu, Jun 27, 2019 at 3:24 PM Viktor Somogyi-Vass <
> > > > > >>> > viktorsomogyi@gmail.com>
> > > > > >>> > > wrote:
> > > > > >>> > >
> > > > > >>> > > > Hi All,
> > > > > >>> > > >
> > > > > >>> > > > I've renamed my KIP as its name was a bit confusing so
> we'll
> > > > > >>> continue
> > > > > >>> > it in
> > > > > >>> > > > this thread.
> > > > > >>> > > > The previous thread for record:
> > > > > >>> > > >
> > > > > >>> > > >
> > > > > >>> >
> > > > > >>> >
> > > > > >>>
> > > > >
> > >
> https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E
> > > > > >>> > > >
> > > > > >>> > > > A short summary of the KIP:
> > > > > >>> > > > In case of a vast partition reassignment (thousands of
> > > partitions
> > > > > >>> at
> > > > > >>> > once)
> > > > > >>> > > > Kafka can collapse under the increased replication
> traffic.
> > > This
> > > > > >>> KIP
> > > > > >>> > will
> > > > > >>> > > > mitigate it by introducing internal batching done by the
> > > > > >>> controller.
> > > > > >>> > > > Besides putting a bandwidth limit on the replication it
> is
> > > useful
> > > > > >>> to
> > > > > >>> > batch
> > > > > >>> > > > partition movements as fewer number of partitions will
> use
> > > the
> > > > > >>> > available
> > > > > >>> > > > bandwidth for reassignment and they finish faster.
> > > > > >>> > > > The main control handles are:
> > > > > >>> > > > - the number of parallel leader movements,
> > > > > >>> > > > - the number of parallel partition movements
> > > > > >>> > > > - and the number of parallel replica movements.
> > > > > >>> > > >
> > > > > >>> > > > Thank you for the feedback and ideas so far in the
> previous
> > > > > thread
> > > > > >>> and
> > > > > >>> > I'm
> > > > > >>> > > > happy to receive more.
> > > > > >>> > > >
> > > > > >>> > > > Regards,
> > > > > >>> > > > Viktor
> > > > > >>> > > >
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> > > --
> > > > > >>> > > Best,
> > > > > >>> > > Stanislav
> > > > > >>> >
> > > > > >>>
> > > > > >>>
> > > > > >>> --
> > > > > >>> Best,
> > > > > >>> Stanislav
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Stanislav
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-435: Internal Partition Reassignment Batching

Posted by Colin McCabe <cm...@apache.org>.
Hi Viktor,

Thanks for the thoughtful reply.

I'm actually not a PMC on the Apache Kafka project, although I hope to become one in the future :)  In any case, the tasks that the PMC tackles tend to be more related to things like project policies, committers, legal stuff, brand stuff, and so on.  I don't think this discussion about reassignment needs to be a PMC discussion.

I think it helps to stay focused on the pain points that users are experiencing.  Prior to KIP-455, we had a lot of them.  For example, there was no officially supported way to cancel reassignments or change them after they were created.  There was no official API so behavior could change between versions without much warning.  The tooling needed to depend on ZooKeeper, and many tools pulled in internal classes from Kafka.

We've fixed these pain points now with the new API, but it will take time for these changes to propagate to all the external tools.  I think it would be good to see how the new API works out before we think about another major revision.  There will probably be pain points with the new API, but it's not completely clear what they will be yet.

best,
Colin


On Fri, Dec 6, 2019, at 08:55, Viktor Somogyi-Vass wrote:
> Hi Colin,
> 
> Thanks for the honest answer. As a bottom line I think a better
> reassignment logic should be included with Kafka somehow instead of relying
> on external tools. It makes the system more mature on its own and
> standardizes what others implemented many times.
> Actually I also agree that decoupling this complexity would come with
> benefits but I haven't looked into this direction yet. And frankly with
> KIP-500 there will be a lot of changes in the controller itself.
> 
> My overall vision with this is that there could be an API of some sort at
> some point where you'd be able to specify how would you want to do your
> rebalance, what plugins you'd use, what would be the default, etc.. I
> really don't mind if this functionality is on the client or on the broker
> as long as it gives a consistent experience for the users. For a better
> overall user experience though I think this is a basic building block and
> it made sense to me to put both functionality onto the server side as there
> is nothing much to be configured on batching except partition, replica
> batch sizes and maybe leader movements. Also users could very well limit
> the blast radius of a reassignment by applying throttling too where they
> don't care about the order of reassignments or any special requirements
> just about the fact that it's sustainable in the cluster and finishes
> without causing troubles. With small batches and throttling individual
> partition reassignments finish earlier because you divide the given
> bandwidth among fewer participants at a time, so it's more effective. By
> just tweaking these knobs (batch size and throttling) we could achieve a
> significantly better and more effective rebalance.
> 
> Policies I think can be somewhat parallel to this as they may require some
> information from metrics and you're right that the controller could gather
> these easier but it's harder to make plugins, although I don't think it's
> impossible to give a good framework for this. Also reassignment can be
> extracted from the controller (Stan has already looked into separating the
> reassignment logic from the KafkaController class in #7339) so I think we
> could come up with a construct that is decoupled from the controller but
> still resides on the broker side. Furthermore policies could also be
> dynamically configured if needed.
> 
> Any change that we make should be backward compatible and I'd default them
> to the current behavior which means external tools wouldn't have to take
> extra steps but those who don't use any external tools would benefit these
> changes.
> 
> I think it would be useful to discuss the overall vision on reassignments
> as it would help us to decide whether this thing goes on the client or the
> broker side so allow me to pose some questions to you and other PMCs as I'd
> like to get some clarity and the overall stance on this so I can align my
> work better.
> 
> What would be the overall goal that we want to achieve? Do we want to give
> a solution for automatic partition load balancing or just a framework that
> standardizes what many others implemented? Or do we want to adopt a
> solution or recommend a specific one?
> - For the first it might be better to extend the existing tools but for the
> other it might be better to provide an API.
> - Giving a tool for this makes ramping up easier for newbies and overall
> gives a better user experience.
> - Adopting one could be easy as we get a bunch of functionality which are
> relatively battle proven but I haven't seen developers from any of the
> tools yet to volunteer. Or perhaps the Kafka PMC group would need to decide
> on a tool (which is waaaay out of my league :) ).
> 
> An alternative solution is we don't do anything but just suggest users to
> use any tools. The question then is which tools should they use? Do we
> recommend them to decide themselves?
> - The variety of tools and deciding between them may not be easy and there
> might be a number of factors based what functionality do you need, etc..
> Also contributing to any of the projects might be more cumbersome than
> contributing to Kafka itself. Therefore there might be benefits for the
> users in consolidating the tools in Kafka.
> 
> Would we want to achieve the state where Kafka automatically heals itself
> through rebalances?
> - As far as I know currently for most of the an administrator have to
> initiate these tasks and approve proposals but it might be desirable if
> Kafka worked on its own where possible. Less maintenance burden.
> 
> To sum up: I could also lean towards having this on the client side so I'll
> rework the KIP and look into how can we solve the reassignment batching
> problem the best on the client side and will come back to you. However I
> think there are important questions above that we need to clear up to set
> the directions. Also if you think answering the above questions would
> exceed the boundaries of email communication I can write up a KIP about
> automatic partition balancing (of course if you can provide the directions
> I should go) and we can have a separate discussion.
> 
> Thanks,
> Viktor
> 
> 
> On Thu, Dec 5, 2019 at 1:23 AM Colin McCabe <co...@cmccabe.xyz> wrote:
> 
> > Hi Stanislav & Viktor,
> >
> > Kafka has historically delegated partition reassignment to external
> > tools.  For example, the choice of which partitions to reassign is
> > determined by an external tool.  The choice of what throttle to use is also
> > set externally.
> >  So I think that putting this into the controller is not very consistent
> > with how Kafka currently works.  It seems better to continue to let
> > batching be handled by an external tool, rather than handled by the
> > controller.
> >
> > The big advantage of having reassignment done by an external tool is that
> > users are free to set their own reassignment policies without hacking Kafka
> > itself.  So, for example, you can build a reassignment tool that moves
> > partitions only when they're smaller than 1 GB without writing a KIP for a
> > new reassign.only.if.smaller.than.1g configuration.
> >
> > The same is true of batching, I think.  Do you want to set up your batch
> > so that you're always moving a given number of megabytes per second?  Or
> > maybe you want to move 3 partitions at all times?  Do you want to move the
> > biggest partitions first?  Etc. etc.  These things are all policy, and it's
> > a lot easier to iterate on the policy when it's not hard-coded in the
> > controller.  Even changing these configurations at runtime will be very
> > difficult if they are controller configs.
> >
> > Of course, there are disadvantages to having reassignment done by an
> > external tool.  The external tool is another thing you have to deploy and
> > run.  The tool also needs to collect statistics about the cluster which the
> > controller already mostly has, so there is some duplication there.
> >
> > Having some reassignment policies set by the controller and some set by an
> > external tool gets us the worst of both worlds.  Users would need to deploy
> > and run external reassignment tools, but they would also need to worry
> > about correctly configuring the controller so that it could execute their
> > preferred policies.
> >
> > We just got done making a major change to the reassignment API, and that
> > change is still percolating through the environment.  Most external tools
> > haven't been updated yet to use the new API.  I think we should see how the
> > new API works out before making more major changes to reassignment.
> >
> > In practice, external reassignment tools already do handle batching.  For
> > example, Cruise Control decides what to move, but also divides up the work
> > into batches.  The main functionality that we lack is a way to do this with
> > just the code shipped as part of Apache Kafka itself.
> >
> > So, if we want to give users a better out-of-the-box experience, why not
> > add a mode to kafka-reassign-partitions.sh that lets it keep running for a
> > while and breaks down the requested partition moves into batches?  I think
> > this is much more consistent with how the system currently works, and much
> > less likely to create technical debt for the future, than moving
> > functionality into the controller.
> >
> > best,
> > Colin
> >
> >
> > On Wed, Dec 4, 2019, at 06:11, Stanislav Kozlovski wrote:
> > > Hey all,
> > >
> > > I spoke offline with Viktor regarding this KIP and intentions on
> > starting a
> > > vote soon, so I made another pass on it. I have some comments:
> > >
> > > From our past discussion:
> > >
> > > > I reiterated on the current algorithm and indeed it would change the
> > order
> > > of replicas in ZK but wouldn't do a leader election, so one would need to
> > > run the preferred replica election tool. However still in the light of
> > this
> > > I'm not sure I'd keep the existing behavior as users wouldn't win
> > anything
> > > with it.
> > >
> > > I think we should mention the change in the KIP at the very least such
> > that
> > > we can discuss it with other reviewers.
> > >
> > > > If I understand correctly, you're suggesting that we count the
> > > "reassignment.parallel.leader.movements" config against such preferred
> > > elections. I think that makes sense. If we are to do that we should
> > > explicitly mention that this KIP touches the preferred election logic as
> > > well. Would that config also bound the number of leader movements the
> > auto
> > > rebalance inside the broker would do as well?
> > > > ... It might be easier to limit the scope of this KIP and not place a
> > > bound on preferred leader election.
> > >
> > > What are your thoughts on this?
> > >
> > > And now comments on the KIP:
> > >
> > > 1.
> > >
> > >    1. > Update CR in Zookeeper
> > >    (/brokers/topics/[topic]/partitions/[partitionId]/state) with TR for
> > the
> > >    given partitions.
> > >
> > >
> > > I believe the leader for the partition is the "owner" of that znode.
> > Having
> > > the controller update it may complicate things.
> > > Has this been updated with the latest KIP-455? We keep reassignment state
> > > in the /brokers/topics/[topic] znode
> > >
> > > 2.
> > >
> > > How does reassignment cancellation work with these new incremental steps?
> > > In your example, what happens if I cancel the current reassignment while
> > > the replica set is (5, 6, 2, 3, 4)? Where and how would we keep the state
> > > to revert back to (0, 1, 2, 3, 4)?
> > >
> > >
> > > 3.
> > >
> > > > The only exception is the first step where (if needed) we add that many
> > > replicas that is enough to fulfil the min.insync.replicas requirement set
> > > on the broker, even if it exceeds the limit on parallel replica
> > > reassignments. For instance if there are 4 brokers, min.insync.replicas
> > set
> > > to 3 but there are only 1 in-sync replica, then we immediately add 2
> > other
> > > in one step, so the producers are able to continue.
> > >
> > > Wouldn't that be adding extra replication traffic and pressure on an
> > > already problematic partition? I may be missing something but I actually
> > > think that this wouldn't help in common circumstances.
> > >
> > > 4.
> > >
> > > > Calculate the replicas to be dropped (DR):
> > >
> > >    1. Calculate n = max(reassignment.parallel.replica.count, size(FTR) -
> > >    size(CR))
> > >
> > >
> > > Do you mean min() here?
> > >
> > > 5.
> > >
> > >    1.
> > >       1. Take the first *reassignment.parallel.replica.count* replicas of
> > >       ER, that will be the set of dropped replicas
> > >
> > >
> > > Do you mean `N replicas of ER` here?
> > >
> > > 6.
> > >
> > > > Calculate the new replicas (NR) to be added
> > >
> > >    1. Calculate that if the partition has enough online replicas to
> > fulfil
> > >    the min.insync.replicas config so the producers are able to continue.
> > >    2. If the preferred leader is different in FTR and it is not yet
> > >    reassigned, then add it to the NR
> > >    3. If size(NR) < min.insync.replicas then take
> > min(min.insync.replicas,
> > >    reassignment.parallel.replica.count) - size(NR) replicas from FTR
> > >    4. Otherwise take as many replica as
> > >    *reassignment.parallel.replica.count* allows
> > >
> > >
> > > Not sure I understand this. Wouldn't we always take as many new replicas
> > > (NR) as the ones we are dropping (DR)?
> > >
> > > 7.
> > >
> > > This new configuration would tell how many replicas of a single partition
> > > can be moved at once.
> > >
> > > I believe this description is outdated
> > >
> > > 8.
> > >
> > > I don't see it as a rejected alternative - have we considered delegating
> > > this logic out to the client and making Kafka accept a series of
> > > reassignment "steps" to run?
> > >
> > >
> > > Thanks!
> > > Stanislav
> > >
> > > On Wed, Aug 21, 2019 at 12:33 PM Viktor Somogyi-Vass <
> > > viktorsomogyi@gmail.com> wrote:
> > >
> > > > Hey Folks,
> > > >
> > > > I think I'll open a vote early next week about this if there are no
> > more
> > > > feedback.
> > > >
> > > > Thanks,
> > > > Viktor
> > > >
> > > > On Fri, Aug 9, 2019 at 5:25 PM Viktor Somogyi-Vass <
> > > > viktorsomogyi@gmail.com>
> > > > wrote:
> > > >
> > > > > Hey Stanislav,
> > > > >
> > > > > I reiterated on the current algorithm and indeed it would change the
> > > > order
> > > > > of replicas in ZK but wouldn't do a leader election, so one would
> > need to
> > > > > run the preferred replica election tool. However still in the light
> > of
> > > > this
> > > > > I'm not sure I'd keep the existing behavior as users wouldn't win
> > > > anything
> > > > > with it. Changing leadership automatically would result in a simpler,
> > > > > easier and more responsive reassign algorithm which is especially
> > > > important
> > > > > if the reassignment is done to free up the broker from under heavy
> > load.
> > > > > Automated tools (Cruise Control) would also have simpler life.
> > > > > Let me know what you think.
> > > > >
> > > > > Viktor
> > > > >
> > > > > On Thu, Jul 11, 2019 at 7:16 PM Viktor Somogyi-Vass <
> > > > > viktorsomogyi@gmail.com> wrote:
> > > > >
> > > > >> Hi Stan,
> > > > >>
> > > > >> I meant the following (maybe rare) scenario - we have replicas [1,
> > 2, 3]
> > > > >> on
> > > > >> a lot of partitions and the user runs a massive rebalance to change
> > them
> > > > >> all to [3, 2, 1]. In the old behavior, I think that this would not
> > do
> > > > >> anything but simply change the replica set in ZK.
> > > > >> Then, the user could run kafka-preferred-replica-election.sh on a
> > given
> > > > >> set
> > > > >> of partitions to make sure the new leader 3 gets elected.
> > > > >>
> > > > >> I thought the old algorithm would elect 3 as the leader in this case
> > > > >> right away at the end but I have to double check this. In any case I
> > > > think
> > > > >> it would make sense in the new algorithm if we elected the new
> > preferred
> > > > >> leader right away, regardless of the new leader is chosen from the
> > > > existing
> > > > >> replicas or not. If the whole reassignment is in fact just changing
> > the
> > > > >> replica order then either way it is a simple (trivial) operation and
> > > > doing
> > > > >> it batched wouldn't slow it down much as there is no data movement
> > > > >> involved. If the reassignment is mixed, meaning it contains
> > reordering
> > > > and
> > > > >> real movement as well then in fact it would help to even out the
> > load
> > > > >> faster as data movements can get long. For instance in case of a
> > > > >> reassignment batch of two partitions concurrently: P1: (1,2,3) ->
> > > > (3,2,1)
> > > > >> and P2: (4,5,6) -> (7,8,9) the P2 reassignment would elect a new
> > leader
> > > > but
> > > > >> P1 wouldn't and it wouldn't help the goal of normalizing traffic on
> > > > broker
> > > > >> 1 that much.
> > > > >> Again, I'll have to check how the current algorithm works and if it
> > has
> > > > >> any unknown drawbacks to implement what I sketched up above.
> > > > >>
> > > > >> As for generic preferred leader election when called from the admin
> > api
> > > > >> or the auto leader balance feature I think you're right that we
> > should
> > > > >> leave it as it is. It doesn't involve any data movement so it's
> > fairly
> > > > fast
> > > > >> and it normalizes the cluster state quickly.
> > > > >>
> > > > >> Viktor
> > > > >>
> > > > >> On Tue, Jul 9, 2019 at 9:04 PM Stanislav Kozlovski <
> > > > >> stanislav@confluent.io> wrote:
> > > > >>
> > > > >>> Hey Viktor,
> > > > >>>
> > > > >>>  I think it is intuitive if there are on a global level...If we
> > applied
> > > > >>> > them on every batch then we
> > > > >>> > couldn't really guarantee any limits as the user would be able
> > to get
> > > > >>> > around it with submitting lots of reassignments.
> > > > >>>
> > > > >>>
> > > > >>> Agreed. Could we mention this explicitly in the KIP?
> > > > >>>
> > > > >>> Also if I understand correctly, AlterPartitionAssignmentsRequest
> > would
> > > > >>> be a
> > > > >>> > partition level batching, isn't it? So if you submit 10
> > partitions at
> > > > >>> once
> > > > >>> > then they'll all be started by the controller immediately as per
> > my
> > > > >>> > understanding.
> > > > >>>
> > > > >>>
> > > > >>> Yep, absolutely
> > > > >>>
> > > > >>> I've raised the ordering problem on the discussion of KIP-455 in a
> > bit
> > > > >>> > different form and as I remember the verdict there was that we
> > > > >>> shouldn't
> > > > >>> > expose ordering as an API. It might not be easy as you say and
> > there
> > > > >>> might
> > > > >>> > be much better strategies to follow (like disk or network
> > utilization
> > > > >>> > goals). Therefore I'll remove this section from the KIP.
> > > > >>>
> > > > >>>
> > > > >>> Sounds good to me.
> > > > >>>
> > > > >>> I'm not sure I get this scenario. So are you saying that after they
> > > > >>> > submitted a reassignment they also submit a preferred leader
> > change?
> > > > >>> > In my mind what I would do is:
> > > > >>> > i) make auto.leader.rebalance.enable to obey the leader movement
> > > > limit
> > > > >>> as
> > > > >>> > this way it will be easier to calculate the reassignment batches.
> > > > >>> >
> > > > >>>
> > > > >>> Sorry, this is my fault -- I should have been more clear.
> > > > >>> First, I didn't think through this well enough at the time, I don't
> > > > >>> think.
> > > > >>> If we have replicas=[1, 2, 3] and we reassign them to [4, 5, 6],
> > it is
> > > > >>> obvious that a leader shift will happen. Your KIP proposes we shift
> > > > >>> replicas 1 and 4 first.
> > > > >>>
> > > > >>> I meant the following (maybe rare) scenario - we have replicas [1,
> > 2,
> > > > 3]
> > > > >>> on
> > > > >>> a lot of partitions and the user runs a massive rebalance to change
> > > > them
> > > > >>> all to [3, 2, 1]. In the old behavior, I think that this would not
> > do
> > > > >>> anything but simply change the replica set in ZK.
> > > > >>> Then, the user could run kafka-preferred-replica-election.sh on a
> > given
> > > > >>> set
> > > > >>> of partitions to make sure the new leader 3 gets elected.
> > > > >>>
> > > > >>> ii) the user could submit preferred leader election but it's
> > basically
> > > > a
> > > > >>> > form of reassignment so it would fall under the batching
> > criterias.
> > > > If
> > > > >>> the
> > > > >>> > batch they submit is smaller than the internal, then it'd be
> > executed
> > > > >>> > immediately but otherwise it would be split into more batches. It
> > > > >>> might be
> > > > >>> > a different behavior as it may not be executed it in one batch
> > but I
> > > > >>> think
> > > > >>> > it isn't a problem because we'll default to Int.MAX with the
> > batches
> > > > >>> and
> > > > >>> > otherwise because since it's a form of reassignment I think it
> > makes
> > > > >>> sense
> > > > >>> > to apply the same logic.
> > > > >>>
> > > > >>>
> > > > >>> The AdminAPI for that is
> > > > >>> "electPreferredLeaders(Collection<TopicPartition>
> > > > >>> partitions)" and the old zNode is "{"partitions": [{"topic": "a",
> > > > >>> "partition": 0}]}" so it is a bit less explicit than our other
> > > > >>> reassignment
> > > > >>> API, but the functionality is the same.
> > > > >>> You're 100% right that it is a form of reassignment, but I hadn't
> > > > thought
> > > > >>> of it like that and I some other people wouldn't have either.
> > > > >>> If I understand correctly, you're suggesting that we count the
> > > > >>> "reassignment.parallel.leader.movements" config against such
> > preferred
> > > > >>> elections. I think that makes sense. If we are to do that we should
> > > > >>> explicitly mention that this KIP touches the preferred election
> > logic
> > > > as
> > > > >>> well. Would that config also bound the number of leader movements
> > the
> > > > >>> auto
> > > > >>> rebalance inside the broker would do as well?
> > > > >>>
> > > > >>> Then again, the "reassignment.parallel.leader.movements" config
> > has a
> > > > bit
> > > > >>> of a different meaning when used in the context of a real
> > reassignment
> > > > >>> (where data moves from the leader to N more replicas) versus in the
> > > > >>> preferred election switch (where all we need is two lightweight
> > > > >>> LeaderAndIsr requests), so I am not entirely convinced we may want
> > to
> > > > use
> > > > >>> the same config for that.
> > > > >>> It might be easier to limit the scope of this KIP and not place a
> > bound
> > > > >>> on
> > > > >>> preferred leader election.
> > > > >>>
> > > > >>> Thanks,
> > > > >>> Stanislav
> > > > >>>
> > > > >>>
> > > > >>> On Mon, Jul 8, 2019 at 4:28 PM Viktor Somogyi-Vass <
> > > > >>> viktorsomogyi@gmail.com>
> > > > >>> wrote:
> > > > >>>
> > > > >>> > Hey Stanislav,
> > > > >>> >
> > > > >>> > Thanks for the thorough look at the KIP! :)
> > > > >>> >
> > > > >>> > > Let me first explicitly confirm my understanding of the
> > configs and
> > > > >>> the
> > > > >>> > > algorithm:
> > > > >>> > > * reassignment.parallel.replica.count - the maximum total
> > number of
> > > > >>> > > replicas that we can move at once, *per partition*
> > > > >>> > > * reassignment.parallel.partition.count - the maximum number of
> > > > >>> > partitions
> > > > >>> > > we can move at once
> > > > >>> > > * reassignment.parallel.leader.movements - the maximum number
> > of
> > > > >>> leader
> > > > >>> > > movements we can have at once
> > > > >>> >
> > > > >>> > Yes.
> > > > >>> >
> > > > >>> > > As far as I currently understand it, your proposed algorithm
> > will
> > > > >>> > naturally
> > > > >>> > > prioritize leader movement first. e.g if
> > > > >>> > > reassignment.parallel.replica.count=1 and
> > > > >>> > >
> > > > >>> >
> > > > >>> >
> > > > >>>
> > > >
> > reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> > > > >>> > > we will always move one, the first possible, replica in the
> > replica
> > > > >>> set
> > > > >>> > > (which will be the leader if part of the excess replica set
> > (ER)).
> > > > >>> > > Am I correct in saying that?
> > > > >>> >
> > > > >>> > Yes.
> > > > >>> >
> > > > >>> > > 1. Does it make sense to add `max` somewhere in the configs'
> > names?
> > > > >>> >
> > > > >>> > If you imply that it's not always an exact number (because the
> > last
> > > > >>> batch
> > > > >>> > could be smaller) than I think it's a good idea. I don't mind
> > this
> > > > >>> naming
> > > > >>> > or having max in it either.
> > > > >>> >
> > > > >>> > > 2. How does this KIP play along with KIP-455's notion of
> > multiple
> > > > >>> > > rebalances - do the configs apply on a
> > > > >>> > > single AlterPartitionAssignmentsRequest or are they global?
> > > > >>> >
> > > > >>> > I think it is intuitive if there are on a global level and the
> > user
> > > > >>> would
> > > > >>> > have the option to set KIP-435's settings to Int.MAX and thus
> > > > >>> eliminate any
> > > > >>> > batching and submit their own batches through
> > > > >>> > AlterPartitionAssignmentsRequest. If we applied them on every
> > batch
> > > > >>> then we
> > > > >>> > couldn't really guarantee any limits as the user would be able
> > to get
> > > > >>> > around it with submitting lots of reassignments. By default
> > though we
> > > > >>> set
> > > > >>> > the batching limits to Int.MAX so effectively they're disabled.
> > > > >>> > Also if I understand correctly, AlterPartitionAssignmentsRequest
> > > > would
> > > > >>> be a
> > > > >>> > partition level batching, isn't it? So if you submit 10
> > partitions at
> > > > >>> once
> > > > >>> > then they'll all be started by the controller immediately as per
> > my
> > > > >>> > understanding.
> > > > >>> >
> > > > >>> > > 3. Unless I've missed it, the algorithm does not take into
> > account
> > > > >>> > > `reassignment.parallel.leader.movements`
> > > > >>> >
> > > > >>> > Yea the reassignment step calculation doesn't contain that
> > because it
> > > > >>> > describes only one partition's reassignment step calculation. We
> > > > simply
> > > > >>> > fill our reassignment batch with as many leader movements as we
> > can
> > > > and
> > > > >>> > then fill the rest with reassignments which don't require leader
> > > > >>> movement
> > > > >>> > if we have space. I'll create a pseudo code block on the KIP for
> > > > this.
> > > > >>> >
> > > > >>> > > 4. The KIP says that the order of the input has some control
> > over
> > > > >>> how the
> > > > >>> > > batches are created - i.e it's deterministic. What would the
> > > > batches
> > > > >>> of
> > > > >>> > the
> > > > >>> > > following reassignment look like:
> > > > >>> > > reassignment.parallel.replica.count=1
> > > > >>> > > reassignment.parallel.partition.count=MAX_INT
> > > > >>> > > reassignment.parallel.leader.movements=1
> > > > >>> > > partitionA - (0,1,2) -> (3, 4, 5)
> > > > >>> > > partitionB - (0,1,2) -> (3,4,5)
> > > > >>> > > partitionC - (0,1,2) -> (3, 4, 5)
> > > > >>> > > From my understanding, we would start with A(0->3), B(1->4) and
> > > > >>> C(1->4).
> > > > >>> > Is
> > > > >>> > > that correct? Would the second step then continue with B(0->3)?
> > > > >>> > >
> > > > >>> > > If the configurations are global, I can imagine we will have a
> > bit
> > > > >>> more
> > > > >>> > > trouble in preserving the expected ordering, especially across
> > > > >>> controller
> > > > >>> > > failovers -- but I'll avoid speculating until you confirm the
> > scope
> > > > >>> of
> > > > >>> > the
> > > > >>> > > config.
> > > > >>> >
> > > > >>> > I've raised the ordering problem on the discussion of KIP-455 in
> > a
> > > > bit
> > > > >>> > different form and as I remember the verdict there was that we
> > > > >>> shouldn't
> > > > >>> > expose ordering as an API. It might not be easy as you say and
> > there
> > > > >>> might
> > > > >>> > be much better strategies to follow (like disk or network
> > utilization
> > > > >>> > goals). Therefore I'll remove this section from the KIP.
> > (Otherwise
> > > > >>> yes, I
> > > > >>> > would have thought of the same behavior what you described.)
> > > > >>> > Since #3 also was about the pseudo code, it would be something
> > like
> > > > >>> this:
> > > > >>> > Config:
> > > > >>> > reassignment.parallel.replica.count=R
> > > > >>> > reassignment.parallel.partition.count=P
> > > > >>> > reassignment.parallel.leader.movements=L
> > > > >>> > Code:
> > > > >>> > val batchSize = max(L,P)
> > > > >>> > // split the individual partition reassignments whether they
> > involve
> > > > >>> leader
> > > > >>> > movement or not
> > > > >>> > val partitionMovements =
> > > > >>> >
> > > > >>> >
> > > > >>>
> > > >
> > calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment)
> > > > >>> > // fill the batch with as much leader movements as possible and
> > take
> > > > >>> the
> > > > >>> > rest from other reassignments
> > > > >>> > val currentBatch = if (partitionMovements.leaderMovements.size <
> > > > >>> batchSize)
> > > > >>> >   partitionMovements.leaderMovements ++
> > > > >>> > partitionsToReassign.otherPartitionMovements.take(batchSize -
> > > > >>> > partitionMovements.leaderMovements.size)
> > > > >>> > else
> > > > >>> >  partitionMovements.leaderMovements.take(batchSize)
> > > > >>> > executeReassignmentOnBatch(currentBatch)
> > > > >>> >
> > > > >>> > > 5. Regarding the new behavior of electing the new preferred
> > leader
> > > > >>> in the
> > > > >>> > > "first step" of the reassignment - does this obey the
> > > > >>> > > `auto.leader.rebalance.enable` config?
> > > > >>> > > If not, I have concerns regarding how backwards compatible this
> > > > >>> might be
> > > > >>> > -
> > > > >>> > > e.g imagine a user does a huge reassignment (as they have
> > always
> > > > >>> done)
> > > > >>> > and
> > > > >>> > > suddenly a huge leader shift happens, whereas the user
> > expected to
> > > > >>> > manually
> > > > >>> > > shift preferred leaders at a slower rate via
> > > > >>> > > the kafka-preferred-replica-election.sh tool.
> > > > >>> >
> > > > >>> > I'm not sure I get this scenario. So are you saying that after
> > they
> > > > >>> > submitted a reassignment they also submit a preferred leader
> > change?
> > > > >>> > In my mind what I would do is:
> > > > >>> > i) make auto.leader.rebalance.enable to obey the leader movement
> > > > limit
> > > > >>> as
> > > > >>> > this way it will be easier to calculate the reassignment batches.
> > > > >>> > ii) the user could submit preferred leader election but it's
> > > > basically
> > > > >>> a
> > > > >>> > form of reassignment so it would fall under the batching
> > criterias.
> > > > If
> > > > >>> the
> > > > >>> > batch they submit is smaller than the internal, then it'd be
> > executed
> > > > >>> > immediately but otherwise it would be split into more batches. It
> > > > >>> might be
> > > > >>> > a different behavior as it may not be executed it in one batch
> > but I
> > > > >>> think
> > > > >>> > it isn't a problem because we'll default to Int.MAX with the
> > batches
> > > > >>> and
> > > > >>> > otherwise because since it's a form of reassignment I think it
> > makes
> > > > >>> sense
> > > > >>> > to apply the same logic.
> > > > >>> >
> > > > >>> > > 6. What is the expected behavior if we dynamically change one
> > of
> > > > the
> > > > >>> > > configs to a lower value while a reassignment is happening.
> > Would
> > > > we
> > > > >>> > cancel
> > > > >>> > > some of the currently reassigned partitions or would we
> > account for
> > > > >>> the
> > > > >>> > new
> > > > >>> > > values on the next reassignment? I assume the latter but it's
> > good
> > > > >>> to be
> > > > >>> > > explicit
> > > > >>> >
> > > > >>> > Yep, it would be the latter. I'll make this explicit in the KIP
> > too.
> > > > >>> >
> > > > >>> > About the nits:
> > > > >>> > Noted, will update the KIP to reflect your suggestions :)
> > > > >>> >
> > > > >>> > Viktor
> > > > >>> >
> > > > >>> > On Fri, Jun 28, 2019 at 5:59 PM Stanislav Kozlovski <
> > > > >>> > stanislav@confluent.io>
> > > > >>> > wrote:
> > > > >>> > >
> > > > >>> > > Hey there Viktor,
> > > > >>> > >
> > > > >>> > > Thanks for working on this KIP! I agree with the notion that
> > > > >>> reliability,
> > > > >>> > > stability and predictability of a reassignment should be a core
> > > > >>> feature
> > > > >>> > of
> > > > >>> > > Kafka.
> > > > >>> > >
> > > > >>> > > Let me first explicitly confirm my understanding of the
> > configs and
> > > > >>> the
> > > > >>> > > algorithm:
> > > > >>> > > * reassignment.parallel.replica.count - the maximum total
> > number of
> > > > >>> > > replicas that we can move at once, *per partition*
> > > > >>> > > * reassignment.parallel.partition.count - the maximum number of
> > > > >>> > partitions
> > > > >>> > > we can move at once
> > > > >>> > > * reassignment.parallel.leader.movements - the maximum number
> > of
> > > > >>> leader
> > > > >>> > > movements we can have at once
> > > > >>> > >
> > > > >>> > > As far as I currently understand it, your proposed algorithm
> > will
> > > > >>> > naturally
> > > > >>> > > prioritize leader movement first. e.g if
> > > > >>> > > reassignment.parallel.replica.count=1 and
> > > > >>> > >
> > > > >>> >
> > > > >>> >
> > > > >>>
> > > >
> > reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> > > > >>> > > we will always move one, the first possible, replica in the
> > replica
> > > > >>> set
> > > > >>> > > (which will be the leader if part of the excess replica set
> > (ER)).
> > > > >>> > > Am I correct in saying that?
> > > > >>> > >
> > > > >>> > > Regarding the KIP, I've got a couple of comments/questions::
> > > > >>> > >
> > > > >>> > > 1. Does it make sense to add `max` somewhere in the configs'
> > names?
> > > > >>> > >
> > > > >>> > > 2. How does this KIP play along with KIP-455's notion of
> > multiple
> > > > >>> > > rebalances - do the configs apply on a
> > > > >>> > > single AlterPartitionAssignmentsRequest or are they global?
> > > > >>> > >
> > > > >>> > > 3. Unless I've missed it, the algorithm does not take into
> > account
> > > > >>> > > `reassignment.parallel.leader.movements`
> > > > >>> > >
> > > > >>> > > 4. The KIP says that the order of the input has some control
> > over
> > > > >>> how the
> > > > >>> > > batches are created - i.e it's deterministic. What would the
> > > > batches
> > > > >>> of
> > > > >>> > the
> > > > >>> > > following reassignment look like:
> > > > >>> > > reassignment.parallel.replica.count=1
> > > > >>> > > reassignment.parallel.partition.count=MAX_INT
> > > > >>> > > reassignment.parallel.leader.movements=1
> > > > >>> > > partitionA - (0,1,2) -> (3, 4, 5)
> > > > >>> > > partitionB - (0,1,2) -> (3,4,5)
> > > > >>> > > partitionC - (0,1,2) -> (3, 4, 5)
> > > > >>> > > From my understanding, we would start with A(0->3), B(1->4) and
> > > > >>> C(1->4).
> > > > >>> > Is
> > > > >>> > > that correct? Would the second step then continue with B(0->3)?
> > > > >>> > >
> > > > >>> > > If the configurations are global, I can imagine we will have a
> > bit
> > > > >>> more
> > > > >>> > > trouble in preserving the expected ordering, especially across
> > > > >>> controller
> > > > >>> > > failovers -- but I'll avoid speculating until you confirm the
> > scope
> > > > >>> of
> > > > >>> > the
> > > > >>> > > config.
> > > > >>> > >
> > > > >>> > > 5. Regarding the new behavior of electing the new preferred
> > leader
> > > > >>> in the
> > > > >>> > > "first step" of the reassignment - does this obey the
> > > > >>> > > `auto.leader.rebalance.enable` config?
> > > > >>> > > If not, I have concerns regarding how backwards compatible this
> > > > >>> might be
> > > > >>> > -
> > > > >>> > > e.g imagine a user does a huge reassignment (as they have
> > always
> > > > >>> done)
> > > > >>> > and
> > > > >>> > > suddenly a huge leader shift happens, whereas the user
> > expected to
> > > > >>> > manually
> > > > >>> > > shift preferred leaders at a slower rate via
> > > > >>> > > the kafka-preferred-replica-election.sh tool.
> > > > >>> > >
> > > > >>> > > 6. What is the expected behavior if we dynamically change one
> > of
> > > > the
> > > > >>> > > configs to a lower value while a reassignment is happening.
> > Would
> > > > we
> > > > >>> > cancel
> > > > >>> > > some of the currently reassigned partitions or would we
> > account for
> > > > >>> the
> > > > >>> > new
> > > > >>> > > values on the next reassignment? I assume the latter but it's
> > good
> > > > >>> to be
> > > > >>> > > explicit
> > > > >>> > >
> > > > >>> > > As some small nits:
> > > > >>> > > - could we have a section in the KIP where we explicitly define
> > > > what
> > > > >>> each
> > > > >>> > > config does? This can be inferred from the KIP as is but
> > requires
> > > > >>> careful
> > > > >>> > > reading, whereas some developers might want to skim through the
> > > > >>> change to
> > > > >>> > > get a quick sense. It also improves readability but that's my
> > > > >>> personal
> > > > >>> > > opinion.
> > > > >>> > > - could you better clarify how a reassignment step is different
> > > > from
> > > > >>> the
> > > > >>> > > currently existing algorithm? maybe laying both algorithms out
> > in
> > > > >>> the KIP
> > > > >>> > > would be most clear
> > > > >>> > > - the names for the OngoingPartitionReassignment and
> > > > >>> > > CurrentPartitionReassignment fields in the
> > > > >>> > > ListPartitionReassignmentsResponse are a bit confusing to me.
> > > > >>> > > Unfortunately, I don't have a better suggestion, but maybe
> > somebody
> > > > >>> else
> > > > >>> > in
> > > > >>> > > the community has.
> > > > >>> > >
> > > > >>> > > Thanks,
> > > > >>> > > Stanislav
> > > > >>> > >
> > > > >>> > > On Thu, Jun 27, 2019 at 3:24 PM Viktor Somogyi-Vass <
> > > > >>> > viktorsomogyi@gmail.com>
> > > > >>> > > wrote:
> > > > >>> > >
> > > > >>> > > > Hi All,
> > > > >>> > > >
> > > > >>> > > > I've renamed my KIP as its name was a bit confusing so we'll
> > > > >>> continue
> > > > >>> > it in
> > > > >>> > > > this thread.
> > > > >>> > > > The previous thread for record:
> > > > >>> > > >
> > > > >>> > > >
> > > > >>> >
> > > > >>> >
> > > > >>>
> > > >
> > https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E
> > > > >>> > > >
> > > > >>> > > > A short summary of the KIP:
> > > > >>> > > > In case of a vast partition reassignment (thousands of
> > partitions
> > > > >>> at
> > > > >>> > once)
> > > > >>> > > > Kafka can collapse under the increased replication traffic.
> > This
> > > > >>> KIP
> > > > >>> > will
> > > > >>> > > > mitigate it by introducing internal batching done by the
> > > > >>> controller.
> > > > >>> > > > Besides putting a bandwidth limit on the replication it is
> > useful
> > > > >>> to
> > > > >>> > batch
> > > > >>> > > > partition movements as fewer number of partitions will use
> > the
> > > > >>> > available
> > > > >>> > > > bandwidth for reassignment and they finish faster.
> > > > >>> > > > The main control handles are:
> > > > >>> > > > - the number of parallel leader movements,
> > > > >>> > > > - the number of parallel partition movements
> > > > >>> > > > - and the number of parallel replica movements.
> > > > >>> > > >
> > > > >>> > > > Thank you for the feedback and ideas so far in the previous
> > > > thread
> > > > >>> and
> > > > >>> > I'm
> > > > >>> > > > happy to receive more.
> > > > >>> > > >
> > > > >>> > > > Regards,
> > > > >>> > > > Viktor
> > > > >>> > > >
> > > > >>> > >
> > > > >>> > >
> > > > >>> > > --
> > > > >>> > > Best,
> > > > >>> > > Stanislav
> > > > >>> >
> > > > >>>
> > > > >>>
> > > > >>> --
> > > > >>> Best,
> > > > >>> Stanislav
> > > > >>>
> > > > >>
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Stanislav
> > >
> >
>

Re: [DISCUSS] KIP-435: Internal Partition Reassignment Batching

Posted by Viktor Somogyi-Vass <vi...@gmail.com>.
Hi Colin,

Thanks for the honest answer. As a bottom line I think a better
reassignment logic should be included with Kafka somehow instead of relying
on external tools. It makes the system more mature on its own and
standardizes what others implemented many times.
Actually I also agree that decoupling this complexity would come with
benefits but I haven't looked into this direction yet. And frankly with
KIP-500 there will be a lot of changes in the controller itself.

My overall vision with this is that there could be an API of some sort at
some point where you'd be able to specify how would you want to do your
rebalance, what plugins you'd use, what would be the default, etc.. I
really don't mind if this functionality is on the client or on the broker
as long as it gives a consistent experience for the users. For a better
overall user experience though I think this is a basic building block and
it made sense to me to put both functionality onto the server side as there
is nothing much to be configured on batching except partition, replica
batch sizes and maybe leader movements. Also users could very well limit
the blast radius of a reassignment by applying throttling too where they
don't care about the order of reassignments or any special requirements
just about the fact that it's sustainable in the cluster and finishes
without causing troubles. With small batches and throttling individual
partition reassignments finish earlier because you divide the given
bandwidth among fewer participants at a time, so it's more effective. By
just tweaking these knobs (batch size and throttling) we could achieve a
significantly better and more effective rebalance.

Policies I think can be somewhat parallel to this as they may require some
information from metrics and you're right that the controller could gather
these easier but it's harder to make plugins, although I don't think it's
impossible to give a good framework for this. Also reassignment can be
extracted from the controller (Stan has already looked into separating the
reassignment logic from the KafkaController class in #7339) so I think we
could come up with a construct that is decoupled from the controller but
still resides on the broker side. Furthermore policies could also be
dynamically configured if needed.

Any change that we make should be backward compatible and I'd default them
to the current behavior which means external tools wouldn't have to take
extra steps but those who don't use any external tools would benefit these
changes.

I think it would be useful to discuss the overall vision on reassignments
as it would help us to decide whether this thing goes on the client or the
broker side so allow me to pose some questions to you and other PMCs as I'd
like to get some clarity and the overall stance on this so I can align my
work better.

What would be the overall goal that we want to achieve? Do we want to give
a solution for automatic partition load balancing or just a framework that
standardizes what many others implemented? Or do we want to adopt a
solution or recommend a specific one?
- For the first it might be better to extend the existing tools but for the
other it might be better to provide an API.
- Giving a tool for this makes ramping up easier for newbies and overall
gives a better user experience.
- Adopting one could be easy as we get a bunch of functionality which are
relatively battle proven but I haven't seen developers from any of the
tools yet to volunteer. Or perhaps the Kafka PMC group would need to decide
on a tool (which is waaaay out of my league :) ).

An alternative solution is we don't do anything but just suggest users to
use any tools. The question then is which tools should they use? Do we
recommend them to decide themselves?
- The variety of tools and deciding between them may not be easy and there
might be a number of factors based what functionality do you need, etc..
Also contributing to any of the projects might be more cumbersome than
contributing to Kafka itself. Therefore there might be benefits for the
users in consolidating the tools in Kafka.

Would we want to achieve the state where Kafka automatically heals itself
through rebalances?
- As far as I know currently for most of the an administrator have to
initiate these tasks and approve proposals but it might be desirable if
Kafka worked on its own where possible. Less maintenance burden.

To sum up: I could also lean towards having this on the client side so I'll
rework the KIP and look into how can we solve the reassignment batching
problem the best on the client side and will come back to you. However I
think there are important questions above that we need to clear up to set
the directions. Also if you think answering the above questions would
exceed the boundaries of email communication I can write up a KIP about
automatic partition balancing (of course if you can provide the directions
I should go) and we can have a separate discussion.

Thanks,
Viktor


On Thu, Dec 5, 2019 at 1:23 AM Colin McCabe <co...@cmccabe.xyz> wrote:

> Hi Stanislav & Viktor,
>
> Kafka has historically delegated partition reassignment to external
> tools.  For example, the choice of which partitions to reassign is
> determined by an external tool.  The choice of what throttle to use is also
> set externally.
>  So I think that putting this into the controller is not very consistent
> with how Kafka currently works.  It seems better to continue to let
> batching be handled by an external tool, rather than handled by the
> controller.
>
> The big advantage of having reassignment done by an external tool is that
> users are free to set their own reassignment policies without hacking Kafka
> itself.  So, for example, you can build a reassignment tool that moves
> partitions only when they're smaller than 1 GB without writing a KIP for a
> new reassign.only.if.smaller.than.1g configuration.
>
> The same is true of batching, I think.  Do you want to set up your batch
> so that you're always moving a given number of megabytes per second?  Or
> maybe you want to move 3 partitions at all times?  Do you want to move the
> biggest partitions first?  Etc. etc.  These things are all policy, and it's
> a lot easier to iterate on the policy when it's not hard-coded in the
> controller.  Even changing these configurations at runtime will be very
> difficult if they are controller configs.
>
> Of course, there are disadvantages to having reassignment done by an
> external tool.  The external tool is another thing you have to deploy and
> run.  The tool also needs to collect statistics about the cluster which the
> controller already mostly has, so there is some duplication there.
>
> Having some reassignment policies set by the controller and some set by an
> external tool gets us the worst of both worlds.  Users would need to deploy
> and run external reassignment tools, but they would also need to worry
> about correctly configuring the controller so that it could execute their
> preferred policies.
>
> We just got done making a major change to the reassignment API, and that
> change is still percolating through the environment.  Most external tools
> haven't been updated yet to use the new API.  I think we should see how the
> new API works out before making more major changes to reassignment.
>
> In practice, external reassignment tools already do handle batching.  For
> example, Cruise Control decides what to move, but also divides up the work
> into batches.  The main functionality that we lack is a way to do this with
> just the code shipped as part of Apache Kafka itself.
>
> So, if we want to give users a better out-of-the-box experience, why not
> add a mode to kafka-reassign-partitions.sh that lets it keep running for a
> while and breaks down the requested partition moves into batches?  I think
> this is much more consistent with how the system currently works, and much
> less likely to create technical debt for the future, than moving
> functionality into the controller.
>
> best,
> Colin
>
>
> On Wed, Dec 4, 2019, at 06:11, Stanislav Kozlovski wrote:
> > Hey all,
> >
> > I spoke offline with Viktor regarding this KIP and intentions on
> starting a
> > vote soon, so I made another pass on it. I have some comments:
> >
> > From our past discussion:
> >
> > > I reiterated on the current algorithm and indeed it would change the
> order
> > of replicas in ZK but wouldn't do a leader election, so one would need to
> > run the preferred replica election tool. However still in the light of
> this
> > I'm not sure I'd keep the existing behavior as users wouldn't win
> anything
> > with it.
> >
> > I think we should mention the change in the KIP at the very least such
> that
> > we can discuss it with other reviewers.
> >
> > > If I understand correctly, you're suggesting that we count the
> > "reassignment.parallel.leader.movements" config against such preferred
> > elections. I think that makes sense. If we are to do that we should
> > explicitly mention that this KIP touches the preferred election logic as
> > well. Would that config also bound the number of leader movements the
> auto
> > rebalance inside the broker would do as well?
> > > ... It might be easier to limit the scope of this KIP and not place a
> > bound on preferred leader election.
> >
> > What are your thoughts on this?
> >
> > And now comments on the KIP:
> >
> > 1.
> >
> >    1. > Update CR in Zookeeper
> >    (/brokers/topics/[topic]/partitions/[partitionId]/state) with TR for
> the
> >    given partitions.
> >
> >
> > I believe the leader for the partition is the "owner" of that znode.
> Having
> > the controller update it may complicate things.
> > Has this been updated with the latest KIP-455? We keep reassignment state
> > in the /brokers/topics/[topic] znode
> >
> > 2.
> >
> > How does reassignment cancellation work with these new incremental steps?
> > In your example, what happens if I cancel the current reassignment while
> > the replica set is (5, 6, 2, 3, 4)? Where and how would we keep the state
> > to revert back to (0, 1, 2, 3, 4)?
> >
> >
> > 3.
> >
> > > The only exception is the first step where (if needed) we add that many
> > replicas that is enough to fulfil the min.insync.replicas requirement set
> > on the broker, even if it exceeds the limit on parallel replica
> > reassignments. For instance if there are 4 brokers, min.insync.replicas
> set
> > to 3 but there are only 1 in-sync replica, then we immediately add 2
> other
> > in one step, so the producers are able to continue.
> >
> > Wouldn't that be adding extra replication traffic and pressure on an
> > already problematic partition? I may be missing something but I actually
> > think that this wouldn't help in common circumstances.
> >
> > 4.
> >
> > > Calculate the replicas to be dropped (DR):
> >
> >    1. Calculate n = max(reassignment.parallel.replica.count, size(FTR) -
> >    size(CR))
> >
> >
> > Do you mean min() here?
> >
> > 5.
> >
> >    1.
> >       1. Take the first *reassignment.parallel.replica.count* replicas of
> >       ER, that will be the set of dropped replicas
> >
> >
> > Do you mean `N replicas of ER` here?
> >
> > 6.
> >
> > > Calculate the new replicas (NR) to be added
> >
> >    1. Calculate that if the partition has enough online replicas to
> fulfil
> >    the min.insync.replicas config so the producers are able to continue.
> >    2. If the preferred leader is different in FTR and it is not yet
> >    reassigned, then add it to the NR
> >    3. If size(NR) < min.insync.replicas then take
> min(min.insync.replicas,
> >    reassignment.parallel.replica.count) - size(NR) replicas from FTR
> >    4. Otherwise take as many replica as
> >    *reassignment.parallel.replica.count* allows
> >
> >
> > Not sure I understand this. Wouldn't we always take as many new replicas
> > (NR) as the ones we are dropping (DR)?
> >
> > 7.
> >
> > This new configuration would tell how many replicas of a single partition
> > can be moved at once.
> >
> > I believe this description is outdated
> >
> > 8.
> >
> > I don't see it as a rejected alternative - have we considered delegating
> > this logic out to the client and making Kafka accept a series of
> > reassignment "steps" to run?
> >
> >
> > Thanks!
> > Stanislav
> >
> > On Wed, Aug 21, 2019 at 12:33 PM Viktor Somogyi-Vass <
> > viktorsomogyi@gmail.com> wrote:
> >
> > > Hey Folks,
> > >
> > > I think I'll open a vote early next week about this if there are no
> more
> > > feedback.
> > >
> > > Thanks,
> > > Viktor
> > >
> > > On Fri, Aug 9, 2019 at 5:25 PM Viktor Somogyi-Vass <
> > > viktorsomogyi@gmail.com>
> > > wrote:
> > >
> > > > Hey Stanislav,
> > > >
> > > > I reiterated on the current algorithm and indeed it would change the
> > > order
> > > > of replicas in ZK but wouldn't do a leader election, so one would
> need to
> > > > run the preferred replica election tool. However still in the light
> of
> > > this
> > > > I'm not sure I'd keep the existing behavior as users wouldn't win
> > > anything
> > > > with it. Changing leadership automatically would result in a simpler,
> > > > easier and more responsive reassign algorithm which is especially
> > > important
> > > > if the reassignment is done to free up the broker from under heavy
> load.
> > > > Automated tools (Cruise Control) would also have simpler life.
> > > > Let me know what you think.
> > > >
> > > > Viktor
> > > >
> > > > On Thu, Jul 11, 2019 at 7:16 PM Viktor Somogyi-Vass <
> > > > viktorsomogyi@gmail.com> wrote:
> > > >
> > > >> Hi Stan,
> > > >>
> > > >> I meant the following (maybe rare) scenario - we have replicas [1,
> 2, 3]
> > > >> on
> > > >> a lot of partitions and the user runs a massive rebalance to change
> them
> > > >> all to [3, 2, 1]. In the old behavior, I think that this would not
> do
> > > >> anything but simply change the replica set in ZK.
> > > >> Then, the user could run kafka-preferred-replica-election.sh on a
> given
> > > >> set
> > > >> of partitions to make sure the new leader 3 gets elected.
> > > >>
> > > >> I thought the old algorithm would elect 3 as the leader in this case
> > > >> right away at the end but I have to double check this. In any case I
> > > think
> > > >> it would make sense in the new algorithm if we elected the new
> preferred
> > > >> leader right away, regardless of the new leader is chosen from the
> > > existing
> > > >> replicas or not. If the whole reassignment is in fact just changing
> the
> > > >> replica order then either way it is a simple (trivial) operation and
> > > doing
> > > >> it batched wouldn't slow it down much as there is no data movement
> > > >> involved. If the reassignment is mixed, meaning it contains
> reordering
> > > and
> > > >> real movement as well then in fact it would help to even out the
> load
> > > >> faster as data movements can get long. For instance in case of a
> > > >> reassignment batch of two partitions concurrently: P1: (1,2,3) ->
> > > (3,2,1)
> > > >> and P2: (4,5,6) -> (7,8,9) the P2 reassignment would elect a new
> leader
> > > but
> > > >> P1 wouldn't and it wouldn't help the goal of normalizing traffic on
> > > broker
> > > >> 1 that much.
> > > >> Again, I'll have to check how the current algorithm works and if it
> has
> > > >> any unknown drawbacks to implement what I sketched up above.
> > > >>
> > > >> As for generic preferred leader election when called from the admin
> api
> > > >> or the auto leader balance feature I think you're right that we
> should
> > > >> leave it as it is. It doesn't involve any data movement so it's
> fairly
> > > fast
> > > >> and it normalizes the cluster state quickly.
> > > >>
> > > >> Viktor
> > > >>
> > > >> On Tue, Jul 9, 2019 at 9:04 PM Stanislav Kozlovski <
> > > >> stanislav@confluent.io> wrote:
> > > >>
> > > >>> Hey Viktor,
> > > >>>
> > > >>>  I think it is intuitive if there are on a global level...If we
> applied
> > > >>> > them on every batch then we
> > > >>> > couldn't really guarantee any limits as the user would be able
> to get
> > > >>> > around it with submitting lots of reassignments.
> > > >>>
> > > >>>
> > > >>> Agreed. Could we mention this explicitly in the KIP?
> > > >>>
> > > >>> Also if I understand correctly, AlterPartitionAssignmentsRequest
> would
> > > >>> be a
> > > >>> > partition level batching, isn't it? So if you submit 10
> partitions at
> > > >>> once
> > > >>> > then they'll all be started by the controller immediately as per
> my
> > > >>> > understanding.
> > > >>>
> > > >>>
> > > >>> Yep, absolutely
> > > >>>
> > > >>> I've raised the ordering problem on the discussion of KIP-455 in a
> bit
> > > >>> > different form and as I remember the verdict there was that we
> > > >>> shouldn't
> > > >>> > expose ordering as an API. It might not be easy as you say and
> there
> > > >>> might
> > > >>> > be much better strategies to follow (like disk or network
> utilization
> > > >>> > goals). Therefore I'll remove this section from the KIP.
> > > >>>
> > > >>>
> > > >>> Sounds good to me.
> > > >>>
> > > >>> I'm not sure I get this scenario. So are you saying that after they
> > > >>> > submitted a reassignment they also submit a preferred leader
> change?
> > > >>> > In my mind what I would do is:
> > > >>> > i) make auto.leader.rebalance.enable to obey the leader movement
> > > limit
> > > >>> as
> > > >>> > this way it will be easier to calculate the reassignment batches.
> > > >>> >
> > > >>>
> > > >>> Sorry, this is my fault -- I should have been more clear.
> > > >>> First, I didn't think through this well enough at the time, I don't
> > > >>> think.
> > > >>> If we have replicas=[1, 2, 3] and we reassign them to [4, 5, 6],
> it is
> > > >>> obvious that a leader shift will happen. Your KIP proposes we shift
> > > >>> replicas 1 and 4 first.
> > > >>>
> > > >>> I meant the following (maybe rare) scenario - we have replicas [1,
> 2,
> > > 3]
> > > >>> on
> > > >>> a lot of partitions and the user runs a massive rebalance to change
> > > them
> > > >>> all to [3, 2, 1]. In the old behavior, I think that this would not
> do
> > > >>> anything but simply change the replica set in ZK.
> > > >>> Then, the user could run kafka-preferred-replica-election.sh on a
> given
> > > >>> set
> > > >>> of partitions to make sure the new leader 3 gets elected.
> > > >>>
> > > >>> ii) the user could submit preferred leader election but it's
> basically
> > > a
> > > >>> > form of reassignment so it would fall under the batching
> criterias.
> > > If
> > > >>> the
> > > >>> > batch they submit is smaller than the internal, then it'd be
> executed
> > > >>> > immediately but otherwise it would be split into more batches. It
> > > >>> might be
> > > >>> > a different behavior as it may not be executed it in one batch
> but I
> > > >>> think
> > > >>> > it isn't a problem because we'll default to Int.MAX with the
> batches
> > > >>> and
> > > >>> > otherwise because since it's a form of reassignment I think it
> makes
> > > >>> sense
> > > >>> > to apply the same logic.
> > > >>>
> > > >>>
> > > >>> The AdminAPI for that is
> > > >>> "electPreferredLeaders(Collection<TopicPartition>
> > > >>> partitions)" and the old zNode is "{"partitions": [{"topic": "a",
> > > >>> "partition": 0}]}" so it is a bit less explicit than our other
> > > >>> reassignment
> > > >>> API, but the functionality is the same.
> > > >>> You're 100% right that it is a form of reassignment, but I hadn't
> > > thought
> > > >>> of it like that and I some other people wouldn't have either.
> > > >>> If I understand correctly, you're suggesting that we count the
> > > >>> "reassignment.parallel.leader.movements" config against such
> preferred
> > > >>> elections. I think that makes sense. If we are to do that we should
> > > >>> explicitly mention that this KIP touches the preferred election
> logic
> > > as
> > > >>> well. Would that config also bound the number of leader movements
> the
> > > >>> auto
> > > >>> rebalance inside the broker would do as well?
> > > >>>
> > > >>> Then again, the "reassignment.parallel.leader.movements" config
> has a
> > > bit
> > > >>> of a different meaning when used in the context of a real
> reassignment
> > > >>> (where data moves from the leader to N more replicas) versus in the
> > > >>> preferred election switch (where all we need is two lightweight
> > > >>> LeaderAndIsr requests), so I am not entirely convinced we may want
> to
> > > use
> > > >>> the same config for that.
> > > >>> It might be easier to limit the scope of this KIP and not place a
> bound
> > > >>> on
> > > >>> preferred leader election.
> > > >>>
> > > >>> Thanks,
> > > >>> Stanislav
> > > >>>
> > > >>>
> > > >>> On Mon, Jul 8, 2019 at 4:28 PM Viktor Somogyi-Vass <
> > > >>> viktorsomogyi@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>> > Hey Stanislav,
> > > >>> >
> > > >>> > Thanks for the thorough look at the KIP! :)
> > > >>> >
> > > >>> > > Let me first explicitly confirm my understanding of the
> configs and
> > > >>> the
> > > >>> > > algorithm:
> > > >>> > > * reassignment.parallel.replica.count - the maximum total
> number of
> > > >>> > > replicas that we can move at once, *per partition*
> > > >>> > > * reassignment.parallel.partition.count - the maximum number of
> > > >>> > partitions
> > > >>> > > we can move at once
> > > >>> > > * reassignment.parallel.leader.movements - the maximum number
> of
> > > >>> leader
> > > >>> > > movements we can have at once
> > > >>> >
> > > >>> > Yes.
> > > >>> >
> > > >>> > > As far as I currently understand it, your proposed algorithm
> will
> > > >>> > naturally
> > > >>> > > prioritize leader movement first. e.g if
> > > >>> > > reassignment.parallel.replica.count=1 and
> > > >>> > >
> > > >>> >
> > > >>> >
> > > >>>
> > >
> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> > > >>> > > we will always move one, the first possible, replica in the
> replica
> > > >>> set
> > > >>> > > (which will be the leader if part of the excess replica set
> (ER)).
> > > >>> > > Am I correct in saying that?
> > > >>> >
> > > >>> > Yes.
> > > >>> >
> > > >>> > > 1. Does it make sense to add `max` somewhere in the configs'
> names?
> > > >>> >
> > > >>> > If you imply that it's not always an exact number (because the
> last
> > > >>> batch
> > > >>> > could be smaller) than I think it's a good idea. I don't mind
> this
> > > >>> naming
> > > >>> > or having max in it either.
> > > >>> >
> > > >>> > > 2. How does this KIP play along with KIP-455's notion of
> multiple
> > > >>> > > rebalances - do the configs apply on a
> > > >>> > > single AlterPartitionAssignmentsRequest or are they global?
> > > >>> >
> > > >>> > I think it is intuitive if there are on a global level and the
> user
> > > >>> would
> > > >>> > have the option to set KIP-435's settings to Int.MAX and thus
> > > >>> eliminate any
> > > >>> > batching and submit their own batches through
> > > >>> > AlterPartitionAssignmentsRequest. If we applied them on every
> batch
> > > >>> then we
> > > >>> > couldn't really guarantee any limits as the user would be able
> to get
> > > >>> > around it with submitting lots of reassignments. By default
> though we
> > > >>> set
> > > >>> > the batching limits to Int.MAX so effectively they're disabled.
> > > >>> > Also if I understand correctly, AlterPartitionAssignmentsRequest
> > > would
> > > >>> be a
> > > >>> > partition level batching, isn't it? So if you submit 10
> partitions at
> > > >>> once
> > > >>> > then they'll all be started by the controller immediately as per
> my
> > > >>> > understanding.
> > > >>> >
> > > >>> > > 3. Unless I've missed it, the algorithm does not take into
> account
> > > >>> > > `reassignment.parallel.leader.movements`
> > > >>> >
> > > >>> > Yea the reassignment step calculation doesn't contain that
> because it
> > > >>> > describes only one partition's reassignment step calculation. We
> > > simply
> > > >>> > fill our reassignment batch with as many leader movements as we
> can
> > > and
> > > >>> > then fill the rest with reassignments which don't require leader
> > > >>> movement
> > > >>> > if we have space. I'll create a pseudo code block on the KIP for
> > > this.
> > > >>> >
> > > >>> > > 4. The KIP says that the order of the input has some control
> over
> > > >>> how the
> > > >>> > > batches are created - i.e it's deterministic. What would the
> > > batches
> > > >>> of
> > > >>> > the
> > > >>> > > following reassignment look like:
> > > >>> > > reassignment.parallel.replica.count=1
> > > >>> > > reassignment.parallel.partition.count=MAX_INT
> > > >>> > > reassignment.parallel.leader.movements=1
> > > >>> > > partitionA - (0,1,2) -> (3, 4, 5)
> > > >>> > > partitionB - (0,1,2) -> (3,4,5)
> > > >>> > > partitionC - (0,1,2) -> (3, 4, 5)
> > > >>> > > From my understanding, we would start with A(0->3), B(1->4) and
> > > >>> C(1->4).
> > > >>> > Is
> > > >>> > > that correct? Would the second step then continue with B(0->3)?
> > > >>> > >
> > > >>> > > If the configurations are global, I can imagine we will have a
> bit
> > > >>> more
> > > >>> > > trouble in preserving the expected ordering, especially across
> > > >>> controller
> > > >>> > > failovers -- but I'll avoid speculating until you confirm the
> scope
> > > >>> of
> > > >>> > the
> > > >>> > > config.
> > > >>> >
> > > >>> > I've raised the ordering problem on the discussion of KIP-455 in
> a
> > > bit
> > > >>> > different form and as I remember the verdict there was that we
> > > >>> shouldn't
> > > >>> > expose ordering as an API. It might not be easy as you say and
> there
> > > >>> might
> > > >>> > be much better strategies to follow (like disk or network
> utilization
> > > >>> > goals). Therefore I'll remove this section from the KIP.
> (Otherwise
> > > >>> yes, I
> > > >>> > would have thought of the same behavior what you described.)
> > > >>> > Since #3 also was about the pseudo code, it would be something
> like
> > > >>> this:
> > > >>> > Config:
> > > >>> > reassignment.parallel.replica.count=R
> > > >>> > reassignment.parallel.partition.count=P
> > > >>> > reassignment.parallel.leader.movements=L
> > > >>> > Code:
> > > >>> > val batchSize = max(L,P)
> > > >>> > // split the individual partition reassignments whether they
> involve
> > > >>> leader
> > > >>> > movement or not
> > > >>> > val partitionMovements =
> > > >>> >
> > > >>> >
> > > >>>
> > >
> calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment)
> > > >>> > // fill the batch with as much leader movements as possible and
> take
> > > >>> the
> > > >>> > rest from other reassignments
> > > >>> > val currentBatch = if (partitionMovements.leaderMovements.size <
> > > >>> batchSize)
> > > >>> >   partitionMovements.leaderMovements ++
> > > >>> > partitionsToReassign.otherPartitionMovements.take(batchSize -
> > > >>> > partitionMovements.leaderMovements.size)
> > > >>> > else
> > > >>> >  partitionMovements.leaderMovements.take(batchSize)
> > > >>> > executeReassignmentOnBatch(currentBatch)
> > > >>> >
> > > >>> > > 5. Regarding the new behavior of electing the new preferred
> leader
> > > >>> in the
> > > >>> > > "first step" of the reassignment - does this obey the
> > > >>> > > `auto.leader.rebalance.enable` config?
> > > >>> > > If not, I have concerns regarding how backwards compatible this
> > > >>> might be
> > > >>> > -
> > > >>> > > e.g imagine a user does a huge reassignment (as they have
> always
> > > >>> done)
> > > >>> > and
> > > >>> > > suddenly a huge leader shift happens, whereas the user
> expected to
> > > >>> > manually
> > > >>> > > shift preferred leaders at a slower rate via
> > > >>> > > the kafka-preferred-replica-election.sh tool.
> > > >>> >
> > > >>> > I'm not sure I get this scenario. So are you saying that after
> they
> > > >>> > submitted a reassignment they also submit a preferred leader
> change?
> > > >>> > In my mind what I would do is:
> > > >>> > i) make auto.leader.rebalance.enable to obey the leader movement
> > > limit
> > > >>> as
> > > >>> > this way it will be easier to calculate the reassignment batches.
> > > >>> > ii) the user could submit preferred leader election but it's
> > > basically
> > > >>> a
> > > >>> > form of reassignment so it would fall under the batching
> criterias.
> > > If
> > > >>> the
> > > >>> > batch they submit is smaller than the internal, then it'd be
> executed
> > > >>> > immediately but otherwise it would be split into more batches. It
> > > >>> might be
> > > >>> > a different behavior as it may not be executed it in one batch
> but I
> > > >>> think
> > > >>> > it isn't a problem because we'll default to Int.MAX with the
> batches
> > > >>> and
> > > >>> > otherwise because since it's a form of reassignment I think it
> makes
> > > >>> sense
> > > >>> > to apply the same logic.
> > > >>> >
> > > >>> > > 6. What is the expected behavior if we dynamically change one
> of
> > > the
> > > >>> > > configs to a lower value while a reassignment is happening.
> Would
> > > we
> > > >>> > cancel
> > > >>> > > some of the currently reassigned partitions or would we
> account for
> > > >>> the
> > > >>> > new
> > > >>> > > values on the next reassignment? I assume the latter but it's
> good
> > > >>> to be
> > > >>> > > explicit
> > > >>> >
> > > >>> > Yep, it would be the latter. I'll make this explicit in the KIP
> too.
> > > >>> >
> > > >>> > About the nits:
> > > >>> > Noted, will update the KIP to reflect your suggestions :)
> > > >>> >
> > > >>> > Viktor
> > > >>> >
> > > >>> > On Fri, Jun 28, 2019 at 5:59 PM Stanislav Kozlovski <
> > > >>> > stanislav@confluent.io>
> > > >>> > wrote:
> > > >>> > >
> > > >>> > > Hey there Viktor,
> > > >>> > >
> > > >>> > > Thanks for working on this KIP! I agree with the notion that
> > > >>> reliability,
> > > >>> > > stability and predictability of a reassignment should be a core
> > > >>> feature
> > > >>> > of
> > > >>> > > Kafka.
> > > >>> > >
> > > >>> > > Let me first explicitly confirm my understanding of the
> configs and
> > > >>> the
> > > >>> > > algorithm:
> > > >>> > > * reassignment.parallel.replica.count - the maximum total
> number of
> > > >>> > > replicas that we can move at once, *per partition*
> > > >>> > > * reassignment.parallel.partition.count - the maximum number of
> > > >>> > partitions
> > > >>> > > we can move at once
> > > >>> > > * reassignment.parallel.leader.movements - the maximum number
> of
> > > >>> leader
> > > >>> > > movements we can have at once
> > > >>> > >
> > > >>> > > As far as I currently understand it, your proposed algorithm
> will
> > > >>> > naturally
> > > >>> > > prioritize leader movement first. e.g if
> > > >>> > > reassignment.parallel.replica.count=1 and
> > > >>> > >
> > > >>> >
> > > >>> >
> > > >>>
> > >
> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> > > >>> > > we will always move one, the first possible, replica in the
> replica
> > > >>> set
> > > >>> > > (which will be the leader if part of the excess replica set
> (ER)).
> > > >>> > > Am I correct in saying that?
> > > >>> > >
> > > >>> > > Regarding the KIP, I've got a couple of comments/questions::
> > > >>> > >
> > > >>> > > 1. Does it make sense to add `max` somewhere in the configs'
> names?
> > > >>> > >
> > > >>> > > 2. How does this KIP play along with KIP-455's notion of
> multiple
> > > >>> > > rebalances - do the configs apply on a
> > > >>> > > single AlterPartitionAssignmentsRequest or are they global?
> > > >>> > >
> > > >>> > > 3. Unless I've missed it, the algorithm does not take into
> account
> > > >>> > > `reassignment.parallel.leader.movements`
> > > >>> > >
> > > >>> > > 4. The KIP says that the order of the input has some control
> over
> > > >>> how the
> > > >>> > > batches are created - i.e it's deterministic. What would the
> > > batches
> > > >>> of
> > > >>> > the
> > > >>> > > following reassignment look like:
> > > >>> > > reassignment.parallel.replica.count=1
> > > >>> > > reassignment.parallel.partition.count=MAX_INT
> > > >>> > > reassignment.parallel.leader.movements=1
> > > >>> > > partitionA - (0,1,2) -> (3, 4, 5)
> > > >>> > > partitionB - (0,1,2) -> (3,4,5)
> > > >>> > > partitionC - (0,1,2) -> (3, 4, 5)
> > > >>> > > From my understanding, we would start with A(0->3), B(1->4) and
> > > >>> C(1->4).
> > > >>> > Is
> > > >>> > > that correct? Would the second step then continue with B(0->3)?
> > > >>> > >
> > > >>> > > If the configurations are global, I can imagine we will have a
> bit
> > > >>> more
> > > >>> > > trouble in preserving the expected ordering, especially across
> > > >>> controller
> > > >>> > > failovers -- but I'll avoid speculating until you confirm the
> scope
> > > >>> of
> > > >>> > the
> > > >>> > > config.
> > > >>> > >
> > > >>> > > 5. Regarding the new behavior of electing the new preferred
> leader
> > > >>> in the
> > > >>> > > "first step" of the reassignment - does this obey the
> > > >>> > > `auto.leader.rebalance.enable` config?
> > > >>> > > If not, I have concerns regarding how backwards compatible this
> > > >>> might be
> > > >>> > -
> > > >>> > > e.g imagine a user does a huge reassignment (as they have
> always
> > > >>> done)
> > > >>> > and
> > > >>> > > suddenly a huge leader shift happens, whereas the user
> expected to
> > > >>> > manually
> > > >>> > > shift preferred leaders at a slower rate via
> > > >>> > > the kafka-preferred-replica-election.sh tool.
> > > >>> > >
> > > >>> > > 6. What is the expected behavior if we dynamically change one
> of
> > > the
> > > >>> > > configs to a lower value while a reassignment is happening.
> Would
> > > we
> > > >>> > cancel
> > > >>> > > some of the currently reassigned partitions or would we
> account for
> > > >>> the
> > > >>> > new
> > > >>> > > values on the next reassignment? I assume the latter but it's
> good
> > > >>> to be
> > > >>> > > explicit
> > > >>> > >
> > > >>> > > As some small nits:
> > > >>> > > - could we have a section in the KIP where we explicitly define
> > > what
> > > >>> each
> > > >>> > > config does? This can be inferred from the KIP as is but
> requires
> > > >>> careful
> > > >>> > > reading, whereas some developers might want to skim through the
> > > >>> change to
> > > >>> > > get a quick sense. It also improves readability but that's my
> > > >>> personal
> > > >>> > > opinion.
> > > >>> > > - could you better clarify how a reassignment step is different
> > > from
> > > >>> the
> > > >>> > > currently existing algorithm? maybe laying both algorithms out
> in
> > > >>> the KIP
> > > >>> > > would be most clear
> > > >>> > > - the names for the OngoingPartitionReassignment and
> > > >>> > > CurrentPartitionReassignment fields in the
> > > >>> > > ListPartitionReassignmentsResponse are a bit confusing to me.
> > > >>> > > Unfortunately, I don't have a better suggestion, but maybe
> somebody
> > > >>> else
> > > >>> > in
> > > >>> > > the community has.
> > > >>> > >
> > > >>> > > Thanks,
> > > >>> > > Stanislav
> > > >>> > >
> > > >>> > > On Thu, Jun 27, 2019 at 3:24 PM Viktor Somogyi-Vass <
> > > >>> > viktorsomogyi@gmail.com>
> > > >>> > > wrote:
> > > >>> > >
> > > >>> > > > Hi All,
> > > >>> > > >
> > > >>> > > > I've renamed my KIP as its name was a bit confusing so we'll
> > > >>> continue
> > > >>> > it in
> > > >>> > > > this thread.
> > > >>> > > > The previous thread for record:
> > > >>> > > >
> > > >>> > > >
> > > >>> >
> > > >>> >
> > > >>>
> > >
> https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E
> > > >>> > > >
> > > >>> > > > A short summary of the KIP:
> > > >>> > > > In case of a vast partition reassignment (thousands of
> partitions
> > > >>> at
> > > >>> > once)
> > > >>> > > > Kafka can collapse under the increased replication traffic.
> This
> > > >>> KIP
> > > >>> > will
> > > >>> > > > mitigate it by introducing internal batching done by the
> > > >>> controller.
> > > >>> > > > Besides putting a bandwidth limit on the replication it is
> useful
> > > >>> to
> > > >>> > batch
> > > >>> > > > partition movements as fewer number of partitions will use
> the
> > > >>> > available
> > > >>> > > > bandwidth for reassignment and they finish faster.
> > > >>> > > > The main control handles are:
> > > >>> > > > - the number of parallel leader movements,
> > > >>> > > > - the number of parallel partition movements
> > > >>> > > > - and the number of parallel replica movements.
> > > >>> > > >
> > > >>> > > > Thank you for the feedback and ideas so far in the previous
> > > thread
> > > >>> and
> > > >>> > I'm
> > > >>> > > > happy to receive more.
> > > >>> > > >
> > > >>> > > > Regards,
> > > >>> > > > Viktor
> > > >>> > > >
> > > >>> > >
> > > >>> > >
> > > >>> > > --
> > > >>> > > Best,
> > > >>> > > Stanislav
> > > >>> >
> > > >>>
> > > >>>
> > > >>> --
> > > >>> Best,
> > > >>> Stanislav
> > > >>>
> > > >>
> > >
> >
> >
> > --
> > Best,
> > Stanislav
> >
>

Re: [DISCUSS] KIP-435: Internal Partition Reassignment Batching

Posted by Colin McCabe <co...@cmccabe.xyz>.
Hi Stanislav & Viktor,

Kafka has historically delegated partition reassignment to external tools.  For example, the choice of which partitions to reassign is determined by an external tool.  The choice of what throttle to use is also set externally. 
 So I think that putting this into the controller is not very consistent with how Kafka currently works.  It seems better to continue to let batching be handled by an external tool, rather than handled by the controller.

The big advantage of having reassignment done by an external tool is that users are free to set their own reassignment policies without hacking Kafka itself.  So, for example, you can build a reassignment tool that moves partitions only when they're smaller than 1 GB without writing a KIP for a new reassign.only.if.smaller.than.1g configuration.

The same is true of batching, I think.  Do you want to set up your batch so that you're always moving a given number of megabytes per second?  Or maybe you want to move 3 partitions at all times?  Do you want to move the biggest partitions first?  Etc. etc.  These things are all policy, and it's a lot easier to iterate on the policy when it's not hard-coded in the controller.  Even changing these configurations at runtime will be very difficult if they are controller configs.

Of course, there are disadvantages to having reassignment done by an external tool.  The external tool is another thing you have to deploy and run.  The tool also needs to collect statistics about the cluster which the controller already mostly has, so there is some duplication there.

Having some reassignment policies set by the controller and some set by an external tool gets us the worst of both worlds.  Users would need to deploy and run external reassignment tools, but they would also need to worry about correctly configuring the controller so that it could execute their preferred policies.

We just got done making a major change to the reassignment API, and that change is still percolating through the environment.  Most external tools haven't been updated yet to use the new API.  I think we should see how the new API works out before making more major changes to reassignment.

In practice, external reassignment tools already do handle batching.  For example, Cruise Control decides what to move, but also divides up the work into batches.  The main functionality that we lack is a way to do this with just the code shipped as part of Apache Kafka itself.

So, if we want to give users a better out-of-the-box experience, why not add a mode to kafka-reassign-partitions.sh that lets it keep running for a while and breaks down the requested partition moves into batches?  I think this is much more consistent with how the system currently works, and much less likely to create technical debt for the future, than moving functionality into the controller.

best,
Colin


On Wed, Dec 4, 2019, at 06:11, Stanislav Kozlovski wrote:
> Hey all,
> 
> I spoke offline with Viktor regarding this KIP and intentions on starting a
> vote soon, so I made another pass on it. I have some comments:
> 
> From our past discussion:
> 
> > I reiterated on the current algorithm and indeed it would change the order
> of replicas in ZK but wouldn't do a leader election, so one would need to
> run the preferred replica election tool. However still in the light of this
> I'm not sure I'd keep the existing behavior as users wouldn't win anything
> with it.
> 
> I think we should mention the change in the KIP at the very least such that
> we can discuss it with other reviewers.
> 
> > If I understand correctly, you're suggesting that we count the
> "reassignment.parallel.leader.movements" config against such preferred
> elections. I think that makes sense. If we are to do that we should
> explicitly mention that this KIP touches the preferred election logic as
> well. Would that config also bound the number of leader movements the auto
> rebalance inside the broker would do as well?
> > ... It might be easier to limit the scope of this KIP and not place a
> bound on preferred leader election.
> 
> What are your thoughts on this?
> 
> And now comments on the KIP:
> 
> 1.
> 
>    1. > Update CR in Zookeeper
>    (/brokers/topics/[topic]/partitions/[partitionId]/state) with TR for the
>    given partitions.
> 
> 
> I believe the leader for the partition is the "owner" of that znode. Having
> the controller update it may complicate things.
> Has this been updated with the latest KIP-455? We keep reassignment state
> in the /brokers/topics/[topic] znode
> 
> 2.
> 
> How does reassignment cancellation work with these new incremental steps?
> In your example, what happens if I cancel the current reassignment while
> the replica set is (5, 6, 2, 3, 4)? Where and how would we keep the state
> to revert back to (0, 1, 2, 3, 4)?
> 
> 
> 3.
> 
> > The only exception is the first step where (if needed) we add that many
> replicas that is enough to fulfil the min.insync.replicas requirement set
> on the broker, even if it exceeds the limit on parallel replica
> reassignments. For instance if there are 4 brokers, min.insync.replicas set
> to 3 but there are only 1 in-sync replica, then we immediately add 2 other
> in one step, so the producers are able to continue.
> 
> Wouldn't that be adding extra replication traffic and pressure on an
> already problematic partition? I may be missing something but I actually
> think that this wouldn't help in common circumstances.
> 
> 4.
> 
> > Calculate the replicas to be dropped (DR):
> 
>    1. Calculate n = max(reassignment.parallel.replica.count, size(FTR) -
>    size(CR))
> 
> 
> Do you mean min() here?
> 
> 5.
> 
>    1.
>       1. Take the first *reassignment.parallel.replica.count* replicas of
>       ER, that will be the set of dropped replicas
> 
> 
> Do you mean `N replicas of ER` here?
> 
> 6.
> 
> > Calculate the new replicas (NR) to be added
> 
>    1. Calculate that if the partition has enough online replicas to fulfil
>    the min.insync.replicas config so the producers are able to continue.
>    2. If the preferred leader is different in FTR and it is not yet
>    reassigned, then add it to the NR
>    3. If size(NR) < min.insync.replicas then take min(min.insync.replicas,
>    reassignment.parallel.replica.count) - size(NR) replicas from FTR
>    4. Otherwise take as many replica as
>    *reassignment.parallel.replica.count* allows
> 
> 
> Not sure I understand this. Wouldn't we always take as many new replicas
> (NR) as the ones we are dropping (DR)?
> 
> 7.
> 
> This new configuration would tell how many replicas of a single partition
> can be moved at once.
> 
> I believe this description is outdated
> 
> 8.
> 
> I don't see it as a rejected alternative - have we considered delegating
> this logic out to the client and making Kafka accept a series of
> reassignment "steps" to run?
> 
> 
> Thanks!
> Stanislav
> 
> On Wed, Aug 21, 2019 at 12:33 PM Viktor Somogyi-Vass <
> viktorsomogyi@gmail.com> wrote:
> 
> > Hey Folks,
> >
> > I think I'll open a vote early next week about this if there are no more
> > feedback.
> >
> > Thanks,
> > Viktor
> >
> > On Fri, Aug 9, 2019 at 5:25 PM Viktor Somogyi-Vass <
> > viktorsomogyi@gmail.com>
> > wrote:
> >
> > > Hey Stanislav,
> > >
> > > I reiterated on the current algorithm and indeed it would change the
> > order
> > > of replicas in ZK but wouldn't do a leader election, so one would need to
> > > run the preferred replica election tool. However still in the light of
> > this
> > > I'm not sure I'd keep the existing behavior as users wouldn't win
> > anything
> > > with it. Changing leadership automatically would result in a simpler,
> > > easier and more responsive reassign algorithm which is especially
> > important
> > > if the reassignment is done to free up the broker from under heavy load.
> > > Automated tools (Cruise Control) would also have simpler life.
> > > Let me know what you think.
> > >
> > > Viktor
> > >
> > > On Thu, Jul 11, 2019 at 7:16 PM Viktor Somogyi-Vass <
> > > viktorsomogyi@gmail.com> wrote:
> > >
> > >> Hi Stan,
> > >>
> > >> I meant the following (maybe rare) scenario - we have replicas [1, 2, 3]
> > >> on
> > >> a lot of partitions and the user runs a massive rebalance to change them
> > >> all to [3, 2, 1]. In the old behavior, I think that this would not do
> > >> anything but simply change the replica set in ZK.
> > >> Then, the user could run kafka-preferred-replica-election.sh on a given
> > >> set
> > >> of partitions to make sure the new leader 3 gets elected.
> > >>
> > >> I thought the old algorithm would elect 3 as the leader in this case
> > >> right away at the end but I have to double check this. In any case I
> > think
> > >> it would make sense in the new algorithm if we elected the new preferred
> > >> leader right away, regardless of the new leader is chosen from the
> > existing
> > >> replicas or not. If the whole reassignment is in fact just changing the
> > >> replica order then either way it is a simple (trivial) operation and
> > doing
> > >> it batched wouldn't slow it down much as there is no data movement
> > >> involved. If the reassignment is mixed, meaning it contains reordering
> > and
> > >> real movement as well then in fact it would help to even out the load
> > >> faster as data movements can get long. For instance in case of a
> > >> reassignment batch of two partitions concurrently: P1: (1,2,3) ->
> > (3,2,1)
> > >> and P2: (4,5,6) -> (7,8,9) the P2 reassignment would elect a new leader
> > but
> > >> P1 wouldn't and it wouldn't help the goal of normalizing traffic on
> > broker
> > >> 1 that much.
> > >> Again, I'll have to check how the current algorithm works and if it has
> > >> any unknown drawbacks to implement what I sketched up above.
> > >>
> > >> As for generic preferred leader election when called from the admin api
> > >> or the auto leader balance feature I think you're right that we should
> > >> leave it as it is. It doesn't involve any data movement so it's fairly
> > fast
> > >> and it normalizes the cluster state quickly.
> > >>
> > >> Viktor
> > >>
> > >> On Tue, Jul 9, 2019 at 9:04 PM Stanislav Kozlovski <
> > >> stanislav@confluent.io> wrote:
> > >>
> > >>> Hey Viktor,
> > >>>
> > >>>  I think it is intuitive if there are on a global level...If we applied
> > >>> > them on every batch then we
> > >>> > couldn't really guarantee any limits as the user would be able to get
> > >>> > around it with submitting lots of reassignments.
> > >>>
> > >>>
> > >>> Agreed. Could we mention this explicitly in the KIP?
> > >>>
> > >>> Also if I understand correctly, AlterPartitionAssignmentsRequest would
> > >>> be a
> > >>> > partition level batching, isn't it? So if you submit 10 partitions at
> > >>> once
> > >>> > then they'll all be started by the controller immediately as per my
> > >>> > understanding.
> > >>>
> > >>>
> > >>> Yep, absolutely
> > >>>
> > >>> I've raised the ordering problem on the discussion of KIP-455 in a bit
> > >>> > different form and as I remember the verdict there was that we
> > >>> shouldn't
> > >>> > expose ordering as an API. It might not be easy as you say and there
> > >>> might
> > >>> > be much better strategies to follow (like disk or network utilization
> > >>> > goals). Therefore I'll remove this section from the KIP.
> > >>>
> > >>>
> > >>> Sounds good to me.
> > >>>
> > >>> I'm not sure I get this scenario. So are you saying that after they
> > >>> > submitted a reassignment they also submit a preferred leader change?
> > >>> > In my mind what I would do is:
> > >>> > i) make auto.leader.rebalance.enable to obey the leader movement
> > limit
> > >>> as
> > >>> > this way it will be easier to calculate the reassignment batches.
> > >>> >
> > >>>
> > >>> Sorry, this is my fault -- I should have been more clear.
> > >>> First, I didn't think through this well enough at the time, I don't
> > >>> think.
> > >>> If we have replicas=[1, 2, 3] and we reassign them to [4, 5, 6], it is
> > >>> obvious that a leader shift will happen. Your KIP proposes we shift
> > >>> replicas 1 and 4 first.
> > >>>
> > >>> I meant the following (maybe rare) scenario - we have replicas [1, 2,
> > 3]
> > >>> on
> > >>> a lot of partitions and the user runs a massive rebalance to change
> > them
> > >>> all to [3, 2, 1]. In the old behavior, I think that this would not do
> > >>> anything but simply change the replica set in ZK.
> > >>> Then, the user could run kafka-preferred-replica-election.sh on a given
> > >>> set
> > >>> of partitions to make sure the new leader 3 gets elected.
> > >>>
> > >>> ii) the user could submit preferred leader election but it's basically
> > a
> > >>> > form of reassignment so it would fall under the batching criterias.
> > If
> > >>> the
> > >>> > batch they submit is smaller than the internal, then it'd be executed
> > >>> > immediately but otherwise it would be split into more batches. It
> > >>> might be
> > >>> > a different behavior as it may not be executed it in one batch but I
> > >>> think
> > >>> > it isn't a problem because we'll default to Int.MAX with the batches
> > >>> and
> > >>> > otherwise because since it's a form of reassignment I think it makes
> > >>> sense
> > >>> > to apply the same logic.
> > >>>
> > >>>
> > >>> The AdminAPI for that is
> > >>> "electPreferredLeaders(Collection<TopicPartition>
> > >>> partitions)" and the old zNode is "{"partitions": [{"topic": "a",
> > >>> "partition": 0}]}" so it is a bit less explicit than our other
> > >>> reassignment
> > >>> API, but the functionality is the same.
> > >>> You're 100% right that it is a form of reassignment, but I hadn't
> > thought
> > >>> of it like that and I some other people wouldn't have either.
> > >>> If I understand correctly, you're suggesting that we count the
> > >>> "reassignment.parallel.leader.movements" config against such preferred
> > >>> elections. I think that makes sense. If we are to do that we should
> > >>> explicitly mention that this KIP touches the preferred election logic
> > as
> > >>> well. Would that config also bound the number of leader movements the
> > >>> auto
> > >>> rebalance inside the broker would do as well?
> > >>>
> > >>> Then again, the "reassignment.parallel.leader.movements" config has a
> > bit
> > >>> of a different meaning when used in the context of a real reassignment
> > >>> (where data moves from the leader to N more replicas) versus in the
> > >>> preferred election switch (where all we need is two lightweight
> > >>> LeaderAndIsr requests), so I am not entirely convinced we may want to
> > use
> > >>> the same config for that.
> > >>> It might be easier to limit the scope of this KIP and not place a bound
> > >>> on
> > >>> preferred leader election.
> > >>>
> > >>> Thanks,
> > >>> Stanislav
> > >>>
> > >>>
> > >>> On Mon, Jul 8, 2019 at 4:28 PM Viktor Somogyi-Vass <
> > >>> viktorsomogyi@gmail.com>
> > >>> wrote:
> > >>>
> > >>> > Hey Stanislav,
> > >>> >
> > >>> > Thanks for the thorough look at the KIP! :)
> > >>> >
> > >>> > > Let me first explicitly confirm my understanding of the configs and
> > >>> the
> > >>> > > algorithm:
> > >>> > > * reassignment.parallel.replica.count - the maximum total number of
> > >>> > > replicas that we can move at once, *per partition*
> > >>> > > * reassignment.parallel.partition.count - the maximum number of
> > >>> > partitions
> > >>> > > we can move at once
> > >>> > > * reassignment.parallel.leader.movements - the maximum number of
> > >>> leader
> > >>> > > movements we can have at once
> > >>> >
> > >>> > Yes.
> > >>> >
> > >>> > > As far as I currently understand it, your proposed algorithm will
> > >>> > naturally
> > >>> > > prioritize leader movement first. e.g if
> > >>> > > reassignment.parallel.replica.count=1 and
> > >>> > >
> > >>> >
> > >>> >
> > >>>
> > reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> > >>> > > we will always move one, the first possible, replica in the replica
> > >>> set
> > >>> > > (which will be the leader if part of the excess replica set (ER)).
> > >>> > > Am I correct in saying that?
> > >>> >
> > >>> > Yes.
> > >>> >
> > >>> > > 1. Does it make sense to add `max` somewhere in the configs' names?
> > >>> >
> > >>> > If you imply that it's not always an exact number (because the last
> > >>> batch
> > >>> > could be smaller) than I think it's a good idea. I don't mind this
> > >>> naming
> > >>> > or having max in it either.
> > >>> >
> > >>> > > 2. How does this KIP play along with KIP-455's notion of multiple
> > >>> > > rebalances - do the configs apply on a
> > >>> > > single AlterPartitionAssignmentsRequest or are they global?
> > >>> >
> > >>> > I think it is intuitive if there are on a global level and the user
> > >>> would
> > >>> > have the option to set KIP-435's settings to Int.MAX and thus
> > >>> eliminate any
> > >>> > batching and submit their own batches through
> > >>> > AlterPartitionAssignmentsRequest. If we applied them on every batch
> > >>> then we
> > >>> > couldn't really guarantee any limits as the user would be able to get
> > >>> > around it with submitting lots of reassignments. By default though we
> > >>> set
> > >>> > the batching limits to Int.MAX so effectively they're disabled.
> > >>> > Also if I understand correctly, AlterPartitionAssignmentsRequest
> > would
> > >>> be a
> > >>> > partition level batching, isn't it? So if you submit 10 partitions at
> > >>> once
> > >>> > then they'll all be started by the controller immediately as per my
> > >>> > understanding.
> > >>> >
> > >>> > > 3. Unless I've missed it, the algorithm does not take into account
> > >>> > > `reassignment.parallel.leader.movements`
> > >>> >
> > >>> > Yea the reassignment step calculation doesn't contain that because it
> > >>> > describes only one partition's reassignment step calculation. We
> > simply
> > >>> > fill our reassignment batch with as many leader movements as we can
> > and
> > >>> > then fill the rest with reassignments which don't require leader
> > >>> movement
> > >>> > if we have space. I'll create a pseudo code block on the KIP for
> > this.
> > >>> >
> > >>> > > 4. The KIP says that the order of the input has some control over
> > >>> how the
> > >>> > > batches are created - i.e it's deterministic. What would the
> > batches
> > >>> of
> > >>> > the
> > >>> > > following reassignment look like:
> > >>> > > reassignment.parallel.replica.count=1
> > >>> > > reassignment.parallel.partition.count=MAX_INT
> > >>> > > reassignment.parallel.leader.movements=1
> > >>> > > partitionA - (0,1,2) -> (3, 4, 5)
> > >>> > > partitionB - (0,1,2) -> (3,4,5)
> > >>> > > partitionC - (0,1,2) -> (3, 4, 5)
> > >>> > > From my understanding, we would start with A(0->3), B(1->4) and
> > >>> C(1->4).
> > >>> > Is
> > >>> > > that correct? Would the second step then continue with B(0->3)?
> > >>> > >
> > >>> > > If the configurations are global, I can imagine we will have a bit
> > >>> more
> > >>> > > trouble in preserving the expected ordering, especially across
> > >>> controller
> > >>> > > failovers -- but I'll avoid speculating until you confirm the scope
> > >>> of
> > >>> > the
> > >>> > > config.
> > >>> >
> > >>> > I've raised the ordering problem on the discussion of KIP-455 in a
> > bit
> > >>> > different form and as I remember the verdict there was that we
> > >>> shouldn't
> > >>> > expose ordering as an API. It might not be easy as you say and there
> > >>> might
> > >>> > be much better strategies to follow (like disk or network utilization
> > >>> > goals). Therefore I'll remove this section from the KIP. (Otherwise
> > >>> yes, I
> > >>> > would have thought of the same behavior what you described.)
> > >>> > Since #3 also was about the pseudo code, it would be something like
> > >>> this:
> > >>> > Config:
> > >>> > reassignment.parallel.replica.count=R
> > >>> > reassignment.parallel.partition.count=P
> > >>> > reassignment.parallel.leader.movements=L
> > >>> > Code:
> > >>> > val batchSize = max(L,P)
> > >>> > // split the individual partition reassignments whether they involve
> > >>> leader
> > >>> > movement or not
> > >>> > val partitionMovements =
> > >>> >
> > >>> >
> > >>>
> > calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment)
> > >>> > // fill the batch with as much leader movements as possible and take
> > >>> the
> > >>> > rest from other reassignments
> > >>> > val currentBatch = if (partitionMovements.leaderMovements.size <
> > >>> batchSize)
> > >>> >   partitionMovements.leaderMovements ++
> > >>> > partitionsToReassign.otherPartitionMovements.take(batchSize -
> > >>> > partitionMovements.leaderMovements.size)
> > >>> > else
> > >>> >  partitionMovements.leaderMovements.take(batchSize)
> > >>> > executeReassignmentOnBatch(currentBatch)
> > >>> >
> > >>> > > 5. Regarding the new behavior of electing the new preferred leader
> > >>> in the
> > >>> > > "first step" of the reassignment - does this obey the
> > >>> > > `auto.leader.rebalance.enable` config?
> > >>> > > If not, I have concerns regarding how backwards compatible this
> > >>> might be
> > >>> > -
> > >>> > > e.g imagine a user does a huge reassignment (as they have always
> > >>> done)
> > >>> > and
> > >>> > > suddenly a huge leader shift happens, whereas the user expected to
> > >>> > manually
> > >>> > > shift preferred leaders at a slower rate via
> > >>> > > the kafka-preferred-replica-election.sh tool.
> > >>> >
> > >>> > I'm not sure I get this scenario. So are you saying that after they
> > >>> > submitted a reassignment they also submit a preferred leader change?
> > >>> > In my mind what I would do is:
> > >>> > i) make auto.leader.rebalance.enable to obey the leader movement
> > limit
> > >>> as
> > >>> > this way it will be easier to calculate the reassignment batches.
> > >>> > ii) the user could submit preferred leader election but it's
> > basically
> > >>> a
> > >>> > form of reassignment so it would fall under the batching criterias.
> > If
> > >>> the
> > >>> > batch they submit is smaller than the internal, then it'd be executed
> > >>> > immediately but otherwise it would be split into more batches. It
> > >>> might be
> > >>> > a different behavior as it may not be executed it in one batch but I
> > >>> think
> > >>> > it isn't a problem because we'll default to Int.MAX with the batches
> > >>> and
> > >>> > otherwise because since it's a form of reassignment I think it makes
> > >>> sense
> > >>> > to apply the same logic.
> > >>> >
> > >>> > > 6. What is the expected behavior if we dynamically change one of
> > the
> > >>> > > configs to a lower value while a reassignment is happening. Would
> > we
> > >>> > cancel
> > >>> > > some of the currently reassigned partitions or would we account for
> > >>> the
> > >>> > new
> > >>> > > values on the next reassignment? I assume the latter but it's good
> > >>> to be
> > >>> > > explicit
> > >>> >
> > >>> > Yep, it would be the latter. I'll make this explicit in the KIP too.
> > >>> >
> > >>> > About the nits:
> > >>> > Noted, will update the KIP to reflect your suggestions :)
> > >>> >
> > >>> > Viktor
> > >>> >
> > >>> > On Fri, Jun 28, 2019 at 5:59 PM Stanislav Kozlovski <
> > >>> > stanislav@confluent.io>
> > >>> > wrote:
> > >>> > >
> > >>> > > Hey there Viktor,
> > >>> > >
> > >>> > > Thanks for working on this KIP! I agree with the notion that
> > >>> reliability,
> > >>> > > stability and predictability of a reassignment should be a core
> > >>> feature
> > >>> > of
> > >>> > > Kafka.
> > >>> > >
> > >>> > > Let me first explicitly confirm my understanding of the configs and
> > >>> the
> > >>> > > algorithm:
> > >>> > > * reassignment.parallel.replica.count - the maximum total number of
> > >>> > > replicas that we can move at once, *per partition*
> > >>> > > * reassignment.parallel.partition.count - the maximum number of
> > >>> > partitions
> > >>> > > we can move at once
> > >>> > > * reassignment.parallel.leader.movements - the maximum number of
> > >>> leader
> > >>> > > movements we can have at once
> > >>> > >
> > >>> > > As far as I currently understand it, your proposed algorithm will
> > >>> > naturally
> > >>> > > prioritize leader movement first. e.g if
> > >>> > > reassignment.parallel.replica.count=1 and
> > >>> > >
> > >>> >
> > >>> >
> > >>>
> > reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> > >>> > > we will always move one, the first possible, replica in the replica
> > >>> set
> > >>> > > (which will be the leader if part of the excess replica set (ER)).
> > >>> > > Am I correct in saying that?
> > >>> > >
> > >>> > > Regarding the KIP, I've got a couple of comments/questions::
> > >>> > >
> > >>> > > 1. Does it make sense to add `max` somewhere in the configs' names?
> > >>> > >
> > >>> > > 2. How does this KIP play along with KIP-455's notion of multiple
> > >>> > > rebalances - do the configs apply on a
> > >>> > > single AlterPartitionAssignmentsRequest or are they global?
> > >>> > >
> > >>> > > 3. Unless I've missed it, the algorithm does not take into account
> > >>> > > `reassignment.parallel.leader.movements`
> > >>> > >
> > >>> > > 4. The KIP says that the order of the input has some control over
> > >>> how the
> > >>> > > batches are created - i.e it's deterministic. What would the
> > batches
> > >>> of
> > >>> > the
> > >>> > > following reassignment look like:
> > >>> > > reassignment.parallel.replica.count=1
> > >>> > > reassignment.parallel.partition.count=MAX_INT
> > >>> > > reassignment.parallel.leader.movements=1
> > >>> > > partitionA - (0,1,2) -> (3, 4, 5)
> > >>> > > partitionB - (0,1,2) -> (3,4,5)
> > >>> > > partitionC - (0,1,2) -> (3, 4, 5)
> > >>> > > From my understanding, we would start with A(0->3), B(1->4) and
> > >>> C(1->4).
> > >>> > Is
> > >>> > > that correct? Would the second step then continue with B(0->3)?
> > >>> > >
> > >>> > > If the configurations are global, I can imagine we will have a bit
> > >>> more
> > >>> > > trouble in preserving the expected ordering, especially across
> > >>> controller
> > >>> > > failovers -- but I'll avoid speculating until you confirm the scope
> > >>> of
> > >>> > the
> > >>> > > config.
> > >>> > >
> > >>> > > 5. Regarding the new behavior of electing the new preferred leader
> > >>> in the
> > >>> > > "first step" of the reassignment - does this obey the
> > >>> > > `auto.leader.rebalance.enable` config?
> > >>> > > If not, I have concerns regarding how backwards compatible this
> > >>> might be
> > >>> > -
> > >>> > > e.g imagine a user does a huge reassignment (as they have always
> > >>> done)
> > >>> > and
> > >>> > > suddenly a huge leader shift happens, whereas the user expected to
> > >>> > manually
> > >>> > > shift preferred leaders at a slower rate via
> > >>> > > the kafka-preferred-replica-election.sh tool.
> > >>> > >
> > >>> > > 6. What is the expected behavior if we dynamically change one of
> > the
> > >>> > > configs to a lower value while a reassignment is happening. Would
> > we
> > >>> > cancel
> > >>> > > some of the currently reassigned partitions or would we account for
> > >>> the
> > >>> > new
> > >>> > > values on the next reassignment? I assume the latter but it's good
> > >>> to be
> > >>> > > explicit
> > >>> > >
> > >>> > > As some small nits:
> > >>> > > - could we have a section in the KIP where we explicitly define
> > what
> > >>> each
> > >>> > > config does? This can be inferred from the KIP as is but requires
> > >>> careful
> > >>> > > reading, whereas some developers might want to skim through the
> > >>> change to
> > >>> > > get a quick sense. It also improves readability but that's my
> > >>> personal
> > >>> > > opinion.
> > >>> > > - could you better clarify how a reassignment step is different
> > from
> > >>> the
> > >>> > > currently existing algorithm? maybe laying both algorithms out in
> > >>> the KIP
> > >>> > > would be most clear
> > >>> > > - the names for the OngoingPartitionReassignment and
> > >>> > > CurrentPartitionReassignment fields in the
> > >>> > > ListPartitionReassignmentsResponse are a bit confusing to me.
> > >>> > > Unfortunately, I don't have a better suggestion, but maybe somebody
> > >>> else
> > >>> > in
> > >>> > > the community has.
> > >>> > >
> > >>> > > Thanks,
> > >>> > > Stanislav
> > >>> > >
> > >>> > > On Thu, Jun 27, 2019 at 3:24 PM Viktor Somogyi-Vass <
> > >>> > viktorsomogyi@gmail.com>
> > >>> > > wrote:
> > >>> > >
> > >>> > > > Hi All,
> > >>> > > >
> > >>> > > > I've renamed my KIP as its name was a bit confusing so we'll
> > >>> continue
> > >>> > it in
> > >>> > > > this thread.
> > >>> > > > The previous thread for record:
> > >>> > > >
> > >>> > > >
> > >>> >
> > >>> >
> > >>>
> > https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E
> > >>> > > >
> > >>> > > > A short summary of the KIP:
> > >>> > > > In case of a vast partition reassignment (thousands of partitions
> > >>> at
> > >>> > once)
> > >>> > > > Kafka can collapse under the increased replication traffic. This
> > >>> KIP
> > >>> > will
> > >>> > > > mitigate it by introducing internal batching done by the
> > >>> controller.
> > >>> > > > Besides putting a bandwidth limit on the replication it is useful
> > >>> to
> > >>> > batch
> > >>> > > > partition movements as fewer number of partitions will use the
> > >>> > available
> > >>> > > > bandwidth for reassignment and they finish faster.
> > >>> > > > The main control handles are:
> > >>> > > > - the number of parallel leader movements,
> > >>> > > > - the number of parallel partition movements
> > >>> > > > - and the number of parallel replica movements.
> > >>> > > >
> > >>> > > > Thank you for the feedback and ideas so far in the previous
> > thread
> > >>> and
> > >>> > I'm
> > >>> > > > happy to receive more.
> > >>> > > >
> > >>> > > > Regards,
> > >>> > > > Viktor
> > >>> > > >
> > >>> > >
> > >>> > >
> > >>> > > --
> > >>> > > Best,
> > >>> > > Stanislav
> > >>> >
> > >>>
> > >>>
> > >>> --
> > >>> Best,
> > >>> Stanislav
> > >>>
> > >>
> >
> 
> 
> -- 
> Best,
> Stanislav
>

Re: [DISCUSS] KIP-435: Internal Partition Reassignment Batching

Posted by Stanislav Kozlovski <st...@confluent.io>.
Hey all,

I spoke offline with Viktor regarding this KIP and intentions on starting a
vote soon, so I made another pass on it. I have some comments:

From our past discussion:

> I reiterated on the current algorithm and indeed it would change the order
of replicas in ZK but wouldn't do a leader election, so one would need to
run the preferred replica election tool. However still in the light of this
I'm not sure I'd keep the existing behavior as users wouldn't win anything
with it.

I think we should mention the change in the KIP at the very least such that
we can discuss it with other reviewers.

> If I understand correctly, you're suggesting that we count the
"reassignment.parallel.leader.movements" config against such preferred
elections. I think that makes sense. If we are to do that we should
explicitly mention that this KIP touches the preferred election logic as
well. Would that config also bound the number of leader movements the auto
rebalance inside the broker would do as well?
> ... It might be easier to limit the scope of this KIP and not place a
bound on preferred leader election.

What are your thoughts on this?

And now comments on the KIP:

1.

   1. > Update CR in Zookeeper
   (/brokers/topics/[topic]/partitions/[partitionId]/state) with TR for the
   given partitions.


I believe the leader for the partition is the "owner" of that znode. Having
the controller update it may complicate things.
Has this been updated with the latest KIP-455? We keep reassignment state
in the /brokers/topics/[topic] znode

2.

How does reassignment cancellation work with these new incremental steps?
In your example, what happens if I cancel the current reassignment while
the replica set is (5, 6, 2, 3, 4)? Where and how would we keep the state
to revert back to (0, 1, 2, 3, 4)?


3.

> The only exception is the first step where (if needed) we add that many
replicas that is enough to fulfil the min.insync.replicas requirement set
on the broker, even if it exceeds the limit on parallel replica
reassignments. For instance if there are 4 brokers, min.insync.replicas set
to 3 but there are only 1 in-sync replica, then we immediately add 2 other
in one step, so the producers are able to continue.

Wouldn't that be adding extra replication traffic and pressure on an
already problematic partition? I may be missing something but I actually
think that this wouldn't help in common circumstances.

4.

> Calculate the replicas to be dropped (DR):

   1. Calculate n = max(reassignment.parallel.replica.count, size(FTR) -
   size(CR))


Do you mean min() here?

5.

   1.
      1. Take the first *reassignment.parallel.replica.count* replicas of
      ER, that will be the set of dropped replicas


Do you mean `N replicas of ER` here?

6.

> Calculate the new replicas (NR) to be added

   1. Calculate that if the partition has enough online replicas to fulfil
   the min.insync.replicas config so the producers are able to continue.
   2. If the preferred leader is different in FTR and it is not yet
   reassigned, then add it to the NR
   3. If size(NR) < min.insync.replicas then take min(min.insync.replicas,
   reassignment.parallel.replica.count) - size(NR) replicas from FTR
   4. Otherwise take as many replica as
   *reassignment.parallel.replica.count* allows


Not sure I understand this. Wouldn't we always take as many new replicas
(NR) as the ones we are dropping (DR)?

7.

This new configuration would tell how many replicas of a single partition
can be moved at once.

I believe this description is outdated

8.

I don't see it as a rejected alternative - have we considered delegating
this logic out to the client and making Kafka accept a series of
reassignment "steps" to run?


Thanks!
Stanislav

On Wed, Aug 21, 2019 at 12:33 PM Viktor Somogyi-Vass <
viktorsomogyi@gmail.com> wrote:

> Hey Folks,
>
> I think I'll open a vote early next week about this if there are no more
> feedback.
>
> Thanks,
> Viktor
>
> On Fri, Aug 9, 2019 at 5:25 PM Viktor Somogyi-Vass <
> viktorsomogyi@gmail.com>
> wrote:
>
> > Hey Stanislav,
> >
> > I reiterated on the current algorithm and indeed it would change the
> order
> > of replicas in ZK but wouldn't do a leader election, so one would need to
> > run the preferred replica election tool. However still in the light of
> this
> > I'm not sure I'd keep the existing behavior as users wouldn't win
> anything
> > with it. Changing leadership automatically would result in a simpler,
> > easier and more responsive reassign algorithm which is especially
> important
> > if the reassignment is done to free up the broker from under heavy load.
> > Automated tools (Cruise Control) would also have simpler life.
> > Let me know what you think.
> >
> > Viktor
> >
> > On Thu, Jul 11, 2019 at 7:16 PM Viktor Somogyi-Vass <
> > viktorsomogyi@gmail.com> wrote:
> >
> >> Hi Stan,
> >>
> >> I meant the following (maybe rare) scenario - we have replicas [1, 2, 3]
> >> on
> >> a lot of partitions and the user runs a massive rebalance to change them
> >> all to [3, 2, 1]. In the old behavior, I think that this would not do
> >> anything but simply change the replica set in ZK.
> >> Then, the user could run kafka-preferred-replica-election.sh on a given
> >> set
> >> of partitions to make sure the new leader 3 gets elected.
> >>
> >> I thought the old algorithm would elect 3 as the leader in this case
> >> right away at the end but I have to double check this. In any case I
> think
> >> it would make sense in the new algorithm if we elected the new preferred
> >> leader right away, regardless of the new leader is chosen from the
> existing
> >> replicas or not. If the whole reassignment is in fact just changing the
> >> replica order then either way it is a simple (trivial) operation and
> doing
> >> it batched wouldn't slow it down much as there is no data movement
> >> involved. If the reassignment is mixed, meaning it contains reordering
> and
> >> real movement as well then in fact it would help to even out the load
> >> faster as data movements can get long. For instance in case of a
> >> reassignment batch of two partitions concurrently: P1: (1,2,3) ->
> (3,2,1)
> >> and P2: (4,5,6) -> (7,8,9) the P2 reassignment would elect a new leader
> but
> >> P1 wouldn't and it wouldn't help the goal of normalizing traffic on
> broker
> >> 1 that much.
> >> Again, I'll have to check how the current algorithm works and if it has
> >> any unknown drawbacks to implement what I sketched up above.
> >>
> >> As for generic preferred leader election when called from the admin api
> >> or the auto leader balance feature I think you're right that we should
> >> leave it as it is. It doesn't involve any data movement so it's fairly
> fast
> >> and it normalizes the cluster state quickly.
> >>
> >> Viktor
> >>
> >> On Tue, Jul 9, 2019 at 9:04 PM Stanislav Kozlovski <
> >> stanislav@confluent.io> wrote:
> >>
> >>> Hey Viktor,
> >>>
> >>>  I think it is intuitive if there are on a global level...If we applied
> >>> > them on every batch then we
> >>> > couldn't really guarantee any limits as the user would be able to get
> >>> > around it with submitting lots of reassignments.
> >>>
> >>>
> >>> Agreed. Could we mention this explicitly in the KIP?
> >>>
> >>> Also if I understand correctly, AlterPartitionAssignmentsRequest would
> >>> be a
> >>> > partition level batching, isn't it? So if you submit 10 partitions at
> >>> once
> >>> > then they'll all be started by the controller immediately as per my
> >>> > understanding.
> >>>
> >>>
> >>> Yep, absolutely
> >>>
> >>> I've raised the ordering problem on the discussion of KIP-455 in a bit
> >>> > different form and as I remember the verdict there was that we
> >>> shouldn't
> >>> > expose ordering as an API. It might not be easy as you say and there
> >>> might
> >>> > be much better strategies to follow (like disk or network utilization
> >>> > goals). Therefore I'll remove this section from the KIP.
> >>>
> >>>
> >>> Sounds good to me.
> >>>
> >>> I'm not sure I get this scenario. So are you saying that after they
> >>> > submitted a reassignment they also submit a preferred leader change?
> >>> > In my mind what I would do is:
> >>> > i) make auto.leader.rebalance.enable to obey the leader movement
> limit
> >>> as
> >>> > this way it will be easier to calculate the reassignment batches.
> >>> >
> >>>
> >>> Sorry, this is my fault -- I should have been more clear.
> >>> First, I didn't think through this well enough at the time, I don't
> >>> think.
> >>> If we have replicas=[1, 2, 3] and we reassign them to [4, 5, 6], it is
> >>> obvious that a leader shift will happen. Your KIP proposes we shift
> >>> replicas 1 and 4 first.
> >>>
> >>> I meant the following (maybe rare) scenario - we have replicas [1, 2,
> 3]
> >>> on
> >>> a lot of partitions and the user runs a massive rebalance to change
> them
> >>> all to [3, 2, 1]. In the old behavior, I think that this would not do
> >>> anything but simply change the replica set in ZK.
> >>> Then, the user could run kafka-preferred-replica-election.sh on a given
> >>> set
> >>> of partitions to make sure the new leader 3 gets elected.
> >>>
> >>> ii) the user could submit preferred leader election but it's basically
> a
> >>> > form of reassignment so it would fall under the batching criterias.
> If
> >>> the
> >>> > batch they submit is smaller than the internal, then it'd be executed
> >>> > immediately but otherwise it would be split into more batches. It
> >>> might be
> >>> > a different behavior as it may not be executed it in one batch but I
> >>> think
> >>> > it isn't a problem because we'll default to Int.MAX with the batches
> >>> and
> >>> > otherwise because since it's a form of reassignment I think it makes
> >>> sense
> >>> > to apply the same logic.
> >>>
> >>>
> >>> The AdminAPI for that is
> >>> "electPreferredLeaders(Collection<TopicPartition>
> >>> partitions)" and the old zNode is "{"partitions": [{"topic": "a",
> >>> "partition": 0}]}" so it is a bit less explicit than our other
> >>> reassignment
> >>> API, but the functionality is the same.
> >>> You're 100% right that it is a form of reassignment, but I hadn't
> thought
> >>> of it like that and I some other people wouldn't have either.
> >>> If I understand correctly, you're suggesting that we count the
> >>> "reassignment.parallel.leader.movements" config against such preferred
> >>> elections. I think that makes sense. If we are to do that we should
> >>> explicitly mention that this KIP touches the preferred election logic
> as
> >>> well. Would that config also bound the number of leader movements the
> >>> auto
> >>> rebalance inside the broker would do as well?
> >>>
> >>> Then again, the "reassignment.parallel.leader.movements" config has a
> bit
> >>> of a different meaning when used in the context of a real reassignment
> >>> (where data moves from the leader to N more replicas) versus in the
> >>> preferred election switch (where all we need is two lightweight
> >>> LeaderAndIsr requests), so I am not entirely convinced we may want to
> use
> >>> the same config for that.
> >>> It might be easier to limit the scope of this KIP and not place a bound
> >>> on
> >>> preferred leader election.
> >>>
> >>> Thanks,
> >>> Stanislav
> >>>
> >>>
> >>> On Mon, Jul 8, 2019 at 4:28 PM Viktor Somogyi-Vass <
> >>> viktorsomogyi@gmail.com>
> >>> wrote:
> >>>
> >>> > Hey Stanislav,
> >>> >
> >>> > Thanks for the thorough look at the KIP! :)
> >>> >
> >>> > > Let me first explicitly confirm my understanding of the configs and
> >>> the
> >>> > > algorithm:
> >>> > > * reassignment.parallel.replica.count - the maximum total number of
> >>> > > replicas that we can move at once, *per partition*
> >>> > > * reassignment.parallel.partition.count - the maximum number of
> >>> > partitions
> >>> > > we can move at once
> >>> > > * reassignment.parallel.leader.movements - the maximum number of
> >>> leader
> >>> > > movements we can have at once
> >>> >
> >>> > Yes.
> >>> >
> >>> > > As far as I currently understand it, your proposed algorithm will
> >>> > naturally
> >>> > > prioritize leader movement first. e.g if
> >>> > > reassignment.parallel.replica.count=1 and
> >>> > >
> >>> >
> >>> >
> >>>
> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> >>> > > we will always move one, the first possible, replica in the replica
> >>> set
> >>> > > (which will be the leader if part of the excess replica set (ER)).
> >>> > > Am I correct in saying that?
> >>> >
> >>> > Yes.
> >>> >
> >>> > > 1. Does it make sense to add `max` somewhere in the configs' names?
> >>> >
> >>> > If you imply that it's not always an exact number (because the last
> >>> batch
> >>> > could be smaller) than I think it's a good idea. I don't mind this
> >>> naming
> >>> > or having max in it either.
> >>> >
> >>> > > 2. How does this KIP play along with KIP-455's notion of multiple
> >>> > > rebalances - do the configs apply on a
> >>> > > single AlterPartitionAssignmentsRequest or are they global?
> >>> >
> >>> > I think it is intuitive if there are on a global level and the user
> >>> would
> >>> > have the option to set KIP-435's settings to Int.MAX and thus
> >>> eliminate any
> >>> > batching and submit their own batches through
> >>> > AlterPartitionAssignmentsRequest. If we applied them on every batch
> >>> then we
> >>> > couldn't really guarantee any limits as the user would be able to get
> >>> > around it with submitting lots of reassignments. By default though we
> >>> set
> >>> > the batching limits to Int.MAX so effectively they're disabled.
> >>> > Also if I understand correctly, AlterPartitionAssignmentsRequest
> would
> >>> be a
> >>> > partition level batching, isn't it? So if you submit 10 partitions at
> >>> once
> >>> > then they'll all be started by the controller immediately as per my
> >>> > understanding.
> >>> >
> >>> > > 3. Unless I've missed it, the algorithm does not take into account
> >>> > > `reassignment.parallel.leader.movements`
> >>> >
> >>> > Yea the reassignment step calculation doesn't contain that because it
> >>> > describes only one partition's reassignment step calculation. We
> simply
> >>> > fill our reassignment batch with as many leader movements as we can
> and
> >>> > then fill the rest with reassignments which don't require leader
> >>> movement
> >>> > if we have space. I'll create a pseudo code block on the KIP for
> this.
> >>> >
> >>> > > 4. The KIP says that the order of the input has some control over
> >>> how the
> >>> > > batches are created - i.e it's deterministic. What would the
> batches
> >>> of
> >>> > the
> >>> > > following reassignment look like:
> >>> > > reassignment.parallel.replica.count=1
> >>> > > reassignment.parallel.partition.count=MAX_INT
> >>> > > reassignment.parallel.leader.movements=1
> >>> > > partitionA - (0,1,2) -> (3, 4, 5)
> >>> > > partitionB - (0,1,2) -> (3,4,5)
> >>> > > partitionC - (0,1,2) -> (3, 4, 5)
> >>> > > From my understanding, we would start with A(0->3), B(1->4) and
> >>> C(1->4).
> >>> > Is
> >>> > > that correct? Would the second step then continue with B(0->3)?
> >>> > >
> >>> > > If the configurations are global, I can imagine we will have a bit
> >>> more
> >>> > > trouble in preserving the expected ordering, especially across
> >>> controller
> >>> > > failovers -- but I'll avoid speculating until you confirm the scope
> >>> of
> >>> > the
> >>> > > config.
> >>> >
> >>> > I've raised the ordering problem on the discussion of KIP-455 in a
> bit
> >>> > different form and as I remember the verdict there was that we
> >>> shouldn't
> >>> > expose ordering as an API. It might not be easy as you say and there
> >>> might
> >>> > be much better strategies to follow (like disk or network utilization
> >>> > goals). Therefore I'll remove this section from the KIP. (Otherwise
> >>> yes, I
> >>> > would have thought of the same behavior what you described.)
> >>> > Since #3 also was about the pseudo code, it would be something like
> >>> this:
> >>> > Config:
> >>> > reassignment.parallel.replica.count=R
> >>> > reassignment.parallel.partition.count=P
> >>> > reassignment.parallel.leader.movements=L
> >>> > Code:
> >>> > val batchSize = max(L,P)
> >>> > // split the individual partition reassignments whether they involve
> >>> leader
> >>> > movement or not
> >>> > val partitionMovements =
> >>> >
> >>> >
> >>>
> calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment)
> >>> > // fill the batch with as much leader movements as possible and take
> >>> the
> >>> > rest from other reassignments
> >>> > val currentBatch = if (partitionMovements.leaderMovements.size <
> >>> batchSize)
> >>> >   partitionMovements.leaderMovements ++
> >>> > partitionsToReassign.otherPartitionMovements.take(batchSize -
> >>> > partitionMovements.leaderMovements.size)
> >>> > else
> >>> >  partitionMovements.leaderMovements.take(batchSize)
> >>> > executeReassignmentOnBatch(currentBatch)
> >>> >
> >>> > > 5. Regarding the new behavior of electing the new preferred leader
> >>> in the
> >>> > > "first step" of the reassignment - does this obey the
> >>> > > `auto.leader.rebalance.enable` config?
> >>> > > If not, I have concerns regarding how backwards compatible this
> >>> might be
> >>> > -
> >>> > > e.g imagine a user does a huge reassignment (as they have always
> >>> done)
> >>> > and
> >>> > > suddenly a huge leader shift happens, whereas the user expected to
> >>> > manually
> >>> > > shift preferred leaders at a slower rate via
> >>> > > the kafka-preferred-replica-election.sh tool.
> >>> >
> >>> > I'm not sure I get this scenario. So are you saying that after they
> >>> > submitted a reassignment they also submit a preferred leader change?
> >>> > In my mind what I would do is:
> >>> > i) make auto.leader.rebalance.enable to obey the leader movement
> limit
> >>> as
> >>> > this way it will be easier to calculate the reassignment batches.
> >>> > ii) the user could submit preferred leader election but it's
> basically
> >>> a
> >>> > form of reassignment so it would fall under the batching criterias.
> If
> >>> the
> >>> > batch they submit is smaller than the internal, then it'd be executed
> >>> > immediately but otherwise it would be split into more batches. It
> >>> might be
> >>> > a different behavior as it may not be executed it in one batch but I
> >>> think
> >>> > it isn't a problem because we'll default to Int.MAX with the batches
> >>> and
> >>> > otherwise because since it's a form of reassignment I think it makes
> >>> sense
> >>> > to apply the same logic.
> >>> >
> >>> > > 6. What is the expected behavior if we dynamically change one of
> the
> >>> > > configs to a lower value while a reassignment is happening. Would
> we
> >>> > cancel
> >>> > > some of the currently reassigned partitions or would we account for
> >>> the
> >>> > new
> >>> > > values on the next reassignment? I assume the latter but it's good
> >>> to be
> >>> > > explicit
> >>> >
> >>> > Yep, it would be the latter. I'll make this explicit in the KIP too.
> >>> >
> >>> > About the nits:
> >>> > Noted, will update the KIP to reflect your suggestions :)
> >>> >
> >>> > Viktor
> >>> >
> >>> > On Fri, Jun 28, 2019 at 5:59 PM Stanislav Kozlovski <
> >>> > stanislav@confluent.io>
> >>> > wrote:
> >>> > >
> >>> > > Hey there Viktor,
> >>> > >
> >>> > > Thanks for working on this KIP! I agree with the notion that
> >>> reliability,
> >>> > > stability and predictability of a reassignment should be a core
> >>> feature
> >>> > of
> >>> > > Kafka.
> >>> > >
> >>> > > Let me first explicitly confirm my understanding of the configs and
> >>> the
> >>> > > algorithm:
> >>> > > * reassignment.parallel.replica.count - the maximum total number of
> >>> > > replicas that we can move at once, *per partition*
> >>> > > * reassignment.parallel.partition.count - the maximum number of
> >>> > partitions
> >>> > > we can move at once
> >>> > > * reassignment.parallel.leader.movements - the maximum number of
> >>> leader
> >>> > > movements we can have at once
> >>> > >
> >>> > > As far as I currently understand it, your proposed algorithm will
> >>> > naturally
> >>> > > prioritize leader movement first. e.g if
> >>> > > reassignment.parallel.replica.count=1 and
> >>> > >
> >>> >
> >>> >
> >>>
> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> >>> > > we will always move one, the first possible, replica in the replica
> >>> set
> >>> > > (which will be the leader if part of the excess replica set (ER)).
> >>> > > Am I correct in saying that?
> >>> > >
> >>> > > Regarding the KIP, I've got a couple of comments/questions::
> >>> > >
> >>> > > 1. Does it make sense to add `max` somewhere in the configs' names?
> >>> > >
> >>> > > 2. How does this KIP play along with KIP-455's notion of multiple
> >>> > > rebalances - do the configs apply on a
> >>> > > single AlterPartitionAssignmentsRequest or are they global?
> >>> > >
> >>> > > 3. Unless I've missed it, the algorithm does not take into account
> >>> > > `reassignment.parallel.leader.movements`
> >>> > >
> >>> > > 4. The KIP says that the order of the input has some control over
> >>> how the
> >>> > > batches are created - i.e it's deterministic. What would the
> batches
> >>> of
> >>> > the
> >>> > > following reassignment look like:
> >>> > > reassignment.parallel.replica.count=1
> >>> > > reassignment.parallel.partition.count=MAX_INT
> >>> > > reassignment.parallel.leader.movements=1
> >>> > > partitionA - (0,1,2) -> (3, 4, 5)
> >>> > > partitionB - (0,1,2) -> (3,4,5)
> >>> > > partitionC - (0,1,2) -> (3, 4, 5)
> >>> > > From my understanding, we would start with A(0->3), B(1->4) and
> >>> C(1->4).
> >>> > Is
> >>> > > that correct? Would the second step then continue with B(0->3)?
> >>> > >
> >>> > > If the configurations are global, I can imagine we will have a bit
> >>> more
> >>> > > trouble in preserving the expected ordering, especially across
> >>> controller
> >>> > > failovers -- but I'll avoid speculating until you confirm the scope
> >>> of
> >>> > the
> >>> > > config.
> >>> > >
> >>> > > 5. Regarding the new behavior of electing the new preferred leader
> >>> in the
> >>> > > "first step" of the reassignment - does this obey the
> >>> > > `auto.leader.rebalance.enable` config?
> >>> > > If not, I have concerns regarding how backwards compatible this
> >>> might be
> >>> > -
> >>> > > e.g imagine a user does a huge reassignment (as they have always
> >>> done)
> >>> > and
> >>> > > suddenly a huge leader shift happens, whereas the user expected to
> >>> > manually
> >>> > > shift preferred leaders at a slower rate via
> >>> > > the kafka-preferred-replica-election.sh tool.
> >>> > >
> >>> > > 6. What is the expected behavior if we dynamically change one of
> the
> >>> > > configs to a lower value while a reassignment is happening. Would
> we
> >>> > cancel
> >>> > > some of the currently reassigned partitions or would we account for
> >>> the
> >>> > new
> >>> > > values on the next reassignment? I assume the latter but it's good
> >>> to be
> >>> > > explicit
> >>> > >
> >>> > > As some small nits:
> >>> > > - could we have a section in the KIP where we explicitly define
> what
> >>> each
> >>> > > config does? This can be inferred from the KIP as is but requires
> >>> careful
> >>> > > reading, whereas some developers might want to skim through the
> >>> change to
> >>> > > get a quick sense. It also improves readability but that's my
> >>> personal
> >>> > > opinion.
> >>> > > - could you better clarify how a reassignment step is different
> from
> >>> the
> >>> > > currently existing algorithm? maybe laying both algorithms out in
> >>> the KIP
> >>> > > would be most clear
> >>> > > - the names for the OngoingPartitionReassignment and
> >>> > > CurrentPartitionReassignment fields in the
> >>> > > ListPartitionReassignmentsResponse are a bit confusing to me.
> >>> > > Unfortunately, I don't have a better suggestion, but maybe somebody
> >>> else
> >>> > in
> >>> > > the community has.
> >>> > >
> >>> > > Thanks,
> >>> > > Stanislav
> >>> > >
> >>> > > On Thu, Jun 27, 2019 at 3:24 PM Viktor Somogyi-Vass <
> >>> > viktorsomogyi@gmail.com>
> >>> > > wrote:
> >>> > >
> >>> > > > Hi All,
> >>> > > >
> >>> > > > I've renamed my KIP as its name was a bit confusing so we'll
> >>> continue
> >>> > it in
> >>> > > > this thread.
> >>> > > > The previous thread for record:
> >>> > > >
> >>> > > >
> >>> >
> >>> >
> >>>
> https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E
> >>> > > >
> >>> > > > A short summary of the KIP:
> >>> > > > In case of a vast partition reassignment (thousands of partitions
> >>> at
> >>> > once)
> >>> > > > Kafka can collapse under the increased replication traffic. This
> >>> KIP
> >>> > will
> >>> > > > mitigate it by introducing internal batching done by the
> >>> controller.
> >>> > > > Besides putting a bandwidth limit on the replication it is useful
> >>> to
> >>> > batch
> >>> > > > partition movements as fewer number of partitions will use the
> >>> > available
> >>> > > > bandwidth for reassignment and they finish faster.
> >>> > > > The main control handles are:
> >>> > > > - the number of parallel leader movements,
> >>> > > > - the number of parallel partition movements
> >>> > > > - and the number of parallel replica movements.
> >>> > > >
> >>> > > > Thank you for the feedback and ideas so far in the previous
> thread
> >>> and
> >>> > I'm
> >>> > > > happy to receive more.
> >>> > > >
> >>> > > > Regards,
> >>> > > > Viktor
> >>> > > >
> >>> > >
> >>> > >
> >>> > > --
> >>> > > Best,
> >>> > > Stanislav
> >>> >
> >>>
> >>>
> >>> --
> >>> Best,
> >>> Stanislav
> >>>
> >>
>


-- 
Best,
Stanislav

Re: [DISCUSS] KIP-435: Internal Partition Reassignment Batching

Posted by Viktor Somogyi-Vass <vi...@gmail.com>.
Hey Folks,

I think I'll open a vote early next week about this if there are no more
feedback.

Thanks,
Viktor

On Fri, Aug 9, 2019 at 5:25 PM Viktor Somogyi-Vass <vi...@gmail.com>
wrote:

> Hey Stanislav,
>
> I reiterated on the current algorithm and indeed it would change the order
> of replicas in ZK but wouldn't do a leader election, so one would need to
> run the preferred replica election tool. However still in the light of this
> I'm not sure I'd keep the existing behavior as users wouldn't win anything
> with it. Changing leadership automatically would result in a simpler,
> easier and more responsive reassign algorithm which is especially important
> if the reassignment is done to free up the broker from under heavy load.
> Automated tools (Cruise Control) would also have simpler life.
> Let me know what you think.
>
> Viktor
>
> On Thu, Jul 11, 2019 at 7:16 PM Viktor Somogyi-Vass <
> viktorsomogyi@gmail.com> wrote:
>
>> Hi Stan,
>>
>> I meant the following (maybe rare) scenario - we have replicas [1, 2, 3]
>> on
>> a lot of partitions and the user runs a massive rebalance to change them
>> all to [3, 2, 1]. In the old behavior, I think that this would not do
>> anything but simply change the replica set in ZK.
>> Then, the user could run kafka-preferred-replica-election.sh on a given
>> set
>> of partitions to make sure the new leader 3 gets elected.
>>
>> I thought the old algorithm would elect 3 as the leader in this case
>> right away at the end but I have to double check this. In any case I think
>> it would make sense in the new algorithm if we elected the new preferred
>> leader right away, regardless of the new leader is chosen from the existing
>> replicas or not. If the whole reassignment is in fact just changing the
>> replica order then either way it is a simple (trivial) operation and doing
>> it batched wouldn't slow it down much as there is no data movement
>> involved. If the reassignment is mixed, meaning it contains reordering and
>> real movement as well then in fact it would help to even out the load
>> faster as data movements can get long. For instance in case of a
>> reassignment batch of two partitions concurrently: P1: (1,2,3) -> (3,2,1)
>> and P2: (4,5,6) -> (7,8,9) the P2 reassignment would elect a new leader but
>> P1 wouldn't and it wouldn't help the goal of normalizing traffic on broker
>> 1 that much.
>> Again, I'll have to check how the current algorithm works and if it has
>> any unknown drawbacks to implement what I sketched up above.
>>
>> As for generic preferred leader election when called from the admin api
>> or the auto leader balance feature I think you're right that we should
>> leave it as it is. It doesn't involve any data movement so it's fairly fast
>> and it normalizes the cluster state quickly.
>>
>> Viktor
>>
>> On Tue, Jul 9, 2019 at 9:04 PM Stanislav Kozlovski <
>> stanislav@confluent.io> wrote:
>>
>>> Hey Viktor,
>>>
>>>  I think it is intuitive if there are on a global level...If we applied
>>> > them on every batch then we
>>> > couldn't really guarantee any limits as the user would be able to get
>>> > around it with submitting lots of reassignments.
>>>
>>>
>>> Agreed. Could we mention this explicitly in the KIP?
>>>
>>> Also if I understand correctly, AlterPartitionAssignmentsRequest would
>>> be a
>>> > partition level batching, isn't it? So if you submit 10 partitions at
>>> once
>>> > then they'll all be started by the controller immediately as per my
>>> > understanding.
>>>
>>>
>>> Yep, absolutely
>>>
>>> I've raised the ordering problem on the discussion of KIP-455 in a bit
>>> > different form and as I remember the verdict there was that we
>>> shouldn't
>>> > expose ordering as an API. It might not be easy as you say and there
>>> might
>>> > be much better strategies to follow (like disk or network utilization
>>> > goals). Therefore I'll remove this section from the KIP.
>>>
>>>
>>> Sounds good to me.
>>>
>>> I'm not sure I get this scenario. So are you saying that after they
>>> > submitted a reassignment they also submit a preferred leader change?
>>> > In my mind what I would do is:
>>> > i) make auto.leader.rebalance.enable to obey the leader movement limit
>>> as
>>> > this way it will be easier to calculate the reassignment batches.
>>> >
>>>
>>> Sorry, this is my fault -- I should have been more clear.
>>> First, I didn't think through this well enough at the time, I don't
>>> think.
>>> If we have replicas=[1, 2, 3] and we reassign them to [4, 5, 6], it is
>>> obvious that a leader shift will happen. Your KIP proposes we shift
>>> replicas 1 and 4 first.
>>>
>>> I meant the following (maybe rare) scenario - we have replicas [1, 2, 3]
>>> on
>>> a lot of partitions and the user runs a massive rebalance to change them
>>> all to [3, 2, 1]. In the old behavior, I think that this would not do
>>> anything but simply change the replica set in ZK.
>>> Then, the user could run kafka-preferred-replica-election.sh on a given
>>> set
>>> of partitions to make sure the new leader 3 gets elected.
>>>
>>> ii) the user could submit preferred leader election but it's basically a
>>> > form of reassignment so it would fall under the batching criterias. If
>>> the
>>> > batch they submit is smaller than the internal, then it'd be executed
>>> > immediately but otherwise it would be split into more batches. It
>>> might be
>>> > a different behavior as it may not be executed it in one batch but I
>>> think
>>> > it isn't a problem because we'll default to Int.MAX with the batches
>>> and
>>> > otherwise because since it's a form of reassignment I think it makes
>>> sense
>>> > to apply the same logic.
>>>
>>>
>>> The AdminAPI for that is
>>> "electPreferredLeaders(Collection<TopicPartition>
>>> partitions)" and the old zNode is "{"partitions": [{"topic": "a",
>>> "partition": 0}]}" so it is a bit less explicit than our other
>>> reassignment
>>> API, but the functionality is the same.
>>> You're 100% right that it is a form of reassignment, but I hadn't thought
>>> of it like that and I some other people wouldn't have either.
>>> If I understand correctly, you're suggesting that we count the
>>> "reassignment.parallel.leader.movements" config against such preferred
>>> elections. I think that makes sense. If we are to do that we should
>>> explicitly mention that this KIP touches the preferred election logic as
>>> well. Would that config also bound the number of leader movements the
>>> auto
>>> rebalance inside the broker would do as well?
>>>
>>> Then again, the "reassignment.parallel.leader.movements" config has a bit
>>> of a different meaning when used in the context of a real reassignment
>>> (where data moves from the leader to N more replicas) versus in the
>>> preferred election switch (where all we need is two lightweight
>>> LeaderAndIsr requests), so I am not entirely convinced we may want to use
>>> the same config for that.
>>> It might be easier to limit the scope of this KIP and not place a bound
>>> on
>>> preferred leader election.
>>>
>>> Thanks,
>>> Stanislav
>>>
>>>
>>> On Mon, Jul 8, 2019 at 4:28 PM Viktor Somogyi-Vass <
>>> viktorsomogyi@gmail.com>
>>> wrote:
>>>
>>> > Hey Stanislav,
>>> >
>>> > Thanks for the thorough look at the KIP! :)
>>> >
>>> > > Let me first explicitly confirm my understanding of the configs and
>>> the
>>> > > algorithm:
>>> > > * reassignment.parallel.replica.count - the maximum total number of
>>> > > replicas that we can move at once, *per partition*
>>> > > * reassignment.parallel.partition.count - the maximum number of
>>> > partitions
>>> > > we can move at once
>>> > > * reassignment.parallel.leader.movements - the maximum number of
>>> leader
>>> > > movements we can have at once
>>> >
>>> > Yes.
>>> >
>>> > > As far as I currently understand it, your proposed algorithm will
>>> > naturally
>>> > > prioritize leader movement first. e.g if
>>> > > reassignment.parallel.replica.count=1 and
>>> > >
>>> >
>>> >
>>> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
>>> > > we will always move one, the first possible, replica in the replica
>>> set
>>> > > (which will be the leader if part of the excess replica set (ER)).
>>> > > Am I correct in saying that?
>>> >
>>> > Yes.
>>> >
>>> > > 1. Does it make sense to add `max` somewhere in the configs' names?
>>> >
>>> > If you imply that it's not always an exact number (because the last
>>> batch
>>> > could be smaller) than I think it's a good idea. I don't mind this
>>> naming
>>> > or having max in it either.
>>> >
>>> > > 2. How does this KIP play along with KIP-455's notion of multiple
>>> > > rebalances - do the configs apply on a
>>> > > single AlterPartitionAssignmentsRequest or are they global?
>>> >
>>> > I think it is intuitive if there are on a global level and the user
>>> would
>>> > have the option to set KIP-435's settings to Int.MAX and thus
>>> eliminate any
>>> > batching and submit their own batches through
>>> > AlterPartitionAssignmentsRequest. If we applied them on every batch
>>> then we
>>> > couldn't really guarantee any limits as the user would be able to get
>>> > around it with submitting lots of reassignments. By default though we
>>> set
>>> > the batching limits to Int.MAX so effectively they're disabled.
>>> > Also if I understand correctly, AlterPartitionAssignmentsRequest would
>>> be a
>>> > partition level batching, isn't it? So if you submit 10 partitions at
>>> once
>>> > then they'll all be started by the controller immediately as per my
>>> > understanding.
>>> >
>>> > > 3. Unless I've missed it, the algorithm does not take into account
>>> > > `reassignment.parallel.leader.movements`
>>> >
>>> > Yea the reassignment step calculation doesn't contain that because it
>>> > describes only one partition's reassignment step calculation. We simply
>>> > fill our reassignment batch with as many leader movements as we can and
>>> > then fill the rest with reassignments which don't require leader
>>> movement
>>> > if we have space. I'll create a pseudo code block on the KIP for this.
>>> >
>>> > > 4. The KIP says that the order of the input has some control over
>>> how the
>>> > > batches are created - i.e it's deterministic. What would the batches
>>> of
>>> > the
>>> > > following reassignment look like:
>>> > > reassignment.parallel.replica.count=1
>>> > > reassignment.parallel.partition.count=MAX_INT
>>> > > reassignment.parallel.leader.movements=1
>>> > > partitionA - (0,1,2) -> (3, 4, 5)
>>> > > partitionB - (0,1,2) -> (3,4,5)
>>> > > partitionC - (0,1,2) -> (3, 4, 5)
>>> > > From my understanding, we would start with A(0->3), B(1->4) and
>>> C(1->4).
>>> > Is
>>> > > that correct? Would the second step then continue with B(0->3)?
>>> > >
>>> > > If the configurations are global, I can imagine we will have a bit
>>> more
>>> > > trouble in preserving the expected ordering, especially across
>>> controller
>>> > > failovers -- but I'll avoid speculating until you confirm the scope
>>> of
>>> > the
>>> > > config.
>>> >
>>> > I've raised the ordering problem on the discussion of KIP-455 in a bit
>>> > different form and as I remember the verdict there was that we
>>> shouldn't
>>> > expose ordering as an API. It might not be easy as you say and there
>>> might
>>> > be much better strategies to follow (like disk or network utilization
>>> > goals). Therefore I'll remove this section from the KIP. (Otherwise
>>> yes, I
>>> > would have thought of the same behavior what you described.)
>>> > Since #3 also was about the pseudo code, it would be something like
>>> this:
>>> > Config:
>>> > reassignment.parallel.replica.count=R
>>> > reassignment.parallel.partition.count=P
>>> > reassignment.parallel.leader.movements=L
>>> > Code:
>>> > val batchSize = max(L,P)
>>> > // split the individual partition reassignments whether they involve
>>> leader
>>> > movement or not
>>> > val partitionMovements =
>>> >
>>> >
>>> calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment)
>>> > // fill the batch with as much leader movements as possible and take
>>> the
>>> > rest from other reassignments
>>> > val currentBatch = if (partitionMovements.leaderMovements.size <
>>> batchSize)
>>> >   partitionMovements.leaderMovements ++
>>> > partitionsToReassign.otherPartitionMovements.take(batchSize -
>>> > partitionMovements.leaderMovements.size)
>>> > else
>>> >  partitionMovements.leaderMovements.take(batchSize)
>>> > executeReassignmentOnBatch(currentBatch)
>>> >
>>> > > 5. Regarding the new behavior of electing the new preferred leader
>>> in the
>>> > > "first step" of the reassignment - does this obey the
>>> > > `auto.leader.rebalance.enable` config?
>>> > > If not, I have concerns regarding how backwards compatible this
>>> might be
>>> > -
>>> > > e.g imagine a user does a huge reassignment (as they have always
>>> done)
>>> > and
>>> > > suddenly a huge leader shift happens, whereas the user expected to
>>> > manually
>>> > > shift preferred leaders at a slower rate via
>>> > > the kafka-preferred-replica-election.sh tool.
>>> >
>>> > I'm not sure I get this scenario. So are you saying that after they
>>> > submitted a reassignment they also submit a preferred leader change?
>>> > In my mind what I would do is:
>>> > i) make auto.leader.rebalance.enable to obey the leader movement limit
>>> as
>>> > this way it will be easier to calculate the reassignment batches.
>>> > ii) the user could submit preferred leader election but it's basically
>>> a
>>> > form of reassignment so it would fall under the batching criterias. If
>>> the
>>> > batch they submit is smaller than the internal, then it'd be executed
>>> > immediately but otherwise it would be split into more batches. It
>>> might be
>>> > a different behavior as it may not be executed it in one batch but I
>>> think
>>> > it isn't a problem because we'll default to Int.MAX with the batches
>>> and
>>> > otherwise because since it's a form of reassignment I think it makes
>>> sense
>>> > to apply the same logic.
>>> >
>>> > > 6. What is the expected behavior if we dynamically change one of the
>>> > > configs to a lower value while a reassignment is happening. Would we
>>> > cancel
>>> > > some of the currently reassigned partitions or would we account for
>>> the
>>> > new
>>> > > values on the next reassignment? I assume the latter but it's good
>>> to be
>>> > > explicit
>>> >
>>> > Yep, it would be the latter. I'll make this explicit in the KIP too.
>>> >
>>> > About the nits:
>>> > Noted, will update the KIP to reflect your suggestions :)
>>> >
>>> > Viktor
>>> >
>>> > On Fri, Jun 28, 2019 at 5:59 PM Stanislav Kozlovski <
>>> > stanislav@confluent.io>
>>> > wrote:
>>> > >
>>> > > Hey there Viktor,
>>> > >
>>> > > Thanks for working on this KIP! I agree with the notion that
>>> reliability,
>>> > > stability and predictability of a reassignment should be a core
>>> feature
>>> > of
>>> > > Kafka.
>>> > >
>>> > > Let me first explicitly confirm my understanding of the configs and
>>> the
>>> > > algorithm:
>>> > > * reassignment.parallel.replica.count - the maximum total number of
>>> > > replicas that we can move at once, *per partition*
>>> > > * reassignment.parallel.partition.count - the maximum number of
>>> > partitions
>>> > > we can move at once
>>> > > * reassignment.parallel.leader.movements - the maximum number of
>>> leader
>>> > > movements we can have at once
>>> > >
>>> > > As far as I currently understand it, your proposed algorithm will
>>> > naturally
>>> > > prioritize leader movement first. e.g if
>>> > > reassignment.parallel.replica.count=1 and
>>> > >
>>> >
>>> >
>>> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
>>> > > we will always move one, the first possible, replica in the replica
>>> set
>>> > > (which will be the leader if part of the excess replica set (ER)).
>>> > > Am I correct in saying that?
>>> > >
>>> > > Regarding the KIP, I've got a couple of comments/questions::
>>> > >
>>> > > 1. Does it make sense to add `max` somewhere in the configs' names?
>>> > >
>>> > > 2. How does this KIP play along with KIP-455's notion of multiple
>>> > > rebalances - do the configs apply on a
>>> > > single AlterPartitionAssignmentsRequest or are they global?
>>> > >
>>> > > 3. Unless I've missed it, the algorithm does not take into account
>>> > > `reassignment.parallel.leader.movements`
>>> > >
>>> > > 4. The KIP says that the order of the input has some control over
>>> how the
>>> > > batches are created - i.e it's deterministic. What would the batches
>>> of
>>> > the
>>> > > following reassignment look like:
>>> > > reassignment.parallel.replica.count=1
>>> > > reassignment.parallel.partition.count=MAX_INT
>>> > > reassignment.parallel.leader.movements=1
>>> > > partitionA - (0,1,2) -> (3, 4, 5)
>>> > > partitionB - (0,1,2) -> (3,4,5)
>>> > > partitionC - (0,1,2) -> (3, 4, 5)
>>> > > From my understanding, we would start with A(0->3), B(1->4) and
>>> C(1->4).
>>> > Is
>>> > > that correct? Would the second step then continue with B(0->3)?
>>> > >
>>> > > If the configurations are global, I can imagine we will have a bit
>>> more
>>> > > trouble in preserving the expected ordering, especially across
>>> controller
>>> > > failovers -- but I'll avoid speculating until you confirm the scope
>>> of
>>> > the
>>> > > config.
>>> > >
>>> > > 5. Regarding the new behavior of electing the new preferred leader
>>> in the
>>> > > "first step" of the reassignment - does this obey the
>>> > > `auto.leader.rebalance.enable` config?
>>> > > If not, I have concerns regarding how backwards compatible this
>>> might be
>>> > -
>>> > > e.g imagine a user does a huge reassignment (as they have always
>>> done)
>>> > and
>>> > > suddenly a huge leader shift happens, whereas the user expected to
>>> > manually
>>> > > shift preferred leaders at a slower rate via
>>> > > the kafka-preferred-replica-election.sh tool.
>>> > >
>>> > > 6. What is the expected behavior if we dynamically change one of the
>>> > > configs to a lower value while a reassignment is happening. Would we
>>> > cancel
>>> > > some of the currently reassigned partitions or would we account for
>>> the
>>> > new
>>> > > values on the next reassignment? I assume the latter but it's good
>>> to be
>>> > > explicit
>>> > >
>>> > > As some small nits:
>>> > > - could we have a section in the KIP where we explicitly define what
>>> each
>>> > > config does? This can be inferred from the KIP as is but requires
>>> careful
>>> > > reading, whereas some developers might want to skim through the
>>> change to
>>> > > get a quick sense. It also improves readability but that's my
>>> personal
>>> > > opinion.
>>> > > - could you better clarify how a reassignment step is different from
>>> the
>>> > > currently existing algorithm? maybe laying both algorithms out in
>>> the KIP
>>> > > would be most clear
>>> > > - the names for the OngoingPartitionReassignment and
>>> > > CurrentPartitionReassignment fields in the
>>> > > ListPartitionReassignmentsResponse are a bit confusing to me.
>>> > > Unfortunately, I don't have a better suggestion, but maybe somebody
>>> else
>>> > in
>>> > > the community has.
>>> > >
>>> > > Thanks,
>>> > > Stanislav
>>> > >
>>> > > On Thu, Jun 27, 2019 at 3:24 PM Viktor Somogyi-Vass <
>>> > viktorsomogyi@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Hi All,
>>> > > >
>>> > > > I've renamed my KIP as its name was a bit confusing so we'll
>>> continue
>>> > it in
>>> > > > this thread.
>>> > > > The previous thread for record:
>>> > > >
>>> > > >
>>> >
>>> >
>>> https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E
>>> > > >
>>> > > > A short summary of the KIP:
>>> > > > In case of a vast partition reassignment (thousands of partitions
>>> at
>>> > once)
>>> > > > Kafka can collapse under the increased replication traffic. This
>>> KIP
>>> > will
>>> > > > mitigate it by introducing internal batching done by the
>>> controller.
>>> > > > Besides putting a bandwidth limit on the replication it is useful
>>> to
>>> > batch
>>> > > > partition movements as fewer number of partitions will use the
>>> > available
>>> > > > bandwidth for reassignment and they finish faster.
>>> > > > The main control handles are:
>>> > > > - the number of parallel leader movements,
>>> > > > - the number of parallel partition movements
>>> > > > - and the number of parallel replica movements.
>>> > > >
>>> > > > Thank you for the feedback and ideas so far in the previous thread
>>> and
>>> > I'm
>>> > > > happy to receive more.
>>> > > >
>>> > > > Regards,
>>> > > > Viktor
>>> > > >
>>> > >
>>> > >
>>> > > --
>>> > > Best,
>>> > > Stanislav
>>> >
>>>
>>>
>>> --
>>> Best,
>>> Stanislav
>>>
>>