You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Konstantine Karantasis <ko...@confluent.io> on 2019/02/04 20:23:06 UTC

Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Hi all,

Thank you for your comments so far.
Now that KIP freeze and feature freeze are behind us for version 2.2, I'd
like to bring this thread back at the top of the email stack, with the
following suggestion:

I'll be changing KIP-415's description to include a serialization format
that extends the current scheme and is based on Kafka structs.

The initial suggestion to transition to using an alternative serialization
format (e.g. flatbuffers) was made just in case we saw this would have a
good potential and we could arrive in a quick consensus on this matter. I
believe the arguments for such a transition make sense, but the pros are
probably not enough to outweigh the introduction of a dependency at this
point and justify changes in every client that will potentially use
incremental cooperative rebalancing in the future. The changes in the
rebalancing protocol have not been very frequent so far.

Admittedly, even more important is the fact that the discussion around the
serialization format of the new protocol is only tangentially related to
the core of KIP-415. Thus, in order to keep the discussion focused on the
essential changes required by KIP-415, which are expected to have
significant impact in addressing the stop-the-world effect, I'd like to
punt any optimizations to the serialization format and change the KIP to
describe a schema that depends on Kafka structs as the current (V0) version
does.

I hope this will allow us to make progress easier and bring the changes of
this new rebalancing protocol to Kafka clients, beginning with Kafka
Connect, in a more applicable and less disruptive way.

I'll change the schema descriptions by end of day.

Looking forward to your next comments!

Konstantine

On Mon, Jan 28, 2019 at 5:22 PM Konstantine Karantasis <
konstantine@confluent.io> wrote:

>
> Hi Ismael,
> thanks for bringing up serialization in the discussion!
>
> Indeed, JSON was considered given it's the prevalent text-based
> serialization option.
>
> In comparison to flatbuffers, most generic pros and cons are valid in this
> context too. Higher perfomance during serde, small size, optional fields,
> strongly typed and others.
>
> Specifically, for Connect's use case, flatbuffers serialization, although
> it introduces a single dependency, it appears more appealing for the
> following reasons:
>
> * The protocol is evolving from a binary format again to a binary one.
> * Although new fields, nested or not, are expected to be introduced (as in
> KIP-415) or old fields may get deprecated, the protocol schemas are
> expected to be simple, mostly flat and manageable. We won't need to process
> arbitrarily nested structures during runtime, for which JSON would be a
> better fit. The current proposal aims to make the current append only
> format a bit more flexible.
> * It's good to keep performance tight because the loop that includes
> subprotocol serde will need to accomodate resource release and assignment.
> Also, rebalancing in it's incremental cooperative form which is expected to
> be lighter has the potential to start happening more frequently. Parsing
> JSON with Jackson has been a hotspot in certain occasions in the past if I
> remember correctly.
> * Evolution will be facilitated by handling or ignoring optional fields
> easily. The protocol may evolve with fewer hard version bumps like the one
> proposed here from V0 to V1.
> * Optional fields are omitted, not just compressed.
> * Unpacking of fields does not require deserialization of the whole
> message, making transition between versions or flavors of the protocol easy
> and performant.
> * Flatbuffers' specification is simple and can be implemented, even in the
> absence of appropriate clients.
>
> I hope the above highlight why flatbuffers is a good candidate for this
> use case and, thus, worth adding as a dependency.
> Strictly speaking, yes, they introduce a new compile-time dependency. But
> during runtime, such a dependency seems equivalent to introducing a JSON
> parser (such as Jackson that is already being used in AK).
>
> Your question is very valid. It's probably worth adding an item under
> rejected alternatives, once we agree how we want to move forward.
>
> Best,
> Konstantine
>
>
>
> On Fri, Jan 25, 2019 at 11:13 PM Ismael Juma <is...@gmail.com> wrote:
>
>> Thanks for the KIP Konstantine. Quick question: introducing a new
>> serialization format (ie flatbuffers) has major implications. Have we
>> considered json? If so, why did we reject it?
>>
>> Ismael
>>
>> On Fri, Jan 11, 2019, 3:44 PM Konstantine Karantasis <
>> konstantine@confluent.io wrote:
>>
>> > Hi all,
>> >
>> > I just published KIP-415: Incremental Cooperative Rebalancing in Kafka
>> > Connect
>> > on the wiki here:
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
>> >
>> > This is the first KIP to suggest an implementation of incremental and
>> > cooperative rebalancing in the context of Kafka Connect. It aims to
>> provide
>> > an adequate solution to the stop-the-world effect that occurs in a
>> Connect
>> > cluster whenever a new connector configuration is submitted or a Connect
>> > Worker is added or removed from the cluster.
>> >
>> > Looking forward to your insightful feedback!
>> >
>> > Regards,
>> > Konstantine
>> >
>>
>

Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Konstantine Karantasis <ko...@confluent.io>.
After almost two months since this discussion thread on KIP-415 was opened
and more than 4 months since the design doc on Incremental Cooperative
Rebalancing was posted,
I believe that the open items have been discussed and that KIP-415, in its
current form, reflects the outcome of our discussions here.

Thus, I'm inclined to start a voting thread. Of course, this doesn't rule
out the possibility of additional comments coming up soon.
In this case, I'd appreciate if we could return for discussion here, and
maybe add just a brief mention on the [VOTE] thread.

Thank you all your insightful comments so far!

Konstantine


On Tue, Mar 5, 2019 at 11:20 AM Konstantine Karantasis <
konstantine@confluent.io> wrote:

>
> Hi Guozhang,
> thanks for your detailed comments!
>
> I thought it'd be a good idea to have some code supporting my replies. The
> PR for KIP-415 was opened this week. It needs some cleanup but I think it
> can give some good hints on the things you've touched upon so far.
>
> I'm answering to your four points inline, below:
>
> On Tue, Feb 12, 2019 at 9:44 PM Guozhang Wang <wa...@gmail.com> wrote:
>
>> Hello Konstantine,
>>
>> Thanks for the marvelous effort writing up this KIP! I've made a pass over
>> it and here's some comments:
>>
>> 1) For other audience to better understand the gist of this proposal, I'd
>> suggest we add the following context before the "Changes to Connect's
>> Rebalancing Process" section:
>>
>> "
>> The core in a group rebalance protocol is to have a synchronization
>> barrier
>> such that every member of the group will coordinate on, such that before
>> everyone hit this barrier all the states will not be changed at all. In
>> the
>> current rebalance protocol, this synchronization barrier is the reception
>> of the JoinGroup request: coordinator will not send any responses to any
>> members until it determines that all JoinGroup requests have been
>> received.
>> And since right after this barrier the new assignment will be made and the
>> assigned partitions may no longer be re-assigned to the same member (i.e.
>> consumer) of the group, today we have to be conservative that all members
>> revoke all the resources they currently own before proceeding to the
>> synchronization barrier.
>>
>> This KIP's key idea is to postpone the synchronization barrier to the
>> second rebalance's JoinGroup reception, so that in the first rebalance
>> since we know NO new assignment will ever be executed, members do not need
>> to revoke anything before joining the group. In other words, we are paying
>> more rebalances than the naive solution (at least two rebalances will be
>> required), but each rebalance now could be much lighter.
>> "
>>
>
> 1) Your point on fencing and rebalancing being seen as an implicit
> synchronization barrier is very good.
>
> I added the following paragraph as a separate section under proposed
> changes, because I believe such a technical detail wouldn't fit in the
> preamble of the proposed changes. Including here in its complete form as a
> reply:
>
> Until now the process of rebalancing has also implied a global
> synchronization barrier among the members of the group. With respect to the
> shared resources among the group, this barrier imposed a happens-before
> relationship between the release and the acquisition of a resource. In
> Connect this simply means that a task had to stop execution on a specific
> worker and save its progress, before starting execution on another worker
> after a rebalance. This is a necessary property for resources and is still
> guaranteed under this proposal. However, instead of the rebalance being a
> flat global synchronization point for all the connectors and tasks, the
> happens-before relationships are enforced only on tasks that are revoked
> and subsequently reassigned, whenever this is required. This optimization,
> that allows resolution of the stop-the-world effect (as it's also known in
> the current form of rebalancing), is allowed because the barrier imposed by
> rebalancing is implicit, it concerns the Connect workers and is not exposed
> in Connect's API. Connect tasks run independently and when a task stops and
> resumes progress it is not required that all tasks of a connector do the
> same. Of course, incremental cooperative rebalancing can be applied not
> only on individual tasks but also in a group of tasks, such as the tasks of
> a specific connector. However, in the changes proposed in this KIP, each
> task remains independent and is scheduled (assigned/revoked/reassigned)
> independently of other tasks.
>
>
>
>>
>> 2) In this idea, the leader needs to be able to distinguish the "first"
>> rebalance where no new assignment will be executed, but only revocations
>> are indicated, and the "second" rebalance where there are some "either
>> revoked or left from leaving member" partitions to be assigned. What is
>> not
>> clear to me is how to distinguish these two cases, and when it decides to
>> inject the delay (i.e. it is the first rebalance) v.s. not injecting
>> delays. Comparing the "*Non-first new member joins*" and "*Worker
>> bounces*"
>> scenarios: in the former case, the leader would decide it is the first
>> rebalance and let W1 revoke some assignment, WITHOUT delay, while in the
>> latter case, when W2 rejoins (at this case it rejoined as a new member, so
>> from the coordinator and leader's point of view there should be no
>> difference compared to "W2 joins as a new member"), the leader assigns to
>> W2 with AT2 and BC0. Also what we did not illustrate in the KIP on
>> consumer
>> failures in between rebalances: for another example, suppose in
>> "*Non-first
>> new member joins*" W1 fails after revoked some partitions but before
>> triggers another rebalance, then when coordinator triggers another join
>> based on failure detection, how would the leader assign partitions? Would
>> it assign all five partitions immediately to W2 and W3 or would it inject
>> delays and not assign any to W2 and W3, or would it assign the ones
>> indicated for revocation to W2 and W3? Could you provide some pseudo code
>> on the leader logic, such that given the list of subscriptions, how would
>> the leader decides:
>>
>> 2.a) adds an delay or not;
>> 2.b) assign new resources to some members or not;
>> 2.c) revoke new resources to some members or not.
>>
>
> 2) The leader can distinguish between lost and new assignments and defer
> reassignment of the former while at the same time starts immediately
> execution of the latter based on a number of things:
>
> a) The current members send their current assignments (running tasks)
> b) The current leader maintains the previous assignment
> c) The current leader reads the up-to-date list of configured tasks, that
> reflects new and removed tasks
>
> As far as delays are concerned, I believe it's good to keep things simple.
> First the set of lost/unaccounted tasks is compiled and at the same time
> the leader keeps a list of candidate workers with no assignments. These
> workers are potentially workers that lost their tasks but managed to rejoin
> in time. Once a delay expires, the lost tasks are reassigned to the these
> candidates. If the leader fails, the reassigned happens immediately without
> taking into account a delay or considering any tasks as lost.
>
> I'd like to return with a state diagram of the current proposed task
> scheduler (or assignor as it's called in the code) before we merge this
> feature. But since this will include implementation specific details, I
> think it would make sense not to block the KIP on it.
>
>
>
>>
>> 3) About compatibility, I'm also wondering how would downgrade be executed
>> here: suppose after upgrading the Connect jar and migrate to `cooperative`
>> mode, users discovered a bug and hence needs to downgrade back to older
>> versions that does not support `cooperative`.
>>
>
> 3) The new protocol is backwards compatible with the old one. Once a
> worker is downgraded the whole group can run with the old version. The
> metadata and assignments are compatible and can be read both ways. The old
> code path is retained, unchanged, so by switching to the old protocol we
> return to the execution mode that stops all the tasks before rebalancing
> and assigns them in the old way.
>
>
>>
>> 4) This is sort of orthogonal to this KIP, but I'm also considering about
>> code sharing with the future Streams incremental rebalance protocol. For
>> Kafka Streams, one difference is that because of the state maintenance,
>> migrating tasks are heavier and hence we should consider bootstrapping the
>> assigned task before revoking it from the old client. So far it seems
>> Streams incremental rebalance protocol would be a bit different from the
>> Connect protocol proposed in KIP-415 here. What they may share in common
>> are a) flatbuffer utils for encoding metadata bytes, and b) consumer
>> members actively triggers another rebalance by sending join-group request.
>> So I'm wondering if we can push these two pieces of logic into the
>> AbstractCoordinator so it can be shared?
>>
>> Guozhang
>>
>>
> 4) Currently, in Connect I've worked on implementing the changes
> exclusively within the WorkerCoordinator and Connect's protocol
> definitions. At the moment I don't see significant opportunity for code
> sharing and abstraction with respect to incremental cooperative
> rebalancing. Also, flatbuffers consideration has been postponed and might
> not return soon. I had included flatbuffers in the initial proposal just in
> case we recognized a need for it, but it wasn't absolutely necessary and
> now KIP-415 has been updated to use the existing Kafka protocol types.
>
> Thanks!
> Konstantine
>
>
>
>>
>> On Wed, Feb 6, 2019 at 9:58 PM Boyang Chen <bc...@outlook.com> wrote:
>>
>> > Thanks Konstantine for the great summary! +1 for having a separate KIP
>> > discussing the trade-offs for using a new serialization format for the
>> > protocol encoding. We probably could discuss a wider options and
>> benchmark
>> > on the performance before reaching a final decision.
>> >
>> > Best,
>> > Boyang
>> > ________________________________
>> > From: Konstantine Karantasis <ko...@confluent.io>
>> > Sent: Tuesday, February 5, 2019 4:23 AM
>> > To: dev@kafka.apache.org
>> > Subject: Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in
>> > Kafka Connect
>> >
>> > Hi all,
>> >
>> > Thank you for your comments so far.
>> > Now that KIP freeze and feature freeze are behind us for version 2.2,
>> I'd
>> > like to bring this thread back at the top of the email stack, with the
>> > following suggestion:
>> >
>> > I'll be changing KIP-415's description to include a serialization format
>> > that extends the current scheme and is based on Kafka structs.
>> >
>> > The initial suggestion to transition to using an alternative
>> serialization
>> > format (e.g. flatbuffers) was made just in case we saw this would have a
>> > good potential and we could arrive in a quick consensus on this matter.
>> I
>> > believe the arguments for such a transition make sense, but the pros are
>> > probably not enough to outweigh the introduction of a dependency at this
>> > point and justify changes in every client that will potentially use
>> > incremental cooperative rebalancing in the future. The changes in the
>> > rebalancing protocol have not been very frequent so far.
>> >
>> > Admittedly, even more important is the fact that the discussion around
>> the
>> > serialization format of the new protocol is only tangentially related to
>> > the core of KIP-415. Thus, in order to keep the discussion focused on
>> the
>> > essential changes required by KIP-415, which are expected to have
>> > significant impact in addressing the stop-the-world effect, I'd like to
>> > punt any optimizations to the serialization format and change the KIP to
>> > describe a schema that depends on Kafka structs as the current (V0)
>> version
>> > does.
>> >
>> > I hope this will allow us to make progress easier and bring the changes
>> of
>> > this new rebalancing protocol to Kafka clients, beginning with Kafka
>> > Connect, in a more applicable and less disruptive way.
>> >
>> > I'll change the schema descriptions by end of day.
>> >
>> > Looking forward to your next comments!
>> >
>> > Konstantine
>> >
>> > On Mon, Jan 28, 2019 at 5:22 PM Konstantine Karantasis <
>> > konstantine@confluent.io> wrote:
>> >
>> > >
>> > > Hi Ismael,
>> > > thanks for bringing up serialization in the discussion!
>> > >
>> > > Indeed, JSON was considered given it's the prevalent text-based
>> > > serialization option.
>> > >
>> > > In comparison to flatbuffers, most generic pros and cons are valid in
>> > this
>> > > context too. Higher perfomance during serde, small size, optional
>> fields,
>> > > strongly typed and others.
>> > >
>> > > Specifically, for Connect's use case, flatbuffers serialization,
>> although
>> > > it introduces a single dependency, it appears more appealing for the
>> > > following reasons:
>> > >
>> > > * The protocol is evolving from a binary format again to a binary one.
>> > > * Although new fields, nested or not, are expected to be introduced
>> (as
>> > in
>> > > KIP-415) or old fields may get deprecated, the protocol schemas are
>> > > expected to be simple, mostly flat and manageable. We won't need to
>> > process
>> > > arbitrarily nested structures during runtime, for which JSON would be
>> a
>> > > better fit. The current proposal aims to make the current append only
>> > > format a bit more flexible.
>> > > * It's good to keep performance tight because the loop that includes
>> > > subprotocol serde will need to accomodate resource release and
>> > assignment.
>> > > Also, rebalancing in it's incremental cooperative form which is
>> expected
>> > to
>> > > be lighter has the potential to start happening more frequently.
>> Parsing
>> > > JSON with Jackson has been a hotspot in certain occasions in the past
>> if
>> > I
>> > > remember correctly.
>> > > * Evolution will be facilitated by handling or ignoring optional
>> fields
>> > > easily. The protocol may evolve with fewer hard version bumps like the
>> > one
>> > > proposed here from V0 to V1.
>> > > * Optional fields are omitted, not just compressed.
>> > > * Unpacking of fields does not require deserialization of the whole
>> > > message, making transition between versions or flavors of the protocol
>> > easy
>> > > and performant.
>> > > * Flatbuffers' specification is simple and can be implemented, even in
>> > the
>> > > absence of appropriate clients.
>> > >
>> > > I hope the above highlight why flatbuffers is a good candidate for
>> this
>> > > use case and, thus, worth adding as a dependency.
>> > > Strictly speaking, yes, they introduce a new compile-time dependency.
>> But
>> > > during runtime, such a dependency seems equivalent to introducing a
>> JSON
>> > > parser (such as Jackson that is already being used in AK).
>> > >
>> > > Your question is very valid. It's probably worth adding an item under
>> > > rejected alternatives, once we agree how we want to move forward.
>> > >
>> > > Best,
>> > > Konstantine
>> > >
>> > >
>> > >
>> > > On Fri, Jan 25, 2019 at 11:13 PM Ismael Juma <is...@gmail.com>
>> wrote:
>> > >
>> > >> Thanks for the KIP Konstantine. Quick question: introducing a new
>> > >> serialization format (ie flatbuffers) has major implications. Have we
>> > >> considered json? If so, why did we reject it?
>> > >>
>> > >> Ismael
>> > >>
>> > >> On Fri, Jan 11, 2019, 3:44 PM Konstantine Karantasis <
>> > >> konstantine@confluent.io wrote:
>> > >>
>> > >> > Hi all,
>> > >> >
>> > >> > I just published KIP-415: Incremental Cooperative Rebalancing in
>> Kafka
>> > >> > Connect
>> > >> > on the wiki here:
>> > >> >
>> > >> >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
>> > >> >
>> > >> > This is the first KIP to suggest an implementation of incremental
>> and
>> > >> > cooperative rebalancing in the context of Kafka Connect. It aims to
>> > >> provide
>> > >> > an adequate solution to the stop-the-world effect that occurs in a
>> > >> Connect
>> > >> > cluster whenever a new connector configuration is submitted or a
>> > Connect
>> > >> > Worker is added or removed from the cluster.
>> > >> >
>> > >> > Looking forward to your insightful feedback!
>> > >> >
>> > >> > Regards,
>> > >> > Konstantine
>> > >> >
>> > >>
>> > >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>

Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Konstantine Karantasis <ko...@confluent.io>.
Hi Guozhang,
thanks for your detailed comments!

I thought it'd be a good idea to have some code supporting my replies. The
PR for KIP-415 was opened this week. It needs some cleanup but I think it
can give some good hints on the things you've touched upon so far.

I'm answering to your four points inline, below:

On Tue, Feb 12, 2019 at 9:44 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Konstantine,
>
> Thanks for the marvelous effort writing up this KIP! I've made a pass over
> it and here's some comments:
>
> 1) For other audience to better understand the gist of this proposal, I'd
> suggest we add the following context before the "Changes to Connect's
> Rebalancing Process" section:
>
> "
> The core in a group rebalance protocol is to have a synchronization barrier
> such that every member of the group will coordinate on, such that before
> everyone hit this barrier all the states will not be changed at all. In the
> current rebalance protocol, this synchronization barrier is the reception
> of the JoinGroup request: coordinator will not send any responses to any
> members until it determines that all JoinGroup requests have been received.
> And since right after this barrier the new assignment will be made and the
> assigned partitions may no longer be re-assigned to the same member (i.e.
> consumer) of the group, today we have to be conservative that all members
> revoke all the resources they currently own before proceeding to the
> synchronization barrier.
>
> This KIP's key idea is to postpone the synchronization barrier to the
> second rebalance's JoinGroup reception, so that in the first rebalance
> since we know NO new assignment will ever be executed, members do not need
> to revoke anything before joining the group. In other words, we are paying
> more rebalances than the naive solution (at least two rebalances will be
> required), but each rebalance now could be much lighter.
> "
>

1) Your point on fencing and rebalancing being seen as an implicit
synchronization barrier is very good.

I added the following paragraph as a separate section under proposed
changes, because I believe such a technical detail wouldn't fit in the
preamble of the proposed changes. Including here in its complete form as a
reply:

Until now the process of rebalancing has also implied a global
synchronization barrier among the members of the group. With respect to the
shared resources among the group, this barrier imposed a happens-before
relationship between the release and the acquisition of a resource. In
Connect this simply means that a task had to stop execution on a specific
worker and save its progress, before starting execution on another worker
after a rebalance. This is a necessary property for resources and is still
guaranteed under this proposal. However, instead of the rebalance being a
flat global synchronization point for all the connectors and tasks, the
happens-before relationships are enforced only on tasks that are revoked
and subsequently reassigned, whenever this is required. This optimization,
that allows resolution of the stop-the-world effect (as it's also known in
the current form of rebalancing), is allowed because the barrier imposed by
rebalancing is implicit, it concerns the Connect workers and is not exposed
in Connect's API. Connect tasks run independently and when a task stops and
resumes progress it is not required that all tasks of a connector do the
same. Of course, incremental cooperative rebalancing can be applied not
only on individual tasks but also in a group of tasks, such as the tasks of
a specific connector. However, in the changes proposed in this KIP, each
task remains independent and is scheduled (assigned/revoked/reassigned)
independently of other tasks.



>
> 2) In this idea, the leader needs to be able to distinguish the "first"
> rebalance where no new assignment will be executed, but only revocations
> are indicated, and the "second" rebalance where there are some "either
> revoked or left from leaving member" partitions to be assigned. What is not
> clear to me is how to distinguish these two cases, and when it decides to
> inject the delay (i.e. it is the first rebalance) v.s. not injecting
> delays. Comparing the "*Non-first new member joins*" and "*Worker bounces*"
> scenarios: in the former case, the leader would decide it is the first
> rebalance and let W1 revoke some assignment, WITHOUT delay, while in the
> latter case, when W2 rejoins (at this case it rejoined as a new member, so
> from the coordinator and leader's point of view there should be no
> difference compared to "W2 joins as a new member"), the leader assigns to
> W2 with AT2 and BC0. Also what we did not illustrate in the KIP on consumer
> failures in between rebalances: for another example, suppose in "*Non-first
> new member joins*" W1 fails after revoked some partitions but before
> triggers another rebalance, then when coordinator triggers another join
> based on failure detection, how would the leader assign partitions? Would
> it assign all five partitions immediately to W2 and W3 or would it inject
> delays and not assign any to W2 and W3, or would it assign the ones
> indicated for revocation to W2 and W3? Could you provide some pseudo code
> on the leader logic, such that given the list of subscriptions, how would
> the leader decides:
>
> 2.a) adds an delay or not;
> 2.b) assign new resources to some members or not;
> 2.c) revoke new resources to some members or not.
>

2) The leader can distinguish between lost and new assignments and defer
reassignment of the former while at the same time starts immediately
execution of the latter based on a number of things:

a) The current members send their current assignments (running tasks)
b) The current leader maintains the previous assignment
c) The current leader reads the up-to-date list of configured tasks, that
reflects new and removed tasks

As far as delays are concerned, I believe it's good to keep things simple.
First the set of lost/unaccounted tasks is compiled and at the same time
the leader keeps a list of candidate workers with no assignments. These
workers are potentially workers that lost their tasks but managed to rejoin
in time. Once a delay expires, the lost tasks are reassigned to the these
candidates. If the leader fails, the reassigned happens immediately without
taking into account a delay or considering any tasks as lost.

I'd like to return with a state diagram of the current proposed task
scheduler (or assignor as it's called in the code) before we merge this
feature. But since this will include implementation specific details, I
think it would make sense not to block the KIP on it.



>
> 3) About compatibility, I'm also wondering how would downgrade be executed
> here: suppose after upgrading the Connect jar and migrate to `cooperative`
> mode, users discovered a bug and hence needs to downgrade back to older
> versions that does not support `cooperative`.
>

3) The new protocol is backwards compatible with the old one. Once a worker
is downgraded the whole group can run with the old version. The metadata
and assignments are compatible and can be read both ways. The old code path
is retained, unchanged, so by switching to the old protocol we return to
the execution mode that stops all the tasks before rebalancing and assigns
them in the old way.


>
> 4) This is sort of orthogonal to this KIP, but I'm also considering about
> code sharing with the future Streams incremental rebalance protocol. For
> Kafka Streams, one difference is that because of the state maintenance,
> migrating tasks are heavier and hence we should consider bootstrapping the
> assigned task before revoking it from the old client. So far it seems
> Streams incremental rebalance protocol would be a bit different from the
> Connect protocol proposed in KIP-415 here. What they may share in common
> are a) flatbuffer utils for encoding metadata bytes, and b) consumer
> members actively triggers another rebalance by sending join-group request.
> So I'm wondering if we can push these two pieces of logic into the
> AbstractCoordinator so it can be shared?
>
> Guozhang
>
>
4) Currently, in Connect I've worked on implementing the changes
exclusively within the WorkerCoordinator and Connect's protocol
definitions. At the moment I don't see significant opportunity for code
sharing and abstraction with respect to incremental cooperative
rebalancing. Also, flatbuffers consideration has been postponed and might
not return soon. I had included flatbuffers in the initial proposal just in
case we recognized a need for it, but it wasn't absolutely necessary and
now KIP-415 has been updated to use the existing Kafka protocol types.

Thanks!
Konstantine



