You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Konstantine Karantasis <ko...@confluent.io> on 2019/03/06 22:27:57 UTC

[VOTE] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

I'd like to open the vote on KIP-415: Incremental Cooperative Rebalancing
in Kafka Connect

https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect

a proposal that will allow Kafka Connect to scale significantly the number
of connectors and tasks it can run in a cluster of Connect workers.

Thanks,
Konstantine

Re: [VOTE] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by "McCaig, Rhys" <Rh...@comcast.com>.
+1 (non-binding)

> On Mar 6, 2019, at 3:40 PM, Ryanne Dolan <ry...@gmail.com> wrote:
> 
> +1 (non-binding)
> 
> Thanks!
> Ryanne
> 
> On Wed, Mar 6, 2019, 4:28 PM Konstantine Karantasis <
> konstantine@confluent.io> wrote:
> 
>> I'd like to open the vote on KIP-415: Incremental Cooperative Rebalancing
>> in Kafka Connect
>> 
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
>> 
>> a proposal that will allow Kafka Connect to scale significantly the number
>> of connectors and tasks it can run in a cluster of Connect workers.
>> 
>> Thanks,
>> Konstantine
>> 


Re: [VOTE] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Ryanne Dolan <ry...@gmail.com>.
+1 (non-binding)

Thanks!
Ryanne

On Wed, Mar 6, 2019, 4:28 PM Konstantine Karantasis <
konstantine@confluent.io> wrote:

> I'd like to open the vote on KIP-415: Incremental Cooperative Rebalancing
> in Kafka Connect
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
>
> a proposal that will allow Kafka Connect to scale significantly the number
> of connectors and tasks it can run in a cluster of Connect workers.
>
> Thanks,
> Konstantine
>

Re: [VOTE] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Konstantine Karantasis <ko...@confluent.io>.
Thank you all for the votes and your comments!

KIP-415 has been accepted with +4 binding votes (Guozhang, Jason, Randall,
Ewen) and +4 non-binding votes (Ryanne, Rhys, Robert, Satish).

Best,
Konstantine


On Thu, Mar 14, 2019 at 10:24 PM Satish Duggana <sa...@gmail.com>
wrote:

> Nice work Konstantine!
> +1 (non-binding)
>
> On Fri, Mar 15, 2019 at 7:48 AM Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
>
> > +1 (binding)
> >
> > -Ewen
> >
> > On Wed, Mar 13, 2019 at 2:04 PM Randall Hauch <rh...@gmail.com> wrote:
> >
> > > Excellent work, Konstantine!
> > >
> > > +1 (binding)
> > >
> > > On Mon, Mar 11, 2019 at 8:05 PM Konstantine Karantasis <
> > > konstantine@confluent.io> wrote:
> > >
> > > > Thanks Jason!
> > > > That makes perfect sense. The change is reflected in the KIP now.
> > > > "compatible" will be the default mode for "connect.protocol"
> > > >
> > > > Cheers,
> > > > Konstantine
> > > >
> > > >
> > > > On Mon, Mar 11, 2019 at 4:31 PM Jason Gustafson <ja...@confluent.io>
> > > > wrote:
> > > >
> > > > > +1 Thanks for all the work on this. My only minor comment is that
> > > > > `connect.protocol` probably should be `compatible` by default. The
> > cost
> > > > is
> > > > > low and it will save upgrade confusion.
> > > > >
> > > > > Best,
> > > > > Jason
> > > > >
> > > > > On Fri, Mar 8, 2019 at 10:37 AM Robert Yokota <ra...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Thanks for the great KIP Konstantine!
> > > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > Robert
> > > > > >
> > > > > > On Thu, Mar 7, 2019 at 2:56 PM Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Thanks Konstantine, I've read the updated section on
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> > > > > > > and it lgtm.
> > > > > > >
> > > > > > > I'm +1 on the KIP.
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Mar 7, 2019 at 2:35 PM Konstantine Karantasis <
> > > > > > > konstantine@confluent.io> wrote:
> > > > > > >
> > > > > > > > Thanks Guozhang. This is a valid observation regarding the
> > > current
> > > > > > status
> > > > > > > > of the PR.
> > > > > > > >
> > > > > > > > I updated the KIP to explicitly call out how the downgrade
> > > process
> > > > > > should
> > > > > > > > work in the section Compatibility, Deprecation, and
> Migration.
> > > > > > > >
> > > > > > > > Additionally, I reduced the configuration modes for the
> > > > > > connect.protocol
> > > > > > > to
> > > > > > > > only two: eager and compatible.
> > > > > > > > That's because there's no way at the moment to select a
> > protocol
> > > > > based
> > > > > > on
> > > > > > > > simple majority and not unanimity across at least one option
> > for
> > > > the
> > > > > > > > sub-protocol.
> > > > > > > > Therefore there's no way to lock a group of workers in a
> > > > > > cooperative-only
> > > > > > > > mode at the moment, if we account for accidental joins of
> > workers
> > > > > > running
> > > > > > > > at an older version.
> > > > > > > >
> > > > > > > > The changes have been reflected in the KIP doc and will be
> > > > reflected
> > > > > in
> > > > > > > the
> > > > > > > > PR in a subsequent commit.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Konstantine
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Mar 7, 2019 at 1:17 PM Guozhang Wang <
> > wangguoz@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Konstantine,
> > > > > > > > >
> > > > > > > > > Thanks for the updated KIP and the PR as well (which is
> huge
> > > :) I
> > > > > > > briefly
> > > > > > > > > looked through it as well as the KIP, and I have one minor
> > > > comment
> > > > > to
> > > > > > > add
> > > > > > > > > (otherwise I'm binding +1 on it as well) about the backward
> > > > > > > > compatibility.
> > > > > > > > > I'll use one example to illustrate the issue:
> > > > > > > > >
> > > > > > > > > 1) Suppose you have workerA and B on newer version and
> > > configured
> > > > > the
> > > > > > > > > connect.protocol as "compatible", they will send both V0/V1
> > to
> > > > the
> > > > > > > leader
> > > > > > > > > (say it's workerA) who will choose V1 as the current
> > protocol,
> > > > this
> > > > > > > will
> > > > > > > > be
> > > > > > > > > sent back to A and B who would remember the current
> protocol
> > > > > version
> > > > > > is
> > > > > > > > > already V1. So after this rebalance everyone remembers that
> > V1
> > > > can
> > > > > be
> > > > > > > > used,
> > > > > > > > > which means that upon prepareJoin they will not revoke all
> > the
> > > > > > assigned
> > > > > > > > > tasks.
> > > > > > > > >
> > > > > > > > > 2) Now let's say a new worker joins but with old version V0
> > > > > > > (practically
> > > > > > > > > this is rare, but for illustration purposes some common
> > > scenarios
> > > > > may
> > > > > > > > falls
> > > > > > > > > into this, e.g. an existing worker being downgraded, which
> is
> > > > > > > essentially
> > > > > > > > > as being kicked out of the group, and then rejoined as a
> new
> > > > member
> > > > > > on
> > > > > > > > the
> > > > > > > > > older version), the leader realized that at least one of
> the
> > > > member
> > > > > > > does
> > > > > > > > > not know V1 and hence would fall back to use version V0 to
> > > > perform
> > > > > > > > > assignment. V0 algorithm would do eager rebalance which may
> > > move
> > > > > some
> > > > > > > > tasks
> > > > > > > > > to the new comer immediately from the existing members, as
> it
> > > > > assumes
> > > > > > > > that
> > > > > > > > > everyone would revoke everything before join (a.k.a the
> > > > > sync-barrier)
> > > > > > > but
> > > > > > > > > this is actually not true, since everyone other than the
> old
> > > > > > versioned
> > > > > > > > new
> > > > > > > > > comer would still follow the behavior of V1 --- not
> revoking
> > > > > anything
> > > > > > > ---
> > > > > > > > > before sending the join group request.
> > > > > > > > >
> > > > > > > > > This could be solvable though, e.g. when leader realized
> that
> > > he
> > > > > > needs
> > > > > > > to
> > > > > > > > > use V0, while the previous "currentProtocol" value is V1,
> > > instead
> > > > > of
> > > > > > > just
> > > > > > > > > blindly follow the algorithm of V0 it could just reassign
> the
> > > > > > existing
> > > > > > > > > partitions without migrating anything, while at the same
> time
> > > > tell
> > > > > > > > everyone
> > > > > > > > > that the currentProtocol version is downgraded to V0; and
> > then
> > > > they
> > > > > > can
> > > > > > > > > trigger another rebalance based on V0 where everything will
> > > > revoke
> > > > > > the
> > > > > > > > > tasks before sending join group requests.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > > On Wed, Mar 6, 2019 at 2:28 PM Konstantine Karantasis <
> > > > > > > > > konstantine@confluent.io> wrote:
> > > > > > > > >
> > > > > > > > > > I'd like to open the vote on KIP-415: Incremental
> > Cooperative
> > > > > > > > Rebalancing
> > > > > > > > > > in Kafka Connect
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> > > > > > > > > >
> > > > > > > > > > a proposal that will allow Kafka Connect to scale
> > > significantly
> > > > > the
> > > > > > > > > number
> > > > > > > > > > of connectors and tasks it can run in a cluster of
> Connect
> > > > > workers.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Konstantine
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [VOTE] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Satish Duggana <sa...@gmail.com>.
Nice work Konstantine!
+1 (non-binding)

