You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jiangjie Qin <jq...@linkedin.com.INVALID> on 2015/09/01 22:58:43 UTC

Re: [DISCUSS] Client-side Assignment for New Consumer

Sorry about the format... It seems apache mail list does not suppor table.
Please refer to the following sheet.

https://docs.google.com/spreadsheets/d/1eKlQC1Ty74qVrIDH9vk7S6hEs_eJAyOufFUZaIU-k28/edit?usp=sharing

Thanks,

Jiangjie (Becket) Qin

On Tue, Sep 1, 2015 at 1:01 PM, Aditya Auradkar <aa...@linkedin.com>
wrote:

> Hey -
>
> So what is remaining here? Becket did you send a followup email regarding
> the data volume. For some reason, I dont receive emails sent by you.
>
> Aditya
>
> On Mon, Aug 31, 2015 at 9:52 AM, Joel Koshy <jj...@gmail.com> wrote:
>
>> Thanks Becket - I think your follow-up email to the thread states our
>> concerns and positions clearly. I would really like to make sure we
>> are very clear and firm on the practical aspects of the changes they
>> are pushing for. We should continue to stay on top of this and make
>> sure they don't push in changes to the consumer in a hurry for 0.8.3
>> without due diligence.
>>
>> Joel
>>
>> On Sun, Aug 30, 2015 at 12:00 AM, Jiangjie Qin <jq...@linkedin.com> wrote:
>> > Hi Joel,
>> >
>> > I was trying to calculate the number but found it might be better to run
>> > some actual tests because of the following reasons:
>> > 1. cross-colo network bandwidth usage is not clear to me yet.
>> > 2. The coordinators of the four mirror maker consumer group may or may
>> not
>> > reside on the same brokers. So the impact would be different.
>> > 3. I am not sure about the network bandwidth usage of normal traffic on
>> > broker. i.e. how much bandwidth is actually available for consumer
>> > rebalance. Currently the outbound traffic on broker are not balanced.
>> >
>> > The above factors can impact the actual rebalance time significantly.
>> So I
>> > just put a quick reply to the mail thread with some actual numbers we
>> have
>> > and wanted to update the test result later. But I agree we should do it
>> > soon.
>> >
>> > Thanks,
>> >
>> >  Jiangjie (Becket) Qin
>> >
>> > On Sat, Aug 29, 2015 at 9:40 PM, Joel Koshy <jj...@gmail.com>
>> wrote:
>> >>
>> >> No - I thought we agreed we would calculate the exact bandwidth (bytes)
>> >> required for a rebalance, its duration and thus whether it makes sense
>> or
>> >> not. I.e that we would come up with exact numbers for the scenarios of
>> >> interest in the email that becket sent
>> >>
>> >>
>> >> On Saturday, August 29, 2015, Kartik Paramasivam
>> >> <kp...@linkedin.com> wrote:
>> >>>
>> >>> Isn't that what becket is also saying ?
>> >>>
>> >>> On Aug 28, 2015, at 10:12 PM, Joel Koshy <jj...@gmail.com> wrote:
>> >>>
>> >>> I thought we were going to run the numbers ourselves and tell them if
>> we
>> >>> are okay with it or not?
>> >>>
>> >>> ---------- Forwarded message ----------
>> >>> From: Jiangjie Qin <jq...@linkedin.com.invalid>
>> >>> Date: Friday, August 28, 2015
>> >>> Subject: [DISCUSS] Client-side Assignment for New Consumer
>> >>> To: dev@kafka.apache.org
>> >>>
>> >>>
>> >>> Hi Neha,
>> >>>
>> >>> Following are some numbers we have in the pipeline. It would be very
>> >>> helpful to see how it goes with the proposed protocol. We will try to
>> do
>> >>> some tests with the current patch as well. Please also let us know if
>> you
>> >>> want further information.
>> >>>
>> >>> 32 brokers, 1Gbps NIC
>> >>> 547 topics
>> >>> 27 chars average topic name length
>> >>> 2-3 consumers for each topic
>> >>>
>> >>> Four 26-node mirror maker instances (Four different consumer groups).
>> >>> Each
>> >>> node has 4 consumers. (Each mirror maker instance has 104 consumers)
>> >>> We are actually using selective copy, so we have a big whitelist for
>> each
>> >>> mirror maker, copying about 100 topics (We expect it to grow to a
>> couple
>> >>> of
>> >>> hundreds).
>> >>> The mirror makers are co-located with target cluster, so the consumer
>> >>> traffic go through the WAN.
>> >>>
>> >>> We have 5 to 6 wildcard consumers consuming from all the topics.
>> >>>
>> >>> The topic creation frequency is not high now, roughly about 1 / day.
>> >>>
>> >>> The scenarios we are interested in are:
>> >>> 1. The time for one round of rebalance.
>> >>> 2. The time for a rolling bounce of mirror maker.
>> >>> 3. For wildcard topic, does metadata sync up cause problem.
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Jiangjie (Becket) Qin
>> >>>
>> >>>
>> >>> On Fri, Aug 28, 2015 at 1:24 PM, Joel Koshy <jj...@gmail.com>
>> wrote:
>> >>>
>> >>> > Another use-case I was thinking of was something like rack-aware
>> >>> > assignment of partitions to clients. This would require some
>> >>> > additional topic metadata to be propagated to and from the
>> coordinator
>> >>> > and you would need some way to resolve conflicts for such
>> strategies.
>> >>> > I think that could be addressed by attaching a generation id to the
>> >>> > metadata and use that (i.e., pick the highest) in order to resolve
>> >>> > conflicts without another round of join-group requests.
>> >>> >
>> >>> > Likewise, without delete/recreate, partition counts are a sort of
>> >>> > generation id since they are non-decreasing. If we need to account
>> for
>> >>> > delete/recreate that could perhaps be addressed by an explicit
>> >>> > (per-topic) generation id attached to each topic in the metadata
>> blob.
>> >>> > Does that make sense? I think that covers my concerns wrt the
>> >>> > split-brain issues.
>> >>> >
>> >>> > I'm still a bit wary of the n^2*m sized rebroadcast of all the
>> >>> > metadata - mainly because for various reasons at LinkedIn, we are
>> >>> > actually using large explicit whitelists (and not wildcards) in
>> >>> > several of our mirroring pipelines. At this point I feel that is a
>> >>> > reasonable cost to pay for having all the logic in one place (i.e.,
>> >>> > client side) but I would like to think a bit more on that.
>> >>> >
>> >>> > Joel
>> >>> >
>> >>> >
>> >>> > On Fri, Aug 28, 2015 at 1:02 PM, Onur Karaman
>> >>> > <ok...@linkedin.com.invalid> wrote:
>> >>> > > From what I understand, the "largest number of partitions" trick
>> is
>> >>> > > based
>> >>> > > on the assumption that topics can only expand their partitions.
>> What
>> >>> > > happens when a topic gets deleted and recreated? This breaks that
>> >>> > > assumption.
>> >>> > >
>> >>> > > On Fri, Aug 28, 2015 at 6:33 AM, Neha Narkhede <neha@confluent.io
>> >
>> >>> > wrote:
>> >>> > >
>> >>> > >> Thanks for re-reviewing Joel.
>> >>> > >>
>> >>> > >>
>> >>> > >>
>> >>> > >>
>> >>> > >>
>> >>> > >>
>> >>> > >> On Fri, Aug 28, 2015 at 2:51 AM -0700, "Joel Koshy" <
>> >>> > jjkoshy.w@gmail.com>
>> >>> > >> wrote:
>> >>> > >>
>> >>> > >>
>> >>> > >>
>> >>> > >>
>> >>> > >>
>> >>> > >>
>> >>> > >>
>> >>> > >>
>> >>> > >>
>> >>> > >>
>> >>> > >> > I think we think this proposal addresses 100% of the split
>> brain
>> >>> > issues
>> >>> > >> > ever seen in the ZK-based protocol, but I think you think there
>> >>> > >> > are
>> >>> > still
>> >>> > >> > issues. Can you explain what your thinking of and when you
>> think
>> >>> > >> > it
>> >>> > would
>> >>> > >> > happen? I want to make sure you aren't assuming
>> >>> > client-side=>split-brain
>> >>> > >> > since I think that is totally not the case.
>> >>> > >>
>> >>> > >> Yes I had concluded that client-side assignment would still
>> result
>> >>> > >> in
>> >>> > >> split-brain wrt partition counts, but I overlooked a key
>> sentence in
>> >>> > >> the wiki - i.e., that the assignment algorithm for consumers can
>> >>> > >> just
>> >>> > >> use the largest number of partitions for each topic reported by
>> any
>> >>> > >> of
>> >>> > >> the consumers. i.e., I assumed that consumers would just fail
>> >>> > >> rebalance if the partition counts were inconsistent but that is
>> not
>> >>> > >> the case since this conflict can be easily resolved as described
>> >>> > >> without further join-group requests. Sorry about that. There is
>> >>> > >> still
>> >>> > >> the issue of the coordinator having to send back n*m worth of
>> >>> > >> metadata, but that was not my biggest concern. I'll look over it
>> >>> > >> again
>> >>> > >> and reply back tomorrow.
>> >>> > >>
>> >>> > >> Joel
>> >>> > >>
>> >>> > >> On Thu, Aug 27, 2015 at 2:55 PM, Jay Kreps  wrote:
>> >>> > >> > Hey Joel,
>> >>> > >> >
>> >>> > >> > I really don't think we should do both. There are pros and cons
>> >>> > >> > but we
>> >>> > >> > should make a decision and work on operationalizing one
>> approach.
>> >>> > Much of
>> >>> > >> > really making something like this work is getting all the bugs
>> >>> > >> > out,
>> >>> > >> getting
>> >>> > >> > monitoring in place, getting rigorous system tests in place.
>> >>> > >> > Trying
>> >>> > to do
>> >>> > >> > those things twice with the same resources will just mean we do
>> >>> > >> > them
>> >>> > half
>> >>> > >> > as well. I also think this buys nothing from the user's point
>> of
>> >>> > >> view--they
>> >>> > >> > want co-ordination that works correctly, the debate we are
>> having
>> >>> > >> > is
>> >>> > >> purely
>> >>> > >> > a "how should we build that" debate. So this is really not the
>> >>> > >> > kind of
>> >>> > >> > thing we'd want to make pluggable and if we did that would just
>> >>> > >> complicate
>> >>> > >> > life for the user.
>> >>> > >> >
>> >>> > >> > I think we think this proposal addresses 100% of the split
>> brain
>> >>> > issues
>> >>> > >> > ever seen in the ZK-based protocol, but I think you think there
>> >>> > >> > are
>> >>> > still
>> >>> > >> > issues. Can you explain what your thinking of and when you
>> think
>> >>> > >> > it
>> >>> > would
>> >>> > >> > happen? I want to make sure you aren't assuming
>> >>> > client-side=>split-brain
>> >>> > >> > since I think that is totally not the case.
>> >>> > >> >
>> >>> > >> > With respect to "herd issues" I actually think all the
>> proposals
>> >>> > address
>> >>> > >> > this by scaling the co-ordinator out to all nodes and making
>> the
>> >>> > >> > co-ordination vastly cheaper. No proposal, of course, gets rid
>> of
>> >>> > >> > the
>> >>> > >> fact
>> >>> > >> > that all clients rejoin at once when there is a membership
>> change,
>> >>> > >> > but
>> >>> > >> that
>> >>> > >> > is kind of fundamental to the problem.
>> >>> > >> >
>> >>> > >> > -Jay
>> >>> > >> >
>> >>> > >> > On Thu, Aug 27, 2015 at 2:02 PM, Joel Koshy  wrote:
>> >>> > >> >
>> >>> > >> >> I actually feel these set of tests (whatever they may be) are
>> >>> > somewhat
>> >>> > >> >> irrelevant here. My main concern with the current client-side
>> >>> > proposal
>> >>> > >> >> (i.e., without Becket's follow-up suggestions) is that it
>> makes a
>> >>> > >> >> significant compromise to the original charter of the new
>> >>> > >> >> consumer -
>> >>> > >> >> i.e., reduce/eliminate herd and split brain problems in both
>> >>> > >> >> group
>> >>> > >> >> management and partition assignment. I understand the need for
>> >>> > >> >> client-side partition assignment in some use cases (which we
>> are
>> >>> > >> >> also
>> >>> > >> >> interested in), but I also think we should make every effort
>> to
>> >>> > >> >> keep
>> >>> > >> >> full server-side coordination for the remaining (majority) of
>> use
>> >>> > >> >> cases especially if it does not complicate the protocol. The
>> >>> > >> >> proposed
>> >>> > >> >> changes do not complicate the protocol IMO - i.e., there is no
>> >>> > further
>> >>> > >> >> modification to the request/response formats beyond the
>> current
>> >>> > >> >> client-side proposal. It only involves a trivial
>> reinterpretation
>> >>> > >> >> of
>> >>> > >> >> the content of the protocol metadata field.
>> >>> > >> >>
>> >>> > >> >> Joel
>> >>> > >> >>
>> >>> > >> >> On Wed, Aug 26, 2015 at 9:33 PM, Neha Narkhede  wrote:
>> >>> > >> >> > Hey Becket,
>> >>> > >> >> >
>> >>> > >> >> > In that case, the broker side partition assignment would be
>> >>> > >> >> > ideal
>> >>> > >> because
>> >>> > >> >> >> it avoids
>> >>> > >> >> >> issues like metadata inconsistency / split brain /
>> exploding
>> >>> > >> >> subscription
>> >>> > >> >> >> set propagation.
>> >>> > >> >> >
>> >>> > >> >> >
>> >>> > >> >> > As per our previous discussions regarding each of those
>> >>> > >> >> > concerns
>> >>> > >> >> (referring
>> >>> > >> >> > to this email thread, KIP calls and JIRA comments), we are
>> >>> > >> >> > going to
>> >>> > >> run a
>> >>> > >> >> > set of tests using the LinkedIn deployment numbers that we
>> will
>> >>> > wait
>> >>> > >> for
>> >>> > >> >> > you to share. The purpose is to see if those concerns are
>> >>> > >> >> > really
>> >>> > >> valid or
>> >>> > >> >> > not. I'd prefer to see that before making any more changes
>> that
>> >>> > will
>> >>> > >> >> > complicate the protocol.
>> >>> > >> >> >
>> >>> > >> >> > On Wed, Aug 26, 2015 at 4:57 PM, Jiangjie Qin > >
>> >>> > >> >> > wrote:
>> >>> > >> >> >
>> >>> > >> >> >> Hi folks,
>> >>> > >> >> >>
>> >>> > >> >> >> After further discussion in LinkedIn, we found that while
>> >>> > >> >> >> having a
>> >>> > >> more
>> >>> > >> >> >> general group management protocol is very useful, the vast
>> >>> > majority
>> >>> > >> of
>> >>> > >> >> the
>> >>> > >> >> >> clients will not use customized partition assignment
>> strategy.
>> >>> > >> >> >> In
>> >>> > >> that
>> >>> > >> >> >> case, the broker side partition assignment would be ideal
>> >>> > >> >> >> because
>> >>> > it
>> >>> > >> >> avoids
>> >>> > >> >> >> issues like metadata inconsistency / split brain /
>> exploding
>> >>> > >> >> subscription
>> >>> > >> >> >> set propagation.
>> >>> > >> >> >>
>> >>> > >> >> >> So we have the following proposal that satisfies the
>> majority
>> >>> > >> >> >> of
>> >>> > the
>> >>> > >> >> >> clients' needs without changing the currently proposed
>> binary
>> >>> > >> protocol.
>> >>> > >> >> >> i.e., Continue to support broker-side assignment if the
>> >>> > >> >> >> assignment
>> >>> > >> >> strategy
>> >>> > >> >> >> is recognized by the coordinator.
>> >>> > >> >> >>
>> >>> > >> >> >> 1. Keep the binary protocol as currently proposed.
>> >>> > >> >> >>
>> >>> > >> >> >> 2. Change the way we interpret ProtocolMetadata:
>> >>> > >> >> >> 2.1 On consumer side, change partition.assignment.strategy
>> to
>> >>> > >> >> >> partition.assignor.class. Implement the something like the
>> >>> > following
>> >>> > >> >> >> PartitionAssignor Interface:
>> >>> > >> >> >>
>> >>> > >> >> >> public interface PartitionAssignor {
>> >>> > >> >> >>   List protocolTypes();
>> >>> > >> >> >>   byte[] protocolMetadata();
>> >>> > >> >> >>   // return the Topic->List map that are assigned to this
>> >>> > >> >> >> consumer.
>> >>> > >> >> >>   List assignPartitions(String protocolType, byte[]
>> >>> > >> >> >> responseProtocolMetadata);
>> >>> > >> >> >> }
>> >>> > >> >> >>
>> >>> > >> >> >> public abstract class AbstractPartitionAssignor implements
>> >>> > >> >> >> PartitionAssignor {
>> >>> > >> >> >>   protected final KafkaConsumer consumer;
>> >>> > >> >> >>   AbstractPartitionAssignor(KafkaConsumer consumer) {
>> >>> > >> >> >>     this.consumer = consumer;
>> >>> > >> >> >>   }
>> >>> > >> >> >> }
>> >>> > >> >> >>
>> >>> > >> >> >> 2.2 The ProtocolMetadata in JoinGroupRequest will be
>> >>> > >> >> >> partitionAssignor.protocolMetadata(). When
>> >>> > partition.assignor.class
>> >>> > >> is
>> >>> > >> >> >> "range" or "roundrobin", the ProtocolMetadata in
>> >>> > >> >> >> JoinGroupRequest
>> >>> > >> will
>> >>> > >> >> be a
>> >>> > >> >> >> JSON subscription set. ("range", "roundrobin" will be
>> reserved
>> >>> > >> words, we
>> >>> > >> >> >> can also consider reserving some Prefix such as "broker-"
>> to
>> >>> > >> >> >> be
>> >>> > more
>> >>> > >> >> clear)
>> >>> > >> >> >> 2.3 On broker side when ProtocolType is "range" or
>> >>> > >> >> >> "roundroubin",
>> >>> > >> >> >> coordinator will parse the ProtocolMetadata in the
>> >>> > JoinGroupRequest
>> >>> > >> and
>> >>> > >> >> >> assign the partitions for consumers. In the
>> JoinGroupResponse,
>> >>> > >> >> >> the
>> >>> > >> >> >> ProtocolMetadata will be the global assignment of
>> partitions.
>> >>> > >> >> >> 2.4 On client side, after receiving the JoinGroupResponse,
>> >>> > >> >> >> partitionAssignor.assignPartitions() will be invoked to
>> return
>> >>> > >> >> >> the
>> >>> > >> >> actual
>> >>> > >> >> >> assignment. If the assignor is RangeAssignor or
>> >>> > RoundRobinAssignor,
>> >>> > >> they
>> >>> > >> >> >> will parse the assignment from the ProtocolMetadata
>> returned
>> >>> > >> >> >> by
>> >>> > >> >> >> coordinator.
>> >>> > >> >> >>
>> >>> > >> >> >> This approach has a few merits:
>> >>> > >> >> >> 1. Does not change the proposed binary protocol, which is
>> >>> > >> >> >> still
>> >>> > >> general.
>> >>> > >> >> >> 2. The majority of the consumers will not suffer from
>> >>> > >> >> >> inconsistent
>> >>> > >> >> metadata
>> >>> > >> >> >> / split brain / exploding subscription set propagation.
>> This
>> >>> > >> >> >> is
>> >>> > >> >> >> specifically to deal with the issue that the current
>> proposal
>> >>> > caters
>> >>> > >> to
>> >>> > >> >> a
>> >>> > >> >> >> 20% use-case while adversely impacting the more common 80%
>> >>> > use-cases.
>> >>> > >> >> >> 3. Easy to implement. The only thing needed is implement a
>> >>> > >> partitioner
>> >>> > >> >> >> class. For most users, the default range and roundrobin
>> >>> > partitioner
>> >>> > >> are
>> >>> > >> >> >> good enough.
>> >>> > >> >> >>
>> >>> > >> >> >> Thoughts?
>> >>> > >> >> >>
>> >>> > >> >> >> Thanks,
>> >>> > >> >> >>
>> >>> > >> >> >> Jiangjie (Becket) Qin
>> >>> > >> >> >>
>> >>> > >> >> >> On Tue, Aug 18, 2015 at 2:51 PM, Jason Gustafson
>> >>> > >> >> >> wrote:
>> >>> > >> >> >>
>> >>> > >> >> >> > Follow-up from the kip call:
>> >>> > >> >> >> >
>> >>> > >> >> >> > 1. Onur brought up the question of whether this protocol
>> >>> > provides
>> >>> > >> >> enough
>> >>> > >> >> >> > coordination capabilities to be generally useful in
>> practice
>> >>> > >> >> >> > (is
>> >>> > >> that
>> >>> > >> >> >> > accurate, Onur?). If it doesn't, then each use case would
>> >>> > probably
>> >>> > >> >> need a
>> >>> > >> >> >> > dependence on zookeeper anyway, and we haven't really
>> gained
>> >>> > >> anything.
>> >>> > >> >> >> The
>> >>> > >> >> >> > group membership provided by this protocol is a useful
>> >>> > >> >> >> > primitive
>> >>> > >> for
>> >>> > >> >> >> > coordination, but it's limited in the sense that
>> everything
>> >>> > shared
>> >>> > >> >> among
>> >>> > >> >> >> > the group has to be communicated at the time the group is
>> >>> > created.
>> >>> > >> If
>> >>> > >> >> any
>> >>> > >> >> >> > shared data changes, then the only way the group can
>> ensure
>> >>> > >> agreement
>> >>> > >> >> is
>> >>> > >> >> >> to
>> >>> > >> >> >> > force a rebalance. This is expensive since all members
>> must
>> >>> > stall
>> >>> > >> >> while
>> >>> > >> >> >> the
>> >>> > >> >> >> > rebalancing takes place. As we have also seen, there is a
>> >>> > practical
>> >>> > >> >> limit
>> >>> > >> >> >> > on the amount of metadata that can be sent through this
>> >>> > >> >> >> > protocol
>> >>> > >> when
>> >>> > >> >> >> > groups get a little larger. This protocol is therefore
>> not
>> >>> > >> suitable to
>> >>> > >> >> >> > cases which require frequent communication or which
>> require
>> >>> > >> >> >> > a
>> >>> > large
>> >>> > >> >> >> amount
>> >>> > >> >> >> > of data to be communicated. For the use cases listed on
>> the
>> >>> > wiki,
>> >>> > >> >> neither
>> >>> > >> >> >> > of these appear to be an issue, but there may be other
>> >>> > limitations
>> >>> > >> >> which
>> >>> > >> >> >> > would limit reuse of the protocol. Perhaps it would be
>> >>> > sufficient
>> >>> > >> to
>> >>> > >> >> >> sketch
>> >>> > >> >> >> > how these cases might work?
>> >>> > >> >> >> >
>> >>> > >> >> >> > 2. We talked a little bit about the issue of metadata
>> churn.
>> >>> > Becket
>> >>> > >> >> >> brought
>> >>> > >> >> >> > up the interesting point that not only do we depend on
>> topic
>> >>> > >> metadata
>> >>> > >> >> >> > changing relatively infrequently, but we also expect
>> timely
>> >>> > >> agreement
>> >>> > >> >> >> among
>> >>> > >> >> >> > the brokers on what that metadata is. To resolve this, we
>> >>> > >> >> >> > can
>> >>> > have
>> >>> > >> the
>> >>> > >> >> >> > consumers fetch metadata from the coordinator. We still
>> >>> > >> >> >> > depend
>> >>> > on
>> >>> > >> >> topic
>> >>> > >> >> >> > metadata not changing frequently, but this should resolve
>> >>> > >> >> >> > any
>> >>> > >> >> >> disagreement
>> >>> > >> >> >> > among the brokers themselves. In fact, since we expect
>> that
>> >>> > >> >> disagreement
>> >>> > >> >> >> is
>> >>> > >> >> >> > relatively rare, we can have the consumers fetch from the
>> >>> > >> coordinator
>> >>> > >> >> >> only
>> >>> > >> >> >> > when when a disagreement occurs. The nice thing about
>> this
>> >>> > >> proposal is
>> >>> > >> >> >> that
>> >>> > >> >> >> > it doesn't affect the join group semantics, so the
>> >>> > >> >> >> > coordinator
>> >>> > >> would
>> >>> > >> >> >> remain
>> >>> > >> >> >> > oblivious to the metadata used by the group for
>> agreement.
>> >>> > Also, if
>> >>> > >> >> >> > metadata churn becomes an issue, it might be possible to
>> >>> > >> >> >> > have
>> >>> > the
>> >>> > >> >> >> > coordinator provide a snapshot for the group to ensure
>> that
>> >>> > >> >> >> > a
>> >>> > >> >> generation
>> >>> > >> >> >> > would be able to reach agreement (this would probably
>> >>> > >> >> >> > require
>> >>> > >> adding
>> >>> > >> >> >> > groupId/generation to the metadata request).
>> >>> > >> >> >> >
>> >>> > >> >> >> > 3. We talked briefly about support for multiple
>> protocols in
>> >>> > >> >> >> > the
>> >>> > >> join
>> >>> > >> >> >> group
>> >>> > >> >> >> > request in order to allow changing the assignment
>> strategy
>> >>> > without
>> >>> > >> >> >> > downtime. I think it's a little doubtful that this would
>> get
>> >>> > much
>> >>> > >> use
>> >>> > >> >> in
>> >>> > >> >> >> > practice, but I agree it's a nice option to have on the
>> >>> > >> >> >> > table.
>> >>> > An
>> >>> > >> >> >> > alternative, for the sake of argument, is to have each
>> >>> > >> >> >> > member
>> >>> > >> provide
>> >>> > >> >> >> only
>> >>> > >> >> >> > one version of the protocol, and to let the coordinator
>> >>> > >> >> >> > choose
>> >>> > the
>> >>> > >> >> >> protocol
>> >>> > >> >> >> > with the largest number of supporters. All members which
>> >>> > >> >> >> > can't
>> >>> > >> support
>> >>> > >> >> >> the
>> >>> > >> >> >> > selected protocol would be kicked out of the group. The
>> >>> > >> >> >> > drawback
>> >>> > >> in a
>> >>> > >> >> >> > rolling upgrade is that the total capacity of the group
>> >>> > >> >> >> > would be
>> >>> > >> >> >> > momentarily halved. It would also be a little tricky to
>> >>> > >> >> >> > handle
>> >>> > the
>> >>> > >> >> case
>> >>> > >> >> >> of
>> >>> > >> >> >> > retrying when a consumer is kicked out of the group. We
>> >>> > >> >> >> > wouldn't
>> >>> > >> want
>> >>> > >> >> it
>> >>> > >> >> >> to
>> >>> > >> >> >> > be able to effect a rebalance, for example, if it would
>> just
>> >>> > >> >> >> > be
>> >>> > >> kicked
>> >>> > >> >> >> out
>> >>> > >> >> >> > again. That would probably complicate the group
>> management
>> >>> > logic on
>> >>> > >> >> the
>> >>> > >> >> >> > coordinator.
>> >>> > >> >> >> >
>> >>> > >> >> >> >
>> >>> > >> >> >> > Thanks,
>> >>> > >> >> >> > Jason
>> >>> > >> >> >> >
>> >>> > >> >> >> >
>> >>> > >> >> >> > On Tue, Aug 18, 2015 at 11:16 AM, Jiangjie Qin
>> >>> > >> >> > >> >
>> >>> > >> >> >> > wrote:
>> >>> > >> >> >> >
>> >>> > >> >> >> > > Jun,
>> >>> > >> >> >> > >
>> >>> > >> >> >> > > Yes, I agree. If the metadata can be synced quickly
>> there
>> >>> > should
>> >>> > >> >> not be
>> >>> > >> >> >> > an
>> >>> > >> >> >> > > issue. It just occurred to me that there is a proposal
>> to
>> >>> > allow
>> >>> > >> >> >> consuming
>> >>> > >> >> >> > > from followers in ISR, that could potentially cause
>> more
>> >>> > frequent
>> >>> > >> >> >> > metadata
>> >>> > >> >> >> > > change for consumers. Would that be an issue?
>> >>> > >> >> >> > >
>> >>> > >> >> >> > > Thanks,
>> >>> > >> >> >> > >
>> >>> > >> >> >> > > Jiangjie (Becket) Qin
>> >>> > >> >> >> > >
>> >>> > >> >> >> > > On Tue, Aug 18, 2015 at 10:22 AM, Jason Gustafson <
>> >>> > >> >> jason@confluent.io>
>> >>> > >> >> >> > > wrote:
>> >>> > >> >> >> > >
>> >>> > >> >> >> > > > Hi Jun,
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > > > Answers below:
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > > > 1. When there are multiple common protocols in the
>> >>> > >> >> JoinGroupRequest,
>> >>> > >> >> >> > > which
>> >>> > >> >> >> > > > one would the coordinator pick?
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > > > I was intending to use the list to indicate
>> preference.
>> >>> > >> >> >> > > > If
>> >>> > all
>> >>> > >> >> group
>> >>> > >> >> >> > > > members support protocols ["A", "B"] in that order,
>> then
>> >>> > >> >> >> > > > we
>> >>> > >> will
>> >>> > >> >> >> choose
>> >>> > >> >> >> > > > "A." If some support ["B", "A"], then we would either
>> >>> > >> >> >> > > > choose
>> >>> > >> >> based on
>> >>> > >> >> >> > > > respective counts or just randomly. The main use
>> case of
>> >>> > >> >> supporting
>> >>> > >> >> >> the
>> >>> > >> >> >> > > > list is for rolling upgrades when a change is made to
>> >>> > >> >> >> > > > the
>> >>> > >> >> assignment
>> >>> > >> >> >> > > > strategy. In that case, the new assignment strategy
>> >>> > >> >> >> > > > would be
>> >>> > >> >> listed
>> >>> > >> >> >> > first
>> >>> > >> >> >> > > > in the upgraded client. I think it's debatable
>> whether
>> >>> > >> >> >> > > > this
>> >>> > >> >> feature
>> >>> > >> >> >> > would
>> >>> > >> >> >> > > > get much use in practice, so we might consider
>> dropping
>> >>> > >> >> >> > > > it.
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > > > 2. If the protocols don't agree, the group
>> construction
>> >>> > fails.
>> >>> > >> >> What
>> >>> > >> >> >> > > exactly
>> >>> > >> >> >> > > > does it mean? Do we send an error in every
>> >>> > >> >> >> > > > JoinGroupResponse
>> >>> > >> and
>> >>> > >> >> >> remove
>> >>> > >> >> >> > > all
>> >>> > >> >> >> > > > members in the group in the coordinator?
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > > > Yes, that is right. It would be handled similarly to
>> >>> > >> inconsistent
>> >>> > >> >> >> > > > assignment strategies in the current protocol. The
>> >>> > coordinator
>> >>> > >> >> >> returns
>> >>> > >> >> >> > an
>> >>> > >> >> >> > > > error in each join group response, and the client
>> >>> > >> >> >> > > > propagates
>> >>> > >> the
>> >>> > >> >> >> error
>> >>> > >> >> >> > to
>> >>> > >> >> >> > > > the user.
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > > > 3. Consumer embedded protocol: The proposal has two
>> >>> > different
>> >>> > >> >> formats
>> >>> > >> >> >> > of
>> >>> > >> >> >> > > > subscription depending on whether wildcards are used
>> or
>> >>> > >> >> >> > > > not.
>> >>> > >> This
>> >>> > >> >> >> > seems a
>> >>> > >> >> >> > > > bit complicated. Would it be better to always use the
>> >>> > metadata
>> >>> > >> >> hash?
>> >>> > >> >> >> > The
>> >>> > >> >> >> > > > clients know the subscribed topics already. This way,
>> >>> > >> >> >> > > > the
>> >>> > >> client
>> >>> > >> >> code
>> >>> > >> >> >> > > > behaves the same whether wildcards are used or not.
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > > > Yeah, I think this is possible (Neha also suggested
>> it).
>> >>> > >> >> >> > > > I
>> >>> > >> haven't
>> >>> > >> >> >> > > updated
>> >>> > >> >> >> > > > the wiki yet, but the patch I started working on uses
>> >>> > >> >> >> > > > only
>> >>> > the
>> >>> > >> >> >> metadata
>> >>> > >> >> >> > > > hash. In the case that an explicit topic list is
>> >>> > >> >> >> > > > provided,
>> >>> > the
>> >>> > >> >> hash
>> >>> > >> >> >> > just
>> >>> > >> >> >> > > > covers the metadata for those topics.
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > > > Thanks,
>> >>> > >> >> >> > > > Jason
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > > > On Tue, Aug 18, 2015 at 10:06 AM, Jun Rao
>> >>> > >> >> wrote:
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > > > > Jason,
>> >>> > >> >> >> > > > >
>> >>> > >> >> >> > > > > Thanks for the writeup. A few comments below.
>> >>> > >> >> >> > > > >
>> >>> > >> >> >> > > > > 1. When there are multiple common protocols in the
>> >>> > >> >> >> JoinGroupRequest,
>> >>> > >> >> >> > > > which
>> >>> > >> >> >> > > > > one would the coordinator pick?
>> >>> > >> >> >> > > > > 2. If the protocols don't agree, the group
>> >>> > >> >> >> > > > > construction
>> >>> > >> fails.
>> >>> > >> >> What
>> >>> > >> >> >> > > > exactly
>> >>> > >> >> >> > > > > does it mean? Do we send an error in every
>> >>> > JoinGroupResponse
>> >>> > >> and
>> >>> > >> >> >> > remove
>> >>> > >> >> >> > > > all
>> >>> > >> >> >> > > > > members in the group in the coordinator?
>> >>> > >> >> >> > > > > 3. Consumer embedded protocol: The proposal has two
>> >>> > different
>> >>> > >> >> >> formats
>> >>> > >> >> >> > > of
>> >>> > >> >> >> > > > > subscription depending on whether wildcards are
>> used
>> >>> > >> >> >> > > > > or
>> >>> > not.
>> >>> > >> >> This
>> >>> > >> >> >> > > seems a
>> >>> > >> >> >> > > > > bit complicated. Would it be better to always use
>> the
>> >>> > >> metadata
>> >>> > >> >> >> hash?
>> >>> > >> >> >> > > The
>> >>> > >> >> >> > > > > clients know the subscribed topics already. This
>> way,
>> >>> > >> >> >> > > > > the
>> >>> > >> client
>> >>> > >> >> >> code
>> >>> > >> >> >> > > > > behaves the same whether wildcards are used or not.
>> >>> > >> >> >> > > > >
>> >>> > >> >> >> > > > > Jiangjie,
>> >>> > >> >> >> > > > >
>> >>> > >> >> >> > > > > With respect to rebalance churns due to topics
>> being
>> >>> > >> >> >> created/deleted.
>> >>> > >> >> >> > > > With
>> >>> > >> >> >> > > > > the new consumer, the rebalance can probably settle
>> >>> > >> >> >> > > > > within
>> >>> > >> 200ms
>> >>> > >> >> >> when
>> >>> > >> >> >> > > > there
>> >>> > >> >> >> > > > > is a topic change. So, as long as we are not
>> changing
>> >>> > topic
>> >>> > >> more
>> >>> > >> >> >> > than 5
>> >>> > >> >> >> > > > > times per sec, there shouldn't be constant churns,
>> >>> > >> >> >> > > > > right?
>> >>> > >> >> >> > > > >
>> >>> > >> >> >> > > > > Thanks,
>> >>> > >> >> >> > > > >
>> >>> > >> >> >> > > > > Jun
>> >>> > >> >> >> > > > >
>> >>> > >> >> >> > > > >
>> >>> > >> >> >> > > > >
>> >>> > >> >> >> > > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason Gustafson <
>> >>> > >> >> >> jason@confluent.io
>> >>> > >> >> >> > >
>> >>> > >> >> >> > > > > wrote:
>> >>> > >> >> >> > > > >
>> >>> > >> >> >> > > > > > Hi Kafka Devs,
>> >>> > >> >> >> > > > > >
>> >>> > >> >> >> > > > > > One of the nagging issues in the current design
>> of
>> >>> > >> >> >> > > > > > the
>> >>> > new
>> >>> > >> >> >> consumer
>> >>> > >> >> >> > > has
>> >>> > >> >> >> > > > > > been the need to support a variety of assignment
>> >>> > >> strategies.
>> >>> > >> >> >> We've
>> >>> > >> >> >> > > > > > encountered this in particular in the design of
>> >>> > >> >> >> > > > > > copycat
>> >>> > and
>> >>> > >> >> the
>> >>> > >> >> >> > > > > processing
>> >>> > >> >> >> > > > > > framework (KIP-28). From what I understand, Samza
>> >>> > >> >> >> > > > > > also
>> >>> > has
>> >>> > >> a
>> >>> > >> >> >> number
>> >>> > >> >> >> > > of
>> >>> > >> >> >> > > > > use
>> >>> > >> >> >> > > > > > cases with custom assignment needs. The new
>> consumer
>> >>> > >> protocol
>> >>> > >> >> >> > > supports
>> >>> > >> >> >> > > > > new
>> >>> > >> >> >> > > > > > assignment strategies by hooking them into the
>> >>> > >> >> >> > > > > > broker.
>> >>> > For
>> >>> > >> >> many
>> >>> > >> >> >> > > > > > environments, this is a major pain and in some
>> >>> > >> >> >> > > > > > cases, a
>> >>> > >> >> >> > non-starter.
>> >>> > >> >> >> > > It
>> >>> > >> >> >> > > > > > also challenges the validation that the
>> coordinator
>> >>> > >> >> >> > > > > > can
>> >>> > >> >> provide.
>> >>> > >> >> >> > For
>> >>> > >> >> >> > > > > > example, some assignment strategies call for
>> >>> > >> >> >> > > > > > partitions
>> >>> > to
>> >>> > >> be
>> >>> > >> >> >> > > assigned
>> >>> > >> >> >> > > > > > multiple times, which means that the coordinator
>> can
>> >>> > only
>> >>> > >> >> check
>> >>> > >> >> >> > that
>> >>> > >> >> >> > > > > > partitions have been assigned at least once.
>> >>> > >> >> >> > > > > >
>> >>> > >> >> >> > > > > > To solve these issues, we'd like to propose
>> moving
>> >>> > >> assignment
>> >>> > >> >> to
>> >>> > >> >> >> > the
>> >>> > >> >> >> > > > > > client. I've written a wiki which outlines some
>> >>> > >> >> >> > > > > > protocol
>> >>> > >> >> changes
>> >>> > >> >> >> to
>> >>> > >> >> >> > > > > achieve
>> >>> > >> >> >> > > > > > this:
>> >>> > >> >> >> > > > > >
>> >>> > >> >> >> > > > > >
>> >>> > >> >> >> > > > >
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > >
>> >>> > >> >> >> >
>> >>> > >> >> >>
>> >>> > >> >>
>> >>> > >>
>> >>> >
>> >>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
>> >>> > >> >> >> > > > > > .
>> >>> > >> >> >> > > > > > To summarize briefly, instead of the coordinator
>> >>> > assigning
>> >>> > >> the
>> >>> > >> >> >> > > > partitions
>> >>> > >> >> >> > > > > > itself, all subscriptions are forwarded to each
>> >>> > >> >> >> > > > > > member
>> >>> > of
>> >>> > >> the
>> >>> > >> >> >> group
>> >>> > >> >> >> > > > which
>> >>> > >> >> >> > > > > > then decides independently which partitions it
>> >>> > >> >> >> > > > > > should
>> >>> > >> consume.
>> >>> > >> >> >> The
>> >>> > >> >> >> > > > > protocol
>> >>> > >> >> >> > > > > > provides a mechanism for the coordinator to
>> validate
>> >>> > that
>> >>> > >> all
>> >>> > >> >> >> > > consumers
>> >>> > >> >> >> > > > > use
>> >>> > >> >> >> > > > > > the same assignment strategy, but it does not
>> ensure
>> >>> > that
>> >>> > >> the
>> >>> > >> >> >> > > resulting
>> >>> > >> >> >> > > > > > assignment is "correct." This provides a powerful
>> >>> > >> capability
>> >>> > >> >> for
>> >>> > >> >> >> > > users
>> >>> > >> >> >> > > > to
>> >>> > >> >> >> > > > > > control the full data flow on the client side.
>> They
>> >>> > control
>> >>> > >> >> how
>> >>> > >> >> >> > data
>> >>> > >> >> >> > > is
>> >>> > >> >> >> > > > > > written to partitions through the Partitioner
>> >>> > >> >> >> > > > > > interface
>> >>> > and
>> >>> > >> >> they
>> >>> > >> >> >> > > > control
>> >>> > >> >> >> > > > > > how data is consumed through the assignment
>> >>> > >> >> >> > > > > > strategy,
>> >>> > all
>> >>> > >> >> without
>> >>> > >> >> >> > > > > touching
>> >>> > >> >> >> > > > > > the server.
>> >>> > >> >> >> > > > > >
>> >>> > >> >> >> > > > > > Of course nothing comes for free. In particular,
>> >>> > >> >> >> > > > > > this
>> >>> > >> change
>> >>> > >> >> >> > removes
>> >>> > >> >> >> > > > the
>> >>> > >> >> >> > > > > > ability of the coordinator to validate that
>> commits
>> >>> > >> >> >> > > > > > are
>> >>> > >> made
>> >>> > >> >> by
>> >>> > >> >> >> > > > consumers
>> >>> > >> >> >> > > > > > who were assigned the respective partition. This
>> >>> > >> >> >> > > > > > might
>> >>> > not
>> >>> > >> be
>> >>> > >> >> too
>> >>> > >> >> >> > bad
>> >>> > >> >> >> > > > > since
>> >>> > >> >> >> > > > > > we retain the ability to validate the generation
>> id,
>> >>> > but it
>> >>> > >> >> is a
>> >>> > >> >> >> > > > > potential
>> >>> > >> >> >> > > > > > concern. We have considered alternative protocols
>> >>> > >> >> >> > > > > > which
>> >>> > >> add a
>> >>> > >> >> >> > second
>> >>> > >> >> >> > > > > > round-trip to the protocol in order to give the
>> >>> > coordinator
>> >>> > >> >> the
>> >>> > >> >> >> > > ability
>> >>> > >> >> >> > > > > to
>> >>> > >> >> >> > > > > > confirm the assignment. As mentioned above, the
>> >>> > >> coordinator is
>> >>> > >> >> >> > > somewhat
>> >>> > >> >> >> > > > > > limited in what it can actually validate, but
>> this
>> >>> > >> >> >> > > > > > would
>> >>> > >> >> return
>> >>> > >> >> >> its
>> >>> > >> >> >> > > > > ability
>> >>> > >> >> >> > > > > > to validate commits. The tradeoff is that it
>> >>> > >> >> >> > > > > > increases
>> >>> > the
>> >>> > >> >> >> > protocol's
>> >>> > >> >> >> > > > > > complexity which means more ways for the
>> protocol to
>> >>> > fail
>> >>> > >> and
>> >>> > >> >> >> > > > > consequently
>> >>> > >> >> >> > > > > > more edge cases in the code.
>> >>> > >> >> >> > > > > >
>> >>> > >> >> >> > > > > > It also misses an opportunity to generalize the
>> >>> > >> >> >> > > > > > group
>> >>> > >> >> membership
>> >>> > >> >> >> > > > protocol
>> >>> > >> >> >> > > > > > for additional use cases. In fact, after you've
>> gone
>> >>> > >> >> >> > > > > > to
>> >>> > the
>> >>> > >> >> >> trouble
>> >>> > >> >> >> > > of
>> >>> > >> >> >> > > > > > moving assignment to the client, the main thing
>> that
>> >>> > >> >> >> > > > > > is
>> >>> > >> left
>> >>> > >> >> in
>> >>> > >> >> >> > this
>> >>> > >> >> >> > > > > > protocol is basically a general group management
>> >>> > >> capability.
>> >>> > >> >> This
>> >>> > >> >> >> > is
>> >>> > >> >> >> > > > > > exactly what is needed for a few cases that are
>> >>> > currently
>> >>> > >> >> under
>> >>> > >> >> >> > > > > discussion
>> >>> > >> >> >> > > > > > (e.g. copycat or single-writer producer). We've
>> >>> > >> >> >> > > > > > taken
>> >>> > this
>> >>> > >> >> >> further
>> >>> > >> >> >> > > step
>> >>> > >> >> >> > > > > in
>> >>> > >> >> >> > > > > > the proposal and attempted to envision what that
>> >>> > >> >> >> > > > > > general
>> >>> > >> >> protocol
>> >>> > >> >> >> > > might
>> >>> > >> >> >> > > > > > look like and how it could be used both by the
>> >>> > >> >> >> > > > > > consumer
>> >>> > and
>> >>> > >> >> for
>> >>> > >> >> >> > some
>> >>> > >> >> >> > > of
>> >>> > >> >> >> > > > > > these other cases.
>> >>> > >> >> >> > > > > >
>> >>> > >> >> >> > > > > > Anyway, since time is running out on the new
>> >>> > >> >> >> > > > > > consumer,
>> >>> > we
>> >>> > >> have
>> >>> > >> >> >> > > perhaps
>> >>> > >> >> >> > > > > one
>> >>> > >> >> >> > > > > > last chance to consider a significant change in
>> the
>> >>> > >> protocol
>> >>> > >> >> like
>> >>> > >> >> >> > > this,
>> >>> > >> >> >> > > > > so
>> >>> > >> >> >> > > > > > have a look at the wiki and share your thoughts.
>> >>> > >> >> >> > > > > > I've no
>> >>> > >> doubt
>> >>> > >> >> >> that
>> >>> > >> >> >> > > > some
>> >>> > >> >> >> > > > > > ideas seem clearer in my mind than they do on
>> paper,
>> >>> > >> >> >> > > > > > so
>> >>> > ask
>> >>> > >> >> >> > questions
>> >>> > >> >> >> > > > if
>> >>> > >> >> >> > > > > > there is any confusion.
>> >>> > >> >> >> > > > > >
>> >>> > >> >> >> > > > > > Thanks!
>> >>> > >> >> >> > > > > > Jason
>> >>> > >> >> >> > > > > >
>> >>> > >> >> >> > > > >
>> >>> > >> >> >> > > >
>> >>> > >> >> >> > >
>> >>> > >> >> >> >
>> >>> > >> >> >>
>> >>> > >> >> >
>> >>> > >> >> >
>> >>> > >> >> >
>> >>> > >> >> > --
>> >>> > >> >> > Thanks,
>> >>> > >> >> > Neha
>> >>> > >> >>
>> >>> > >>
>> >>> >
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Sent from Gmail Mobile
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from Gmail Mobile
>> >
>> >
>>
>
>