You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Oleksiy Krivoshey <ol...@gmail.com> on 2015/12/23 18:44:51 UTC

Protocol version upgrades in 0.9

Hi,

I can't understand how the protocol upgrades (to newer version) should
work. When I send GroupJoinRequest with a list of assignment protocols
(same protocol name, different versions) always the first protocol/version
gets picked up as a member version. Even if all consumers in the group are
configured with two versions still always the first specified version will
be selected by coordinator and not the one with highest version number.

So for example:
consumer1: [ {name:strategyX, version: 0}, {name: strategyX, version: 1} ]
consumer2: [ {name:strategyX, version: 0}, {name: strategyX, version: 1} ]

Both will be assigned a version 0 in a response to leader. If I make it
this way:

consumer1: [ {name:strategyX, version: 1}, {name: strategyX, version: 0} ]
consumer2: [ {name:strategyX, version: 1}, {name: strategyX, version: 0} ]

Both will be assigned version 1.

In this case:

consumer1: [ {name:strategyX, version: 10}, {name: strategyX, version: 1} ]
consumer2: [ {name:strategyX, version: 20}, {name: strategyX, version: 1} ]

Kafka will endlessly try to rebalance the group without success because
consumer1 will have version:10 and consumer2 - version:20 in a
GroupJoinResponse.

Can anyone please explain the process of the protocol version upgrade?

Re: Protocol version upgrades in 0.9

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Although, also note that this is only necessary if you want to exactly
mirror the Java client implementation. The consumer protocol can be
implemented however you like in your library (although obviously the Java
implementation is a good reference). The only reason you'd need to match it
exactly is if you want to be able to mix consumers using different
libraries in the same consumer group (consumers in different groups using
different libraries should always be fine).

-Ewen

On Mon, Dec 28, 2015 at 4:16 PM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> Yes, version=0 should work. You might take a look at
> o.a.k.clients.consumer.internals.ConsumerProtocol if there are any other
> details that aren't clear from the protocol wiki page.
>
> -Ewen
>
> On Thu, Dec 24, 2015 at 3:32 AM, Oleksiy Krivoshey <ol...@gmail.com>
> wrote:
>
>> Hi Ewen,
>>
>> Thanks for detailed explanation. So its basically the version of
>> MemberAssignment structure, not the version of the assignment strategy as
>> I
>> thought. Should I use version=0 in protocol exchanges for now? (I'm
>> building a client in Node.js for 0.9:
>> https://github.com/oleksiyk/kafka/blob/master/lib/group_consumer.js )
>>
>> On Thu, 24 Dec 2015 at 10:15 Ewen Cheslack-Postava <ew...@confluent.io>
>> wrote:
>>
>> > Oleksiy,
>> >
>> > The join group protocol is general enough to handle multiple types of
>> group
>> > membership, not just consumers. This is used in Kafka Connect to form a
>> > group of workers (which, instead of splitting topic partitions between
>> > members splits connector tasks).
>> >
>> > In order to make this work and allow flexibility in how assignment is
>> > handled, the protocol is divided into two layers. The primary join group
>> > protocol only a) keeps track of group membership and b) selects a group
>> > protocol that all members agree they can work with. At this level,
>> there's
>> > no version information, no info about consumer subscriptions, and no
>> > knowledge of partition assignment strategies other than the names and
>> > opaque metadata submitted by clients.
>> >
>> > The "embedded" layer is where the version info you're setting is
>> specified.
>> > This is never even parsed by the brokers -- the information is collected
>> > and sent to one of the group members which then decodes it and
>> determines
>> > the assignment info. That result is then returned to the broker which
>> > disseminates the information (and again, the broker never decodes this,
>> it
>> > just forwards the appropriate info to each member).
>> >
>> > The version is included specifically in the consumer protocol to allow
>> us
>> > to extend the format in the future. For example, if we needed to add or
>> > change the way subscriptions are expressed, we could increase that
>> version
>> > number and update the message format. In other words, it is the
>> mechanism
>> > we have chosen *only for the consumer embedded protocol* to allow
>> metadata
>> > format changes. (Note that for the consumer embedded protocol there is
>> also
>> > *yet another* layer of data, called "UserData" in that protocol
>> > documentation; this is custom data the partition assignment strategy in
>> the
>> > consumer, which is pluggable, might want include, e.g. if you were doing
>> > resource-based assignment you might need to include info like # of cpus,
>> > which is specific to that assignment strategy).
>> >
>> > The broker only looks at the ProtocolName (which is equivalent to
>> > AssignmentStrategy for consumers) when choosing which protocol to use
>> for
>> > consumers. If you want to version those in an incompatible way (i.e. you
>> > can't handle the change just by updating the format of your metadata),
>> you
>> > should include version info in the ProtocolName itself to ensure the
>> group
>> > coordinator broker can differentiate them, e.g. round-robin vs
>> > round-robin-2. But you should also think carefully about whether that
>> > change is necessary -- in many cases if you're not adding any metadata
>> > you'll be fine just keeping the same name since one member is selected
>> to
>> > perform the assignment and every other member just needs to respect
>> > whatever assignment it makes. And of course if you're just trying to
>> switch
>> > to a completely different assignment strategy (e.g. from range ->
>> > round-robin), then the name itself is enough. Just bounce all consumers
>> > adding round-robin as an option, then bounce them all removing range.
>> >
>> > We considered other options when designing this protocol, but decided
>> this
>> > was the best tradeoff. The current protocol is already pretty complex
>> and
>> > multi-layered and the alternatives that tried to build in versioning at
>> > this level too were even more complex and confusing.
>> >
>> > -Ewen
>> >
>> >
>> >
>> > On Wed, Dec 23, 2015 at 10:45 PM, Oleksiy Krivoshey <oleksiyk@gmail.com
>> >
>> > wrote:
>> >
>> > > Hi Ewen,
>> > >
>> > > I specify version in ProtocolMetadata structure, as per this document:
>> > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupResponse
>> > >
>> > > ---------------
>> > > ProtocolType => "consumer"
>> > >
>> > > ProtocolName => AssignmentStrategy
>> > >   AssignmentStrategy => string
>> > >
>> > > ProtocolMetadata => Version Subscription UserData
>> > >   Version => int16
>> > >   Subscription => [Topic]
>> > >     Topic => string
>> > >   UserData => bytes
>> > > -----------------
>> > >
>> > > Maybe I misunderstood the purpose of this version field?
>> > >
>> > > On Thu, 24 Dec 2015 at 00:27 Ewen Cheslack-Postava <ewen@confluent.io
>> >
>> > > wrote:
>> > >
>> > > > Oleksiy,
>> > > >
>> > > > Where are you specifying the version? Unless I'm missing something,
>> the
>> > > > JoinGroup protocol doesn't include versions so I'm not sure I
>> > understand
>> > > > the examples you are giving. Are the version numbers included in the
>> > > > per-protocol metadata?
>> > > >
>> > > > You can see exactly how the consumer coordinator on the broker
>> selects
>> > > the
>> > > > protocol here:
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L179
>> > > > It is just taking the candidate protocols (ones that are available
>> for
>> > > all
>> > > > consumers), then has each consumer "vote" by selecting whichever
>> > > candidate
>> > > > appears in its list of strategies first, then uses the one with the
>> > most
>> > > > votes.
>> > > >
>> > > > Is it possible your example is behaving the way it is because it
>> > actually
>> > > > has duplicates for "strategyX", and in the last case it chooses the
>> > first
>> > > > strategyX despite the conflicting versions?
>> > > >
>> > > > -Ewen
>> > > >
>> > > > On Wed, Dec 23, 2015 at 9:44 AM, Oleksiy Krivoshey <
>> oleksiyk@gmail.com
>> > >
>> > > > wrote:
>> > > >
>> > > > > Hi,
>> > > > >
>> > > > > I can't understand how the protocol upgrades (to newer version)
>> > should
>> > > > > work. When I send GroupJoinRequest with a list of assignment
>> > protocols
>> > > > > (same protocol name, different versions) always the first
>> > > > protocol/version
>> > > > > gets picked up as a member version. Even if all consumers in the
>> > group
>> > > > are
>> > > > > configured with two versions still always the first specified
>> version
>> > > > will
>> > > > > be selected by coordinator and not the one with highest version
>> > number.
>> > > > >
>> > > > > So for example:
>> > > > > consumer1: [ {name:strategyX, version: 0}, {name: strategyX,
>> version:
>> > > 1}
>> > > > ]
>> > > > > consumer2: [ {name:strategyX, version: 0}, {name: strategyX,
>> version:
>> > > 1}
>> > > > ]
>> > > > >
>> > > > > Both will be assigned a version 0 in a response to leader. If I
>> make
>> > it
>> > > > > this way:
>> > > > >
>> > > > > consumer1: [ {name:strategyX, version: 1}, {name: strategyX,
>> version:
>> > > 0}
>> > > > ]
>> > > > > consumer2: [ {name:strategyX, version: 1}, {name: strategyX,
>> version:
>> > > 0}
>> > > > ]
>> > > > >
>> > > > > Both will be assigned version 1.
>> > > > >
>> > > > > In this case:
>> > > > >
>> > > > > consumer1: [ {name:strategyX, version: 10}, {name: strategyX,
>> > version:
>> > > > 1} ]
>> > > > > consumer2: [ {name:strategyX, version: 20}, {name: strategyX,
>> > version:
>> > > > 1} ]
>> > > > >
>> > > > > Kafka will endlessly try to rebalance the group without success
>> > because
>> > > > > consumer1 will have version:10 and consumer2 - version:20 in a
>> > > > > GroupJoinResponse.
>> > > > >
>> > > > > Can anyone please explain the process of the protocol version
>> > upgrade?
>> > > > >
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > Thanks,
>> > > > Ewen
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > Thanks,
>> > Ewen
>> >
>>
>
>
>
> --
> Thanks,
> Ewen
>



-- 
Thanks,
Ewen

Re: Protocol version upgrades in 0.9

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Yes, version=0 should work. You might take a look at
o.a.k.clients.consumer.internals.ConsumerProtocol if there are any other
details that aren't clear from the protocol wiki page.

-Ewen

On Thu, Dec 24, 2015 at 3:32 AM, Oleksiy Krivoshey <ol...@gmail.com>
wrote:

> Hi Ewen,
>
> Thanks for detailed explanation. So its basically the version of
> MemberAssignment structure, not the version of the assignment strategy as I
> thought. Should I use version=0 in protocol exchanges for now? (I'm
> building a client in Node.js for 0.9:
> https://github.com/oleksiyk/kafka/blob/master/lib/group_consumer.js )
>
> On Thu, 24 Dec 2015 at 10:15 Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
>
> > Oleksiy,
> >
> > The join group protocol is general enough to handle multiple types of
> group
> > membership, not just consumers. This is used in Kafka Connect to form a
> > group of workers (which, instead of splitting topic partitions between
> > members splits connector tasks).
> >
> > In order to make this work and allow flexibility in how assignment is
> > handled, the protocol is divided into two layers. The primary join group
> > protocol only a) keeps track of group membership and b) selects a group
> > protocol that all members agree they can work with. At this level,
> there's
> > no version information, no info about consumer subscriptions, and no
> > knowledge of partition assignment strategies other than the names and
> > opaque metadata submitted by clients.
> >
> > The "embedded" layer is where the version info you're setting is
> specified.
> > This is never even parsed by the brokers -- the information is collected
> > and sent to one of the group members which then decodes it and determines
> > the assignment info. That result is then returned to the broker which
> > disseminates the information (and again, the broker never decodes this,
> it
> > just forwards the appropriate info to each member).
> >
> > The version is included specifically in the consumer protocol to allow us
> > to extend the format in the future. For example, if we needed to add or
> > change the way subscriptions are expressed, we could increase that
> version
> > number and update the message format. In other words, it is the mechanism
> > we have chosen *only for the consumer embedded protocol* to allow
> metadata
> > format changes. (Note that for the consumer embedded protocol there is
> also
> > *yet another* layer of data, called "UserData" in that protocol
> > documentation; this is custom data the partition assignment strategy in
> the
> > consumer, which is pluggable, might want include, e.g. if you were doing
> > resource-based assignment you might need to include info like # of cpus,
> > which is specific to that assignment strategy).
> >
> > The broker only looks at the ProtocolName (which is equivalent to
> > AssignmentStrategy for consumers) when choosing which protocol to use for
> > consumers. If you want to version those in an incompatible way (i.e. you
> > can't handle the change just by updating the format of your metadata),
> you
> > should include version info in the ProtocolName itself to ensure the
> group
> > coordinator broker can differentiate them, e.g. round-robin vs
> > round-robin-2. But you should also think carefully about whether that
> > change is necessary -- in many cases if you're not adding any metadata
> > you'll be fine just keeping the same name since one member is selected to
> > perform the assignment and every other member just needs to respect
> > whatever assignment it makes. And of course if you're just trying to
> switch
> > to a completely different assignment strategy (e.g. from range ->
> > round-robin), then the name itself is enough. Just bounce all consumers
> > adding round-robin as an option, then bounce them all removing range.
> >
> > We considered other options when designing this protocol, but decided
> this
> > was the best tradeoff. The current protocol is already pretty complex and
> > multi-layered and the alternatives that tried to build in versioning at
> > this level too were even more complex and confusing.
> >
> > -Ewen
> >
> >
> >
> > On Wed, Dec 23, 2015 at 10:45 PM, Oleksiy Krivoshey <ol...@gmail.com>
> > wrote:
> >
> > > Hi Ewen,
> > >
> > > I specify version in ProtocolMetadata structure, as per this document:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupResponse
> > >
> > > ---------------
> > > ProtocolType => "consumer"
> > >
> > > ProtocolName => AssignmentStrategy
> > >   AssignmentStrategy => string
> > >
> > > ProtocolMetadata => Version Subscription UserData
> > >   Version => int16
> > >   Subscription => [Topic]
> > >     Topic => string
> > >   UserData => bytes
> > > -----------------
> > >
> > > Maybe I misunderstood the purpose of this version field?
> > >
> > > On Thu, 24 Dec 2015 at 00:27 Ewen Cheslack-Postava <ew...@confluent.io>
> > > wrote:
> > >
> > > > Oleksiy,
> > > >
> > > > Where are you specifying the version? Unless I'm missing something,
> the
> > > > JoinGroup protocol doesn't include versions so I'm not sure I
> > understand
> > > > the examples you are giving. Are the version numbers included in the
> > > > per-protocol metadata?
> > > >
> > > > You can see exactly how the consumer coordinator on the broker
> selects
> > > the
> > > > protocol here:
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L179
> > > > It is just taking the candidate protocols (ones that are available
> for
> > > all
> > > > consumers), then has each consumer "vote" by selecting whichever
> > > candidate
> > > > appears in its list of strategies first, then uses the one with the
> > most
> > > > votes.
> > > >
> > > > Is it possible your example is behaving the way it is because it
> > actually
> > > > has duplicates for "strategyX", and in the last case it chooses the
> > first
> > > > strategyX despite the conflicting versions?
> > > >
> > > > -Ewen
> > > >
> > > > On Wed, Dec 23, 2015 at 9:44 AM, Oleksiy Krivoshey <
> oleksiyk@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I can't understand how the protocol upgrades (to newer version)
> > should
> > > > > work. When I send GroupJoinRequest with a list of assignment
> > protocols
> > > > > (same protocol name, different versions) always the first
> > > > protocol/version
> > > > > gets picked up as a member version. Even if all consumers in the
> > group
> > > > are
> > > > > configured with two versions still always the first specified
> version
> > > > will
> > > > > be selected by coordinator and not the one with highest version
> > number.
> > > > >
> > > > > So for example:
> > > > > consumer1: [ {name:strategyX, version: 0}, {name: strategyX,
> version:
> > > 1}
> > > > ]
> > > > > consumer2: [ {name:strategyX, version: 0}, {name: strategyX,
> version:
> > > 1}
> > > > ]
> > > > >
> > > > > Both will be assigned a version 0 in a response to leader. If I
> make
> > it
> > > > > this way:
> > > > >
> > > > > consumer1: [ {name:strategyX, version: 1}, {name: strategyX,
> version:
> > > 0}
> > > > ]
> > > > > consumer2: [ {name:strategyX, version: 1}, {name: strategyX,
> version:
> > > 0}
> > > > ]
> > > > >
> > > > > Both will be assigned version 1.
> > > > >
> > > > > In this case:
> > > > >
> > > > > consumer1: [ {name:strategyX, version: 10}, {name: strategyX,
> > version:
> > > > 1} ]
> > > > > consumer2: [ {name:strategyX, version: 20}, {name: strategyX,
> > version:
> > > > 1} ]
> > > > >
> > > > > Kafka will endlessly try to rebalance the group without success
> > because
> > > > > consumer1 will have version:10 and consumer2 - version:20 in a
> > > > > GroupJoinResponse.
> > > > >
> > > > > Can anyone please explain the process of the protocol version
> > upgrade?
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Ewen
> > > >
> > >
> >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>



-- 
Thanks,
Ewen

Re: Protocol version upgrades in 0.9

Posted by Oleksiy Krivoshey <ol...@gmail.com>.
Hi Ewen,

Thanks for detailed explanation. So its basically the version of
MemberAssignment structure, not the version of the assignment strategy as I
thought. Should I use version=0 in protocol exchanges for now? (I'm
building a client in Node.js for 0.9:
https://github.com/oleksiyk/kafka/blob/master/lib/group_consumer.js )

On Thu, 24 Dec 2015 at 10:15 Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> Oleksiy,
>
> The join group protocol is general enough to handle multiple types of group
> membership, not just consumers. This is used in Kafka Connect to form a
> group of workers (which, instead of splitting topic partitions between
> members splits connector tasks).
>
> In order to make this work and allow flexibility in how assignment is
> handled, the protocol is divided into two layers. The primary join group
> protocol only a) keeps track of group membership and b) selects a group
> protocol that all members agree they can work with. At this level, there's
> no version information, no info about consumer subscriptions, and no
> knowledge of partition assignment strategies other than the names and
> opaque metadata submitted by clients.
>
> The "embedded" layer is where the version info you're setting is specified.
> This is never even parsed by the brokers -- the information is collected
> and sent to one of the group members which then decodes it and determines
> the assignment info. That result is then returned to the broker which
> disseminates the information (and again, the broker never decodes this, it
> just forwards the appropriate info to each member).
>
> The version is included specifically in the consumer protocol to allow us
> to extend the format in the future. For example, if we needed to add or
> change the way subscriptions are expressed, we could increase that version
> number and update the message format. In other words, it is the mechanism
> we have chosen *only for the consumer embedded protocol* to allow metadata
> format changes. (Note that for the consumer embedded protocol there is also
> *yet another* layer of data, called "UserData" in that protocol
> documentation; this is custom data the partition assignment strategy in the
> consumer, which is pluggable, might want include, e.g. if you were doing
> resource-based assignment you might need to include info like # of cpus,
> which is specific to that assignment strategy).
>
> The broker only looks at the ProtocolName (which is equivalent to
> AssignmentStrategy for consumers) when choosing which protocol to use for
> consumers. If you want to version those in an incompatible way (i.e. you
> can't handle the change just by updating the format of your metadata), you
> should include version info in the ProtocolName itself to ensure the group
> coordinator broker can differentiate them, e.g. round-robin vs
> round-robin-2. But you should also think carefully about whether that
> change is necessary -- in many cases if you're not adding any metadata
> you'll be fine just keeping the same name since one member is selected to
> perform the assignment and every other member just needs to respect
> whatever assignment it makes. And of course if you're just trying to switch
> to a completely different assignment strategy (e.g. from range ->
> round-robin), then the name itself is enough. Just bounce all consumers
> adding round-robin as an option, then bounce them all removing range.
>
> We considered other options when designing this protocol, but decided this
> was the best tradeoff. The current protocol is already pretty complex and
> multi-layered and the alternatives that tried to build in versioning at
> this level too were even more complex and confusing.
>
> -Ewen
>
>
>
> On Wed, Dec 23, 2015 at 10:45 PM, Oleksiy Krivoshey <ol...@gmail.com>
> wrote:
>
> > Hi Ewen,
> >
> > I specify version in ProtocolMetadata structure, as per this document:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupResponse
> >
> > ---------------
> > ProtocolType => "consumer"
> >
> > ProtocolName => AssignmentStrategy
> >   AssignmentStrategy => string
> >
> > ProtocolMetadata => Version Subscription UserData
> >   Version => int16
> >   Subscription => [Topic]
> >     Topic => string
> >   UserData => bytes
> > -----------------
> >
> > Maybe I misunderstood the purpose of this version field?
> >
> > On Thu, 24 Dec 2015 at 00:27 Ewen Cheslack-Postava <ew...@confluent.io>
> > wrote:
> >
> > > Oleksiy,
> > >
> > > Where are you specifying the version? Unless I'm missing something, the
> > > JoinGroup protocol doesn't include versions so I'm not sure I
> understand
> > > the examples you are giving. Are the version numbers included in the
> > > per-protocol metadata?
> > >
> > > You can see exactly how the consumer coordinator on the broker selects
> > the
> > > protocol here:
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L179
> > > It is just taking the candidate protocols (ones that are available for
> > all
> > > consumers), then has each consumer "vote" by selecting whichever
> > candidate
> > > appears in its list of strategies first, then uses the one with the
> most
> > > votes.
> > >
> > > Is it possible your example is behaving the way it is because it
> actually
> > > has duplicates for "strategyX", and in the last case it chooses the
> first
> > > strategyX despite the conflicting versions?
> > >
> > > -Ewen
> > >
> > > On Wed, Dec 23, 2015 at 9:44 AM, Oleksiy Krivoshey <oleksiyk@gmail.com
> >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I can't understand how the protocol upgrades (to newer version)
> should
> > > > work. When I send GroupJoinRequest with a list of assignment
> protocols
> > > > (same protocol name, different versions) always the first
> > > protocol/version
> > > > gets picked up as a member version. Even if all consumers in the
> group
> > > are
> > > > configured with two versions still always the first specified version
> > > will
> > > > be selected by coordinator and not the one with highest version
> number.
> > > >
> > > > So for example:
> > > > consumer1: [ {name:strategyX, version: 0}, {name: strategyX, version:
> > 1}
> > > ]
> > > > consumer2: [ {name:strategyX, version: 0}, {name: strategyX, version:
> > 1}
> > > ]
> > > >
> > > > Both will be assigned a version 0 in a response to leader. If I make
> it
> > > > this way:
> > > >
> > > > consumer1: [ {name:strategyX, version: 1}, {name: strategyX, version:
> > 0}
> > > ]
> > > > consumer2: [ {name:strategyX, version: 1}, {name: strategyX, version:
> > 0}
> > > ]
> > > >
> > > > Both will be assigned version 1.
> > > >
> > > > In this case:
> > > >
> > > > consumer1: [ {name:strategyX, version: 10}, {name: strategyX,
> version:
> > > 1} ]
> > > > consumer2: [ {name:strategyX, version: 20}, {name: strategyX,
> version:
> > > 1} ]
> > > >
> > > > Kafka will endlessly try to rebalance the group without success
> because
> > > > consumer1 will have version:10 and consumer2 - version:20 in a
> > > > GroupJoinResponse.
> > > >
> > > > Can anyone please explain the process of the protocol version
> upgrade?
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>

Re: Protocol version upgrades in 0.9

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Oleksiy,

The join group protocol is general enough to handle multiple types of group
membership, not just consumers. This is used in Kafka Connect to form a
group of workers (which, instead of splitting topic partitions between
members splits connector tasks).

In order to make this work and allow flexibility in how assignment is
handled, the protocol is divided into two layers. The primary join group
protocol only a) keeps track of group membership and b) selects a group
protocol that all members agree they can work with. At this level, there's
no version information, no info about consumer subscriptions, and no
knowledge of partition assignment strategies other than the names and
opaque metadata submitted by clients.

The "embedded" layer is where the version info you're setting is specified.
This is never even parsed by the brokers -- the information is collected
and sent to one of the group members which then decodes it and determines
the assignment info. That result is then returned to the broker which
disseminates the information (and again, the broker never decodes this, it
just forwards the appropriate info to each member).

The version is included specifically in the consumer protocol to allow us
to extend the format in the future. For example, if we needed to add or
change the way subscriptions are expressed, we could increase that version
number and update the message format. In other words, it is the mechanism
we have chosen *only for the consumer embedded protocol* to allow metadata
format changes. (Note that for the consumer embedded protocol there is also
*yet another* layer of data, called "UserData" in that protocol
documentation; this is custom data the partition assignment strategy in the
consumer, which is pluggable, might want include, e.g. if you were doing
resource-based assignment you might need to include info like # of cpus,
which is specific to that assignment strategy).

The broker only looks at the ProtocolName (which is equivalent to
AssignmentStrategy for consumers) when choosing which protocol to use for
consumers. If you want to version those in an incompatible way (i.e. you
can't handle the change just by updating the format of your metadata), you
should include version info in the ProtocolName itself to ensure the group
coordinator broker can differentiate them, e.g. round-robin vs
round-robin-2. But you should also think carefully about whether that
change is necessary -- in many cases if you're not adding any metadata
you'll be fine just keeping the same name since one member is selected to
perform the assignment and every other member just needs to respect
whatever assignment it makes. And of course if you're just trying to switch
to a completely different assignment strategy (e.g. from range ->
round-robin), then the name itself is enough. Just bounce all consumers
adding round-robin as an option, then bounce them all removing range.

We considered other options when designing this protocol, but decided this
was the best tradeoff. The current protocol is already pretty complex and
multi-layered and the alternatives that tried to build in versioning at
this level too were even more complex and confusing.

-Ewen



On Wed, Dec 23, 2015 at 10:45 PM, Oleksiy Krivoshey <ol...@gmail.com>
wrote:

> Hi Ewen,
>
> I specify version in ProtocolMetadata structure, as per this document:
>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupResponse
>
> ---------------
> ProtocolType => "consumer"
>
> ProtocolName => AssignmentStrategy
>   AssignmentStrategy => string
>
> ProtocolMetadata => Version Subscription UserData
>   Version => int16
>   Subscription => [Topic]
>     Topic => string
>   UserData => bytes
> -----------------
>
> Maybe I misunderstood the purpose of this version field?
>
> On Thu, 24 Dec 2015 at 00:27 Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
>
> > Oleksiy,
> >
> > Where are you specifying the version? Unless I'm missing something, the
> > JoinGroup protocol doesn't include versions so I'm not sure I understand
> > the examples you are giving. Are the version numbers included in the
> > per-protocol metadata?
> >
> > You can see exactly how the consumer coordinator on the broker selects
> the
> > protocol here:
> >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L179
> > It is just taking the candidate protocols (ones that are available for
> all
> > consumers), then has each consumer "vote" by selecting whichever
> candidate
> > appears in its list of strategies first, then uses the one with the most
> > votes.
> >
> > Is it possible your example is behaving the way it is because it actually
> > has duplicates for "strategyX", and in the last case it chooses the first
> > strategyX despite the conflicting versions?
> >
> > -Ewen
> >
> > On Wed, Dec 23, 2015 at 9:44 AM, Oleksiy Krivoshey <ol...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I can't understand how the protocol upgrades (to newer version) should
> > > work. When I send GroupJoinRequest with a list of assignment protocols
> > > (same protocol name, different versions) always the first
> > protocol/version
> > > gets picked up as a member version. Even if all consumers in the group
> > are
> > > configured with two versions still always the first specified version
> > will
> > > be selected by coordinator and not the one with highest version number.
> > >
> > > So for example:
> > > consumer1: [ {name:strategyX, version: 0}, {name: strategyX, version:
> 1}
> > ]
> > > consumer2: [ {name:strategyX, version: 0}, {name: strategyX, version:
> 1}
> > ]
> > >
> > > Both will be assigned a version 0 in a response to leader. If I make it
> > > this way:
> > >
> > > consumer1: [ {name:strategyX, version: 1}, {name: strategyX, version:
> 0}
> > ]
> > > consumer2: [ {name:strategyX, version: 1}, {name: strategyX, version:
> 0}
> > ]
> > >
> > > Both will be assigned version 1.
> > >
> > > In this case:
> > >
> > > consumer1: [ {name:strategyX, version: 10}, {name: strategyX, version:
> > 1} ]
> > > consumer2: [ {name:strategyX, version: 20}, {name: strategyX, version:
> > 1} ]
> > >
> > > Kafka will endlessly try to rebalance the group without success because
> > > consumer1 will have version:10 and consumer2 - version:20 in a
> > > GroupJoinResponse.
> > >
> > > Can anyone please explain the process of the protocol version upgrade?
> > >
> >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>



-- 
Thanks,
Ewen

Re: Protocol version upgrades in 0.9

Posted by Oleksiy Krivoshey <ol...@gmail.com>.
Hi Ewen,

I specify version in ProtocolMetadata structure, as per this document:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupResponse

---------------
ProtocolType => "consumer"

ProtocolName => AssignmentStrategy
  AssignmentStrategy => string

ProtocolMetadata => Version Subscription UserData
  Version => int16
  Subscription => [Topic]
    Topic => string
  UserData => bytes
-----------------

Maybe I misunderstood the purpose of this version field?

On Thu, 24 Dec 2015 at 00:27 Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> Oleksiy,
>
> Where are you specifying the version? Unless I'm missing something, the
> JoinGroup protocol doesn't include versions so I'm not sure I understand
> the examples you are giving. Are the version numbers included in the
> per-protocol metadata?
>
> You can see exactly how the consumer coordinator on the broker selects the
> protocol here:
>
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L179
> It is just taking the candidate protocols (ones that are available for all
> consumers), then has each consumer "vote" by selecting whichever candidate
> appears in its list of strategies first, then uses the one with the most
> votes.
>
> Is it possible your example is behaving the way it is because it actually
> has duplicates for "strategyX", and in the last case it chooses the first
> strategyX despite the conflicting versions?
>
> -Ewen
>
> On Wed, Dec 23, 2015 at 9:44 AM, Oleksiy Krivoshey <ol...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I can't understand how the protocol upgrades (to newer version) should
> > work. When I send GroupJoinRequest with a list of assignment protocols
> > (same protocol name, different versions) always the first
> protocol/version
> > gets picked up as a member version. Even if all consumers in the group
> are
> > configured with two versions still always the first specified version
> will
> > be selected by coordinator and not the one with highest version number.
> >
> > So for example:
> > consumer1: [ {name:strategyX, version: 0}, {name: strategyX, version: 1}
> ]
> > consumer2: [ {name:strategyX, version: 0}, {name: strategyX, version: 1}
> ]
> >
> > Both will be assigned a version 0 in a response to leader. If I make it
> > this way:
> >
> > consumer1: [ {name:strategyX, version: 1}, {name: strategyX, version: 0}
> ]
> > consumer2: [ {name:strategyX, version: 1}, {name: strategyX, version: 0}
> ]
> >
> > Both will be assigned version 1.
> >
> > In this case:
> >
> > consumer1: [ {name:strategyX, version: 10}, {name: strategyX, version:
> 1} ]
> > consumer2: [ {name:strategyX, version: 20}, {name: strategyX, version:
> 1} ]
> >
> > Kafka will endlessly try to rebalance the group without success because
> > consumer1 will have version:10 and consumer2 - version:20 in a
> > GroupJoinResponse.
> >
> > Can anyone please explain the process of the protocol version upgrade?
> >
>
>
>
> --
> Thanks,
> Ewen
>

Re: Protocol version upgrades in 0.9

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Oleksiy,

Where are you specifying the version? Unless I'm missing something, the
JoinGroup protocol doesn't include versions so I'm not sure I understand
the examples you are giving. Are the version numbers included in the
per-protocol metadata?

You can see exactly how the consumer coordinator on the broker selects the
protocol here:
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L179
It is just taking the candidate protocols (ones that are available for all
consumers), then has each consumer "vote" by selecting whichever candidate
appears in its list of strategies first, then uses the one with the most
votes.

Is it possible your example is behaving the way it is because it actually
has duplicates for "strategyX", and in the last case it chooses the first
strategyX despite the conflicting versions?

-Ewen

On Wed, Dec 23, 2015 at 9:44 AM, Oleksiy Krivoshey <ol...@gmail.com>
wrote:

> Hi,
>
> I can't understand how the protocol upgrades (to newer version) should
> work. When I send GroupJoinRequest with a list of assignment protocols
> (same protocol name, different versions) always the first protocol/version
> gets picked up as a member version. Even if all consumers in the group are
> configured with two versions still always the first specified version will
> be selected by coordinator and not the one with highest version number.
>
> So for example:
> consumer1: [ {name:strategyX, version: 0}, {name: strategyX, version: 1} ]
> consumer2: [ {name:strategyX, version: 0}, {name: strategyX, version: 1} ]
>
> Both will be assigned a version 0 in a response to leader. If I make it
> this way:
>
> consumer1: [ {name:strategyX, version: 1}, {name: strategyX, version: 0} ]
> consumer2: [ {name:strategyX, version: 1}, {name: strategyX, version: 0} ]
>
> Both will be assigned version 1.
>
> In this case:
>
> consumer1: [ {name:strategyX, version: 10}, {name: strategyX, version: 1} ]
> consumer2: [ {name:strategyX, version: 20}, {name: strategyX, version: 1} ]
>
> Kafka will endlessly try to rebalance the group without success because
> consumer1 will have version:10 and consumer2 - version:20 in a
> GroupJoinResponse.
>
> Can anyone please explain the process of the protocol version upgrade?
>



-- 
Thanks,
Ewen