>
> On Wed, Feb 6, 2019 at 9:58 PM Boyang Chen <bc...@outlook.com> wrote:
>
> > Thanks Konstantine for the great summary! +1 for having a separate KIP
> > discussing the trade-offs for using a new serialization format for the
> > protocol encoding. We probably could discuss a wider options and
> benchmark
> > on the performance before reaching a final decision.
> >
> > Best,
> > Boyang
> > ________________________________
> > From: Konstantine Karantasis <ko...@confluent.io>
> > Sent: Tuesday, February 5, 2019 4:23 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in
> > Kafka Connect
> >
> > Hi all,
> >
> > Thank you for your comments so far.
> > Now that KIP freeze and feature freeze are behind us for version 2.2, I'd
> > like to bring this thread back at the top of the email stack, with the
> > following suggestion:
> >
> > I'll be changing KIP-415's description to include a serialization format
> > that extends the current scheme and is based on Kafka structs.
> >
> > The initial suggestion to transition to using an alternative
> serialization
> > format (e.g. flatbuffers) was made just in case we saw this would have a
> > good potential and we could arrive in a quick consensus on this matter. I
> > believe the arguments for such a transition make sense, but the pros are
> > probably not enough to outweigh the introduction of a dependency at this
> > point and justify changes in every client that will potentially use
> > incremental cooperative rebalancing in the future. The changes in the
> > rebalancing protocol have not been very frequent so far.
> >
> > Admittedly, even more important is the fact that the discussion around
> the
> > serialization format of the new protocol is only tangentially related to
> > the core of KIP-415. Thus, in order to keep the discussion focused on the
> > essential changes required by KIP-415, which are expected to have
> > significant impact in addressing the stop-the-world effect, I'd like to
> > punt any optimizations to the serialization format and change the KIP to
> > describe a schema that depends on Kafka structs as the current (V0)
> version
> > does.
> >
> > I hope this will allow us to make progress easier and bring the changes
> of
> > this new rebalancing protocol to Kafka clients, beginning with Kafka
> > Connect, in a more applicable and less disruptive way.
> >
> > I'll change the schema descriptions by end of day.
> >
> > Looking forward to your next comments!
> >
> > Konstantine
> >
> > On Mon, Jan 28, 2019 at 5:22 PM Konstantine Karantasis <
> > konstantine@confluent.io> wrote:
> >
> > >
> > > Hi Ismael,
> > > thanks for bringing up serialization in the discussion!
> > >
> > > Indeed, JSON was considered given it's the prevalent text-based
> > > serialization option.
> > >
> > > In comparison to flatbuffers, most generic pros and cons are valid in
> > this
> > > context too. Higher perfomance during serde, small size, optional
> fields,
> > > strongly typed and others.
> > >
> > > Specifically, for Connect's use case, flatbuffers serialization,
> although
> > > it introduces a single dependency, it appears more appealing for the
> > > following reasons:
> > >
> > > * The protocol is evolving from a binary format again to a binary one.
> > > * Although new fields, nested or not, are expected to be introduced (as
> > in
> > > KIP-415) or old fields may get deprecated, the protocol schemas are
> > > expected to be simple, mostly flat and manageable. We won't need to
> > process
> > > arbitrarily nested structures during runtime, for which JSON would be a
> > > better fit. The current proposal aims to make the current append only
> > > format a bit more flexible.
> > > * It's good to keep performance tight because the loop that includes
> > > subprotocol serde will need to accomodate resource release and
> > assignment.
> > > Also, rebalancing in it's incremental cooperative form which is
> expected
> > to
> > > be lighter has the potential to start happening more frequently.
> Parsing
> > > JSON with Jackson has been a hotspot in certain occasions in the past
> if
> > I
> > > remember correctly.
> > > * Evolution will be facilitated by handling or ignoring optional fields
> > > easily. The protocol may evolve with fewer hard version bumps like the
> > one
> > > proposed here from V0 to V1.
> > > * Optional fields are omitted, not just compressed.
> > > * Unpacking of fields does not require deserialization of the whole
> > > message, making transition between versions or flavors of the protocol
> > easy
> > > and performant.
> > > * Flatbuffers' specification is simple and can be implemented, even in
> > the
> > > absence of appropriate clients.
> > >
> > > I hope the above highlight why flatbuffers is a good candidate for this
> > > use case and, thus, worth adding as a dependency.
> > > Strictly speaking, yes, they introduce a new compile-time dependency.
> But
> > > during runtime, such a dependency seems equivalent to introducing a
> JSON
> > > parser (such as Jackson that is already being used in AK).
> > >
> > > Your question is very valid. It's probably worth adding an item under
> > > rejected alternatives, once we agree how we want to move forward.
> > >
> > > Best,
> > > Konstantine
> > >
> > >
> > >
> > > On Fri, Jan 25, 2019 at 11:13 PM Ismael Juma <is...@gmail.com>
> wrote:
> > >
> > >> Thanks for the KIP Konstantine. Quick question: introducing a new
> > >> serialization format (ie flatbuffers) has major implications. Have we
> > >> considered json? If so, why did we reject it?
> > >>
> > >> Ismael
> > >>
> > >> On Fri, Jan 11, 2019, 3:44 PM Konstantine Karantasis <
> > >> konstantine@confluent.io wrote:
> > >>
> > >> > Hi all,
> > >> >
> > >> > I just published KIP-415: Incremental Cooperative Rebalancing in
> Kafka
> > >> > Connect
> > >> > on the wiki here:
> > >> >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> > >> >
> > >> > This is the first KIP to suggest an implementation of incremental
> and
> > >> > cooperative rebalancing in the context of Kafka Connect. It aims to
> > >> provide
> > >> > an adequate solution to the stop-the-world effect that occurs in a
> > >> Connect
> > >> > cluster whenever a new connector configuration is submitted or a
> > Connect
> > >> > Worker is added or removed from the cluster.
> > >> >
> > >> > Looking forward to your insightful feedback!
> > >> >
> > >> > Regards,
> > >> > Konstantine
> > >> >
> > >>
> > >
> >
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Konstantine,

Thanks for the marvelous effort writing up this KIP! I've made a pass over
it and here's some comments:

1) For other audience to better understand the gist of this proposal, I'd
suggest we add the following context before the "Changes to Connect's
Rebalancing Process" section:

"
The core in a group rebalance protocol is to have a synchronization barrier
such that every member of the group will coordinate on, such that before
everyone hit this barrier all the states will not be changed at all. In the
current rebalance protocol, this synchronization barrier is the reception
of the JoinGroup request: coordinator will not send any responses to any
members until it determines that all JoinGroup requests have been received.
And since right after this barrier the new assignment will be made and the
assigned partitions may no longer be re-assigned to the same member (i.e.
consumer) of the group, today we have to be conservative that all members
revoke all the resources they currently own before proceeding to the
synchronization barrier.

This KIP's key idea is to postpone the synchronization barrier to the
second rebalance's JoinGroup reception, so that in the first rebalance
since we know NO new assignment will ever be executed, members do not need
to revoke anything before joining the group. In other words, we are paying
more rebalances than the naive solution (at least two rebalances will be
required), but each rebalance now could be much lighter.
"

2) In this idea, the leader needs to be able to distinguish the "first"
rebalance where no new assignment will be executed, but only revocations
are indicated, and the "second" rebalance where there are some "either
revoked or left from leaving member" partitions to be assigned. What is not
clear to me is how to distinguish these two cases, and when it decides to
inject the delay (i.e. it is the first rebalance) v.s. not injecting
delays. Comparing the "*Non-first new member joins*" and "*Worker bounces*"
scenarios: in the former case, the leader would decide it is the first
rebalance and let W1 revoke some assignment, WITHOUT delay, while in the
latter case, when W2 rejoins (at this case it rejoined as a new member, so
from the coordinator and leader's point of view there should be no
difference compared to "W2 joins as a new member"), the leader assigns to
W2 with AT2 and BC0. Also what we did not illustrate in the KIP on consumer
failures in between rebalances: for another example, suppose in "*Non-first
new member joins*" W1 fails after revoked some partitions but before
triggers another rebalance, then when coordinator triggers another join
based on failure detection, how would the leader assign partitions? Would
it assign all five partitions immediately to W2 and W3 or would it inject
delays and not assign any to W2 and W3, or would it assign the ones
indicated for revocation to W2 and W3? Could you provide some pseudo code
on the leader logic, such that given the list of subscriptions, how would
the leader decides:

2.a) adds an delay or not;
2.b) assign new resources to some members or not;
2.c) revoke new resources to some members or not.

3) About compatibility, I'm also wondering how would downgrade be executed
here: suppose after upgrading the Connect jar and migrate to `cooperative`
mode, users discovered a bug and hence needs to downgrade back to older
versions that does not support `cooperative`.

4) This is sort of orthogonal to this KIP, but I'm also considering about
code sharing with the future Streams incremental rebalance protocol. For
Kafka Streams, one difference is that because of the state maintenance,
migrating tasks are heavier and hence we should consider bootstrapping the
assigned task before revoking it from the old client. So far it seems
Streams incremental rebalance protocol would be a bit different from the
Connect protocol proposed in KIP-415 here. What they may share in common
are a) flatbuffer utils for encoding metadata bytes, and b) consumer
members actively triggers another rebalance by sending join-group request.
So I'm wondering if we can push these two pieces of logic into the
AbstractCoordinator so it can be shared?

Guozhang



On Wed, Feb 6, 2019 at 9:58 PM Boyang Chen <bc...@outlook.com> wrote:

> Thanks Konstantine for the great summary! +1 for having a separate KIP
> discussing the trade-offs for using a new serialization format for the
> protocol encoding. We probably could discuss a wider options and benchmark
> on the performance before reaching a final decision.
>
> Best,
> Boyang
> ________________________________
> From: Konstantine Karantasis <ko...@confluent.io>
> Sent: Tuesday, February 5, 2019 4:23 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in
> Kafka Connect
>
> Hi all,
>
> Thank you for your comments so far.
> Now that KIP freeze and feature freeze are behind us for version 2.2, I'd
> like to bring this thread back at the top of the email stack, with the
> following suggestion:
>
> I'll be changing KIP-415's description to include a serialization format
> that extends the current scheme and is based on Kafka structs.
>
> The initial suggestion to transition to using an alternative serialization
> format (e.g. flatbuffers) was made just in case we saw this would have a
> good potential and we could arrive in a quick consensus on this matter. I
> believe the arguments for such a transition make sense, but the pros are
> probably not enough to outweigh the introduction of a dependency at this
> point and justify changes in every client that will potentially use
> incremental cooperative rebalancing in the future. The changes in the
> rebalancing protocol have not been very frequent so far.
>
> Admittedly, even more important is the fact that the discussion around the
> serialization format of the new protocol is only tangentially related to
> the core of KIP-415. Thus, in order to keep the discussion focused on the
> essential changes required by KIP-415, which are expected to have
> significant impact in addressing the stop-the-world effect, I'd like to
> punt any optimizations to the serialization format and change the KIP to
> describe a schema that depends on Kafka structs as the current (V0) version
> does.
>
> I hope this will allow us to make progress easier and bring the changes of
> this new rebalancing protocol to Kafka clients, beginning with Kafka
> Connect, in a more applicable and less disruptive way.
>
> I'll change the schema descriptions by end of day.
>
> Looking forward to your next comments!
>
> Konstantine
>
> On Mon, Jan 28, 2019 at 5:22 PM Konstantine Karantasis <
> konstantine@confluent.io> wrote:
>
> >
> > Hi Ismael,
> > thanks for bringing up serialization in the discussion!
> >
> > Indeed, JSON was considered given it's the prevalent text-based
> > serialization option.
> >
> > In comparison to flatbuffers, most generic pros and cons are valid in
> this
> > context too. Higher perfomance during serde, small size, optional fields,
> > strongly typed and others.
> >
> > Specifically, for Connect's use case, flatbuffers serialization, although
> > it introduces a single dependency, it appears more appealing for the
> > following reasons:
> >
> > * The protocol is evolving from a binary format again to a binary one.
> > * Although new fields, nested or not, are expected to be introduced (as
> in
> > KIP-415) or old fields may get deprecated, the protocol schemas are
> > expected to be simple, mostly flat and manageable. We won't need to
> process
> > arbitrarily nested structures during runtime, for which JSON would be a
> > better fit. The current proposal aims to make the current append only
> > format a bit more flexible.
> > * It's good to keep performance tight because the loop that includes
> > subprotocol serde will need to accomodate resource release and
> assignment.
> > Also, rebalancing in it's incremental cooperative form which is expected
> to
> > be lighter has the potential to start happening more frequently. Parsing
> > JSON with Jackson has been a hotspot in certain occasions in the past if
> I
> > remember correctly.
> > * Evolution will be facilitated by handling or ignoring optional fields
> > easily. The protocol may evolve with fewer hard version bumps like the
> one
> > proposed here from V0 to V1.
> > * Optional fields are omitted, not just compressed.
> > * Unpacking of fields does not require deserialization of the whole
> > message, making transition between versions or flavors of the protocol
> easy
> > and performant.
> > * Flatbuffers' specification is simple and can be implemented, even in
> the
> > absence of appropriate clients.
> >
> > I hope the above highlight why flatbuffers is a good candidate for this
> > use case and, thus, worth adding as a dependency.
> > Strictly speaking, yes, they introduce a new compile-time dependency. But
> > during runtime, such a dependency seems equivalent to introducing a JSON
> > parser (such as Jackson that is already being used in AK).
> >
> > Your question is very valid. It's probably worth adding an item under
> > rejected alternatives, once we agree how we want to move forward.
> >
> > Best,
> > Konstantine
> >
> >
> >
> > On Fri, Jan 25, 2019 at 11:13 PM Ismael Juma <is...@gmail.com> wrote:
> >
> >> Thanks for the KIP Konstantine. Quick question: introducing a new
> >> serialization format (ie flatbuffers) has major implications. Have we
> >> considered json? If so, why did we reject it?
> >>
> >> Ismael
> >>
> >> On Fri, Jan 11, 2019, 3:44 PM Konstantine Karantasis <
> >> konstantine@confluent.io wrote:
> >>
> >> > Hi all,
> >> >
> >> > I just published KIP-415: Incremental Cooperative Rebalancing in Kafka
> >> > Connect
> >> > on the wiki here:
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> >> >
> >> > This is the first KIP to suggest an implementation of incremental and
> >> > cooperative rebalancing in the context of Kafka Connect. It aims to
> >> provide
> >> > an adequate solution to the stop-the-world effect that occurs in a
> >> Connect
> >> > cluster whenever a new connector configuration is submitted or a
> Connect
> >> > Worker is added or removed from the cluster.
> >> >
> >> > Looking forward to your insightful feedback!
> >> >
> >> > Regards,
> >> > Konstantine
> >> >
> >>
> >
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Boyang Chen <bc...@outlook.com>.
Thanks Konstantine for the great summary! +1 for having a separate KIP discussing the trade-offs for using a new serialization format for the protocol encoding. We probably could discuss a wider options and benchmark on the performance before reaching a final decision.

Best,
Boyang
________________________________
From: Konstantine Karantasis <ko...@confluent.io>
Sent: Tuesday, February 5, 2019 4:23 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Hi all,

Thank you for your comments so far.
Now that KIP freeze and feature freeze are behind us for version 2.2, I'd
like to bring this thread back at the top of the email stack, with the
following suggestion:

I'll be changing KIP-415's description to include a serialization format
that extends the current scheme and is based on Kafka structs.

The initial suggestion to transition to using an alternative serialization
format (e.g. flatbuffers) was made just in case we saw this would have a
good potential and we could arrive in a quick consensus on this matter. I
believe the arguments for such a transition make sense, but the pros are
probably not enough to outweigh the introduction of a dependency at this
point and justify changes in every client that will potentially use
incremental cooperative rebalancing in the future. The changes in the
rebalancing protocol have not been very frequent so far.

Admittedly, even more important is the fact that the discussion around the
serialization format of the new protocol is only tangentially related to
the core of KIP-415. Thus, in order to keep the discussion focused on the
essential changes required by KIP-415, which are expected to have
significant impact in addressing the stop-the-world effect, I'd like to
punt any optimizations to the serialization format and change the KIP to
describe a schema that depends on Kafka structs as the current (V0) version
does.

I hope this will allow us to make progress easier and bring the changes of
this new rebalancing protocol to Kafka clients, beginning with Kafka
Connect, in a more applicable and less disruptive way.

I'll change the schema descriptions by end of day.

Looking forward to your next comments!

Konstantine

On Mon, Jan 28, 2019 at 5:22 PM Konstantine Karantasis <
konstantine@confluent.io> wrote:

>
> Hi Ismael,
> thanks for bringing up serialization in the discussion!
>
> Indeed, JSON was considered given it's the prevalent text-based
> serialization option.
>
> In comparison to flatbuffers, most generic pros and cons are valid in this
> context too. Higher perfomance during serde, small size, optional fields,
> strongly typed and others.
>
> Specifically, for Connect's use case, flatbuffers serialization, although
> it introduces a single dependency, it appears more appealing for the
> following reasons:
>
> * The protocol is evolving from a binary format again to a binary one.
> * Although new fields, nested or not, are expected to be introduced (as in
> KIP-415) or old fields may get deprecated, the protocol schemas are
> expected to be simple, mostly flat and manageable. We won't need to process
> arbitrarily nested structures during runtime, for which JSON would be a
> better fit. The current proposal aims to make the current append only
> format a bit more flexible.
> * It's good to keep performance tight because the loop that includes
> subprotocol serde will need to accomodate resource release and assignment.
> Also, rebalancing in it's incremental cooperative form which is expected to
> be lighter has the potential to start happening more frequently. Parsing
> JSON with Jackson has been a hotspot in certain occasions in the past if I
> remember correctly.
> * Evolution will be facilitated by handling or ignoring optional fields
> easily. The protocol may evolve with fewer hard version bumps like the one
> proposed here from V0 to V1.
> * Optional fields are omitted, not just compressed.
> * Unpacking of fields does not require deserialization of the whole
> message, making transition between versions or flavors of the protocol easy
> and performant.
> * Flatbuffers' specification is simple and can be implemented, even in the
> absence of appropriate clients.
>
> I hope the above highlight why flatbuffers is a good candidate for this
> use case and, thus, worth adding as a dependency.
> Strictly speaking, yes, they introduce a new compile-time dependency. But
> during runtime, such a dependency seems equivalent to introducing a JSON
> parser (such as Jackson that is already being used in AK).
>
> Your question is very valid. It's probably worth adding an item under
> rejected alternatives, once we agree how we want to move forward.
>
> Best,
> Konstantine
>
>
>
> On Fri, Jan 25, 2019 at 11:13 PM Ismael Juma <is...@gmail.com> wrote:
>
>> Thanks for the KIP Konstantine. Quick question: introducing a new
>> serialization format (ie flatbuffers) has major implications. Have we
>> considered json? If so, why did we reject it?
>>
>> Ismael
>>
>> On Fri, Jan 11, 2019, 3:44 PM Konstantine Karantasis <
>> konstantine@confluent.io wrote:
>>
>> > Hi all,
>> >
>> > I just published KIP-415: Incremental Cooperative Rebalancing in Kafka
>> > Connect
>> > on the wiki here:
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
>> >
>> > This is the first KIP to suggest an implementation of incremental and
>> > cooperative rebalancing in the context of Kafka Connect. It aims to
>> provide
>> > an adequate solution to the stop-the-world effect that occurs in a
>> Connect
>> > cluster whenever a new connector configuration is submitted or a Connect
>> > Worker is added or removed from the cluster.
>> >
>> > Looking forward to your insightful feedback!
>> >
>> > Regards,
>> > Konstantine
>> >
>>
>