You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Unmesh Joshi <un...@gmail.com> on 2020/09/01 14:11:25 UTC

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

>>Yes, I agree that the lease timeout on the controller side should be
reset.The alternative would be to track the lease as hard state rather than
soft state, but I think that is not really >> >> needed, and would result
in more log entries.
On the related note, I think it will be good to add some details on how
leases are maintained in case of active controller (raft leader) failure.
e.g. Attached a few diagrams considering leases are maintained with log
entries.

On Mon, Aug 31, 2020 at 6:28 PM Unmesh Joshi <un...@gmail.com> wrote:

> >>The reason for including LeaseStartTimeMs in the request is to ensure
> that the time required to communicate with the controller gets included in
> >>the lease time.  Since requests can potentially be delayed in the network
> for a long time, this is important.
> The network time will be added anyway, because the lease timer on the
> active controller will start only after the heartbeat request reaches the
> server. And I think, some assumption about network round trip time is
> needed anyway to decide on the frequency of the heartbeat (
> registration.heartbeat.interval.ms), and lease timeout (
> registration.lease.timeout.ms). So I think just having a leaseTTL in the
> request is easier to understand and implement.
> >>>Yes, I agree that the lease timeout on the controller side should be
> reset in the case of controller failover.  The alternative would be to
> track the >>>lease as hard state rather than soft state, but I think that
> is not really needed, and would result in more log entries.
> My interpretation of the mention of BrokerRecord in the KIP was that this
> record exists in the Raft log. By soft state, do you mean the broker
> records exist only on the active leader and will not be replicated in the
> raft log? If the live brokers list is maintained only on the active
> controller (raft leader), then, in case of leader failure, there will be a
> window where the new leader does not know about the live brokers, till the
> brokers establish the leases again.
> I think it will be safer to have leases as a hard state managed by
> standard Raft replication.
> Or am I misunderstanding something? (I assume that with soft state, you
> mean something like zookeeper local sessions
> https://issues.apache.org/jira/browse/ZOOKEEPER-1147.)
>
> >>Our code is single threaded as well.  I think it makes sense for the
> controller, since otherwise locking becomes very messy.  I'm not sure I
> >>understand your question about duplicate broker ID detection, though.
> There's a section in the KIP about this -- is there a detail we should add
> ?>>there?
> I assumed broker leases are implemented as a hard state. In that case, to
> check for broker id conflict, we need to check the broker ids at two places
> 1. Pending broker registrations (which are yet to be committed) 2. Already
> committed broker registrations.
>
> Thanks,
> Unmesh
>
>
>
> On Mon, Aug 31, 2020 at 5:42 PM Colin McCabe <cm...@apache.org> wrote:
>
>> On Sat, Aug 29, 2020, at 01:12, Unmesh Joshi wrote:
>> > >>>Can you repeat your questions about broker leases?
>> >
>> > >>>>The LeaseStartTimeMs is expected to be the broker's
>> > 'System.currentTimeMillis()' at the point of the request. The active
>> > controller will add its lease period to this in order >>>>to compute the
>> > LeaseEndTimeMs.
>> >
>> > I think the use of LeaseStartTimeMs and LeaseEndTimeMs in the KIP is a
>> > bit
>> > confusing.  Monotonic Clock (System.nanoTime) on the active controller
>> > should be used to track leases.
>> > (For example,
>> >
>> https://issues.apache.org/jira/browse/ZOOKEEPER-1616https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114
>> > )
>> >
>> > Then we will not need LeaseStartTimeMs?
>> > Instead of LeaseStartTimeMs, can we call it LeaseTTL? The active
>> controller
>> > can then calculate LeaseEndTime = System.nanoTime() + LeaseTTL.
>> > In this case we might just drop LeaseEndTimeMs from the response, as the
>> > broker already knows about the TTL and can send heartbeats at some
>> fraction
>> > of TTL, say every TTL/4 milliseconds.(elapsed time on the broker
>> measured
>> > by System.nanoTime)
>> >
>>
>> Hi Unmesh,
>>
>> I agree that the monotonic clock is probably a better idea here.  It is
>> good to be robust against wall clock changes, although I think a cluster
>> which had them might suffer other issues.  I will change it to specify a
>> monotonic clock.
>>
>> The reason for including LeaseStartTimeMs in the request is to ensure
>> that the time required to communicate with the controller gets included in
>> the lease time.  Since requests can potentially be delayed in the network
>> for a long time, this is important.
>>
>> >
>> > I have a prototype built to demonstrate this as following:
>> >
>> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
>> >
>> > The Kip631Controller itself depends on a Consensus module, to
>> demonstrate
>> > how possible interactions with the consensus module will look like
>> >  (The Consensus can be pluggable really, with an API to allow reading
>> > replicated log upto HighWaterMark)
>> >
>> > It has an implementation of LeaseTracker
>> >
>> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/LeaderLeaseTracker.scala
>> > to demonstrate LeaseTracker's interaction with the consensus module.
>> >
>> > The implementation has the following aspects:
>> > 1. The lease tracking happens only on the active controller (raft
>> > leader)
>> > 2. Once the lease expires, it needs to propose and commit a FenceBroker
>> > record for that lease.
>> > 3. In case of active controller failure, the lease will be tracked by
>> > the
>> > newly raft leader. The new raft leader starts the lease timer again, (as
>> > implemented in onBecomingLeader method of
>> >
>> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
>> > )
>> > in effect extending the lease by the time spent in the leader election
>> > and
>> > whatever time was elapsed on the old leader.
>>
>> Yes, I agree that the lease timeout on the controller side should be
>> reset in the case of controller failover.  The alternative would be to
>> track the lease as hard state rather than soft state, but I think that is
>> not really needed, and would result in more log entries.
>>
>> >
>> > There are working tests for this implementation here.
>> >
>> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala
>> > and an end to end test here
>> >
>> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/ProducerConsumerKIP500Test.scala
>> > <
>> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala
>> >
>> > >>'m not sure what you mean by "de-duplication of the broker."  Can you
>> > give a little more context?
>> > Apologies for using the confusing term deduplication. I meant broker id
>> > conflict.
>> > As you can see in the prototype handleRequest of KIP631Controller
>> > <
>> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
>> >,
>> > the duplicate broker id needs to be detected before the BrokerRecord is
>> > submitted to the raft module.
>> > Also as implemented in the prototype, the KIP631Controller is single
>> > threaded, handling requests one at a time. (an example of
>> >
>> https://martinfowler.com/articles/patterns-of-distributed-systems/singular-update-queue.html
>> > )
>>
>> Our code is single threaded as well.  I think it makes sense for the
>> controller, since otherwise locking becomes very messy.  I'm not sure I
>> understand your question about duplicate broker ID detection, though.
>> There's a section in the KIP about this -- is there a detail we should add
>> there?
>>
>> best,
>> Colin
>>
>>
>> >
>> > Thanks,
>> > Unmesh
>> >
>> > On Sat, Aug 29, 2020 at 10:49 AM Colin McCabe <co...@cmccabe.xyz>
>> wrote:
>> >
>> > > On Fri, Aug 28, 2020, at 19:36, Unmesh Joshi wrote:
>> > > > Hi Colin,
>> > > >
>> > > > There were a few of questions I had..
>> > >
>> > > Hi Unmesh,
>> > >
>> > > Thanks for the response.
>> > >
>> > > >
>> > > > 1. Were my comments on the broker lease implementation (and
>> corresponding
>> > > > prototype) appropriate and do we need to change the KIP
>> > > > description accordingly?.
>> > > >
>> > >
>> > > Can you repeat your questions about broker leases?
>> > >
>> > > >
>> > > > 2. How will broker epochs be generated? I am assuming it can be the
>> > > > committed log offset (like zxid?)
>> > > >
>> > >
>> > > There isn't any need to use a log offset.  We can just look at an
>> > > in-memory hash table and see what the latest number is, and add one,
>> to
>> > > generate a new broker epoch.
>> > >
>> > > >
>> > > > 3. How will producer registration happen? I am assuming it should be
>> > > > similar to broker registration, with a similar way to generate
>> producer
>> > > id.
>> > > >
>> > >
>> > > For the EOS stuff, we will need a few new RPCs to the controller.  I
>> think
>> > > we should do that in a follow-on KIP, though, since this one is
>> already
>> > > pretty big.
>> > >
>> > > >
>> > > > 4. Because we expose Raft log to all the brokers, any
>> de-duplication of
>> > > the
>> > > > broker needs to happen before the requests are proposed to Raft.
>> For this
>> > > > the controller needs to be single threaded, and should do validation
>> > > > against the in-process or pending requests and the final state. I
>> read a
>> > > > mention of this, in the responses in this thread.Will it be useful
>> to
>> > > > mention this in the KIP?
>> > > >
>> > >
>> > > I'm not sure what you mean by "de-duplication of the broker."  Can you
>> > > give a little more context?
>> > >
>> > > best,
>> > > Colin
>> > >
>> > > >
>> > > > Thanks,
>> > > > Unmesh
>> > > >
>> > > > On Sat, Aug 29, 2020 at 4:50 AM Colin McCabe <cm...@apache.org>
>> wrote:
>> > > >
>> > > > > Hi all,
>> > > > >
>> > > > > I'm thinking of calling a vote on KIP-631 on Monday.  Let me know
>> if
>> > > > > there's any more comments I should address before I start the
>> vote.
>> > > > >
>> > > > > cheers,
>> > > > > Colin
>> > > > >
>> > > > > On Tue, Aug 11, 2020, at 05:39, Unmesh Joshi wrote:
>> > > > > > >>Hi Unmesh,
>> > > > > > >>Thanks, I'll take a look.
>> > > > > > Thanks. I will be adding more to the prototype and will be
>> happy to
>> > > help
>> > > > > > and collaborate.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Unmesh
>> > > > > >
>> > > > > > On Tue, Aug 11, 2020 at 12:28 AM Colin McCabe <
>> cmccabe@apache.org>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi Jose,
>> > > > > > >
>> > > > > > > That'a s good point that I hadn't considered.  It's probably
>> worth
>> > > > > having
>> > > > > > > a separate leader change message, as you mentioned.
>> > > > > > >
>> > > > > > > Hi Unmesh,
>> > > > > > >
>> > > > > > > Thanks, I'll take a look.
>> > > > > > >
>> > > > > > > best,
>> > > > > > > Colin
>> > > > > > >
>> > > > > > >
>> > > > > > > On Fri, Aug 7, 2020, at 11:56, Jose Garcia Sancio wrote:
>> > > > > > > > Hi Unmesh,
>> > > > > > > >
>> > > > > > > > Very cool prototype!
>> > > > > > > >
>> > > > > > > > Hi Colin,
>> > > > > > > >
>> > > > > > > > The KIP proposes a record called IsrChange which includes
>> the
>> > > > > > > > partition, topic, isr, leader and leader epoch. During
>> normal
>> > > > > > > > operation ISR changes do not result in leader changes.
>> Similarly,
>> > > > > > > > leader changes do not necessarily involve ISR changes. The
>> > > controller
>> > > > > > > > implementation that uses ZK modeled them together because
>> > > > > > > > 1. All of this information is stored in one znode.
>> > > > > > > > 2. ZK's optimistic lock requires that you specify the new
>> value
>> > > > > > > completely
>> > > > > > > > 3. The change to that znode was being performed by both the
>> > > > > controller
>> > > > > > > > and the leader.
>> > > > > > > >
>> > > > > > > > None of these reasons are true in KIP-500. Have we
>> considered
>> > > having
>> > > > > > > > two different records? For example
>> > > > > > > >
>> > > > > > > > 1. IsrChange record which includes topic, partition, isr
>> > > > > > > > 2. LeaderChange record which includes topic, partition,
>> leader
>> > > and
>> > > > > > > leader epoch.
>> > > > > > > >
>> > > > > > > > I suspect that making this change will also require
>> changing the
>> > > > > > > > message AlterIsrRequest introduced in KIP-497: Add
>> inter-broker
>> > > API
>> > > > > to
>> > > > > > > > alter ISR.
>> > > > > > > >
>> > > > > > > > Thanks
>> > > > > > > > -Jose
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>