On Fri, Mar 15, 2019 at 7:48 AM Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> +1 (binding)
>
> -Ewen
>
> On Wed, Mar 13, 2019 at 2:04 PM Randall Hauch <rh...@gmail.com> wrote:
>
> > Excellent work, Konstantine!
> >
> > +1 (binding)
> >
> > On Mon, Mar 11, 2019 at 8:05 PM Konstantine Karantasis <
> > konstantine@confluent.io> wrote:
> >
> > > Thanks Jason!
> > > That makes perfect sense. The change is reflected in the KIP now.
> > > "compatible" will be the default mode for "connect.protocol"
> > >
> > > Cheers,
> > > Konstantine
> > >
> > >
> > > On Mon, Mar 11, 2019 at 4:31 PM Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > +1 Thanks for all the work on this. My only minor comment is that
> > > > `connect.protocol` probably should be `compatible` by default. The
> cost
> > > is
> > > > low and it will save upgrade confusion.
> > > >
> > > > Best,
> > > > Jason
> > > >
> > > > On Fri, Mar 8, 2019 at 10:37 AM Robert Yokota <ra...@gmail.com>
> > > wrote:
> > > >
> > > > > Thanks for the great KIP Konstantine!
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Robert
> > > > >
> > > > > On Thu, Mar 7, 2019 at 2:56 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Thanks Konstantine, I've read the updated section on
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> > > > > > and it lgtm.
> > > > > >
> > > > > > I'm +1 on the KIP.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Thu, Mar 7, 2019 at 2:35 PM Konstantine Karantasis <
> > > > > > konstantine@confluent.io> wrote:
> > > > > >
> > > > > > > Thanks Guozhang. This is a valid observation regarding the
> > current
> > > > > status
> > > > > > > of the PR.
> > > > > > >
> > > > > > > I updated the KIP to explicitly call out how the downgrade
> > process
> > > > > should
> > > > > > > work in the section Compatibility, Deprecation, and Migration.
> > > > > > >
> > > > > > > Additionally, I reduced the configuration modes for the
> > > > > connect.protocol
> > > > > > to
> > > > > > > only two: eager and compatible.
> > > > > > > That's because there's no way at the moment to select a
> protocol
> > > > based
> > > > > on
> > > > > > > simple majority and not unanimity across at least one option
> for
> > > the
> > > > > > > sub-protocol.
> > > > > > > Therefore there's no way to lock a group of workers in a
> > > > > cooperative-only
> > > > > > > mode at the moment, if we account for accidental joins of
> workers
> > > > > running
> > > > > > > at an older version.
> > > > > > >
> > > > > > > The changes have been reflected in the KIP doc and will be
> > > reflected
> > > > in
> > > > > > the
> > > > > > > PR in a subsequent commit.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Konstantine
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Mar 7, 2019 at 1:17 PM Guozhang Wang <
> wangguoz@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Konstantine,
> > > > > > > >
> > > > > > > > Thanks for the updated KIP and the PR as well (which is huge
> > :) I
> > > > > > briefly
> > > > > > > > looked through it as well as the KIP, and I have one minor
> > > comment
> > > > to
> > > > > > add
> > > > > > > > (otherwise I'm binding +1 on it as well) about the backward
> > > > > > > compatibility.
> > > > > > > > I'll use one example to illustrate the issue:
> > > > > > > >
> > > > > > > > 1) Suppose you have workerA and B on newer version and
> > configured
> > > > the
> > > > > > > > connect.protocol as "compatible", they will send both V0/V1
> to
> > > the
> > > > > > leader
> > > > > > > > (say it's workerA) who will choose V1 as the current
> protocol,
> > > this
> > > > > > will
> > > > > > > be
> > > > > > > > sent back to A and B who would remember the current protocol
> > > > version
> > > > > is
> > > > > > > > already V1. So after this rebalance everyone remembers that
> V1
> > > can
> > > > be
> > > > > > > used,
> > > > > > > > which means that upon prepareJoin they will not revoke all
> the
> > > > > assigned
> > > > > > > > tasks.
> > > > > > > >
> > > > > > > > 2) Now let's say a new worker joins but with old version V0
> > > > > > (practically
> > > > > > > > this is rare, but for illustration purposes some common
> > scenarios
> > > > may
> > > > > > > falls
> > > > > > > > into this, e.g. an existing worker being downgraded, which is
> > > > > > essentially
> > > > > > > > as being kicked out of the group, and then rejoined as a new
> > > member
> > > > > on
> > > > > > > the
> > > > > > > > older version), the leader realized that at least one of the
> > > member
> > > > > > does
> > > > > > > > not know V1 and hence would fall back to use version V0 to
> > > perform
> > > > > > > > assignment. V0 algorithm would do eager rebalance which may
> > move
> > > > some
> > > > > > > tasks
> > > > > > > > to the new comer immediately from the existing members, as it
> > > > assumes
> > > > > > > that
> > > > > > > > everyone would revoke everything before join (a.k.a the
> > > > sync-barrier)
> > > > > > but
> > > > > > > > this is actually not true, since everyone other than the old
> > > > > versioned
> > > > > > > new
> > > > > > > > comer would still follow the behavior of V1 --- not revoking
> > > > anything
> > > > > > ---
> > > > > > > > before sending the join group request.
> > > > > > > >
> > > > > > > > This could be solvable though, e.g. when leader realized that
> > he
> > > > > needs
> > > > > > to
> > > > > > > > use V0, while the previous "currentProtocol" value is V1,
> > instead
> > > > of
> > > > > > just
> > > > > > > > blindly follow the algorithm of V0 it could just reassign the
> > > > > existing
> > > > > > > > partitions without migrating anything, while at the same time
> > > tell
> > > > > > > everyone
> > > > > > > > that the currentProtocol version is downgraded to V0; and
> then
> > > they
> > > > > can
> > > > > > > > trigger another rebalance based on V0 where everything will
> > > revoke
> > > > > the
> > > > > > > > tasks before sending join group requests.
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > > On Wed, Mar 6, 2019 at 2:28 PM Konstantine Karantasis <
> > > > > > > > konstantine@confluent.io> wrote:
> > > > > > > >
> > > > > > > > > I'd like to open the vote on KIP-415: Incremental
> Cooperative
> > > > > > > Rebalancing
> > > > > > > > > in Kafka Connect
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> > > > > > > > >
> > > > > > > > > a proposal that will allow Kafka Connect to scale
> > significantly
> > > > the
> > > > > > > > number
> > > > > > > > > of connectors and tasks it can run in a cluster of Connect
> > > > workers.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Konstantine
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [VOTE] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
+1 (binding)

-Ewen

On Wed, Mar 13, 2019 at 2:04 PM Randall Hauch <rh...@gmail.com> wrote:

> Excellent work, Konstantine!
>
> +1 (binding)
>
> On Mon, Mar 11, 2019 at 8:05 PM Konstantine Karantasis <
> konstantine@confluent.io> wrote:
>
> > Thanks Jason!
> > That makes perfect sense. The change is reflected in the KIP now.
> > "compatible" will be the default mode for "connect.protocol"
> >
> > Cheers,
> > Konstantine
> >
> >
> > On Mon, Mar 11, 2019 at 4:31 PM Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > +1 Thanks for all the work on this. My only minor comment is that
> > > `connect.protocol` probably should be `compatible` by default. The cost
> > is
> > > low and it will save upgrade confusion.
> > >
> > > Best,
> > > Jason
> > >
> > > On Fri, Mar 8, 2019 at 10:37 AM Robert Yokota <ra...@gmail.com>
> > wrote:
> > >
> > > > Thanks for the great KIP Konstantine!
> > > >
> > > > +1 (non-binding)
> > > >
> > > > Robert
> > > >
> > > > On Thu, Mar 7, 2019 at 2:56 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > > >
> > > > > Thanks Konstantine, I've read the updated section on
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> > > > > and it lgtm.
> > > > >
> > > > > I'm +1 on the KIP.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Thu, Mar 7, 2019 at 2:35 PM Konstantine Karantasis <
> > > > > konstantine@confluent.io> wrote:
> > > > >
> > > > > > Thanks Guozhang. This is a valid observation regarding the
> current
> > > > status
> > > > > > of the PR.
> > > > > >
> > > > > > I updated the KIP to explicitly call out how the downgrade
> process
> > > > should
> > > > > > work in the section Compatibility, Deprecation, and Migration.
> > > > > >
> > > > > > Additionally, I reduced the configuration modes for the
> > > > connect.protocol
> > > > > to
> > > > > > only two: eager and compatible.
> > > > > > That's because there's no way at the moment to select a protocol
> > > based
> > > > on
> > > > > > simple majority and not unanimity across at least one option for
> > the
> > > > > > sub-protocol.
> > > > > > Therefore there's no way to lock a group of workers in a
> > > > cooperative-only
> > > > > > mode at the moment, if we account for accidental joins of workers
> > > > running
> > > > > > at an older version.
> > > > > >
> > > > > > The changes have been reflected in the KIP doc and will be
> > reflected
> > > in
> > > > > the
> > > > > > PR in a subsequent commit.
> > > > > >
> > > > > > Thanks,
> > > > > > Konstantine
> > > > > >
> > > > > >
> > > > > > On Thu, Mar 7, 2019 at 1:17 PM Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Konstantine,
> > > > > > >
> > > > > > > Thanks for the updated KIP and the PR as well (which is huge
> :) I
> > > > > briefly
> > > > > > > looked through it as well as the KIP, and I have one minor
> > comment
> > > to
> > > > > add
> > > > > > > (otherwise I'm binding +1 on it as well) about the backward
> > > > > > compatibility.
> > > > > > > I'll use one example to illustrate the issue:
> > > > > > >
> > > > > > > 1) Suppose you have workerA and B on newer version and
> configured
> > > the
> > > > > > > connect.protocol as "compatible", they will send both V0/V1 to
> > the
> > > > > leader
> > > > > > > (say it's workerA) who will choose V1 as the current protocol,
> > this
> > > > > will
> > > > > > be
> > > > > > > sent back to A and B who would remember the current protocol
> > > version
> > > > is
> > > > > > > already V1. So after this rebalance everyone remembers that V1
> > can
> > > be
> > > > > > used,
> > > > > > > which means that upon prepareJoin they will not revoke all the
> > > > assigned
> > > > > > > tasks.
> > > > > > >
> > > > > > > 2) Now let's say a new worker joins but with old version V0
> > > > > (practically
> > > > > > > this is rare, but for illustration purposes some common
> scenarios
> > > may
> > > > > > falls
> > > > > > > into this, e.g. an existing worker being downgraded, which is
> > > > > essentially
> > > > > > > as being kicked out of the group, and then rejoined as a new
> > member
> > > > on
> > > > > > the
> > > > > > > older version), the leader realized that at least one of the
> > member
> > > > > does
> > > > > > > not know V1 and hence would fall back to use version V0 to
> > perform
> > > > > > > assignment. V0 algorithm would do eager rebalance which may
> move
> > > some
> > > > > > tasks
> > > > > > > to the new comer immediately from the existing members, as it
> > > assumes
> > > > > > that
> > > > > > > everyone would revoke everything before join (a.k.a the
> > > sync-barrier)
> > > > > but
> > > > > > > this is actually not true, since everyone other than the old
> > > > versioned
> > > > > > new
> > > > > > > comer would still follow the behavior of V1 --- not revoking
> > > anything
> > > > > ---
> > > > > > > before sending the join group request.
> > > > > > >
> > > > > > > This could be solvable though, e.g. when leader realized that
> he
> > > > needs
> > > > > to
> > > > > > > use V0, while the previous "currentProtocol" value is V1,
> instead
> > > of
> > > > > just
> > > > > > > blindly follow the algorithm of V0 it could just reassign the
> > > > existing
> > > > > > > partitions without migrating anything, while at the same time
> > tell
> > > > > > everyone
> > > > > > > that the currentProtocol version is downgraded to V0; and then
> > they
> > > > can
> > > > > > > trigger another rebalance based on V0 where everything will
> > revoke
> > > > the
> > > > > > > tasks before sending join group requests.
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > > On Wed, Mar 6, 2019 at 2:28 PM Konstantine Karantasis <
> > > > > > > konstantine@confluent.io> wrote:
> > > > > > >
> > > > > > > > I'd like to open the vote on KIP-415: Incremental Cooperative
> > > > > > Rebalancing
> > > > > > > > in Kafka Connect
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> > > > > > > >
> > > > > > > > a proposal that will allow Kafka Connect to scale
> significantly
> > > the
> > > > > > > number
> > > > > > > > of connectors and tasks it can run in a cluster of Connect
> > > workers.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Konstantine
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>

Re: [VOTE] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Randall Hauch <rh...@gmail.com>.
Excellent work, Konstantine!

+1 (binding)

On Mon, Mar 11, 2019 at 8:05 PM Konstantine Karantasis <
konstantine@confluent.io> wrote:

> Thanks Jason!
> That makes perfect sense. The change is reflected in the KIP now.
> "compatible" will be the default mode for "connect.protocol"
>
> Cheers,
> Konstantine
>
>
> On Mon, Mar 11, 2019 at 4:31 PM Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > +1 Thanks for all the work on this. My only minor comment is that
> > `connect.protocol` probably should be `compatible` by default. The cost
> is
> > low and it will save upgrade confusion.
> >
> > Best,
> > Jason
> >
> > On Fri, Mar 8, 2019 at 10:37 AM Robert Yokota <ra...@gmail.com>
> wrote:
> >
> > > Thanks for the great KIP Konstantine!
> > >
> > > +1 (non-binding)
> > >
> > > Robert
> > >
> > > On Thu, Mar 7, 2019 at 2:56 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> > >
> > > > Thanks Konstantine, I've read the updated section on
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> > > > and it lgtm.
> > > >
> > > > I'm +1 on the KIP.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, Mar 7, 2019 at 2:35 PM Konstantine Karantasis <
> > > > konstantine@confluent.io> wrote:
> > > >
> > > > > Thanks Guozhang. This is a valid observation regarding the current
> > > status
> > > > > of the PR.
> > > > >
> > > > > I updated the KIP to explicitly call out how the downgrade process
> > > should
> > > > > work in the section Compatibility, Deprecation, and Migration.
> > > > >
> > > > > Additionally, I reduced the configuration modes for the
> > > connect.protocol
> > > > to
> > > > > only two: eager and compatible.
> > > > > That's because there's no way at the moment to select a protocol
> > based
> > > on
> > > > > simple majority and not unanimity across at least one option for
> the
> > > > > sub-protocol.
> > > > > Therefore there's no way to lock a group of workers in a
> > > cooperative-only
> > > > > mode at the moment, if we account for accidental joins of workers
> > > running
> > > > > at an older version.
> > > > >
> > > > > The changes have been reflected in the KIP doc and will be
> reflected
> > in
> > > > the
> > > > > PR in a subsequent commit.
> > > > >
> > > > > Thanks,
> > > > > Konstantine
> > > > >
> > > > >
> > > > > On Thu, Mar 7, 2019 at 1:17 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Konstantine,
> > > > > >
> > > > > > Thanks for the updated KIP and the PR as well (which is huge :) I
> > > > briefly
> > > > > > looked through it as well as the KIP, and I have one minor
> comment
> > to
> > > > add
> > > > > > (otherwise I'm binding +1 on it as well) about the backward
> > > > > compatibility.
> > > > > > I'll use one example to illustrate the issue:
> > > > > >
> > > > > > 1) Suppose you have workerA and B on newer version and configured
> > the
> > > > > > connect.protocol as "compatible", they will send both V0/V1 to
> the
> > > > leader
> > > > > > (say it's workerA) who will choose V1 as the current protocol,
> this
> > > > will
> > > > > be
> > > > > > sent back to A and B who would remember the current protocol
> > version
> > > is
> > > > > > already V1. So after this rebalance everyone remembers that V1
> can
> > be
> > > > > used,
> > > > > > which means that upon prepareJoin they will not revoke all the
> > > assigned
> > > > > > tasks.
> > > > > >
> > > > > > 2) Now let's say a new worker joins but with old version V0
> > > > (practically
> > > > > > this is rare, but for illustration purposes some common scenarios
> > may
> > > > > falls
> > > > > > into this, e.g. an existing worker being downgraded, which is
> > > > essentially
> > > > > > as being kicked out of the group, and then rejoined as a new
> member
> > > on
> > > > > the
> > > > > > older version), the leader realized that at least one of the
> member
> > > > does
> > > > > > not know V1 and hence would fall back to use version V0 to
> perform
> > > > > > assignment. V0 algorithm would do eager rebalance which may move
> > some
> > > > > tasks
> > > > > > to the new comer immediately from the existing members, as it
> > assumes
> > > > > that
> > > > > > everyone would revoke everything before join (a.k.a the
> > sync-barrier)
> > > > but
> > > > > > this is actually not true, since everyone other than the old
> > > versioned
> > > > > new
> > > > > > comer would still follow the behavior of V1 --- not revoking
> > anything
> > > > ---
> > > > > > before sending the join group request.
> > > > > >
> > > > > > This could be solvable though, e.g. when leader realized that he
> > > needs
> > > > to
> > > > > > use V0, while the previous "currentProtocol" value is V1, instead
> > of
> > > > just
> > > > > > blindly follow the algorithm of V0 it could just reassign the
> > > existing
> > > > > > partitions without migrating anything, while at the same time
> tell
> > > > > everyone
> > > > > > that the currentProtocol version is downgraded to V0; and then
> they
> > > can
> > > > > > trigger another rebalance based on V0 where everything will
> revoke
> > > the
> > > > > > tasks before sending join group requests.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Wed, Mar 6, 2019 at 2:28 PM Konstantine Karantasis <
> > > > > > konstantine@confluent.io> wrote:
> > > > > >
> > > > > > > I'd like to open the vote on KIP-415: Incremental Cooperative
> > > > > Rebalancing
> > > > > > > in Kafka Connect
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> > > > > > >
> > > > > > > a proposal that will allow Kafka Connect to scale significantly
> > the
> > > > > > number
> > > > > > > of connectors and tasks it can run in a cluster of Connect
> > workers.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Konstantine
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>

Re: [VOTE] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Konstantine Karantasis <ko...@confluent.io>.
Thanks Jason!
That makes perfect sense. The change is reflected in the KIP now.
"compatible" will be the default mode for "connect.protocol"

Cheers,
Konstantine


On Mon, Mar 11, 2019 at 4:31 PM Jason Gustafson <ja...@confluent.io> wrote:

> +1 Thanks for all the work on this. My only minor comment is that
> `connect.protocol` probably should be `compatible` by default. The cost is
> low and it will save upgrade confusion.
>
> Best,
> Jason
>
> On Fri, Mar 8, 2019 at 10:37 AM Robert Yokota <ra...@gmail.com> wrote:
>
> > Thanks for the great KIP Konstantine!
> >
> > +1 (non-binding)
> >
> > Robert
> >
> > On Thu, Mar 7, 2019 at 2:56 PM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > > Thanks Konstantine, I've read the updated section on
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> > > and it lgtm.
> > >
> > > I'm +1 on the KIP.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Mar 7, 2019 at 2:35 PM Konstantine Karantasis <
> > > konstantine@confluent.io> wrote:
> > >
> > > > Thanks Guozhang. This is a valid observation regarding the current
> > status
> > > > of the PR.
> > > >
> > > > I updated the KIP to explicitly call out how the downgrade process
> > should
> > > > work in the section Compatibility, Deprecation, and Migration.
> > > >
> > > > Additionally, I reduced the configuration modes for the
> > connect.protocol
> > > to
> > > > only two: eager and compatible.
> > > > That's because there's no way at the moment to select a protocol
> based
> > on
> > > > simple majority and not unanimity across at least one option for the
> > > > sub-protocol.
> > > > Therefore there's no way to lock a group of workers in a
> > cooperative-only
> > > > mode at the moment, if we account for accidental joins of workers
> > running
> > > > at an older version.
> > > >
> > > > The changes have been reflected in the KIP doc and will be reflected
> in
> > > the
> > > > PR in a subsequent commit.
> > > >
> > > > Thanks,
> > > > Konstantine
> > > >
> > > >
> > > > On Thu, Mar 7, 2019 at 1:17 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Konstantine,
> > > > >
> > > > > Thanks for the updated KIP and the PR as well (which is huge :) I
> > > briefly
> > > > > looked through it as well as the KIP, and I have one minor comment
> to
> > > add
> > > > > (otherwise I'm binding +1 on it as well) about the backward
> > > > compatibility.
> > > > > I'll use one example to illustrate the issue:
> > > > >
> > > > > 1) Suppose you have workerA and B on newer version and configured
> the
> > > > > connect.protocol as "compatible", they will send both V0/V1 to the
> > > leader
> > > > > (say it's workerA) who will choose V1 as the current protocol, this
> > > will
> > > > be
> > > > > sent back to A and B who would remember the current protocol
> version
> > is
> > > > > already V1. So after this rebalance everyone remembers that V1 can
> be
> > > > used,
> > > > > which means that upon prepareJoin they will not revoke all the
> > assigned
> > > > > tasks.
> > > > >
> > > > > 2) Now let's say a new worker joins but with old version V0
> > > (practically
> > > > > this is rare, but for illustration purposes some common scenarios
> may
> > > > falls
> > > > > into this, e.g. an existing worker being downgraded, which is
> > > essentially
> > > > > as being kicked out of the group, and then rejoined as a new member
> > on
> > > > the
> > > > > older version), the leader realized that at least one of the member
> > > does
> > > > > not know V1 and hence would fall back to use version V0 to perform
> > > > > assignment. V0 algorithm would do eager rebalance which may move
> some
> > > > tasks
> > > > > to the new comer immediately from the existing members, as it
> assumes
> > > > that
> > > > > everyone would revoke everything before join (a.k.a the
> sync-barrier)
> > > but
> > > > > this is actually not true, since everyone other than the old
> > versioned
> > > > new
> > > > > comer would still follow the behavior of V1 --- not revoking
> anything
> > > ---
> > > > > before sending the join group request.
> > > > >
> > > > > This could be solvable though, e.g. when leader realized that he
> > needs
> > > to
> > > > > use V0, while the previous "currentProtocol" value is V1, instead
> of
> > > just
> > > > > blindly follow the algorithm of V0 it could just reassign the
> > existing
> > > > > partitions without migrating anything, while at the same time tell
> > > > everyone
> > > > > that the currentProtocol version is downgraded to V0; and then they
> > can
> > > > > trigger another rebalance based on V0 where everything will revoke
> > the
> > > > > tasks before sending join group requests.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Wed, Mar 6, 2019 at 2:28 PM Konstantine Karantasis <
> > > > > konstantine@confluent.io> wrote:
> > > > >
> > > > > > I'd like to open the vote on KIP-415: Incremental Cooperative
> > > > Rebalancing
> > > > > > in Kafka Connect
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> > > > > >
> > > > > > a proposal that will allow Kafka Connect to scale significantly
> the
> > > > > number
> > > > > > of connectors and tasks it can run in a cluster of Connect
> workers.
> > > > > >
> > > > > > Thanks,
> > > > > > Konstantine
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: [VOTE] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Jason Gustafson <ja...@confluent.io>.
+1 Thanks for all the work on this. My only minor comment is that
`connect.protocol` probably should be `compatible` by default. The cost is
low and it will save upgrade confusion.

Best,
Jason

On Fri, Mar 8, 2019 at 10:37 AM Robert Yokota <ra...@gmail.com> wrote:

> Thanks for the great KIP Konstantine!
>
> +1 (non-binding)
>
> Robert
>
> On Thu, Mar 7, 2019 at 2:56 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Thanks Konstantine, I've read the updated section on
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> > and it lgtm.
> >
> > I'm +1 on the KIP.
> >
> >
> > Guozhang
> >
> >
> > On Thu, Mar 7, 2019 at 2:35 PM Konstantine Karantasis <
> > konstantine@confluent.io> wrote:
> >
> > > Thanks Guozhang. This is a valid observation regarding the current
> status
> > > of the PR.
> > >
> > > I updated the KIP to explicitly call out how the downgrade process
> should
> > > work in the section Compatibility, Deprecation, and Migration.
> > >
> > > Additionally, I reduced the configuration modes for the
> connect.protocol
> > to
> > > only two: eager and compatible.
> > > That's because there's no way at the moment to select a protocol based
> on
> > > simple majority and not unanimity across at least one option for the
> > > sub-protocol.
> > > Therefore there's no way to lock a group of workers in a
> cooperative-only
> > > mode at the moment, if we account for accidental joins of workers
> running
> > > at an older version.
> > >
> > > The changes have been reflected in the KIP doc and will be reflected in
> > the
> > > PR in a subsequent commit.
> > >
> > > Thanks,
> > > Konstantine
> > >
> > >
> > > On Thu, Mar 7, 2019 at 1:17 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> > >
> > > > Hi Konstantine,
> > > >
> > > > Thanks for the updated KIP and the PR as well (which is huge :) I
> > briefly
> > > > looked through it as well as the KIP, and I have one minor comment to
> > add
> > > > (otherwise I'm binding +1 on it as well) about the backward
> > > compatibility.
> > > > I'll use one example to illustrate the issue:
> > > >
> > > > 1) Suppose you have workerA and B on newer version and configured the
> > > > connect.protocol as "compatible", they will send both V0/V1 to the
> > leader
> > > > (say it's workerA) who will choose V1 as the current protocol, this
> > will
> > > be
> > > > sent back to A and B who would remember the current protocol version
> is
> > > > already V1. So after this rebalance everyone remembers that V1 can be
> > > used,
> > > > which means that upon prepareJoin they will not revoke all the
> assigned
> > > > tasks.
> > > >
> > > > 2) Now let's say a new worker joins but with old version V0
> > (practically
> > > > this is rare, but for illustration purposes some common scenarios may
> > > falls
> > > > into this, e.g. an existing worker being downgraded, which is
> > essentially
> > > > as being kicked out of the group, and then rejoined as a new member
> on
> > > the
> > > > older version), the leader realized that at least one of the member
> > does
> > > > not know V1 and hence would fall back to use version V0 to perform
> > > > assignment. V0 algorithm would do eager rebalance which may move some
> > > tasks
> > > > to the new comer immediately from the existing members, as it assumes
> > > that
> > > > everyone would revoke everything before join (a.k.a the sync-barrier)
> > but
> > > > this is actually not true, since everyone other than the old
> versioned
> > > new
> > > > comer would still follow the behavior of V1 --- not revoking anything
> > ---
> > > > before sending the join group request.
> > > >
> > > > This could be solvable though, e.g. when leader realized that he
> needs
> > to
> > > > use V0, while the previous "currentProtocol" value is V1, instead of
> > just
> > > > blindly follow the algorithm of V0 it could just reassign the
> existing
> > > > partitions without migrating anything, while at the same time tell
> > > everyone
> > > > that the currentProtocol version is downgraded to V0; and then they
> can
> > > > trigger another rebalance based on V0 where everything will revoke
> the
> > > > tasks before sending join group requests.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Wed, Mar 6, 2019 at 2:28 PM Konstantine Karantasis <
> > > > konstantine@confluent.io> wrote:
> > > >
> > > > > I'd like to open the vote on KIP-415: Incremental Cooperative
> > > Rebalancing
> > > > > in Kafka Connect
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> > > > >
> > > > > a proposal that will allow Kafka Connect to scale significantly the
> > > > number
> > > > > of connectors and tasks it can run in a cluster of Connect workers.
> > > > >
> > > > > Thanks,
> > > > > Konstantine
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: [VOTE] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Robert Yokota <ra...@gmail.com>.
Thanks for the great KIP Konstantine!

+1 (non-binding)

Robert

On Thu, Mar 7, 2019 at 2:56 PM Guozhang Wang <wa...@gmail.com> wrote:

> Thanks Konstantine, I've read the updated section on
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> and it lgtm.
>
> I'm +1 on the KIP.
>
>
> Guozhang
>
>
> On Thu, Mar 7, 2019 at 2:35 PM Konstantine Karantasis <
> konstantine@confluent.io> wrote:
>
> > Thanks Guozhang. This is a valid observation regarding the current status
> > of the PR.
> >
> > I updated the KIP to explicitly call out how the downgrade process should
> > work in the section Compatibility, Deprecation, and Migration.
> >
> > Additionally, I reduced the configuration modes for the connect.protocol
> to
> > only two: eager and compatible.
> > That's because there's no way at the moment to select a protocol based on
> > simple majority and not unanimity across at least one option for the
> > sub-protocol.
> > Therefore there's no way to lock a group of workers in a cooperative-only
> > mode at the moment, if we account for accidental joins of workers running
> > at an older version.
> >
> > The changes have been reflected in the KIP doc and will be reflected in
> the
> > PR in a subsequent commit.
> >
> > Thanks,
> > Konstantine
> >
> >
> > On Thu, Mar 7, 2019 at 1:17 PM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > > Hi Konstantine,
> > >
> > > Thanks for the updated KIP and the PR as well (which is huge :) I
> briefly
> > > looked through it as well as the KIP, and I have one minor comment to
> add
> > > (otherwise I'm binding +1 on it as well) about the backward
> > compatibility.
> > > I'll use one example to illustrate the issue:
> > >
> > > 1) Suppose you have workerA and B on newer version and configured the
> > > connect.protocol as "compatible", they will send both V0/V1 to the
> leader
> > > (say it's workerA) who will choose V1 as the current protocol, this
> will
> > be
> > > sent back to A and B who would remember the current protocol version is
> > > already V1. So after this rebalance everyone remembers that V1 can be
> > used,
> > > which means that upon prepareJoin they will not revoke all the assigned
> > > tasks.
> > >
> > > 2) Now let's say a new worker joins but with old version V0
> (practically
> > > this is rare, but for illustration purposes some common scenarios may
> > falls
> > > into this, e.g. an existing worker being downgraded, which is
> essentially
> > > as being kicked out of the group, and then rejoined as a new member on
> > the
> > > older version), the leader realized that at least one of the member
> does
> > > not know V1 and hence would fall back to use version V0 to perform
> > > assignment. V0 algorithm would do eager rebalance which may move some
> > tasks
> > > to the new comer immediately from the existing members, as it assumes
> > that
> > > everyone would revoke everything before join (a.k.a the sync-barrier)
> but
> > > this is actually not true, since everyone other than the old versioned
> > new
> > > comer would still follow the behavior of V1 --- not revoking anything
> ---
> > > before sending the join group request.
> > >
> > > This could be solvable though, e.g. when leader realized that he needs
> to
> > > use V0, while the previous "currentProtocol" value is V1, instead of
> just
> > > blindly follow the algorithm of V0 it could just reassign the existing
> > > partitions without migrating anything, while at the same time tell
> > everyone
> > > that the currentProtocol version is downgraded to V0; and then they can
> > > trigger another rebalance based on V0 where everything will revoke the
> > > tasks before sending join group requests.
> > >
> > >
> > > Guozhang
> > >
> > > On Wed, Mar 6, 2019 at 2:28 PM Konstantine Karantasis <
> > > konstantine@confluent.io> wrote:
> > >
> > > > I'd like to open the vote on KIP-415: Incremental Cooperative
> > Rebalancing
> > > > in Kafka Connect
> > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> > > >
> > > > a proposal that will allow Kafka Connect to scale significantly the
> > > number
> > > > of connectors and tasks it can run in a cluster of Connect workers.
> > > >
> > > > Thanks,
> > > > Konstantine
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Re: [VOTE] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks Konstantine, I've read the updated section on
https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
and it lgtm.

I'm +1 on the KIP.


Guozhang


On Thu, Mar 7, 2019 at 2:35 PM Konstantine Karantasis <
konstantine@confluent.io> wrote:

> Thanks Guozhang. This is a valid observation regarding the current status
> of the PR.
>
> I updated the KIP to explicitly call out how the downgrade process should
> work in the section Compatibility, Deprecation, and Migration.
>
> Additionally, I reduced the configuration modes for the connect.protocol to
> only two: eager and compatible.
> That's because there's no way at the moment to select a protocol based on
> simple majority and not unanimity across at least one option for the
> sub-protocol.
> Therefore there's no way to lock a group of workers in a cooperative-only
> mode at the moment, if we account for accidental joins of workers running
> at an older version.
>
> The changes have been reflected in the KIP doc and will be reflected in the
> PR in a subsequent commit.
>
> Thanks,
> Konstantine
>
>
> On Thu, Mar 7, 2019 at 1:17 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Konstantine,
> >
> > Thanks for the updated KIP and the PR as well (which is huge :) I briefly
> > looked through it as well as the KIP, and I have one minor comment to add
> > (otherwise I'm binding +1 on it as well) about the backward
> compatibility.
> > I'll use one example to illustrate the issue:
> >
> > 1) Suppose you have workerA and B on newer version and configured the
> > connect.protocol as "compatible", they will send both V0/V1 to the leader
> > (say it's workerA) who will choose V1 as the current protocol, this will
> be
> > sent back to A and B who would remember the current protocol version is
> > already V1. So after this rebalance everyone remembers that V1 can be
> used,
> > which means that upon prepareJoin they will not revoke all the assigned
> > tasks.
> >
> > 2) Now let's say a new worker joins but with old version V0 (practically
> > this is rare, but for illustration purposes some common scenarios may
> falls
> > into this, e.g. an existing worker being downgraded, which is essentially
> > as being kicked out of the group, and then rejoined as a new member on
> the
> > older version), the leader realized that at least one of the member does
> > not know V1 and hence would fall back to use version V0 to perform
> > assignment. V0 algorithm would do eager rebalance which may move some
> tasks
> > to the new comer immediately from the existing members, as it assumes
> that
> > everyone would revoke everything before join (a.k.a the sync-barrier) but
> > this is actually not true, since everyone other than the old versioned
> new
> > comer would still follow the behavior of V1 --- not revoking anything ---
> > before sending the join group request.
> >
> > This could be solvable though, e.g. when leader realized that he needs to
> > use V0, while the previous "currentProtocol" value is V1, instead of just
> > blindly follow the algorithm of V0 it could just reassign the existing
> > partitions without migrating anything, while at the same time tell
> everyone
> > that the currentProtocol version is downgraded to V0; and then they can
> > trigger another rebalance based on V0 where everything will revoke the
> > tasks before sending join group requests.
> >
> >
> > Guozhang
> >
> > On Wed, Mar 6, 2019 at 2:28 PM Konstantine Karantasis <
> > konstantine@confluent.io> wrote:
> >
> > > I'd like to open the vote on KIP-415: Incremental Cooperative
> Rebalancing
> > > in Kafka Connect
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> > >
> > > a proposal that will allow Kafka Connect to scale significantly the
> > number
> > > of connectors and tasks it can run in a cluster of Connect workers.
> > >
> > > Thanks,
> > > Konstantine
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: [VOTE] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Konstantine Karantasis <ko...@confluent.io>.
Thanks Guozhang. This is a valid observation regarding the current status
of the PR.

I updated the KIP to explicitly call out how the downgrade process should
work in the section Compatibility, Deprecation, and Migration.

Additionally, I reduced the configuration modes for the connect.protocol to
only two: eager and compatible.
That's because there's no way at the moment to select a protocol based on
simple majority and not unanimity across at least one option for the
sub-protocol.
Therefore there's no way to lock a group of workers in a cooperative-only
mode at the moment, if we account for accidental joins of workers running
at an older version.

The changes have been reflected in the KIP doc and will be reflected in the
PR in a subsequent commit.

Thanks,
Konstantine


On Thu, Mar 7, 2019 at 1:17 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Konstantine,
>
> Thanks for the updated KIP and the PR as well (which is huge :) I briefly
> looked through it as well as the KIP, and I have one minor comment to add
> (otherwise I'm binding +1 on it as well) about the backward compatibility.
> I'll use one example to illustrate the issue:
>
> 1) Suppose you have workerA and B on newer version and configured the
> connect.protocol as "compatible", they will send both V0/V1 to the leader
> (say it's workerA) who will choose V1 as the current protocol, this will be
> sent back to A and B who would remember the current protocol version is
> already V1. So after this rebalance everyone remembers that V1 can be used,
> which means that upon prepareJoin they will not revoke all the assigned
> tasks.
>
> 2) Now let's say a new worker joins but with old version V0 (practically
> this is rare, but for illustration purposes some common scenarios may falls
> into this, e.g. an existing worker being downgraded, which is essentially
> as being kicked out of the group, and then rejoined as a new member on the
> older version), the leader realized that at least one of the member does
> not know V1 and hence would fall back to use version V0 to perform
> assignment. V0 algorithm would do eager rebalance which may move some tasks
> to the new comer immediately from the existing members, as it assumes that
> everyone would revoke everything before join (a.k.a the sync-barrier) but
> this is actually not true, since everyone other than the old versioned new
> comer would still follow the behavior of V1 --- not revoking anything ---
> before sending the join group request.
>
> This could be solvable though, e.g. when leader realized that he needs to
> use V0, while the previous "currentProtocol" value is V1, instead of just
> blindly follow the algorithm of V0 it could just reassign the existing
> partitions without migrating anything, while at the same time tell everyone
> that the currentProtocol version is downgraded to V0; and then they can
> trigger another rebalance based on V0 where everything will revoke the
> tasks before sending join group requests.
>
>
> Guozhang
>
> On Wed, Mar 6, 2019 at 2:28 PM Konstantine Karantasis <
> konstantine@confluent.io> wrote:
>
> > I'd like to open the vote on KIP-415: Incremental Cooperative Rebalancing
> > in Kafka Connect
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
> >
> > a proposal that will allow Kafka Connect to scale significantly the
> number
> > of connectors and tasks it can run in a cluster of Connect workers.
> >
> > Thanks,
> > Konstantine
> >
>
>
> --
> -- Guozhang
>

Re: [VOTE] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Konstantine,

Thanks for the updated KIP and the PR as well (which is huge :) I briefly
looked through it as well as the KIP, and I have one minor comment to add
(otherwise I'm binding +1 on it as well) about the backward compatibility.
I'll use one example to illustrate the issue:

1) Suppose you have workerA and B on newer version and configured the
connect.protocol as "compatible", they will send both V0/V1 to the leader
(say it's workerA) who will choose V1 as the current protocol, this will be
sent back to A and B who would remember the current protocol version is
already V1. So after this rebalance everyone remembers that V1 can be used,
which means that upon prepareJoin they will not revoke all the assigned
tasks.

2) Now let's say a new worker joins but with old version V0 (practically
this is rare, but for illustration purposes some common scenarios may falls
into this, e.g. an existing worker being downgraded, which is essentially
as being kicked out of the group, and then rejoined as a new member on the
older version), the leader realized that at least one of the member does
not know V1 and hence would fall back to use version V0 to perform
assignment. V0 algorithm would do eager rebalance which may move some tasks
to the new comer immediately from the existing members, as it assumes that
everyone would revoke everything before join (a.k.a the sync-barrier) but
this is actually not true, since everyone other than the old versioned new
comer would still follow the behavior of V1 --- not revoking anything ---
before sending the join group request.

This could be solvable though, e.g. when leader realized that he needs to
use V0, while the previous "currentProtocol" value is V1, instead of just
blindly follow the algorithm of V0 it could just reassign the existing
partitions without migrating anything, while at the same time tell everyone
that the currentProtocol version is downgraded to V0; and then they can
trigger another rebalance based on V0 where everything will revoke the
tasks before sending join group requests.


Guozhang

On Wed, Mar 6, 2019 at 2:28 PM Konstantine Karantasis <
konstantine@confluent.io> wrote:

> I'd like to open the vote on KIP-415: Incremental Cooperative Rebalancing
> in Kafka Connect
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
>
> a proposal that will allow Kafka Connect to scale significantly the number
> of connectors and tasks it can run in a cluster of Connect workers.
>
> Thanks,
> Konstantine
>


-- 
-- Guozhang