You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2018/03/05 06:38:57 UTC

Re: [DISCUSS] KIP-250 Add Support for Quorum-based Producer Acknowledgment

Hi Litao,

When acks=all is used, we will only require all ISR replicas to ack before
returning, your understanding on this part is right.

What I meant is, that with acks=all, for the above example with {A, B, C},
if the ISR list still contains all three replicas, we will still require
ALL 3 of them to ack even if min.isr is only set to 2. So from A's point of
view, before she shrinks the ISR from {A, B, C} to {A, B} she cannot
respond a produce with ack=all; and when she wants to do that, she will get
an error from ZK tell her that the leader epoch is stale hence knowing that
she's no longer the leader. So with acks=all even when both A and B are
partitioned off we are still safe.

But with your proposal of acks=quorum, in the same case, A can actually
respond a produce request with ISR = {A, B, C} while the produced message
is only acked from A herself and B, because min.isr = 2 and hence it is
enough for a quorum. This will lead to the error case I described above
when the partition heals.

-------------

Besides this question, another issue that how the LEO-based leader election
will be implemented, which is still not covered in detail in your KIP wiki.
As discussed with Becket offline, one issue is that today the leader
election on the controller side is based on the synchronization barrier
provided by ZK, in the sense that leader will deterministically choose the
leader from the ZK's registered metadata which is guaranteed to be not
changed concurrently. However, if we now implement the new LEO-based
election based on, say, round trip communications between the controller
and the brokers, there is no such synchronization barrier, so for example
maybe right after the controller gets the LEO from A and B and determine A
has larger value, B gets a new batch of messages concurrently and hence its
LEO gets incremented than A's LEO. In this case the controller will choose
the leader who actually, does not have the longest LEO and hence will have
the same risk that some acked messages be truncated later because of that.

I'm wondering if you have already thought of this case, and if yes could
you complete the KIP wiki with a detailed description of the new leader
election implementation design?


Guozhang


On Mon, Feb 12, 2018 at 3:39 PM, Litao Deng <li...@airbnb.com.invalid>
wrote:

> Hey Guozhang. Not fully understand your following comments.
>
> The goal of my approach is to maintain the behavior of ack="all", which
> > happen to do better than what Kafka is actually guaranteed: when both A
> and
> > B are partitioned off, produced records will not be acked since "all"
> > requires all replicas (not only ISRs, my previous email has an incorrect
> > term) are required.
>
>
> My understanding is once the leader's HW exceeds the required offset, it
> will check whether the current ISR is equal or larger than the min.isr. In
> terms of the HW, it will increase once all of the ISR replicated the
> message. So I am a little bit confused about "requires all replicas (not
> only ISRs, my previous email has an incorrect term)".
>
> Do you mean all of the replicas (ISR and the out of sync replicas) should
> commit the message once acks='all'?
>
> Thanks!
>
> On Mon, Feb 12, 2018 at 3:02 PM, Litao Deng <li...@airbnb.com> wrote:
>
> > Folks. Thanks for all of the good discussions.
> >
> > Here are a few of my thoughts:
> >
> >    1. we won't change any existing semantics. That means, besides acks
> >    '-1 (all)', '0', '1', we will introduce a separate 'quorum' and
> document
> >    the semantic. 'quorum' is a totally different view of our replication
> >    protocol for the sake of better tail (P999) latency. I will advertise
> don't
> >    compare 'quorum' with '-1 (all)' and any other existing values.
> >    2. in terms of the controller functionality, I admit there are many
> >    awesome consensus protocols; however, for this specific KIP, I choose
> to
> >    minimize the impact/change on the controller code path.
> >       - we will keep the current controller's overall design and
> >       implementation by NOT introducing any consensus protocol.
> >       - we will introduce a topic level config "enable.quorum.acks"
> >       (default to false), and only accept acks='quorum' produce requests
> while
> >       the corresponding topic enabled this config. In this case, during
> the new
> >       leader election, we will only use the new LEO-based new leader
> election for
> >       the topics turned the "enable.quorum.acks" on. In this case, we
> only do
> >       LEO-based new leader election for the topics needed, and other
> topics won't
> >       pay the penalty.
> >
> > One requirement for this KIP is fully semantic backward compatible and
> > pay-as-you-go for the complexity of controller (longer new leader
> election
> > latency).
> >
> > Thoughts?
> >
> > On Mon, Feb 12, 2018 at 10:19 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> >> Hello Tao,
> >>
> >> No I was not proposing to change the mechanism of acks=all, and only
> >> mentioning that today even with acks=all the tolerance of failures is
> >> theoretically still bounded by min.isr settings though we do require all
> >> replicas in ISR (which may be larger than min.isr) to replicate before
> >> responding; this is what Jun mentioned may surprise many users today. I
> >> think with an additional "acks=quorum" can help resolve this, by
> requiring
> >> the num.acks >= majority (to make sure consistency is guaranteed with at
> >> most (X-1) / 2 failures with X number of replicas) AND num.acks >=
> min.isr
> >> (to specify if we want tolerate more failures than (X-1) / 2).
> >>
> >> The question then is, whether acks=all is still useful with introduced
> >> "quorum": if it is not, we can just replace the current semantics of
> "all"
> >> and document it. The example that we gave above, demonstrate that
> >> "acks=all" itself may still be useful even with the introduction of
> >> "quorum" since that scenario can be avoided by acks=all, but not
> >> acks=quorum as it requires ALL ISR replicas to replicate even if that
> >> number is larger than {min.isr} and also larger than the majority number
> >> (and if A is trying to shrink its ISR from {A,B,C} to {A,B} it will fail
> >> the ZK write since epoch has been incremented). Hence my proposal is to
> >> add
> >> a new config than replacing current semantics of "all".
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Sat, Feb 3, 2018 at 2:45 AM, tao xiao <xi...@gmail.com> wrote:
> >>
> >> > Hi Guozhang,
> >> >
> >> > Are you proposing changing semantic of ack=all to acknowledge message
> >> only
> >> > after all replicas (not all ISRs, which is what Kafka currently is
> >> doing)
> >> > have committed the message? This is equivalent to setting
> >> min.isr=number of
> >> > replicas, which makes ack=all much stricter than what Kafka has right
> >> now.
> >> > I think this may introduce surprise to users too as producer will not
> >> > succeed in producing a message to Kafka when one of the followers is
> >> down
> >> >
> >> > On Sat, 3 Feb 2018 at 15:26 Guozhang Wang <wa...@gmail.com> wrote:
> >> >
> >> > > Hi Dong,
> >> > >
> >> > > Could you elaborate a bit more how controller could affect leaders
> to
> >> > > switch between all and quorum?
> >> > >
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > > On Fri, Feb 2, 2018 at 10:12 PM, Dong Lin <li...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Hey Guazhang,
> >> > > >
> >> > > > Got it. Thanks for the detailed explanation. I guess my point is
> >> that
> >> > we
> >> > > > can probably achieve the best of both worlds, i.e. maintain the
> >> > existing
> >> > > > behavior of ack="all" while improving the tail latency.
> >> > > >
> >> > > > Thanks,
> >> > > > Dong
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Fri, Feb 2, 2018 at 8:43 PM, Guozhang Wang <wangguoz@gmail.com
> >
> >> > > wrote:
> >> > > >
> >> > > >> Hi Dong,
> >> > > >>
> >> > > >> Yes, in terms of fault tolerance "quorum" does not do better than
> >> > "all",
> >> > > >> as I said, with {min.isr} to X+1 Kafka is able to tolerate X
> >> failures
> >> > > only.
> >> > > >> So if A and B are partitioned off at the same time, then there
> are
> >> two
> >> > > >> concurrent failures and we do not guarantee all acked messages
> >> will be
> >> > > >> retained.
> >> > > >>
> >> > > >> The goal of my approach is to maintain the behavior of ack="all",
> >> > which
> >> > > >> happen to do better than what Kafka is actually guaranteed: when
> >> both
> >> > A
> >> > > and
> >> > > >> B are partitioned off, produced records will not be acked since
> >> "all"
> >> > > >> requires all replicas (not only ISRs, my previous email has an
> >> > incorrect
> >> > > >> term) are required. This is doing better than tolerating X
> >> failures,
> >> > > which
> >> > > >> I was proposing to keep, so that we would not introduce any
> >> regression
> >> > > >> "surprises" to users who are already using "all". In other words,
> >> > > "quorum"
> >> > > >> is trading a bit of failure tolerance that is strictly defined on
> >> > > min.isr
> >> > > >> for better tail latency.
> >> > > >>
> >> > > >>
> >> > > >> Guozhang
> >> > > >>
> >> > > >>
> >> > > >> On Fri, Feb 2, 2018 at 6:25 PM, Dong Lin <li...@gmail.com>
> >> wrote:
> >> > > >>
> >> > > >>> Hey Guozhang,
> >> > > >>>
> >> > > >>> According to the new proposal, with 3 replicas, min.isr=2 and
> >> > > >>> acks="quorum", it seems that acknowledged messages can still be
> >> > > truncated
> >> > > >>> in the network partition scenario you mentioned, right? So I
> guess
> >> > the
> >> > > goal
> >> > > >>> is for some user to achieve better tail latency at the cost of
> >> > > potential
> >> > > >>> message loss?
> >> > > >>>
> >> > > >>> If this is the case, then I think it may be better to adopt an
> >> > approach
> >> > > >>> where controller dynamically turn on/off this optimization. This
> >> > > provides
> >> > > >>> user with peace of mind (i.e. no message loss) while still
> >> reducing
> >> > > tail
> >> > > >>> latency. What do you think?
> >> > > >>>
> >> > > >>> Thanks,
> >> > > >>> Dong
> >> > > >>>
> >> > > >>>
> >> > > >>> On Fri, Feb 2, 2018 at 11:11 AM, Guozhang Wang <
> >> wangguoz@gmail.com>
> >> > > >>> wrote:
> >> > > >>>
> >> > > >>>> Hello Litao,
> >> > > >>>>
> >> > > >>>> Just double checking on the leader election details, do you
> have
> >> > time
> >> > > >>>> to complete the proposal on that part?
> >> > > >>>>
> >> > > >>>> Also Jun mentioned one caveat related to KIP-250 on the KIP-232
> >> > > >>>> discussion thread that Dong is working on, I figured it is
> worth
> >> > > pointing
> >> > > >>>> out here with a tentative solution:
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> ```
> >> > > >>>> Currently, if the producer uses acks=-1, a write will only
> >> succeed
> >> > if
> >> > > >>>> the write is received by all in-sync replicas (i.e.,
> committed).
> >> > This
> >> > > >>>> is true even when min.isr is set since we first wait for a
> >> message
> >> > to
> >> > > >>>> be committed and then check the min.isr requirement. KIP-250
> may
> >> > > >>>> change that, but we can discuss the implication there.
> >> > > >>>> ```
> >> > > >>>>
> >> > > >>>> The caveat is that, if we change the acking semantics in
> KIP-250
> >> > that
> >> > > >>>> we will only requires num of {min.isr} to acknowledge a
> produce,
> >> > then
> >> > > the
> >> > > >>>> above scenario will have a caveat: imagine you have {A, B, C}
> >> > > replicas of a
> >> > > >>>> partition with A as the leader, all in the isr list, and
> min.isr
> >> is
> >> > 2.
> >> > > >>>>
> >> > > >>>> 1. Say there is a network partition and both A and B are fenced
> >> > off. C
> >> > > >>>> is elected as the new leader, it shrinks its isr list to only
> >> {C};
> >> > > from A's
> >> > > >>>> point of view it does not know it becomes the "ghost" and no
> >> longer
> >> > > the
> >> > > >>>> leader, all it does is shrinking the isr list to {A, B}.
> >> > > >>>>
> >> > > >>>> 2. At this time, any new writes with ack=-1 to C will not be
> >> acked,
> >> > > >>>> since from C's pov there is only one replica. This is correct.
> >> > > >>>>
> >> > > >>>> 3. However, any writes that are send to A (NOTE this is totally
> >> > > >>>> possible, since producers would only refresh metadata
> >> periodically,
> >> > > >>>> additionally if they happen to ask A or B they will get the
> stale
> >> > > metadata
> >> > > >>>> that A's still the leader), since A thinks that isr list is {A,
> >> B}
> >> > > and as
> >> > > >>>> long as B has replicated the message, A can acked the produce.
> >> > > >>>>
> >> > > >>>>     This is not correct behavior, since when network heals, A
> >> would
> >> > > >>>> realize it is not the leader and will truncate its log. And
> hence
> >> > as a
> >> > > >>>> result the acked records are lost, violating Kafka's
> guarantees.
> >> And
> >> > > >>>> KIP-232 would not help preventing this scenario.
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> Although one can argue that, with 3 replicas and min.isr set to
> >> 2,
> >> > > >>>> Kafka is guaranteeing to tolerate only one failure, while the
> >> above
> >> > > >>>> scenario is actually two concurrent failures (both A and B are
> >> > > considered
> >> > > >>>> wedged), this is still a regression to the current version.
> >> > > >>>>
> >> > > >>>> So to resolve this issue, I'd propose we can change the
> >> semantics in
> >> > > >>>> the following way (this is only slightly different from your
> >> > > proposal):
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> 1. Add one more value to client-side acks config:
> >> > > >>>>
> >> > > >>>>    0: no acks needed at all.
> >> > > >>>>    1: ack from the leader.
> >> > > >>>>    all: ack from ALL the ISR replicas AND that current number
> of
> >> isr
> >> > > >>>> replicas has to be no smaller than {min.isr} (i.e. not changing
> >> this
> >> > > >>>> semantic).
> >> > > >>>>    quorum: this is the new value, it requires ack from enough
> >> number
> >> > > of
> >> > > >>>> ISR replicas no smaller than majority of the replicas AND no
> >> smaller
> >> > > than
> >> > > >>>> {min.isr}.
> >> > > >>>>
> >> > > >>>> 2. Clarify in the docs that if a user wants to tolerate X
> >> failures,
> >> > > she
> >> > > >>>> needs to set client acks=all or acks=quorum (better tail
> latency
> >> > than
> >> > > >>>> "all") with broker {min.sir} to be X+1; however, "all" is not
> >> > > necessarily
> >> > > >>>> stronger than "quorum":
> >> > > >>>>
> >> > > >>>> For example, with 3 replicas, and {min.isr} set to 2. Here is a
> >> list
> >> > > of
> >> > > >>>> scenarios:
> >> > > >>>>
> >> > > >>>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2
> of
> >> > > them.
> >> > > >>>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them.
> >> > > >>>> c. ISR list has 1: "all" and "quorum" would not ack.
> >> > > >>>>
> >> > > >>>> If {min.isr} is set to 1, interestingly, here would be the list
> >> of
> >> > > >>>> scenarios:
> >> > > >>>>
> >> > > >>>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2
> of
> >> > > them.
> >> > > >>>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them.
> >> > > >>>> c. ISR list has 1: "all" waits for leader to return, while
> >> "quorum"
> >> > > >>>> would not ack (because it requires that number > {min.isr}, AND
> >> >=
> >> > > >>>> {majority of num.replicas}, so its actually stronger than
> "all").
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> WDYT?
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> Guozhang
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> On Thu, Jan 25, 2018 at 8:13 PM, Dong Lin <lindong28@gmail.com
> >
> >> > > wrote:
> >> > > >>>>
> >> > > >>>>> Hey Litao,
> >> > > >>>>>
> >> > > >>>>> Not sure there will be an easy way to select the broker with
> >> > highest
> >> > > >>>>> LEO
> >> > > >>>>> without losing acknowledged message. In case it is useful,
> here
> >> is
> >> > > >>>>> another
> >> > > >>>>> idea. Maybe we can have a mechanism to turn switch between the
> >> > > min.isr
> >> > > >>>>> and
> >> > > >>>>> isr set for determining when to acknowledge a message.
> >> Controller
> >> > can
> >> > > >>>>> probably use RPC to request the current leader to use isr set
> >> > before
> >> > > it
> >> > > >>>>> sends LeaderAndIsrRequest for leadership change.
> >> > > >>>>>
> >> > > >>>>> Regards,
> >> > > >>>>> Dong
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>> On Wed, Jan 24, 2018 at 7:29 PM, Litao Deng
> >> > > >>>>> <li...@airbnb.com.invalid>
> >> > > >>>>> wrote:
> >> > > >>>>>
> >> > > >>>>> > Thanks Jun for the detailed feedback.
> >> > > >>>>> >
> >> > > >>>>> > Yes, for #1, I mean the live replicas from the ISR.
> >> > > >>>>> >
> >> > > >>>>> > Actually, I believe for all of the 4 new leader election
> >> > strategies
> >> > > >>>>> > (offline, reassign, preferred replica and controlled
> >> shutdown),
> >> > we
> >> > > >>>>> need to
> >> > > >>>>> > make corresponding changes. Will document the details in the
> >> KIP.
> >> > > >>>>> >
> >> > > >>>>> > On Wed, Jan 24, 2018 at 3:59 PM, Jun Rao <ju...@confluent.io>
> >> > wrote:
> >> > > >>>>> >
> >> > > >>>>> > > Hi, Litao,
> >> > > >>>>> > >
> >> > > >>>>> > > Thanks for the KIP. Good proposal. A few comments below.
> >> > > >>>>> > >
> >> > > >>>>> > > 1. The KIP says "select the live replica with the largest
> >> LEO".
> >> > > I
> >> > > >>>>> guess
> >> > > >>>>> > > what you meant is selecting the live replicas in ISR with
> >> the
> >> > > >>>>> largest
> >> > > >>>>> > LEO?
> >> > > >>>>> > >
> >> > > >>>>> > > 2. I agree that we can probably just reuse the current
> >> min.isr
> >> > > >>>>> > > configuration, but with a slightly different semantics.
> >> > > Currently,
> >> > > >>>>> if
> >> > > >>>>> > > min.isr is set, a user expects the record to be in at
> least
> >> > > min.isr
> >> > > >>>>> > > replicas on successful ack. This KIP guarantees this too.
> >> Most
> >> > > >>>>> people are
> >> > > >>>>> > > probably surprised that currently the ack is only sent
> back
> >> > after
> >> > > >>>>> all
> >> > > >>>>> > > replicas in ISR receive the record. This KIP will change
> the
> >> > ack
> >> > > >>>>> to only
> >> > > >>>>> > > wait on min.isr replicas, which matches the user's
> >> expectation
> >> > > and
> >> > > >>>>> gives
> >> > > >>>>> > > better latency. Currently, we guarantee no data loss if
> >> there
> >> > are
> >> > > >>>>> fewer
> >> > > >>>>> > > than replication factor failures. The KIP changes that to
> >> fewer
> >> > > >>>>> than
> >> > > >>>>> > > min.isr failures. The latter probably matches the user
> >> > > expectation.
> >> > > >>>>> > >
> >> > > >>>>> > > 3. I agree that the new leader election process is a bit
> >> more
> >> > > >>>>> > complicated.
> >> > > >>>>> > > The controller now needs to contact all replicas in ISR to
> >> > > >>>>> determine who
> >> > > >>>>> > > has the longest log. However, this happens infrequently.
> So,
> >> > it's
> >> > > >>>>> > probably
> >> > > >>>>> > > worth doing for the better latency in #2.
> >> > > >>>>> > >
> >> > > >>>>> > > 4. We have to think through the preferred leader election
> >> > > process.
> >> > > >>>>> > > Currently, the first assigned replica is preferred for
> load
> >> > > >>>>> balancing.
> >> > > >>>>> > > There is a process to automatically move the leader to the
> >> > > >>>>> preferred
> >> > > >>>>> > > replica when it's in sync. The issue is that the preferred
> >> > > replica
> >> > > >>>>> may no
> >> > > >>>>> > > be the replica with the longest log. Naively switching to
> >> the
> >> > > >>>>> preferred
> >> > > >>>>> > > replica may cause data loss when there are actually fewer
> >> > > failures
> >> > > >>>>> than
> >> > > >>>>> > > configured min.isr. One way to address this issue is to do
> >> the
> >> > > >>>>> following
> >> > > >>>>> > > steps during preferred leader election: (a) controller
> >> sends an
> >> > > RPC
> >> > > >>>>> > request
> >> > > >>>>> > > to the current leader; (b) the current leader stops taking
> >> new
> >> > > >>>>> writes
> >> > > >>>>> > > (sending a new error code to the clients) and returns its
> >> LEO
> >> > > >>>>> (call it L)
> >> > > >>>>> > > to the controller; (c) the controller issues an RPC
> request
> >> to
> >> > > the
> >> > > >>>>> > > preferred replica and waits its LEO to reach L; (d) the
> >> > > controller
> >> > > >>>>> > changes
> >> > > >>>>> > > the leader to the preferred replica.
> >> > > >>>>> > >
> >> > > >>>>> > > Jun
> >> > > >>>>> > >
> >> > > >>>>> > > On Wed, Jan 24, 2018 at 2:51 PM, Litao Deng
> >> > > >>>>> > <litao.deng@airbnb.com.invalid
> >> > > >>>>> > > >
> >> > > >>>>> > > wrote:
> >> > > >>>>> > >
> >> > > >>>>> > > > Sorry folks, just realized I didn't use the correct
> thread
> >> > > >>>>> format for
> >> > > >>>>> > the
> >> > > >>>>> > > > discussion. I started this new one and copied all of the
> >> > > >>>>> responses from
> >> > > >>>>> > > the
> >> > > >>>>> > > > old one.
> >> > > >>>>> > > >
> >> > > >>>>> > > > @Dong
> >> > > >>>>> > > > It makes sense to just use the min.insync.replicas
> >> instead of
> >> > > >>>>> > > introducing a
> >> > > >>>>> > > > new config, and we must make this change together with
> the
> >> > > >>>>> LEO-based
> >> > > >>>>> > new
> >> > > >>>>> > > > leader election.
> >> > > >>>>> > > >
> >> > > >>>>> > > > @Xi
> >> > > >>>>> > > > I thought about embedding the LEO information to the
> >> > > >>>>> ControllerContext,
> >> > > >>>>> > > > didn't find a way. Using RPC will make the leader
> election
> >> > > period
> >> > > >>>>> > longer
> >> > > >>>>> > > > and this should happen in very rare cases (broker
> failure,
> >> > > >>>>> controlled
> >> > > >>>>> > > > shutdown, preferred leader election and partition
> >> > > reassignment).
> >> > > >>>>> > > >
> >> > > >>>>> > > > @Jeff
> >> > > >>>>> > > > The current leader election is to pick the first replica
> >> from
> >> > > AR
> >> > > >>>>> which
> >> > > >>>>> > > > exists both in the live brokers and ISR sets. I agree
> with
> >> > you
> >> > > >>>>> about
> >> > > >>>>> > > > changing the current/default behavior will cause many
> >> > > >>>>> confusions, and
> >> > > >>>>> > > > that's the reason the title is "Add Support ...". In
> this
> >> > case,
> >> > > >>>>> we
> >> > > >>>>> > > wouldn't
> >> > > >>>>> > > > break any current promises and provide a separate option
> >> for
> >> > > our
> >> > > >>>>> user.
> >> > > >>>>> > > > In terms of KIP-250, I feel it is more like the
> >> > > "Semisynchronous
> >> > > >>>>> > > > Replication" in the MySQL world, and yes it is something
> >> > > between
> >> > > >>>>> acks=1
> >> > > >>>>> > > and
> >> > > >>>>> > > > acks=insync.replicas. Additionally, I feel KIP-250 and
> >> > KIP-227
> >> > > >>>>> are
> >> > > >>>>> > > > two orthogonal improvements. KIP-227 is to improve the
> >> > > >>>>> replication
> >> > > >>>>> > > protocol
> >> > > >>>>> > > > (like the introduction of parallel replication in
> MySQL),
> >> and
> >> > > >>>>> KIP-250
> >> > > >>>>> > is
> >> > > >>>>> > > an
> >> > > >>>>> > > > enhancement for the replication architecture (sync,
> >> > semi-sync,
> >> > > >>>>> and
> >> > > >>>>> > > async).
> >> > > >>>>> > > >
> >> > > >>>>> > > >
> >> > > >>>>> > > > Dong Lin
> >> > > >>>>> > > >
> >> > > >>>>> > > > > Thanks for the KIP. I have one quick comment before
> you
> >> > > >>>>> provide more
> >> > > >>>>> > > > detail
> >> > > >>>>> > > > > on how to select the leader with the largest LEO.
> >> > > >>>>> > > > > Do you think it would make sense to change the default
> >> > > >>>>> behavior of
> >> > > >>>>> > > > acks=-1,
> >> > > >>>>> > > > > such that broker will acknowledge the message once the
> >> > > message
> >> > > >>>>> has
> >> > > >>>>> > been
> >> > > >>>>> > > > > replicated to min.insync.replicas brokers? This would
> >> allow
> >> > > us
> >> > > >>>>> to
> >> > > >>>>> > keep
> >> > > >>>>> > > > the
> >> > > >>>>> > > > > same durability guarantee, improve produce request
> >> latency
> >> > > >>>>> without
> >> > > >>>>> > > > having a
> >> > > >>>>> > > > > new config.
> >> > > >>>>> > > >
> >> > > >>>>> > > >
> >> > > >>>>> > > > Hu Xi
> >> > > >>>>> > > >
> >> > > >>>>> > > > > Currently,  with holding the assigned replicas(AR) for
> >> all
> >> > > >>>>> > partitions,
> >> > > >>>>> > > > > controller is now able to elect new leaders by
> selecting
> >> > the
> >> > > >>>>> first
> >> > > >>>>> > > > replica
> >> > > >>>>> > > > > of AR which occurs in both live replica set and ISR.
> If
> >> > > >>>>> switching to
> >> > > >>>>> > > the
> >> > > >>>>> > > > > LEO-based strategy, controller context might need to
> be
> >> > > >>>>> enriched or
> >> > > >>>>> > > > > augmented to store those values.  If retrieving those
> >> LEOs
> >> > > >>>>> real-time,
> >> > > >>>>> > > > > several rounds of RPCs are unavoidable which seems to
> >> > violate
> >> > > >>>>> the
> >> > > >>>>> > > > original
> >> > > >>>>> > > > > intention of this KIP.​
> >> > > >>>>> > > >
> >> > > >>>>> > > >
> >> > > >>>>> > > > Jeff Widman
> >> > > >>>>> > > >
> >> > > >>>>> > > > > I agree with Dong, we should see if it's possible to
> >> change
> >> > > the
> >> > > >>>>> > default
> >> > > >>>>> > > > > behavior so that as soon as min.insync.replicas
> brokers
> >> > > >>>>> respond than
> >> > > >>>>> > > the
> >> > > >>>>> > > > > broker acknowledges the message back to the client
> >> without
> >> > > >>>>> waiting
> >> > > >>>>> > for
> >> > > >>>>> > > > > additional brokers who are in the in-sync replica list
> >> to
> >> > > >>>>> respond. (I
> >> > > >>>>> > > > > actually thought it already worked this way).
> >> > > >>>>> > > > > As you implied in the KIP though, changing this
> default
> >> > > >>>>> introduces a
> >> > > >>>>> > > > weird
> >> > > >>>>> > > > > state where an in-sync follower broker is not
> >> guaranteed to
> >> > > >>>>> have a
> >> > > >>>>> > > > > message...
> >> > > >>>>> > > > > So at a minimum, the leadership failover algorithm
> would
> >> > need
> >> > > >>>>> to be
> >> > > >>>>> > > sure
> >> > > >>>>> > > > to
> >> > > >>>>> > > > > pick the most up-to-date follower... I thought it
> >> already
> >> > did
> >> > > >>>>> this?
> >> > > >>>>> > > > > But if multiple brokers fail in quick succession,
> then a
> >> > > >>>>> broker that
> >> > > >>>>> > > was
> >> > > >>>>> > > > in
> >> > > >>>>> > > > > the ISR could become a leader without ever receiving
> the
> >> > > >>>>> message...
> >> > > >>>>> > > > > violating the current promises of
> >> unclean.leader.election.
> >> > > >>>>> > > > enable=False...
> >> > > >>>>> > > > > so changing the default might be not be a tenable
> >> solution.
> >> > > >>>>> > > > > What also jumped out at me in the KIP was the goal of
> >> > > reducing
> >> > > >>>>> p999
> >> > > >>>>> > > when
> >> > > >>>>> > > > > setting replica lag time at 10 seconds(!!)... I
> >> understand
> >> > > the
> >> > > >>>>> desire
> >> > > >>>>> > > to
> >> > > >>>>> > > > > minimize frequent ISR shrink/expansion, as I face this
> >> same
> >> > > >>>>> issue at
> >> > > >>>>> > my
> >> > > >>>>> > > > day
> >> > > >>>>> > > > > job. But what you're essentially trying to do here is
> >> > create
> >> > > an
> >> > > >>>>> > > > additional
> >> > > >>>>> > > > > replication state that is in-between acks=1 and acks =
> >> ISR
> >> > to
> >> > > >>>>> paper
> >> > > >>>>> > > over
> >> > > >>>>> > > > a
> >> > > >>>>> > > > > root problem of ISR shrink/expansion...
> >> > > >>>>> > > > > I'm just wary of shipping more features (and more
> >> > operational
> >> > > >>>>> > > confusion)
> >> > > >>>>> > > > if
> >> > > >>>>> > > > > it's only addressing the symptom rather than the root
> >> > cause.
> >> > > >>>>> For
> >> > > >>>>> > > example,
> >> > > >>>>> > > > > my day job's problem is we run a very high number of
> >> > > >>>>> low-traffic
> >> > > >>>>> > > > > partitions-per-broker, so the fetch requests hit many
> >> > > >>>>> partitions
> >> > > >>>>> > before
> >> > > >>>>> > > > > they fill. Solving that requires changing our
> >> architecture
> >> > +
> >> > > >>>>> making
> >> > > >>>>> > the
> >> > > >>>>> > > > > replication protocol more efficient (KIP-227).
> >> > > >>>>> > > >
> >> > > >>>>> > > >
> >> > > >>>>> > > > On Tue, Jan 23, 2018 at 10:02 PM, Litao Deng <
> >> > > >>>>> litao.deng@airbnb.com>
> >> > > >>>>> > > > wrote:
> >> > > >>>>> > > >
> >> > > >>>>> > > > > Hey folks. I would like to add a feature to support
> the
> >> > > >>>>> quorum-based
> >> > > >>>>> > > > > acknowledgment for the producer request. We have been
> >> > > running a
> >> > > >>>>> > > modified
> >> > > >>>>> > > > > version of Kafka on our testing cluster for weeks, the
> >> > > >>>>> improvement of
> >> > > >>>>> > > > P999
> >> > > >>>>> > > > > is significant with very stable latency.
> Additionally, I
> >> > > have a
> >> > > >>>>> > > proposal
> >> > > >>>>> > > > to
> >> > > >>>>> > > > > achieve a similar data durability as with the
> >> > > >>>>> insync.replicas-based
> >> > > >>>>> > > > > acknowledgment through LEO-based leader election.
> >> > > >>>>> > > > >
> >> > > >>>>> > > > > https://cwiki.apache.org/
> confluence/display/KAFKA/KIP-
> >> > > >>>>> > > > > 250+Add+Support+for+Quorum-based+Producer+Acknowledge
> >> > > >>>>> > > > >
> >> > > >>>>> > > >
> >> > > >>>>> > >
> >> > > >>>>> >
> >> > > >>>>>
> >> > > >>>>
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> --
> >> > > >>>> -- Guozhang
> >> > > >>>>
> >> > > >>>
> >> > > >>>
> >> > > >>
> >> > > >>
> >> > > >> --
> >> > > >> -- Guozhang
> >> > > >>
> >> > > >
> >> > > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang