You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Patrick Huang <hz...@hotmail.com> on 2018/10/10 06:46:02 UTC

[DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Hi All,

Please find the below KIP which proposes the concept of broker generation to resolve issues caused by controller missing broker state changes and broker processing outdated control requests.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-380%3A+Detect+outdated+control+requests+and+bounced+brokers+using+broker+generation

All comments are appreciated.

Best,
Zhanxiang (Patrick) Huang

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Patrick Huang <hz...@hotmail.com>.
Hi Becket, Dong,

Thanks for the discussion. Those are very good points. I think it makes sense to send back STALE_BROKER_EPOCH error to the broker in both cases:

  1.  Broker gets quickly restarted. In this case, the channel has already been closed during broker shutdown so the broker will not react to the error.
  2.  Broker gets disconnected from zk and reconnect. In this case, the broker will see the error and will resend the ControlledShutdownRequest with a newer broker epoch.

I have also updated the KIP accordingly to include what we have discussed. Thanks again!

Best,
Zhanxiang (Patrick) Huang

________________________________
From: Becket Qin <be...@gmail.com>
Sent: Monday, November 12, 2018 21:56
To: Dong Lin
Cc: dev
Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Hi Dong,

That is a good point. But I think the STALE_BROKER_EPOCH error may still be
sent to the broker. For example, think about the following case:

1. Broker sends a ControlledShutdownRequest to the controller
2. Broker had a ZK session timeout
3. Broker created the ephemeral node
4. Controller processes the ControlledShutdownRequest in step 1
5. Broker receives a ControlledShutdownResponse with STALE_BROKER_EPOCH.

However, in this case, the broker should probably resend the controlled
shutdown request again with the new epoch. So it looks that returning a
STALE_BROKER_EPOCH is the correct behavior. If the broker has really been
bounced, that response will not be delivered to the broker. If the broker
has not really restarted, it will just resend the ControlledShutdownRequest
with the current epoch again.

It might worth updating the KIP wiki to mention this behavior.

Thanks,

Jiangjie (Becket) Qin


On Tue, Nov 13, 2018 at 2:04 AM Dong Lin <li...@gmail.com> wrote:

> Hey Becket, Patrick,
>
> Currently we expect that controller can only receive ControlledShutdownRequest
> with outdated broker epoch in two cases: 1) the ControlledShutdownRequest
> was sent by the broker before it has restarted; and 2) there is bug.
>
> In case 1), it seems that ControlledShutdownResponse will not be delivered
> to the broker as the channel should have been disconnected. Thus there is
> no confusion to the broker because broker will not receive response
> with STALE_BROKER_EPOCH error.
>
> In case 2), it seems that it can be useful to still deliver ControlledShutdownResponse
> with STALE_BROKER_EPOCH so that this broker at least knows that which
> response is not accepted. This helps us debug this issue.
>
> Also, in terms of both design and implementation, it seems simpler to
> still define a response for a given invalid request rather than have
> special path to skip response for the invalid request. Does this sound
> reasonable?
>
> Thanks,
> Dong
>
>
> On Mon, Nov 12, 2018 at 9:52 AM Patrick Huang <hz...@hotmail.com> wrote:
>
>> Hi Becket,
>>
>> I think you are right. STALE_BROKER_EPOCH only makes sense when the
>> broker detects outdated control requests and wants the controller to know
>> about that. For ControlledShutdownRequest, the controller should just
>> ignore the request with stale broker epoch since the broker does not need
>> and will not do anything for STALE_BROKER_EPOCH response. Thanks for
>> pointing it out.
>>
>> Thanks,
>> Zhanxiang (Patrick) Huang
>>
>> ________________________________
>> From: Becket Qin <be...@gmail.com>
>> Sent: Monday, November 12, 2018 6:46
>> To: dev
>> Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
>> bounced brokers using broker generation
>>
>> Hi Patrick,
>>
>> I am wondering why the controller should send STALE_BROKER_EPOCH error to
>> the broker if the broker epoch is stale? Would this be a little confusing
>> to the current broker if the request was sent by a broker with previous
>> epoch? Should the controller just ignore those requests in that case?
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Fri, Nov 9, 2018 at 2:17 AM Patrick Huang <hz...@hotmail.com> wrote:
>>
>> > Hi,
>> >
>> > In this KIP, we are also going to add a new exception and a new error
>> code
>> > "STALE_BROKER_EPOCH" in order to allow the broker to respond back the
>> right
>> > error when it sees outdated broker epoch in the control requests. Since
>> > adding a new exception and error code is also considered as public
>> > interface change, I have updated the original KIP accordingly to include
>> > this change. Feel free to comment if there is any concern.
>> >
>> > Thanks,
>> > Zhanxiang (Patrick) Huang
>> >
>> > ________________________________
>> > From: Patrick Huang <hz...@hotmail.com>
>> > Sent: Tuesday, October 23, 2018 6:20
>> > To: Jun Rao; dev@kafka.apache.org
>> > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
>> > bounced brokers using broker generation
>> >
>> > Agreed. I have updated the PR to add czxid in ControlledShutdownRequest
>> (
>> > https://github.com/apache/kafka/pull/5821). Appreciated if you can
>> take a
>> > look.
>> >
>> > Btw, I also have the vote thread for this KIP:
>> >
>> https://lists.apache.org/thread.html/3689d83db537d5aa86d10967dee7ee29578897fc123daae4f77a8605@%3Cdev.kafka.apache.org%3E
>> >
>> > Best,
>> > Zhanxiang (Patrick) Huang
>> >
>> > ________________________________
>> > From: Jun Rao <ju...@confluent.io>
>> > Sent: Monday, October 22, 2018 21:31
>> > To: dev@kafka.apache.org
>> > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
>> > bounced brokers using broker generation
>> >
>> > Hi, Patrick,
>> >
>> > Yes, that's the general sequence. After step 2, the shutting down broker
>> > can give up the controlled shutdown process and proceed to shut down.
>> When
>> > it's restarted, it could still receive StopReplica requests from the
>> > controller in reaction to the previous controlled shutdown requests.
>> This
>> > could lead the restarted broker to a bad state.
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> >
>> > On Mon, Oct 22, 2018 at 4:32 PM, Patrick Huang <hz...@hotmail.com>
>> wrote:
>> >
>> > > Hi Jun,
>> > >
>> > > That is a good point. I want to make it clear about the scenario you
>> > > mentioned. Correct me if I am wrong. Here is the sequence of the
>> event:
>> > >
>> > >    1. Broker sends ControlledShutdown request 1 to controller
>> > >    2. Broker sends ControlledShutdown request 2 to controller due to
>> > >    reties
>> > >    3. Controller processes ControlledShutdown request 1
>> > >    4. Controller sends control requests to the broker
>> > >    5. Broker receives ControlledShutdown response 1 from controller
>> > >    6. Broker shuts down and restarts quickly
>> > >    7. Controller processes ControllerShutdown request 2
>> > >    8. Controller sends control requests to the broker
>> > >
>> > > It is possible that controller processes the broker change event
>> between
>> > > 6) and 7). In this case, controller already updates the cached czxid
>> to
>> > the
>> > > up-to-date ones so the bounced broker will not reject control
>> requests in
>> > > 8), which cause a correctness problem.
>> > >
>> > >
>> > > Best,
>> > > Zhanxiang (Patrick) Huang
>> > >
>> > > ------------------------------
>> > > *From:* Jun Rao <ju...@confluent.io>
>> > > *Sent:* Monday, October 22, 2018 14:45
>> > > *To:* dev
>> > > *Subject:* Re: [DISCUSS] KIP-380: Detect outdated control requests and
>> > > bounced brokers using broker generation
>> > >
>> > > Hi, Patrick,
>> > >
>> > > There is another thing that may be worth considering.
>> > >
>> > > 10. It will be useful to include the czxid also in the
>> ControlledShutdown
>> > > request. This way, if the broker has been restarted, the controller
>> can
>> > > ignore an old ControlledShutdown request(e.g., due to retries). This
>> will
>> > > prevent the restarted broker from incorrectly stopping replicas.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Thu, Oct 11, 2018 at 5:56 PM, Patrick Huang <
>> hzxa21.huang@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi Jun,
>> > > >
>> > > > Thanks a lot for the comments.
>> > > >
>> > > > 1. czxid is globally unique and monotonically increasing based on
>> the
>> > > > zookeeper doc.
>> > > > References (from
>> > > > https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
>> > > > "Every change to the ZooKeeper state receives a stamp in the form
>> of a
>> > > > *zxid* (ZooKeeper Transaction Id). This exposes the total ordering
>> of
>> > all
>> > > > changes to ZooKeeper. Each change will have a unique zxid and if
>> zxid1
>> > is
>> > > > smaller than zxid2 then zxid1 happened before zxid2."
>> > > > "czxid: The zxid of the change that caused this znode to be
>> created."
>> > > >
>> > > > 2. You are right. There will be only on broker change event fired in
>> > the
>> > > > case I mentioned because we will not register the watcher before the
>> > > read.
>> > > >
>> > > > 3. Let's say we want to initialize the states of broker set A and we
>> > want
>> > > > the cluster to be aware of the absence of broker set B. The
>> currently
>> > > live
>> > > > broker set in the cluster is C.
>> > > >
>> > > >     From the design point of view, here are the rules for broker
>> state
>> > > > transition:
>> > > >     - Pass in broker ids of A for onBrokerStartup() and pass in
>> broker
>> > > ids
>> > > > of B for onBrokerFailure().
>> > > >     - When processing onBrokerStartup(), we use the broker
>> generation
>> > > > controller read from zk to send requests to broker set A and use the
>> > > > previously cached broker generation to send requests to (C-A).
>> > > >     - When processing onBrokerFailure(), we use the previously
>> cached
>> > > > broker generation to send requests to C.
>> > > >
>> > > >     From the implementation point of view, here are the steps we
>> need
>> > to
>> > > > follow when processing BrokerChangeEvent:
>> > > >     -  Reads all child nodes in /brokers/ids/ to get current brokers
>> > with
>> > > > broker generation
>> > > >     -  Detect new brokers, dead brokers and bounced brokers
>> > > >     -  Update the live broker ids in controller context
>> > > >     -  Update broker generations for new brokers in controller
>> context
>> > > >     -  Invoke onBrokerStartup(new brokers)
>> > > >     -  Invoke onBrokerFailure(bounced brokers)
>> > > >     -  Update broker generations for bounce brokers in controller
>> > context
>> > > >     -  Invoke onBrokerStartup(bounced brokers)
>> > > >     -  Invoke onBrokerFailure(dead brokers)
>> > > >     We can further optimize the flow by avoiding sending requests
>> to a
>> > > > broker if its broker generation is larger than the one in the
>> > controller
>> > > > context.
>> > > >
>> > > > I will also update the KIP to clarify how it works for
>> > BrokerChangeEvent
>> > > > processing in more detail.
>> > > >
>> > > > Thanks,
>> > > > Patrick
>> > > >
>> > > >
>> > > >
>> > > > On Thu, Oct 11, 2018 at 12:12 PM Jun Rao <ju...@confluent.io> wrote:
>> > > >
>> > > > > Hi, Patrick,
>> > > > >
>> > > > > Thanks for the KIP. Looks good to me overall and very useful. A
>> few
>> > > > > comments below.
>> > > > >
>> > > > > 1. "will reject the requests with smaller broker generation than
>> its
>> > > > > current generation." Is czxid monotonically increasing?
>> > > > >
>> > > > > 2. To clarify on the issue of the controller missing a ZK
>> watcher. ZK
>> > > > > watchers are one-time watchers. Once a watcher is fired, one
>> needs to
>> > > > > register it again before the watcher can be triggered. So, when
>> the
>> > > > > controller is busy and a broker goes down and comes up, the first
>> > event
>> > > > > will trigger the ZK watcher. Since the controller is busy and
>> hasn't
>> > > > > registered the watcher again, the second event actually won't
>> fire.
>> > By
>> > > > the
>> > > > > time the controller reads from ZK, it sees that the broker is
>> still
>> > > > > registered and thus thinks that nothing has happened to the
>> broker,
>> > > which
>> > > > > is causing the problem.
>> > > > >
>> > > > > 3. "Handle broker state change: invoke onBrokerFailure(...) first,
>> > then
>> > > > > invoke onBrokerStartUp(...)". We probably want to be a bit careful
>> > > here.
>> > > > > Could you clarify the broker list and the broker epoch used when
>> > making
>> > > > > these calls? We want to prevent the restarted broker from
>> receiving a
>> > > > > partial replica list on the first LeaderAndIsr request because of
>> > this.
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > > On Wed, Oct 10, 2018 at 12:51 PM, Patrick Huang <
>> hzxa21@hotmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Hey Stanislav,
>> > > > > >
>> > > > > > Sure. Thanks for your interest in this KIP. I am glad to provide
>> > more
>> > > > > > detail.
>> > > > > >
>> > > > > > broker A is initiating a controlled shutdown (restart). The
>> > > Controller
>> > > > > > sends a StopReplicaRequest but it reaches broker A after it has
>> > > started
>> > > > > up
>> > > > > > again. He therefore stops replicating those partitions even
>> though
>> > he
>> > > > > > should just be starting to
>> > > > > > This is right.
>> > > > > >
>> > > > > > Controller sends a LeaderAndIsrRequest before broker A
>> initiates a
>> > > > > restart.
>> > > > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
>> > > > therefore
>> > > > > > starts leading for the partitions sent by that request and might
>> > stop
>> > > > > > leading partitions that it was leading previously.
>> > > > > > This was well explained in the linked JIRA, but I cannot
>> understand
>> > > why
>> > > > > > that would happen due to my limited experience. If Broker A
>> leads
>> > p1
>> > > > and
>> > > > > > p2, when would a Controller send a LeaderAndIsrRequest with p1
>> only
>> > > and
>> > > > > not
>> > > > > > want Broker A to drop leadership for p2?
>> > > > > > The root cause of the issue is that after a broker just
>> restarts,
>> > it
>> > > > > > relies on the first LeaderAndIsrRequest to populate the
>> partition
>> > > state
>> > > > > and
>> > > > > > initializes the highwater mark checkpoint thread. The highwater
>> > mark
>> > > > > > checkpoint thread will overwrite the highwater mark checkpoint
>> file
>> > > > based
>> > > > > > on the broker's in-memory partition states. In other words, If a
>> > > > > partition
>> > > > > > that is physically hosted by the broker is missing in the
>> in-memory
>> > > > > > partition states map, its highwater mark will be lost after the
>> > > > highwater
>> > > > > > mark checkpoint thread overwrites the file. (Related codes:
>> > > > > >
>> > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
>> > > > > > 4a5534d4e9/core/src/main/scala/kafka/server/
>> > > > ReplicaManager.scala#L1091)
>> > > > > > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
>> > > > > >
>> > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
>> > > > > > 4a5534d4e9/core/src/main/scala/kafka/server/
>> > > > ReplicaManager.scala#L1091>
>> > > > > >
>> > > > > > apache/kafka<https://github.com/apache/kafka/blob/
>> > > > > >
>> > > > > ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/
>> > > > scala/kafka/server/
>> > > > > > ReplicaManager.scala#L1091>
>> > > > > > Mirror of Apache Kafka. Contribute to apache/kafka development
>> by
>> > > > > creating
>> > > > > > an account on GitHub.
>> > > > > > github.com
>> > > > > >
>> > > > > >
>> > > > > > In your example, assume the first LeaderAndIsrRequest broker A
>> > > receives
>> > > > > is
>> > > > > > the one initiated in the controlled shutdown logic in
>> Controller to
>> > > > move
>> > > > > > leadership away from broker A. This LeaderAndIsrRequest only
>> > contains
>> > > > > > partitions that broker A leads, not all the partitions that
>> broker
>> > A
>> > > > > hosts
>> > > > > > (i.e. no follower partitions), so the highwater mark for the
>> > follower
>> > > > > > partitions will be lost. Also, the first LeaderAndIsrRequst
>> broker
>> > A
>> > > > > > receives may not necessarily be the one initiated in controlled
>> > > > shutdown
>> > > > > > logic (e.g. there can be an ongoing preferred leader election),
>> > > > although
>> > > > > I
>> > > > > > think this may not be very common.
>> > > > > >
>> > > > > > Here the controller will start processing the BrokerChange event
>> > > (that
>> > > > > says
>> > > > > > that broker A shutdown) after the broker has come back up and
>> > > > > re-registered
>> > > > > > himself in ZK?
>> > > > > > How will the Controller miss the restart, won't he subsequently
>> > > receive
>> > > > > > another ZK event saying that broker A has come back up?
>> > > > > > Controller will not miss the BrokerChange event and actually
>> there
>> > > will
>> > > > > be
>> > > > > > two BrokerChange events fired in this case (one for broker
>> > > > deregistration
>> > > > > > in zk and one for registration). However, when processing the
>> > > > > > BrokerChangeEvent, controller needs to do a read from zookeeper
>> to
>> > > get
>> > > > > back
>> > > > > > the current brokers in the cluster and if the bounced broker
>> > already
>> > > > > joined
>> > > > > > the cluster by this time, controller will not know this broker
>> has
>> > > been
>> > > > > > bounced because it sees no diff between zk and its in-memory
>> cache.
>> > > So
>> > > > > > basically both of the BrokerChange event processing become
>> no-op.
>> > > > > >
>> > > > > >
>> > > > > > Hope that I answer your questions. Feel free to follow up if I
>> am
>> > > > missing
>> > > > > > something.
>> > > > > >
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Zhanxiang (Patrick) Huang
>> > > > > >
>> > > > > > ________________________________
>> > > > > > From: Stanislav Kozlovski <st...@confluent.io>
>> > > > > > Sent: Wednesday, October 10, 2018 7:22
>> > > > > > To: dev@kafka.apache.org
>> > > > > > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests
>> > and
>> > > > > > bounced brokers using broker generation
>> > > > > >
>> > > > > > Hi Patrick,
>> > > > > >
>> > > > > > Thanks for the KIP! Fixing such correctness issues is always
>> very
>> > > > > welcome -
>> > > > > > they're commonly hard to diagnose and debug when they happen in
>> > > > > production.
>> > > > > >
>> > > > > > I was wondering if I understood the potential correctness issues
>> > > > > correctly.
>> > > > > > Here is what I got:
>> > > > > >
>> > > > > >
>> > > > > >    - If a broker bounces during controlled shutdown, the bounced
>> > > broker
>> > > > > may
>> > > > > >    accidentally process its earlier generation’s
>> StopReplicaRequest
>> > > > sent
>> > > > > > from
>> > > > > >    the active controller for one of its follower replicas,
>> leaving
>> > > the
>> > > > > > replica
>> > > > > >    offline while its remaining replicas may stay online
>> > > > > >
>> > > > > > broker A is initiating a controlled shutdown (restart). The
>> > > Controller
>> > > > > > sends a StopReplicaRequest but it reaches broker A after it has
>> > > started
>> > > > > up
>> > > > > > again. He therefore stops replicating those partitions even
>> though
>> > he
>> > > > > > should just be starting to
>> > > > > >
>> > > > > >
>> > > > > >    - If the first LeaderAndIsrRequest that a broker processes is
>> > sent
>> > > > by
>> > > > > >    the active controller before its startup, the broker will
>> > > overwrite
>> > > > > the
>> > > > > >    high watermark checkpoint file and may cause incorrect
>> > truncation
>> > > (
>> > > > > >    KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235
>> >)
>> > > > > >
>> > > > > > Controller sends a LeaderAndIsrRequest before broker A
>> initiates a
>> > > > > restart.
>> > > > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
>> > > > therefore
>> > > > > > starts leading for the partitions sent by that request and might
>> > stop
>> > > > > > leading partitions that it was leading previously.
>> > > > > > This was well explained in the linked JIRA, but I cannot
>> understand
>> > > why
>> > > > > > that would happen due to my limited experience. If Broker A
>> leads
>> > p1
>> > > > and
>> > > > > > p2, when would a Controller send a LeaderAndIsrRequest with p1
>> only
>> > > and
>> > > > > not
>> > > > > > want Broker A to drop leadership for p2?
>> > > > > >
>> > > > > >
>> > > > > >    - If a broker bounces very quickly, the controller may start
>> > > > > processing
>> > > > > >    the BrokerChange event after the broker already re-registers
>> > > itself
>> > > > in
>> > > > > > zk.
>> > > > > >    In this case, controller will miss the broker restart and
>> will
>> > not
>> > > > > send
>> > > > > > any
>> > > > > >    requests to the broker for initialization. The broker will
>> not
>> > be
>> > > > able
>> > > > > > to
>> > > > > >    accept traffics.
>> > > > > >
>> > > > > > Here the controller will start processing the BrokerChange event
>> > > (that
>> > > > > says
>> > > > > > that broker A shutdown) after the broker has come back up and
>> > > > > re-registered
>> > > > > > himself in ZK?
>> > > > > > How will the Controller miss the restart, won't he subsequently
>> > > receive
>> > > > > > another ZK event saying that broker A has come back up?
>> > > > > >
>> > > > > >
>> > > > > > Could we explain these potential problems in a bit more detail
>> just
>> > > so
>> > > > > they
>> > > > > > could be more easily digestable by novices?
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Stanislav
>> > > > > >
>> > > > > > On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com>
>> > > wrote:
>> > > > > >
>> > > > > > > Hey Patrick,
>> > > > > > >
>> > > > > > > Thanks much for the KIP. The KIP is very well written.
>> > > > > > >
>> > > > > > > LGTM.  +1 (binding)
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Dong
>> > > > > > >
>> > > > > > >
>> > > > > > > On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <
>> > hzxa21@hotmail.com>
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi All,
>> > > > > > > >
>> > > > > > > > Please find the below KIP which proposes the concept of
>> broker
>> > > > > > generation
>> > > > > > > > to resolve issues caused by controller missing broker state
>> > > changes
>> > > > > and
>> > > > > > > > broker processing outdated control requests.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > > 380%3A+Detect+outdated+control+requests+and+bounced+
>> > > > brokers+using+broker+
>> > > > > > generation
>> > > > > > > >
>> > > > > > > > All comments are appreciated.
>> > > > > > > >
>> > > > > > > > Best,
>> > > > > > > > Zhanxiang (Patrick) Huang
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > Best,
>> > > > > > Stanislav
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Becket Qin <be...@gmail.com>.
Hi Dong,

That is a good point. But I think the STALE_BROKER_EPOCH error may still be
sent to the broker. For example, think about the following case:

1. Broker sends a ControlledShutdownRequest to the controller
2. Broker had a ZK session timeout
3. Broker created the ephemeral node
4. Controller processes the ControlledShutdownRequest in step 1
5. Broker receives a ControlledShutdownResponse with STALE_BROKER_EPOCH.

However, in this case, the broker should probably resend the controlled
shutdown request again with the new epoch. So it looks that returning a
STALE_BROKER_EPOCH is the correct behavior. If the broker has really been
bounced, that response will not be delivered to the broker. If the broker
has not really restarted, it will just resend the ControlledShutdownRequest
with the current epoch again.

It might worth updating the KIP wiki to mention this behavior.

Thanks,

Jiangjie (Becket) Qin


On Tue, Nov 13, 2018 at 2:04 AM Dong Lin <li...@gmail.com> wrote:

> Hey Becket, Patrick,
>
> Currently we expect that controller can only receive ControlledShutdownRequest
> with outdated broker epoch in two cases: 1) the ControlledShutdownRequest
> was sent by the broker before it has restarted; and 2) there is bug.
>
> In case 1), it seems that ControlledShutdownResponse will not be delivered
> to the broker as the channel should have been disconnected. Thus there is
> no confusion to the broker because broker will not receive response
> with STALE_BROKER_EPOCH error.
>
> In case 2), it seems that it can be useful to still deliver ControlledShutdownResponse
> with STALE_BROKER_EPOCH so that this broker at least knows that which
> response is not accepted. This helps us debug this issue.
>
> Also, in terms of both design and implementation, it seems simpler to
> still define a response for a given invalid request rather than have
> special path to skip response for the invalid request. Does this sound
> reasonable?
>
> Thanks,
> Dong
>
>
> On Mon, Nov 12, 2018 at 9:52 AM Patrick Huang <hz...@hotmail.com> wrote:
>
>> Hi Becket,
>>
>> I think you are right. STALE_BROKER_EPOCH only makes sense when the
>> broker detects outdated control requests and wants the controller to know
>> about that. For ControlledShutdownRequest, the controller should just
>> ignore the request with stale broker epoch since the broker does not need
>> and will not do anything for STALE_BROKER_EPOCH response. Thanks for
>> pointing it out.
>>
>> Thanks,
>> Zhanxiang (Patrick) Huang
>>
>> ________________________________
>> From: Becket Qin <be...@gmail.com>
>> Sent: Monday, November 12, 2018 6:46
>> To: dev
>> Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
>> bounced brokers using broker generation
>>
>> Hi Patrick,
>>
>> I am wondering why the controller should send STALE_BROKER_EPOCH error to
>> the broker if the broker epoch is stale? Would this be a little confusing
>> to the current broker if the request was sent by a broker with previous
>> epoch? Should the controller just ignore those requests in that case?
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Fri, Nov 9, 2018 at 2:17 AM Patrick Huang <hz...@hotmail.com> wrote:
>>
>> > Hi,
>> >
>> > In this KIP, we are also going to add a new exception and a new error
>> code
>> > "STALE_BROKER_EPOCH" in order to allow the broker to respond back the
>> right
>> > error when it sees outdated broker epoch in the control requests. Since
>> > adding a new exception and error code is also considered as public
>> > interface change, I have updated the original KIP accordingly to include
>> > this change. Feel free to comment if there is any concern.
>> >
>> > Thanks,
>> > Zhanxiang (Patrick) Huang
>> >
>> > ________________________________
>> > From: Patrick Huang <hz...@hotmail.com>
>> > Sent: Tuesday, October 23, 2018 6:20
>> > To: Jun Rao; dev@kafka.apache.org
>> > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
>> > bounced brokers using broker generation
>> >
>> > Agreed. I have updated the PR to add czxid in ControlledShutdownRequest
>> (
>> > https://github.com/apache/kafka/pull/5821). Appreciated if you can
>> take a
>> > look.
>> >
>> > Btw, I also have the vote thread for this KIP:
>> >
>> https://lists.apache.org/thread.html/3689d83db537d5aa86d10967dee7ee29578897fc123daae4f77a8605@%3Cdev.kafka.apache.org%3E
>> >
>> > Best,
>> > Zhanxiang (Patrick) Huang
>> >
>> > ________________________________
>> > From: Jun Rao <ju...@confluent.io>
>> > Sent: Monday, October 22, 2018 21:31
>> > To: dev@kafka.apache.org
>> > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
>> > bounced brokers using broker generation
>> >
>> > Hi, Patrick,
>> >
>> > Yes, that's the general sequence. After step 2, the shutting down broker
>> > can give up the controlled shutdown process and proceed to shut down.
>> When
>> > it's restarted, it could still receive StopReplica requests from the
>> > controller in reaction to the previous controlled shutdown requests.
>> This
>> > could lead the restarted broker to a bad state.
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> >
>> > On Mon, Oct 22, 2018 at 4:32 PM, Patrick Huang <hz...@hotmail.com>
>> wrote:
>> >
>> > > Hi Jun,
>> > >
>> > > That is a good point. I want to make it clear about the scenario you
>> > > mentioned. Correct me if I am wrong. Here is the sequence of the
>> event:
>> > >
>> > >    1. Broker sends ControlledShutdown request 1 to controller
>> > >    2. Broker sends ControlledShutdown request 2 to controller due to
>> > >    reties
>> > >    3. Controller processes ControlledShutdown request 1
>> > >    4. Controller sends control requests to the broker
>> > >    5. Broker receives ControlledShutdown response 1 from controller
>> > >    6. Broker shuts down and restarts quickly
>> > >    7. Controller processes ControllerShutdown request 2
>> > >    8. Controller sends control requests to the broker
>> > >
>> > > It is possible that controller processes the broker change event
>> between
>> > > 6) and 7). In this case, controller already updates the cached czxid
>> to
>> > the
>> > > up-to-date ones so the bounced broker will not reject control
>> requests in
>> > > 8), which cause a correctness problem.
>> > >
>> > >
>> > > Best,
>> > > Zhanxiang (Patrick) Huang
>> > >
>> > > ------------------------------
>> > > *From:* Jun Rao <ju...@confluent.io>
>> > > *Sent:* Monday, October 22, 2018 14:45
>> > > *To:* dev
>> > > *Subject:* Re: [DISCUSS] KIP-380: Detect outdated control requests and
>> > > bounced brokers using broker generation
>> > >
>> > > Hi, Patrick,
>> > >
>> > > There is another thing that may be worth considering.
>> > >
>> > > 10. It will be useful to include the czxid also in the
>> ControlledShutdown
>> > > request. This way, if the broker has been restarted, the controller
>> can
>> > > ignore an old ControlledShutdown request(e.g., due to retries). This
>> will
>> > > prevent the restarted broker from incorrectly stopping replicas.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Thu, Oct 11, 2018 at 5:56 PM, Patrick Huang <
>> hzxa21.huang@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi Jun,
>> > > >
>> > > > Thanks a lot for the comments.
>> > > >
>> > > > 1. czxid is globally unique and monotonically increasing based on
>> the
>> > > > zookeeper doc.
>> > > > References (from
>> > > > https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
>> > > > "Every change to the ZooKeeper state receives a stamp in the form
>> of a
>> > > > *zxid* (ZooKeeper Transaction Id). This exposes the total ordering
>> of
>> > all
>> > > > changes to ZooKeeper. Each change will have a unique zxid and if
>> zxid1
>> > is
>> > > > smaller than zxid2 then zxid1 happened before zxid2."
>> > > > "czxid: The zxid of the change that caused this znode to be
>> created."
>> > > >
>> > > > 2. You are right. There will be only on broker change event fired in
>> > the
>> > > > case I mentioned because we will not register the watcher before the
>> > > read.
>> > > >
>> > > > 3. Let's say we want to initialize the states of broker set A and we
>> > want
>> > > > the cluster to be aware of the absence of broker set B. The
>> currently
>> > > live
>> > > > broker set in the cluster is C.
>> > > >
>> > > >     From the design point of view, here are the rules for broker
>> state
>> > > > transition:
>> > > >     - Pass in broker ids of A for onBrokerStartup() and pass in
>> broker
>> > > ids
>> > > > of B for onBrokerFailure().
>> > > >     - When processing onBrokerStartup(), we use the broker
>> generation
>> > > > controller read from zk to send requests to broker set A and use the
>> > > > previously cached broker generation to send requests to (C-A).
>> > > >     - When processing onBrokerFailure(), we use the previously
>> cached
>> > > > broker generation to send requests to C.
>> > > >
>> > > >     From the implementation point of view, here are the steps we
>> need
>> > to
>> > > > follow when processing BrokerChangeEvent:
>> > > >     -  Reads all child nodes in /brokers/ids/ to get current brokers
>> > with
>> > > > broker generation
>> > > >     -  Detect new brokers, dead brokers and bounced brokers
>> > > >     -  Update the live broker ids in controller context
>> > > >     -  Update broker generations for new brokers in controller
>> context
>> > > >     -  Invoke onBrokerStartup(new brokers)
>> > > >     -  Invoke onBrokerFailure(bounced brokers)
>> > > >     -  Update broker generations for bounce brokers in controller
>> > context
>> > > >     -  Invoke onBrokerStartup(bounced brokers)
>> > > >     -  Invoke onBrokerFailure(dead brokers)
>> > > >     We can further optimize the flow by avoiding sending requests
>> to a
>> > > > broker if its broker generation is larger than the one in the
>> > controller
>> > > > context.
>> > > >
>> > > > I will also update the KIP to clarify how it works for
>> > BrokerChangeEvent
>> > > > processing in more detail.
>> > > >
>> > > > Thanks,
>> > > > Patrick
>> > > >
>> > > >
>> > > >
>> > > > On Thu, Oct 11, 2018 at 12:12 PM Jun Rao <ju...@confluent.io> wrote:
>> > > >
>> > > > > Hi, Patrick,
>> > > > >
>> > > > > Thanks for the KIP. Looks good to me overall and very useful. A
>> few
>> > > > > comments below.
>> > > > >
>> > > > > 1. "will reject the requests with smaller broker generation than
>> its
>> > > > > current generation." Is czxid monotonically increasing?
>> > > > >
>> > > > > 2. To clarify on the issue of the controller missing a ZK
>> watcher. ZK
>> > > > > watchers are one-time watchers. Once a watcher is fired, one
>> needs to
>> > > > > register it again before the watcher can be triggered. So, when
>> the
>> > > > > controller is busy and a broker goes down and comes up, the first
>> > event
>> > > > > will trigger the ZK watcher. Since the controller is busy and
>> hasn't
>> > > > > registered the watcher again, the second event actually won't
>> fire.
>> > By
>> > > > the
>> > > > > time the controller reads from ZK, it sees that the broker is
>> still
>> > > > > registered and thus thinks that nothing has happened to the
>> broker,
>> > > which
>> > > > > is causing the problem.
>> > > > >
>> > > > > 3. "Handle broker state change: invoke onBrokerFailure(...) first,
>> > then
>> > > > > invoke onBrokerStartUp(...)". We probably want to be a bit careful
>> > > here.
>> > > > > Could you clarify the broker list and the broker epoch used when
>> > making
>> > > > > these calls? We want to prevent the restarted broker from
>> receiving a
>> > > > > partial replica list on the first LeaderAndIsr request because of
>> > this.
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > > On Wed, Oct 10, 2018 at 12:51 PM, Patrick Huang <
>> hzxa21@hotmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Hey Stanislav,
>> > > > > >
>> > > > > > Sure. Thanks for your interest in this KIP. I am glad to provide
>> > more
>> > > > > > detail.
>> > > > > >
>> > > > > > broker A is initiating a controlled shutdown (restart). The
>> > > Controller
>> > > > > > sends a StopReplicaRequest but it reaches broker A after it has
>> > > started
>> > > > > up
>> > > > > > again. He therefore stops replicating those partitions even
>> though
>> > he
>> > > > > > should just be starting to
>> > > > > > This is right.
>> > > > > >
>> > > > > > Controller sends a LeaderAndIsrRequest before broker A
>> initiates a
>> > > > > restart.
>> > > > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
>> > > > therefore
>> > > > > > starts leading for the partitions sent by that request and might
>> > stop
>> > > > > > leading partitions that it was leading previously.
>> > > > > > This was well explained in the linked JIRA, but I cannot
>> understand
>> > > why
>> > > > > > that would happen due to my limited experience. If Broker A
>> leads
>> > p1
>> > > > and
>> > > > > > p2, when would a Controller send a LeaderAndIsrRequest with p1
>> only
>> > > and
>> > > > > not
>> > > > > > want Broker A to drop leadership for p2?
>> > > > > > The root cause of the issue is that after a broker just
>> restarts,
>> > it
>> > > > > > relies on the first LeaderAndIsrRequest to populate the
>> partition
>> > > state
>> > > > > and
>> > > > > > initializes the highwater mark checkpoint thread. The highwater
>> > mark
>> > > > > > checkpoint thread will overwrite the highwater mark checkpoint
>> file
>> > > > based
>> > > > > > on the broker's in-memory partition states. In other words, If a
>> > > > > partition
>> > > > > > that is physically hosted by the broker is missing in the
>> in-memory
>> > > > > > partition states map, its highwater mark will be lost after the
>> > > > highwater
>> > > > > > mark checkpoint thread overwrites the file. (Related codes:
>> > > > > >
>> > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
>> > > > > > 4a5534d4e9/core/src/main/scala/kafka/server/
>> > > > ReplicaManager.scala#L1091)
>> > > > > > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
>> > > > > >
>> > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
>> > > > > > 4a5534d4e9/core/src/main/scala/kafka/server/
>> > > > ReplicaManager.scala#L1091>
>> > > > > >
>> > > > > > apache/kafka<https://github.com/apache/kafka/blob/
>> > > > > >
>> > > > > ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/
>> > > > scala/kafka/server/
>> > > > > > ReplicaManager.scala#L1091>
>> > > > > > Mirror of Apache Kafka. Contribute to apache/kafka development
>> by
>> > > > > creating
>> > > > > > an account on GitHub.
>> > > > > > github.com
>> > > > > >
>> > > > > >
>> > > > > > In your example, assume the first LeaderAndIsrRequest broker A
>> > > receives
>> > > > > is
>> > > > > > the one initiated in the controlled shutdown logic in
>> Controller to
>> > > > move
>> > > > > > leadership away from broker A. This LeaderAndIsrRequest only
>> > contains
>> > > > > > partitions that broker A leads, not all the partitions that
>> broker
>> > A
>> > > > > hosts
>> > > > > > (i.e. no follower partitions), so the highwater mark for the
>> > follower
>> > > > > > partitions will be lost. Also, the first LeaderAndIsrRequst
>> broker
>> > A
>> > > > > > receives may not necessarily be the one initiated in controlled
>> > > > shutdown
>> > > > > > logic (e.g. there can be an ongoing preferred leader election),
>> > > > although
>> > > > > I
>> > > > > > think this may not be very common.
>> > > > > >
>> > > > > > Here the controller will start processing the BrokerChange event
>> > > (that
>> > > > > says
>> > > > > > that broker A shutdown) after the broker has come back up and
>> > > > > re-registered
>> > > > > > himself in ZK?
>> > > > > > How will the Controller miss the restart, won't he subsequently
>> > > receive
>> > > > > > another ZK event saying that broker A has come back up?
>> > > > > > Controller will not miss the BrokerChange event and actually
>> there
>> > > will
>> > > > > be
>> > > > > > two BrokerChange events fired in this case (one for broker
>> > > > deregistration
>> > > > > > in zk and one for registration). However, when processing the
>> > > > > > BrokerChangeEvent, controller needs to do a read from zookeeper
>> to
>> > > get
>> > > > > back
>> > > > > > the current brokers in the cluster and if the bounced broker
>> > already
>> > > > > joined
>> > > > > > the cluster by this time, controller will not know this broker
>> has
>> > > been
>> > > > > > bounced because it sees no diff between zk and its in-memory
>> cache.
>> > > So
>> > > > > > basically both of the BrokerChange event processing become
>> no-op.
>> > > > > >
>> > > > > >
>> > > > > > Hope that I answer your questions. Feel free to follow up if I
>> am
>> > > > missing
>> > > > > > something.
>> > > > > >
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Zhanxiang (Patrick) Huang
>> > > > > >
>> > > > > > ________________________________
>> > > > > > From: Stanislav Kozlovski <st...@confluent.io>
>> > > > > > Sent: Wednesday, October 10, 2018 7:22
>> > > > > > To: dev@kafka.apache.org
>> > > > > > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests
>> > and
>> > > > > > bounced brokers using broker generation
>> > > > > >
>> > > > > > Hi Patrick,
>> > > > > >
>> > > > > > Thanks for the KIP! Fixing such correctness issues is always
>> very
>> > > > > welcome -
>> > > > > > they're commonly hard to diagnose and debug when they happen in
>> > > > > production.
>> > > > > >
>> > > > > > I was wondering if I understood the potential correctness issues
>> > > > > correctly.
>> > > > > > Here is what I got:
>> > > > > >
>> > > > > >
>> > > > > >    - If a broker bounces during controlled shutdown, the bounced
>> > > broker
>> > > > > may
>> > > > > >    accidentally process its earlier generation’s
>> StopReplicaRequest
>> > > > sent
>> > > > > > from
>> > > > > >    the active controller for one of its follower replicas,
>> leaving
>> > > the
>> > > > > > replica
>> > > > > >    offline while its remaining replicas may stay online
>> > > > > >
>> > > > > > broker A is initiating a controlled shutdown (restart). The
>> > > Controller
>> > > > > > sends a StopReplicaRequest but it reaches broker A after it has
>> > > started
>> > > > > up
>> > > > > > again. He therefore stops replicating those partitions even
>> though
>> > he
>> > > > > > should just be starting to
>> > > > > >
>> > > > > >
>> > > > > >    - If the first LeaderAndIsrRequest that a broker processes is
>> > sent
>> > > > by
>> > > > > >    the active controller before its startup, the broker will
>> > > overwrite
>> > > > > the
>> > > > > >    high watermark checkpoint file and may cause incorrect
>> > truncation
>> > > (
>> > > > > >    KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235
>> >)
>> > > > > >
>> > > > > > Controller sends a LeaderAndIsrRequest before broker A
>> initiates a
>> > > > > restart.
>> > > > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
>> > > > therefore
>> > > > > > starts leading for the partitions sent by that request and might
>> > stop
>> > > > > > leading partitions that it was leading previously.
>> > > > > > This was well explained in the linked JIRA, but I cannot
>> understand
>> > > why
>> > > > > > that would happen due to my limited experience. If Broker A
>> leads
>> > p1
>> > > > and
>> > > > > > p2, when would a Controller send a LeaderAndIsrRequest with p1
>> only
>> > > and
>> > > > > not
>> > > > > > want Broker A to drop leadership for p2?
>> > > > > >
>> > > > > >
>> > > > > >    - If a broker bounces very quickly, the controller may start
>> > > > > processing
>> > > > > >    the BrokerChange event after the broker already re-registers
>> > > itself
>> > > > in
>> > > > > > zk.
>> > > > > >    In this case, controller will miss the broker restart and
>> will
>> > not
>> > > > > send
>> > > > > > any
>> > > > > >    requests to the broker for initialization. The broker will
>> not
>> > be
>> > > > able
>> > > > > > to
>> > > > > >    accept traffics.
>> > > > > >
>> > > > > > Here the controller will start processing the BrokerChange event
>> > > (that
>> > > > > says
>> > > > > > that broker A shutdown) after the broker has come back up and
>> > > > > re-registered
>> > > > > > himself in ZK?
>> > > > > > How will the Controller miss the restart, won't he subsequently
>> > > receive
>> > > > > > another ZK event saying that broker A has come back up?
>> > > > > >
>> > > > > >
>> > > > > > Could we explain these potential problems in a bit more detail
>> just
>> > > so
>> > > > > they
>> > > > > > could be more easily digestable by novices?
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Stanislav
>> > > > > >
>> > > > > > On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com>
>> > > wrote:
>> > > > > >
>> > > > > > > Hey Patrick,
>> > > > > > >
>> > > > > > > Thanks much for the KIP. The KIP is very well written.
>> > > > > > >
>> > > > > > > LGTM.  +1 (binding)
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Dong
>> > > > > > >
>> > > > > > >
>> > > > > > > On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <
>> > hzxa21@hotmail.com>
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi All,
>> > > > > > > >
>> > > > > > > > Please find the below KIP which proposes the concept of
>> broker
>> > > > > > generation
>> > > > > > > > to resolve issues caused by controller missing broker state
>> > > changes
>> > > > > and
>> > > > > > > > broker processing outdated control requests.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > > 380%3A+Detect+outdated+control+requests+and+bounced+
>> > > > brokers+using+broker+
>> > > > > > generation
>> > > > > > > >
>> > > > > > > > All comments are appreciated.
>> > > > > > > >
>> > > > > > > > Best,
>> > > > > > > > Zhanxiang (Patrick) Huang
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > Best,
>> > > > > > Stanislav
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Dong Lin <li...@gmail.com>.
Hey Becket, Patrick,

Currently we expect that controller can only receive ControlledShutdownRequest
with outdated broker epoch in two cases: 1) the ControlledShutdownRequest
was sent by the broker before it has restarted; and 2) there is bug.

In case 1), it seems that ControlledShutdownResponse will not be delivered
to the broker as the channel should have been disconnected. Thus there is
no confusion to the broker because broker will not receive response
with STALE_BROKER_EPOCH error.

In case 2), it seems that it can be useful to still deliver
ControlledShutdownResponse
with STALE_BROKER_EPOCH so that this broker at least knows that which
response is not accepted. This helps us debug this issue.

Also, in terms of both design and implementation, it seems simpler to still
define a response for a given invalid request rather than have special path
to skip response for the invalid request. Does this sound reasonable?

Thanks,
Dong


On Mon, Nov 12, 2018 at 9:52 AM Patrick Huang <hz...@hotmail.com> wrote:

> Hi Becket,
>
> I think you are right. STALE_BROKER_EPOCH only makes sense when the broker
> detects outdated control requests and wants the controller to know about
> that. For ControlledShutdownRequest, the controller should just ignore the
> request with stale broker epoch since the broker does not need and will not
> do anything for STALE_BROKER_EPOCH response. Thanks for pointing it out.
>
> Thanks,
> Zhanxiang (Patrick) Huang
>
> ________________________________
> From: Becket Qin <be...@gmail.com>
> Sent: Monday, November 12, 2018 6:46
> To: dev
> Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> bounced brokers using broker generation
>
> Hi Patrick,
>
> I am wondering why the controller should send STALE_BROKER_EPOCH error to
> the broker if the broker epoch is stale? Would this be a little confusing
> to the current broker if the request was sent by a broker with previous
> epoch? Should the controller just ignore those requests in that case?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Nov 9, 2018 at 2:17 AM Patrick Huang <hz...@hotmail.com> wrote:
>
> > Hi,
> >
> > In this KIP, we are also going to add a new exception and a new error
> code
> > "STALE_BROKER_EPOCH" in order to allow the broker to respond back the
> right
> > error when it sees outdated broker epoch in the control requests. Since
> > adding a new exception and error code is also considered as public
> > interface change, I have updated the original KIP accordingly to include
> > this change. Feel free to comment if there is any concern.
> >
> > Thanks,
> > Zhanxiang (Patrick) Huang
> >
> > ________________________________
> > From: Patrick Huang <hz...@hotmail.com>
> > Sent: Tuesday, October 23, 2018 6:20
> > To: Jun Rao; dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> > bounced brokers using broker generation
> >
> > Agreed. I have updated the PR to add czxid in ControlledShutdownRequest (
> > https://github.com/apache/kafka/pull/5821). Appreciated if you can take
> a
> > look.
> >
> > Btw, I also have the vote thread for this KIP:
> >
> https://lists.apache.org/thread.html/3689d83db537d5aa86d10967dee7ee29578897fc123daae4f77a8605@%3Cdev.kafka.apache.org%3E
> >
> > Best,
> > Zhanxiang (Patrick) Huang
> >
> > ________________________________
> > From: Jun Rao <ju...@confluent.io>
> > Sent: Monday, October 22, 2018 21:31
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> > bounced brokers using broker generation
> >
> > Hi, Patrick,
> >
> > Yes, that's the general sequence. After step 2, the shutting down broker
> > can give up the controlled shutdown process and proceed to shut down.
> When
> > it's restarted, it could still receive StopReplica requests from the
> > controller in reaction to the previous controlled shutdown requests. This
> > could lead the restarted broker to a bad state.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Oct 22, 2018 at 4:32 PM, Patrick Huang <hz...@hotmail.com>
> wrote:
> >
> > > Hi Jun,
> > >
> > > That is a good point. I want to make it clear about the scenario you
> > > mentioned. Correct me if I am wrong. Here is the sequence of the event:
> > >
> > >    1. Broker sends ControlledShutdown request 1 to controller
> > >    2. Broker sends ControlledShutdown request 2 to controller due to
> > >    reties
> > >    3. Controller processes ControlledShutdown request 1
> > >    4. Controller sends control requests to the broker
> > >    5. Broker receives ControlledShutdown response 1 from controller
> > >    6. Broker shuts down and restarts quickly
> > >    7. Controller processes ControllerShutdown request 2
> > >    8. Controller sends control requests to the broker
> > >
> > > It is possible that controller processes the broker change event
> between
> > > 6) and 7). In this case, controller already updates the cached czxid to
> > the
> > > up-to-date ones so the bounced broker will not reject control requests
> in
> > > 8), which cause a correctness problem.
> > >
> > >
> > > Best,
> > > Zhanxiang (Patrick) Huang
> > >
> > > ------------------------------
> > > *From:* Jun Rao <ju...@confluent.io>
> > > *Sent:* Monday, October 22, 2018 14:45
> > > *To:* dev
> > > *Subject:* Re: [DISCUSS] KIP-380: Detect outdated control requests and
> > > bounced brokers using broker generation
> > >
> > > Hi, Patrick,
> > >
> > > There is another thing that may be worth considering.
> > >
> > > 10. It will be useful to include the czxid also in the
> ControlledShutdown
> > > request. This way, if the broker has been restarted, the controller can
> > > ignore an old ControlledShutdown request(e.g., due to retries). This
> will
> > > prevent the restarted broker from incorrectly stopping replicas.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Oct 11, 2018 at 5:56 PM, Patrick Huang <hzxa21.huang@gmail.com
> >
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Thanks a lot for the comments.
> > > >
> > > > 1. czxid is globally unique and monotonically increasing based on the
> > > > zookeeper doc.
> > > > References (from
> > > > https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
> > > > "Every change to the ZooKeeper state receives a stamp in the form of
> a
> > > > *zxid* (ZooKeeper Transaction Id). This exposes the total ordering of
> > all
> > > > changes to ZooKeeper. Each change will have a unique zxid and if
> zxid1
> > is
> > > > smaller than zxid2 then zxid1 happened before zxid2."
> > > > "czxid: The zxid of the change that caused this znode to be created."
> > > >
> > > > 2. You are right. There will be only on broker change event fired in
> > the
> > > > case I mentioned because we will not register the watcher before the
> > > read.
> > > >
> > > > 3. Let's say we want to initialize the states of broker set A and we
> > want
> > > > the cluster to be aware of the absence of broker set B. The currently
> > > live
> > > > broker set in the cluster is C.
> > > >
> > > >     From the design point of view, here are the rules for broker
> state
> > > > transition:
> > > >     - Pass in broker ids of A for onBrokerStartup() and pass in
> broker
> > > ids
> > > > of B for onBrokerFailure().
> > > >     - When processing onBrokerStartup(), we use the broker generation
> > > > controller read from zk to send requests to broker set A and use the
> > > > previously cached broker generation to send requests to (C-A).
> > > >     - When processing onBrokerFailure(), we use the previously cached
> > > > broker generation to send requests to C.
> > > >
> > > >     From the implementation point of view, here are the steps we need
> > to
> > > > follow when processing BrokerChangeEvent:
> > > >     -  Reads all child nodes in /brokers/ids/ to get current brokers
> > with
> > > > broker generation
> > > >     -  Detect new brokers, dead brokers and bounced brokers
> > > >     -  Update the live broker ids in controller context
> > > >     -  Update broker generations for new brokers in controller
> context
> > > >     -  Invoke onBrokerStartup(new brokers)
> > > >     -  Invoke onBrokerFailure(bounced brokers)
> > > >     -  Update broker generations for bounce brokers in controller
> > context
> > > >     -  Invoke onBrokerStartup(bounced brokers)
> > > >     -  Invoke onBrokerFailure(dead brokers)
> > > >     We can further optimize the flow by avoiding sending requests to
> a
> > > > broker if its broker generation is larger than the one in the
> > controller
> > > > context.
> > > >
> > > > I will also update the KIP to clarify how it works for
> > BrokerChangeEvent
> > > > processing in more detail.
> > > >
> > > > Thanks,
> > > > Patrick
> > > >
> > > >
> > > >
> > > > On Thu, Oct 11, 2018 at 12:12 PM Jun Rao <ju...@confluent.io> wrote:
> > > >
> > > > > Hi, Patrick,
> > > > >
> > > > > Thanks for the KIP. Looks good to me overall and very useful. A few
> > > > > comments below.
> > > > >
> > > > > 1. "will reject the requests with smaller broker generation than
> its
> > > > > current generation." Is czxid monotonically increasing?
> > > > >
> > > > > 2. To clarify on the issue of the controller missing a ZK watcher.
> ZK
> > > > > watchers are one-time watchers. Once a watcher is fired, one needs
> to
> > > > > register it again before the watcher can be triggered. So, when the
> > > > > controller is busy and a broker goes down and comes up, the first
> > event
> > > > > will trigger the ZK watcher. Since the controller is busy and
> hasn't
> > > > > registered the watcher again, the second event actually won't fire.
> > By
> > > > the
> > > > > time the controller reads from ZK, it sees that the broker is still
> > > > > registered and thus thinks that nothing has happened to the broker,
> > > which
> > > > > is causing the problem.
> > > > >
> > > > > 3. "Handle broker state change: invoke onBrokerFailure(...) first,
> > then
> > > > > invoke onBrokerStartUp(...)". We probably want to be a bit careful
> > > here.
> > > > > Could you clarify the broker list and the broker epoch used when
> > making
> > > > > these calls? We want to prevent the restarted broker from
> receiving a
> > > > > partial replica list on the first LeaderAndIsr request because of
> > this.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Oct 10, 2018 at 12:51 PM, Patrick Huang <
> hzxa21@hotmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hey Stanislav,
> > > > > >
> > > > > > Sure. Thanks for your interest in this KIP. I am glad to provide
> > more
> > > > > > detail.
> > > > > >
> > > > > > broker A is initiating a controlled shutdown (restart). The
> > > Controller
> > > > > > sends a StopReplicaRequest but it reaches broker A after it has
> > > started
> > > > > up
> > > > > > again. He therefore stops replicating those partitions even
> though
> > he
> > > > > > should just be starting to
> > > > > > This is right.
> > > > > >
> > > > > > Controller sends a LeaderAndIsrRequest before broker A initiates
> a
> > > > > restart.
> > > > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> > > > therefore
> > > > > > starts leading for the partitions sent by that request and might
> > stop
> > > > > > leading partitions that it was leading previously.
> > > > > > This was well explained in the linked JIRA, but I cannot
> understand
> > > why
> > > > > > that would happen due to my limited experience. If Broker A leads
> > p1
> > > > and
> > > > > > p2, when would a Controller send a LeaderAndIsrRequest with p1
> only
> > > and
> > > > > not
> > > > > > want Broker A to drop leadership for p2?
> > > > > > The root cause of the issue is that after a broker just restarts,
> > it
> > > > > > relies on the first LeaderAndIsrRequest to populate the partition
> > > state
> > > > > and
> > > > > > initializes the highwater mark checkpoint thread. The highwater
> > mark
> > > > > > checkpoint thread will overwrite the highwater mark checkpoint
> file
> > > > based
> > > > > > on the broker's in-memory partition states. In other words, If a
> > > > > partition
> > > > > > that is physically hosted by the broker is missing in the
> in-memory
> > > > > > partition states map, its highwater mark will be lost after the
> > > > highwater
> > > > > > mark checkpoint thread overwrites the file. (Related codes:
> > > > > >
> > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > > > > 4a5534d4e9/core/src/main/scala/kafka/server/
> > > > ReplicaManager.scala#L1091)
> > > > > > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> > > > > >
> > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > > > > 4a5534d4e9/core/src/main/scala/kafka/server/
> > > > ReplicaManager.scala#L1091>
> > > > > >
> > > > > > apache/kafka<https://github.com/apache/kafka/blob/
> > > > > >
> > > > > ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/
> > > > scala/kafka/server/
> > > > > > ReplicaManager.scala#L1091>
> > > > > > Mirror of Apache Kafka. Contribute to apache/kafka development by
> > > > > creating
> > > > > > an account on GitHub.
> > > > > > github.com
> > > > > >
> > > > > >
> > > > > > In your example, assume the first LeaderAndIsrRequest broker A
> > > receives
> > > > > is
> > > > > > the one initiated in the controlled shutdown logic in Controller
> to
> > > > move
> > > > > > leadership away from broker A. This LeaderAndIsrRequest only
> > contains
> > > > > > partitions that broker A leads, not all the partitions that
> broker
> > A
> > > > > hosts
> > > > > > (i.e. no follower partitions), so the highwater mark for the
> > follower
> > > > > > partitions will be lost. Also, the first LeaderAndIsrRequst
> broker
> > A
> > > > > > receives may not necessarily be the one initiated in controlled
> > > > shutdown
> > > > > > logic (e.g. there can be an ongoing preferred leader election),
> > > > although
> > > > > I
> > > > > > think this may not be very common.
> > > > > >
> > > > > > Here the controller will start processing the BrokerChange event
> > > (that
> > > > > says
> > > > > > that broker A shutdown) after the broker has come back up and
> > > > > re-registered
> > > > > > himself in ZK?
> > > > > > How will the Controller miss the restart, won't he subsequently
> > > receive
> > > > > > another ZK event saying that broker A has come back up?
> > > > > > Controller will not miss the BrokerChange event and actually
> there
> > > will
> > > > > be
> > > > > > two BrokerChange events fired in this case (one for broker
> > > > deregistration
> > > > > > in zk and one for registration). However, when processing the
> > > > > > BrokerChangeEvent, controller needs to do a read from zookeeper
> to
> > > get
> > > > > back
> > > > > > the current brokers in the cluster and if the bounced broker
> > already
> > > > > joined
> > > > > > the cluster by this time, controller will not know this broker
> has
> > > been
> > > > > > bounced because it sees no diff between zk and its in-memory
> cache.
> > > So
> > > > > > basically both of the BrokerChange event processing become no-op.
> > > > > >
> > > > > >
> > > > > > Hope that I answer your questions. Feel free to follow up if I am
> > > > missing
> > > > > > something.
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Zhanxiang (Patrick) Huang
> > > > > >
> > > > > > ________________________________
> > > > > > From: Stanislav Kozlovski <st...@confluent.io>
> > > > > > Sent: Wednesday, October 10, 2018 7:22
> > > > > > To: dev@kafka.apache.org
> > > > > > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests
> > and
> > > > > > bounced brokers using broker generation
> > > > > >
> > > > > > Hi Patrick,
> > > > > >
> > > > > > Thanks for the KIP! Fixing such correctness issues is always very
> > > > > welcome -
> > > > > > they're commonly hard to diagnose and debug when they happen in
> > > > > production.
> > > > > >
> > > > > > I was wondering if I understood the potential correctness issues
> > > > > correctly.
> > > > > > Here is what I got:
> > > > > >
> > > > > >
> > > > > >    - If a broker bounces during controlled shutdown, the bounced
> > > broker
> > > > > may
> > > > > >    accidentally process its earlier generation’s
> StopReplicaRequest
> > > > sent
> > > > > > from
> > > > > >    the active controller for one of its follower replicas,
> leaving
> > > the
> > > > > > replica
> > > > > >    offline while its remaining replicas may stay online
> > > > > >
> > > > > > broker A is initiating a controlled shutdown (restart). The
> > > Controller
> > > > > > sends a StopReplicaRequest but it reaches broker A after it has
> > > started
> > > > > up
> > > > > > again. He therefore stops replicating those partitions even
> though
> > he
> > > > > > should just be starting to
> > > > > >
> > > > > >
> > > > > >    - If the first LeaderAndIsrRequest that a broker processes is
> > sent
> > > > by
> > > > > >    the active controller before its startup, the broker will
> > > overwrite
> > > > > the
> > > > > >    high watermark checkpoint file and may cause incorrect
> > truncation
> > > (
> > > > > >    KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235
> >)
> > > > > >
> > > > > > Controller sends a LeaderAndIsrRequest before broker A initiates
> a
> > > > > restart.
> > > > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> > > > therefore
> > > > > > starts leading for the partitions sent by that request and might
> > stop
> > > > > > leading partitions that it was leading previously.
> > > > > > This was well explained in the linked JIRA, but I cannot
> understand
> > > why
> > > > > > that would happen due to my limited experience. If Broker A leads
> > p1
> > > > and
> > > > > > p2, when would a Controller send a LeaderAndIsrRequest with p1
> only
> > > and
> > > > > not
> > > > > > want Broker A to drop leadership for p2?
> > > > > >
> > > > > >
> > > > > >    - If a broker bounces very quickly, the controller may start
> > > > > processing
> > > > > >    the BrokerChange event after the broker already re-registers
> > > itself
> > > > in
> > > > > > zk.
> > > > > >    In this case, controller will miss the broker restart and will
> > not
> > > > > send
> > > > > > any
> > > > > >    requests to the broker for initialization. The broker will not
> > be
> > > > able
> > > > > > to
> > > > > >    accept traffics.
> > > > > >
> > > > > > Here the controller will start processing the BrokerChange event
> > > (that
> > > > > says
> > > > > > that broker A shutdown) after the broker has come back up and
> > > > > re-registered
> > > > > > himself in ZK?
> > > > > > How will the Controller miss the restart, won't he subsequently
> > > receive
> > > > > > another ZK event saying that broker A has come back up?
> > > > > >
> > > > > >
> > > > > > Could we explain these potential problems in a bit more detail
> just
> > > so
> > > > > they
> > > > > > could be more easily digestable by novices?
> > > > > >
> > > > > > Thanks,
> > > > > > Stanislav
> > > > > >
> > > > > > On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hey Patrick,
> > > > > > >
> > > > > > > Thanks much for the KIP. The KIP is very well written.
> > > > > > >
> > > > > > > LGTM.  +1 (binding)
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Dong
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <
> > hzxa21@hotmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > >
> > > > > > > > Please find the below KIP which proposes the concept of
> broker
> > > > > > generation
> > > > > > > > to resolve issues caused by controller missing broker state
> > > changes
> > > > > and
> > > > > > > > broker processing outdated control requests.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 380%3A+Detect+outdated+control+requests+and+bounced+
> > > > brokers+using+broker+
> > > > > > generation
> > > > > > > >
> > > > > > > > All comments are appreciated.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Zhanxiang (Patrick) Huang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best,
> > > > > > Stanislav
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Patrick Huang <hz...@hotmail.com>.
Hi Becket,

I think you are right. STALE_BROKER_EPOCH only makes sense when the broker detects outdated control requests and wants the controller to know about that. For ControlledShutdownRequest, the controller should just ignore the request with stale broker epoch since the broker does not need and will not do anything for STALE_BROKER_EPOCH response. Thanks for pointing it out.

Thanks,
Zhanxiang (Patrick) Huang

________________________________
From: Becket Qin <be...@gmail.com>
Sent: Monday, November 12, 2018 6:46
To: dev
Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Hi Patrick,

I am wondering why the controller should send STALE_BROKER_EPOCH error to
the broker if the broker epoch is stale? Would this be a little confusing
to the current broker if the request was sent by a broker with previous
epoch? Should the controller just ignore those requests in that case?

Thanks,

Jiangjie (Becket) Qin

On Fri, Nov 9, 2018 at 2:17 AM Patrick Huang <hz...@hotmail.com> wrote:

> Hi,
>
> In this KIP, we are also going to add a new exception and a new error code
> "STALE_BROKER_EPOCH" in order to allow the broker to respond back the right
> error when it sees outdated broker epoch in the control requests. Since
> adding a new exception and error code is also considered as public
> interface change, I have updated the original KIP accordingly to include
> this change. Feel free to comment if there is any concern.
>
> Thanks,
> Zhanxiang (Patrick) Huang
>
> ________________________________
> From: Patrick Huang <hz...@hotmail.com>
> Sent: Tuesday, October 23, 2018 6:20
> To: Jun Rao; dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> bounced brokers using broker generation
>
> Agreed. I have updated the PR to add czxid in ControlledShutdownRequest (
> https://github.com/apache/kafka/pull/5821). Appreciated if you can take a
> look.
>
> Btw, I also have the vote thread for this KIP:
> https://lists.apache.org/thread.html/3689d83db537d5aa86d10967dee7ee29578897fc123daae4f77a8605@%3Cdev.kafka.apache.org%3E
>
> Best,
> Zhanxiang (Patrick) Huang
>
> ________________________________
> From: Jun Rao <ju...@confluent.io>
> Sent: Monday, October 22, 2018 21:31
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> bounced brokers using broker generation
>
> Hi, Patrick,
>
> Yes, that's the general sequence. After step 2, the shutting down broker
> can give up the controlled shutdown process and proceed to shut down. When
> it's restarted, it could still receive StopReplica requests from the
> controller in reaction to the previous controlled shutdown requests. This
> could lead the restarted broker to a bad state.
>
> Thanks,
>
> Jun
>
>
> On Mon, Oct 22, 2018 at 4:32 PM, Patrick Huang <hz...@hotmail.com> wrote:
>
> > Hi Jun,
> >
> > That is a good point. I want to make it clear about the scenario you
> > mentioned. Correct me if I am wrong. Here is the sequence of the event:
> >
> >    1. Broker sends ControlledShutdown request 1 to controller
> >    2. Broker sends ControlledShutdown request 2 to controller due to
> >    reties
> >    3. Controller processes ControlledShutdown request 1
> >    4. Controller sends control requests to the broker
> >    5. Broker receives ControlledShutdown response 1 from controller
> >    6. Broker shuts down and restarts quickly
> >    7. Controller processes ControllerShutdown request 2
> >    8. Controller sends control requests to the broker
> >
> > It is possible that controller processes the broker change event between
> > 6) and 7). In this case, controller already updates the cached czxid to
> the
> > up-to-date ones so the bounced broker will not reject control requests in
> > 8), which cause a correctness problem.
> >
> >
> > Best,
> > Zhanxiang (Patrick) Huang
> >
> > ------------------------------
> > *From:* Jun Rao <ju...@confluent.io>
> > *Sent:* Monday, October 22, 2018 14:45
> > *To:* dev
> > *Subject:* Re: [DISCUSS] KIP-380: Detect outdated control requests and
> > bounced brokers using broker generation
> >
> > Hi, Patrick,
> >
> > There is another thing that may be worth considering.
> >
> > 10. It will be useful to include the czxid also in the ControlledShutdown
> > request. This way, if the broker has been restarted, the controller can
> > ignore an old ControlledShutdown request(e.g., due to retries). This will
> > prevent the restarted broker from incorrectly stopping replicas.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Oct 11, 2018 at 5:56 PM, Patrick Huang <hz...@gmail.com>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > Thanks a lot for the comments.
> > >
> > > 1. czxid is globally unique and monotonically increasing based on the
> > > zookeeper doc.
> > > References (from
> > > https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
> > > "Every change to the ZooKeeper state receives a stamp in the form of a
> > > *zxid* (ZooKeeper Transaction Id). This exposes the total ordering of
> all
> > > changes to ZooKeeper. Each change will have a unique zxid and if zxid1
> is
> > > smaller than zxid2 then zxid1 happened before zxid2."
> > > "czxid: The zxid of the change that caused this znode to be created."
> > >
> > > 2. You are right. There will be only on broker change event fired in
> the
> > > case I mentioned because we will not register the watcher before the
> > read.
> > >
> > > 3. Let's say we want to initialize the states of broker set A and we
> want
> > > the cluster to be aware of the absence of broker set B. The currently
> > live
> > > broker set in the cluster is C.
> > >
> > >     From the design point of view, here are the rules for broker state
> > > transition:
> > >     - Pass in broker ids of A for onBrokerStartup() and pass in broker
> > ids
> > > of B for onBrokerFailure().
> > >     - When processing onBrokerStartup(), we use the broker generation
> > > controller read from zk to send requests to broker set A and use the
> > > previously cached broker generation to send requests to (C-A).
> > >     - When processing onBrokerFailure(), we use the previously cached
> > > broker generation to send requests to C.
> > >
> > >     From the implementation point of view, here are the steps we need
> to
> > > follow when processing BrokerChangeEvent:
> > >     -  Reads all child nodes in /brokers/ids/ to get current brokers
> with
> > > broker generation
> > >     -  Detect new brokers, dead brokers and bounced brokers
> > >     -  Update the live broker ids in controller context
> > >     -  Update broker generations for new brokers in controller context
> > >     -  Invoke onBrokerStartup(new brokers)
> > >     -  Invoke onBrokerFailure(bounced brokers)
> > >     -  Update broker generations for bounce brokers in controller
> context
> > >     -  Invoke onBrokerStartup(bounced brokers)
> > >     -  Invoke onBrokerFailure(dead brokers)
> > >     We can further optimize the flow by avoiding sending requests to a
> > > broker if its broker generation is larger than the one in the
> controller
> > > context.
> > >
> > > I will also update the KIP to clarify how it works for
> BrokerChangeEvent
> > > processing in more detail.
> > >
> > > Thanks,
> > > Patrick
> > >
> > >
> > >
> > > On Thu, Oct 11, 2018 at 12:12 PM Jun Rao <ju...@confluent.io> wrote:
> > >
> > > > Hi, Patrick,
> > > >
> > > > Thanks for the KIP. Looks good to me overall and very useful. A few
> > > > comments below.
> > > >
> > > > 1. "will reject the requests with smaller broker generation than its
> > > > current generation." Is czxid monotonically increasing?
> > > >
> > > > 2. To clarify on the issue of the controller missing a ZK watcher. ZK
> > > > watchers are one-time watchers. Once a watcher is fired, one needs to
> > > > register it again before the watcher can be triggered. So, when the
> > > > controller is busy and a broker goes down and comes up, the first
> event
> > > > will trigger the ZK watcher. Since the controller is busy and hasn't
> > > > registered the watcher again, the second event actually won't fire.
> By
> > > the
> > > > time the controller reads from ZK, it sees that the broker is still
> > > > registered and thus thinks that nothing has happened to the broker,
> > which
> > > > is causing the problem.
> > > >
> > > > 3. "Handle broker state change: invoke onBrokerFailure(...) first,
> then
> > > > invoke onBrokerStartUp(...)". We probably want to be a bit careful
> > here.
> > > > Could you clarify the broker list and the broker epoch used when
> making
> > > > these calls? We want to prevent the restarted broker from receiving a
> > > > partial replica list on the first LeaderAndIsr request because of
> this.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Wed, Oct 10, 2018 at 12:51 PM, Patrick Huang <hz...@hotmail.com>
> > > > wrote:
> > > >
> > > > > Hey Stanislav,
> > > > >
> > > > > Sure. Thanks for your interest in this KIP. I am glad to provide
> more
> > > > > detail.
> > > > >
> > > > > broker A is initiating a controlled shutdown (restart). The
> > Controller
> > > > > sends a StopReplicaRequest but it reaches broker A after it has
> > started
> > > > up
> > > > > again. He therefore stops replicating those partitions even though
> he
> > > > > should just be starting to
> > > > > This is right.
> > > > >
> > > > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > > > restart.
> > > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> > > therefore
> > > > > starts leading for the partitions sent by that request and might
> stop
> > > > > leading partitions that it was leading previously.
> > > > > This was well explained in the linked JIRA, but I cannot understand
> > why
> > > > > that would happen due to my limited experience. If Broker A leads
> p1
> > > and
> > > > > p2, when would a Controller send a LeaderAndIsrRequest with p1 only
> > and
> > > > not
> > > > > want Broker A to drop leadership for p2?
> > > > > The root cause of the issue is that after a broker just restarts,
> it
> > > > > relies on the first LeaderAndIsrRequest to populate the partition
> > state
> > > > and
> > > > > initializes the highwater mark checkpoint thread. The highwater
> mark
> > > > > checkpoint thread will overwrite the highwater mark checkpoint file
> > > based
> > > > > on the broker's in-memory partition states. In other words, If a
> > > > partition
> > > > > that is physically hosted by the broker is missing in the in-memory
> > > > > partition states map, its highwater mark will be lost after the
> > > highwater
> > > > > mark checkpoint thread overwrites the file. (Related codes:
> > > > >
> https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > > > 4a5534d4e9/core/src/main/scala/kafka/server/
> > > ReplicaManager.scala#L1091)
> > > > > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> > > > >
> https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > > > 4a5534d4e9/core/src/main/scala/kafka/server/
> > > ReplicaManager.scala#L1091>
> > > > >
> > > > > apache/kafka<https://github.com/apache/kafka/blob/
> > > > >
> > > > ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/
> > > scala/kafka/server/
> > > > > ReplicaManager.scala#L1091>
> > > > > Mirror of Apache Kafka. Contribute to apache/kafka development by
> > > > creating
> > > > > an account on GitHub.
> > > > > github.com
> > > > >
> > > > >
> > > > > In your example, assume the first LeaderAndIsrRequest broker A
> > receives
> > > > is
> > > > > the one initiated in the controlled shutdown logic in Controller to
> > > move
> > > > > leadership away from broker A. This LeaderAndIsrRequest only
> contains
> > > > > partitions that broker A leads, not all the partitions that broker
> A
> > > > hosts
> > > > > (i.e. no follower partitions), so the highwater mark for the
> follower
> > > > > partitions will be lost. Also, the first LeaderAndIsrRequst broker
> A
> > > > > receives may not necessarily be the one initiated in controlled
> > > shutdown
> > > > > logic (e.g. there can be an ongoing preferred leader election),
> > > although
> > > > I
> > > > > think this may not be very common.
> > > > >
> > > > > Here the controller will start processing the BrokerChange event
> > (that
> > > > says
> > > > > that broker A shutdown) after the broker has come back up and
> > > > re-registered
> > > > > himself in ZK?
> > > > > How will the Controller miss the restart, won't he subsequently
> > receive
> > > > > another ZK event saying that broker A has come back up?
> > > > > Controller will not miss the BrokerChange event and actually there
> > will
> > > > be
> > > > > two BrokerChange events fired in this case (one for broker
> > > deregistration
> > > > > in zk and one for registration). However, when processing the
> > > > > BrokerChangeEvent, controller needs to do a read from zookeeper to
> > get
> > > > back
> > > > > the current brokers in the cluster and if the bounced broker
> already
> > > > joined
> > > > > the cluster by this time, controller will not know this broker has
> > been
> > > > > bounced because it sees no diff between zk and its in-memory cache.
> > So
> > > > > basically both of the BrokerChange event processing become no-op.
> > > > >
> > > > >
> > > > > Hope that I answer your questions. Feel free to follow up if I am
> > > missing
> > > > > something.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Zhanxiang (Patrick) Huang
> > > > >
> > > > > ________________________________
> > > > > From: Stanislav Kozlovski <st...@confluent.io>
> > > > > Sent: Wednesday, October 10, 2018 7:22
> > > > > To: dev@kafka.apache.org
> > > > > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests
> and
> > > > > bounced brokers using broker generation
> > > > >
> > > > > Hi Patrick,
> > > > >
> > > > > Thanks for the KIP! Fixing such correctness issues is always very
> > > > welcome -
> > > > > they're commonly hard to diagnose and debug when they happen in
> > > > production.
> > > > >
> > > > > I was wondering if I understood the potential correctness issues
> > > > correctly.
> > > > > Here is what I got:
> > > > >
> > > > >
> > > > >    - If a broker bounces during controlled shutdown, the bounced
> > broker
> > > > may
> > > > >    accidentally process its earlier generation’s StopReplicaRequest
> > > sent
> > > > > from
> > > > >    the active controller for one of its follower replicas, leaving
> > the
> > > > > replica
> > > > >    offline while its remaining replicas may stay online
> > > > >
> > > > > broker A is initiating a controlled shutdown (restart). The
> > Controller
> > > > > sends a StopReplicaRequest but it reaches broker A after it has
> > started
> > > > up
> > > > > again. He therefore stops replicating those partitions even though
> he
> > > > > should just be starting to
> > > > >
> > > > >
> > > > >    - If the first LeaderAndIsrRequest that a broker processes is
> sent
> > > by
> > > > >    the active controller before its startup, the broker will
> > overwrite
> > > > the
> > > > >    high watermark checkpoint file and may cause incorrect
> truncation
> > (
> > > > >    KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235>)
> > > > >
> > > > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > > > restart.
> > > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> > > therefore
> > > > > starts leading for the partitions sent by that request and might
> stop
> > > > > leading partitions that it was leading previously.
> > > > > This was well explained in the linked JIRA, but I cannot understand
> > why
> > > > > that would happen due to my limited experience. If Broker A leads
> p1
> > > and
> > > > > p2, when would a Controller send a LeaderAndIsrRequest with p1 only
> > and
> > > > not
> > > > > want Broker A to drop leadership for p2?
> > > > >
> > > > >
> > > > >    - If a broker bounces very quickly, the controller may start
> > > > processing
> > > > >    the BrokerChange event after the broker already re-registers
> > itself
> > > in
> > > > > zk.
> > > > >    In this case, controller will miss the broker restart and will
> not
> > > > send
> > > > > any
> > > > >    requests to the broker for initialization. The broker will not
> be
> > > able
> > > > > to
> > > > >    accept traffics.
> > > > >
> > > > > Here the controller will start processing the BrokerChange event
> > (that
> > > > says
> > > > > that broker A shutdown) after the broker has come back up and
> > > > re-registered
> > > > > himself in ZK?
> > > > > How will the Controller miss the restart, won't he subsequently
> > receive
> > > > > another ZK event saying that broker A has come back up?
> > > > >
> > > > >
> > > > > Could we explain these potential problems in a bit more detail just
> > so
> > > > they
> > > > > could be more easily digestable by novices?
> > > > >
> > > > > Thanks,
> > > > > Stanislav
> > > > >
> > > > > On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hey Patrick,
> > > > > >
> > > > > > Thanks much for the KIP. The KIP is very well written.
> > > > > >
> > > > > > LGTM.  +1 (binding)
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > > On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <
> hzxa21@hotmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > Please find the below KIP which proposes the concept of broker
> > > > > generation
> > > > > > > to resolve issues caused by controller missing broker state
> > changes
> > > > and
> > > > > > > broker processing outdated control requests.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 380%3A+Detect+outdated+control+requests+and+bounced+
> > > brokers+using+broker+
> > > > > generation
> > > > > > >
> > > > > > > All comments are appreciated.
> > > > > > >
> > > > > > > Best,
> > > > > > > Zhanxiang (Patrick) Huang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best,
> > > > > Stanislav
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Becket Qin <be...@gmail.com>.
Hi Patrick,

I am wondering why the controller should send STALE_BROKER_EPOCH error to
the broker if the broker epoch is stale? Would this be a little confusing
to the current broker if the request was sent by a broker with previous
epoch? Should the controller just ignore those requests in that case?

Thanks,

Jiangjie (Becket) Qin

On Fri, Nov 9, 2018 at 2:17 AM Patrick Huang <hz...@hotmail.com> wrote:

> Hi,
>
> In this KIP, we are also going to add a new exception and a new error code
> "STALE_BROKER_EPOCH" in order to allow the broker to respond back the right
> error when it sees outdated broker epoch in the control requests. Since
> adding a new exception and error code is also considered as public
> interface change, I have updated the original KIP accordingly to include
> this change. Feel free to comment if there is any concern.
>
> Thanks,
> Zhanxiang (Patrick) Huang
>
> ________________________________
> From: Patrick Huang <hz...@hotmail.com>
> Sent: Tuesday, October 23, 2018 6:20
> To: Jun Rao; dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> bounced brokers using broker generation
>
> Agreed. I have updated the PR to add czxid in ControlledShutdownRequest (
> https://github.com/apache/kafka/pull/5821). Appreciated if you can take a
> look.
>
> Btw, I also have the vote thread for this KIP:
> https://lists.apache.org/thread.html/3689d83db537d5aa86d10967dee7ee29578897fc123daae4f77a8605@%3Cdev.kafka.apache.org%3E
>
> Best,
> Zhanxiang (Patrick) Huang
>
> ________________________________
> From: Jun Rao <ju...@confluent.io>
> Sent: Monday, October 22, 2018 21:31
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> bounced brokers using broker generation
>
> Hi, Patrick,
>
> Yes, that's the general sequence. After step 2, the shutting down broker
> can give up the controlled shutdown process and proceed to shut down. When
> it's restarted, it could still receive StopReplica requests from the
> controller in reaction to the previous controlled shutdown requests. This
> could lead the restarted broker to a bad state.
>
> Thanks,
>
> Jun
>
>
> On Mon, Oct 22, 2018 at 4:32 PM, Patrick Huang <hz...@hotmail.com> wrote:
>
> > Hi Jun,
> >
> > That is a good point. I want to make it clear about the scenario you
> > mentioned. Correct me if I am wrong. Here is the sequence of the event:
> >
> >    1. Broker sends ControlledShutdown request 1 to controller
> >    2. Broker sends ControlledShutdown request 2 to controller due to
> >    reties
> >    3. Controller processes ControlledShutdown request 1
> >    4. Controller sends control requests to the broker
> >    5. Broker receives ControlledShutdown response 1 from controller
> >    6. Broker shuts down and restarts quickly
> >    7. Controller processes ControllerShutdown request 2
> >    8. Controller sends control requests to the broker
> >
> > It is possible that controller processes the broker change event between
> > 6) and 7). In this case, controller already updates the cached czxid to
> the
> > up-to-date ones so the bounced broker will not reject control requests in
> > 8), which cause a correctness problem.
> >
> >
> > Best,
> > Zhanxiang (Patrick) Huang
> >
> > ------------------------------
> > *From:* Jun Rao <ju...@confluent.io>
> > *Sent:* Monday, October 22, 2018 14:45
> > *To:* dev
> > *Subject:* Re: [DISCUSS] KIP-380: Detect outdated control requests and
> > bounced brokers using broker generation
> >
> > Hi, Patrick,
> >
> > There is another thing that may be worth considering.
> >
> > 10. It will be useful to include the czxid also in the ControlledShutdown
> > request. This way, if the broker has been restarted, the controller can
> > ignore an old ControlledShutdown request(e.g., due to retries). This will
> > prevent the restarted broker from incorrectly stopping replicas.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Oct 11, 2018 at 5:56 PM, Patrick Huang <hz...@gmail.com>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > Thanks a lot for the comments.
> > >
> > > 1. czxid is globally unique and monotonically increasing based on the
> > > zookeeper doc.
> > > References (from
> > > https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
> > > "Every change to the ZooKeeper state receives a stamp in the form of a
> > > *zxid* (ZooKeeper Transaction Id). This exposes the total ordering of
> all
> > > changes to ZooKeeper. Each change will have a unique zxid and if zxid1
> is
> > > smaller than zxid2 then zxid1 happened before zxid2."
> > > "czxid: The zxid of the change that caused this znode to be created."
> > >
> > > 2. You are right. There will be only on broker change event fired in
> the
> > > case I mentioned because we will not register the watcher before the
> > read.
> > >
> > > 3. Let's say we want to initialize the states of broker set A and we
> want
> > > the cluster to be aware of the absence of broker set B. The currently
> > live
> > > broker set in the cluster is C.
> > >
> > >     From the design point of view, here are the rules for broker state
> > > transition:
> > >     - Pass in broker ids of A for onBrokerStartup() and pass in broker
> > ids
> > > of B for onBrokerFailure().
> > >     - When processing onBrokerStartup(), we use the broker generation
> > > controller read from zk to send requests to broker set A and use the
> > > previously cached broker generation to send requests to (C-A).
> > >     - When processing onBrokerFailure(), we use the previously cached
> > > broker generation to send requests to C.
> > >
> > >     From the implementation point of view, here are the steps we need
> to
> > > follow when processing BrokerChangeEvent:
> > >     -  Reads all child nodes in /brokers/ids/ to get current brokers
> with
> > > broker generation
> > >     -  Detect new brokers, dead brokers and bounced brokers
> > >     -  Update the live broker ids in controller context
> > >     -  Update broker generations for new brokers in controller context
> > >     -  Invoke onBrokerStartup(new brokers)
> > >     -  Invoke onBrokerFailure(bounced brokers)
> > >     -  Update broker generations for bounce brokers in controller
> context
> > >     -  Invoke onBrokerStartup(bounced brokers)
> > >     -  Invoke onBrokerFailure(dead brokers)
> > >     We can further optimize the flow by avoiding sending requests to a
> > > broker if its broker generation is larger than the one in the
> controller
> > > context.
> > >
> > > I will also update the KIP to clarify how it works for
> BrokerChangeEvent
> > > processing in more detail.
> > >
> > > Thanks,
> > > Patrick
> > >
> > >
> > >
> > > On Thu, Oct 11, 2018 at 12:12 PM Jun Rao <ju...@confluent.io> wrote:
> > >
> > > > Hi, Patrick,
> > > >
> > > > Thanks for the KIP. Looks good to me overall and very useful. A few
> > > > comments below.
> > > >
> > > > 1. "will reject the requests with smaller broker generation than its
> > > > current generation." Is czxid monotonically increasing?
> > > >
> > > > 2. To clarify on the issue of the controller missing a ZK watcher. ZK
> > > > watchers are one-time watchers. Once a watcher is fired, one needs to
> > > > register it again before the watcher can be triggered. So, when the
> > > > controller is busy and a broker goes down and comes up, the first
> event
> > > > will trigger the ZK watcher. Since the controller is busy and hasn't
> > > > registered the watcher again, the second event actually won't fire.
> By
> > > the
> > > > time the controller reads from ZK, it sees that the broker is still
> > > > registered and thus thinks that nothing has happened to the broker,
> > which
> > > > is causing the problem.
> > > >
> > > > 3. "Handle broker state change: invoke onBrokerFailure(...) first,
> then
> > > > invoke onBrokerStartUp(...)". We probably want to be a bit careful
> > here.
> > > > Could you clarify the broker list and the broker epoch used when
> making
> > > > these calls? We want to prevent the restarted broker from receiving a
> > > > partial replica list on the first LeaderAndIsr request because of
> this.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Wed, Oct 10, 2018 at 12:51 PM, Patrick Huang <hz...@hotmail.com>
> > > > wrote:
> > > >
> > > > > Hey Stanislav,
> > > > >
> > > > > Sure. Thanks for your interest in this KIP. I am glad to provide
> more
> > > > > detail.
> > > > >
> > > > > broker A is initiating a controlled shutdown (restart). The
> > Controller
> > > > > sends a StopReplicaRequest but it reaches broker A after it has
> > started
> > > > up
> > > > > again. He therefore stops replicating those partitions even though
> he
> > > > > should just be starting to
> > > > > This is right.
> > > > >
> > > > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > > > restart.
> > > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> > > therefore
> > > > > starts leading for the partitions sent by that request and might
> stop
> > > > > leading partitions that it was leading previously.
> > > > > This was well explained in the linked JIRA, but I cannot understand
> > why
> > > > > that would happen due to my limited experience. If Broker A leads
> p1
> > > and
> > > > > p2, when would a Controller send a LeaderAndIsrRequest with p1 only
> > and
> > > > not
> > > > > want Broker A to drop leadership for p2?
> > > > > The root cause of the issue is that after a broker just restarts,
> it
> > > > > relies on the first LeaderAndIsrRequest to populate the partition
> > state
> > > > and
> > > > > initializes the highwater mark checkpoint thread. The highwater
> mark
> > > > > checkpoint thread will overwrite the highwater mark checkpoint file
> > > based
> > > > > on the broker's in-memory partition states. In other words, If a
> > > > partition
> > > > > that is physically hosted by the broker is missing in the in-memory
> > > > > partition states map, its highwater mark will be lost after the
> > > highwater
> > > > > mark checkpoint thread overwrites the file. (Related codes:
> > > > >
> https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > > > 4a5534d4e9/core/src/main/scala/kafka/server/
> > > ReplicaManager.scala#L1091)
> > > > > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> > > > >
> https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > > > 4a5534d4e9/core/src/main/scala/kafka/server/
> > > ReplicaManager.scala#L1091>
> > > > >
> > > > > apache/kafka<https://github.com/apache/kafka/blob/
> > > > >
> > > > ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/
> > > scala/kafka/server/
> > > > > ReplicaManager.scala#L1091>
> > > > > Mirror of Apache Kafka. Contribute to apache/kafka development by
> > > > creating
> > > > > an account on GitHub.
> > > > > github.com
> > > > >
> > > > >
> > > > > In your example, assume the first LeaderAndIsrRequest broker A
> > receives
> > > > is
> > > > > the one initiated in the controlled shutdown logic in Controller to
> > > move
> > > > > leadership away from broker A. This LeaderAndIsrRequest only
> contains
> > > > > partitions that broker A leads, not all the partitions that broker
> A
> > > > hosts
> > > > > (i.e. no follower partitions), so the highwater mark for the
> follower
> > > > > partitions will be lost. Also, the first LeaderAndIsrRequst broker
> A
> > > > > receives may not necessarily be the one initiated in controlled
> > > shutdown
> > > > > logic (e.g. there can be an ongoing preferred leader election),
> > > although
> > > > I
> > > > > think this may not be very common.
> > > > >
> > > > > Here the controller will start processing the BrokerChange event
> > (that
> > > > says
> > > > > that broker A shutdown) after the broker has come back up and
> > > > re-registered
> > > > > himself in ZK?
> > > > > How will the Controller miss the restart, won't he subsequently
> > receive
> > > > > another ZK event saying that broker A has come back up?
> > > > > Controller will not miss the BrokerChange event and actually there
> > will
> > > > be
> > > > > two BrokerChange events fired in this case (one for broker
> > > deregistration
> > > > > in zk and one for registration). However, when processing the
> > > > > BrokerChangeEvent, controller needs to do a read from zookeeper to
> > get
> > > > back
> > > > > the current brokers in the cluster and if the bounced broker
> already
> > > > joined
> > > > > the cluster by this time, controller will not know this broker has
> > been
> > > > > bounced because it sees no diff between zk and its in-memory cache.
> > So
> > > > > basically both of the BrokerChange event processing become no-op.
> > > > >
> > > > >
> > > > > Hope that I answer your questions. Feel free to follow up if I am
> > > missing
> > > > > something.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Zhanxiang (Patrick) Huang
> > > > >
> > > > > ________________________________
> > > > > From: Stanislav Kozlovski <st...@confluent.io>
> > > > > Sent: Wednesday, October 10, 2018 7:22
> > > > > To: dev@kafka.apache.org
> > > > > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests
> and
> > > > > bounced brokers using broker generation
> > > > >
> > > > > Hi Patrick,
> > > > >
> > > > > Thanks for the KIP! Fixing such correctness issues is always very
> > > > welcome -
> > > > > they're commonly hard to diagnose and debug when they happen in
> > > > production.
> > > > >
> > > > > I was wondering if I understood the potential correctness issues
> > > > correctly.
> > > > > Here is what I got:
> > > > >
> > > > >
> > > > >    - If a broker bounces during controlled shutdown, the bounced
> > broker
> > > > may
> > > > >    accidentally process its earlier generation’s StopReplicaRequest
> > > sent
> > > > > from
> > > > >    the active controller for one of its follower replicas, leaving
> > the
> > > > > replica
> > > > >    offline while its remaining replicas may stay online
> > > > >
> > > > > broker A is initiating a controlled shutdown (restart). The
> > Controller
> > > > > sends a StopReplicaRequest but it reaches broker A after it has
> > started
> > > > up
> > > > > again. He therefore stops replicating those partitions even though
> he
> > > > > should just be starting to
> > > > >
> > > > >
> > > > >    - If the first LeaderAndIsrRequest that a broker processes is
> sent
> > > by
> > > > >    the active controller before its startup, the broker will
> > overwrite
> > > > the
> > > > >    high watermark checkpoint file and may cause incorrect
> truncation
> > (
> > > > >    KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235>)
> > > > >
> > > > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > > > restart.
> > > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> > > therefore
> > > > > starts leading for the partitions sent by that request and might
> stop
> > > > > leading partitions that it was leading previously.
> > > > > This was well explained in the linked JIRA, but I cannot understand
> > why
> > > > > that would happen due to my limited experience. If Broker A leads
> p1
> > > and
> > > > > p2, when would a Controller send a LeaderAndIsrRequest with p1 only
> > and
> > > > not
> > > > > want Broker A to drop leadership for p2?
> > > > >
> > > > >
> > > > >    - If a broker bounces very quickly, the controller may start
> > > > processing
> > > > >    the BrokerChange event after the broker already re-registers
> > itself
> > > in
> > > > > zk.
> > > > >    In this case, controller will miss the broker restart and will
> not
> > > > send
> > > > > any
> > > > >    requests to the broker for initialization. The broker will not
> be
> > > able
> > > > > to
> > > > >    accept traffics.
> > > > >
> > > > > Here the controller will start processing the BrokerChange event
> > (that
> > > > says
> > > > > that broker A shutdown) after the broker has come back up and
> > > > re-registered
> > > > > himself in ZK?
> > > > > How will the Controller miss the restart, won't he subsequently
> > receive
> > > > > another ZK event saying that broker A has come back up?
> > > > >
> > > > >
> > > > > Could we explain these potential problems in a bit more detail just
> > so
> > > > they
> > > > > could be more easily digestable by novices?
> > > > >
> > > > > Thanks,
> > > > > Stanislav
> > > > >
> > > > > On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hey Patrick,
> > > > > >
> > > > > > Thanks much for the KIP. The KIP is very well written.
> > > > > >
> > > > > > LGTM.  +1 (binding)
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > > On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <
> hzxa21@hotmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > Please find the below KIP which proposes the concept of broker
> > > > > generation
> > > > > > > to resolve issues caused by controller missing broker state
> > changes
> > > > and
> > > > > > > broker processing outdated control requests.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 380%3A+Detect+outdated+control+requests+and+bounced+
> > > brokers+using+broker+
> > > > > generation
> > > > > > >
> > > > > > > All comments are appreciated.
> > > > > > >
> > > > > > > Best,
> > > > > > > Zhanxiang (Patrick) Huang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best,
> > > > > Stanislav
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Patrick Huang <hz...@hotmail.com>.
Hi,

In this KIP, we are also going to add a new exception and a new error code "STALE_BROKER_EPOCH" in order to allow the broker to respond back the right error when it sees outdated broker epoch in the control requests. Since adding a new exception and error code is also considered as public interface change, I have updated the original KIP accordingly to include this change. Feel free to comment if there is any concern.

Thanks,
Zhanxiang (Patrick) Huang

________________________________
From: Patrick Huang <hz...@hotmail.com>
Sent: Tuesday, October 23, 2018 6:20
To: Jun Rao; dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Agreed. I have updated the PR to add czxid in ControlledShutdownRequest (https://github.com/apache/kafka/pull/5821). Appreciated if you can take a look.

Btw, I also have the vote thread for this KIP: https://lists.apache.org/thread.html/3689d83db537d5aa86d10967dee7ee29578897fc123daae4f77a8605@%3Cdev.kafka.apache.org%3E

Best,
Zhanxiang (Patrick) Huang

________________________________
From: Jun Rao <ju...@confluent.io>
Sent: Monday, October 22, 2018 21:31
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Hi, Patrick,

Yes, that's the general sequence. After step 2, the shutting down broker
can give up the controlled shutdown process and proceed to shut down. When
it's restarted, it could still receive StopReplica requests from the
controller in reaction to the previous controlled shutdown requests. This
could lead the restarted broker to a bad state.

Thanks,

Jun


On Mon, Oct 22, 2018 at 4:32 PM, Patrick Huang <hz...@hotmail.com> wrote:

> Hi Jun,
>
> That is a good point. I want to make it clear about the scenario you
> mentioned. Correct me if I am wrong. Here is the sequence of the event:
>
>    1. Broker sends ControlledShutdown request 1 to controller
>    2. Broker sends ControlledShutdown request 2 to controller due to
>    reties
>    3. Controller processes ControlledShutdown request 1
>    4. Controller sends control requests to the broker
>    5. Broker receives ControlledShutdown response 1 from controller
>    6. Broker shuts down and restarts quickly
>    7. Controller processes ControllerShutdown request 2
>    8. Controller sends control requests to the broker
>
> It is possible that controller processes the broker change event between
> 6) and 7). In this case, controller already updates the cached czxid to the
> up-to-date ones so the bounced broker will not reject control requests in
> 8), which cause a correctness problem.
>
>
> Best,
> Zhanxiang (Patrick) Huang
>
> ------------------------------
> *From:* Jun Rao <ju...@confluent.io>
> *Sent:* Monday, October 22, 2018 14:45
> *To:* dev
> *Subject:* Re: [DISCUSS] KIP-380: Detect outdated control requests and
> bounced brokers using broker generation
>
> Hi, Patrick,
>
> There is another thing that may be worth considering.
>
> 10. It will be useful to include the czxid also in the ControlledShutdown
> request. This way, if the broker has been restarted, the controller can
> ignore an old ControlledShutdown request(e.g., due to retries). This will
> prevent the restarted broker from incorrectly stopping replicas.
>
> Thanks,
>
> Jun
>
>
> On Thu, Oct 11, 2018 at 5:56 PM, Patrick Huang <hz...@gmail.com>
> wrote:
>
> > Hi Jun,
> >
> > Thanks a lot for the comments.
> >
> > 1. czxid is globally unique and monotonically increasing based on the
> > zookeeper doc.
> > References (from
> > https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
> > "Every change to the ZooKeeper state receives a stamp in the form of a
> > *zxid* (ZooKeeper Transaction Id). This exposes the total ordering of all
> > changes to ZooKeeper. Each change will have a unique zxid and if zxid1 is
> > smaller than zxid2 then zxid1 happened before zxid2."
> > "czxid: The zxid of the change that caused this znode to be created."
> >
> > 2. You are right. There will be only on broker change event fired in the
> > case I mentioned because we will not register the watcher before the
> read.
> >
> > 3. Let's say we want to initialize the states of broker set A and we want
> > the cluster to be aware of the absence of broker set B. The currently
> live
> > broker set in the cluster is C.
> >
> >     From the design point of view, here are the rules for broker state
> > transition:
> >     - Pass in broker ids of A for onBrokerStartup() and pass in broker
> ids
> > of B for onBrokerFailure().
> >     - When processing onBrokerStartup(), we use the broker generation
> > controller read from zk to send requests to broker set A and use the
> > previously cached broker generation to send requests to (C-A).
> >     - When processing onBrokerFailure(), we use the previously cached
> > broker generation to send requests to C.
> >
> >     From the implementation point of view, here are the steps we need to
> > follow when processing BrokerChangeEvent:
> >     -  Reads all child nodes in /brokers/ids/ to get current brokers with
> > broker generation
> >     -  Detect new brokers, dead brokers and bounced brokers
> >     -  Update the live broker ids in controller context
> >     -  Update broker generations for new brokers in controller context
> >     -  Invoke onBrokerStartup(new brokers)
> >     -  Invoke onBrokerFailure(bounced brokers)
> >     -  Update broker generations for bounce brokers in controller context
> >     -  Invoke onBrokerStartup(bounced brokers)
> >     -  Invoke onBrokerFailure(dead brokers)
> >     We can further optimize the flow by avoiding sending requests to a
> > broker if its broker generation is larger than the one in the controller
> > context.
> >
> > I will also update the KIP to clarify how it works for BrokerChangeEvent
> > processing in more detail.
> >
> > Thanks,
> > Patrick
> >
> >
> >
> > On Thu, Oct 11, 2018 at 12:12 PM Jun Rao <ju...@confluent.io> wrote:
> >
> > > Hi, Patrick,
> > >
> > > Thanks for the KIP. Looks good to me overall and very useful. A few
> > > comments below.
> > >
> > > 1. "will reject the requests with smaller broker generation than its
> > > current generation." Is czxid monotonically increasing?
> > >
> > > 2. To clarify on the issue of the controller missing a ZK watcher. ZK
> > > watchers are one-time watchers. Once a watcher is fired, one needs to
> > > register it again before the watcher can be triggered. So, when the
> > > controller is busy and a broker goes down and comes up, the first event
> > > will trigger the ZK watcher. Since the controller is busy and hasn't
> > > registered the watcher again, the second event actually won't fire. By
> > the
> > > time the controller reads from ZK, it sees that the broker is still
> > > registered and thus thinks that nothing has happened to the broker,
> which
> > > is causing the problem.
> > >
> > > 3. "Handle broker state change: invoke onBrokerFailure(...) first, then
> > > invoke onBrokerStartUp(...)". We probably want to be a bit careful
> here.
> > > Could you clarify the broker list and the broker epoch used when making
> > > these calls? We want to prevent the restarted broker from receiving a
> > > partial replica list on the first LeaderAndIsr request because of this.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Oct 10, 2018 at 12:51 PM, Patrick Huang <hz...@hotmail.com>
> > > wrote:
> > >
> > > > Hey Stanislav,
> > > >
> > > > Sure. Thanks for your interest in this KIP. I am glad to provide more
> > > > detail.
> > > >
> > > > broker A is initiating a controlled shutdown (restart). The
> Controller
> > > > sends a StopReplicaRequest but it reaches broker A after it has
> started
> > > up
> > > > again. He therefore stops replicating those partitions even though he
> > > > should just be starting to
> > > > This is right.
> > > >
> > > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > > restart.
> > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> > therefore
> > > > starts leading for the partitions sent by that request and might stop
> > > > leading partitions that it was leading previously.
> > > > This was well explained in the linked JIRA, but I cannot understand
> why
> > > > that would happen due to my limited experience. If Broker A leads p1
> > and
> > > > p2, when would a Controller send a LeaderAndIsrRequest with p1 only
> and
> > > not
> > > > want Broker A to drop leadership for p2?
> > > > The root cause of the issue is that after a broker just restarts, it
> > > > relies on the first LeaderAndIsrRequest to populate the partition
> state
> > > and
> > > > initializes the highwater mark checkpoint thread. The highwater mark
> > > > checkpoint thread will overwrite the highwater mark checkpoint file
> > based
> > > > on the broker's in-memory partition states. In other words, If a
> > > partition
> > > > that is physically hosted by the broker is missing in the in-memory
> > > > partition states map, its highwater mark will be lost after the
> > highwater
> > > > mark checkpoint thread overwrites the file. (Related codes:
> > > > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > > 4a5534d4e9/core/src/main/scala/kafka/server/
> > ReplicaManager.scala#L1091)
> > > > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> > > > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > > 4a5534d4e9/core/src/main/scala/kafka/server/
> > ReplicaManager.scala#L1091>
> > > >
> > > > apache/kafka<https://github.com/apache/kafka/blob/
> > > >
> > > ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/
> > scala/kafka/server/
> > > > ReplicaManager.scala#L1091>
> > > > Mirror of Apache Kafka. Contribute to apache/kafka development by
> > > creating
> > > > an account on GitHub.
> > > > github.com
> > > >
> > > >
> > > > In your example, assume the first LeaderAndIsrRequest broker A
> receives
> > > is
> > > > the one initiated in the controlled shutdown logic in Controller to
> > move
> > > > leadership away from broker A. This LeaderAndIsrRequest only contains
> > > > partitions that broker A leads, not all the partitions that broker A
> > > hosts
> > > > (i.e. no follower partitions), so the highwater mark for the follower
> > > > partitions will be lost. Also, the first LeaderAndIsrRequst broker A
> > > > receives may not necessarily be the one initiated in controlled
> > shutdown
> > > > logic (e.g. there can be an ongoing preferred leader election),
> > although
> > > I
> > > > think this may not be very common.
> > > >
> > > > Here the controller will start processing the BrokerChange event
> (that
> > > says
> > > > that broker A shutdown) after the broker has come back up and
> > > re-registered
> > > > himself in ZK?
> > > > How will the Controller miss the restart, won't he subsequently
> receive
> > > > another ZK event saying that broker A has come back up?
> > > > Controller will not miss the BrokerChange event and actually there
> will
> > > be
> > > > two BrokerChange events fired in this case (one for broker
> > deregistration
> > > > in zk and one for registration). However, when processing the
> > > > BrokerChangeEvent, controller needs to do a read from zookeeper to
> get
> > > back
> > > > the current brokers in the cluster and if the bounced broker already
> > > joined
> > > > the cluster by this time, controller will not know this broker has
> been
> > > > bounced because it sees no diff between zk and its in-memory cache.
> So
> > > > basically both of the BrokerChange event processing become no-op.
> > > >
> > > >
> > > > Hope that I answer your questions. Feel free to follow up if I am
> > missing
> > > > something.
> > > >
> > > >
> > > > Thanks,
> > > > Zhanxiang (Patrick) Huang
> > > >
> > > > ________________________________
> > > > From: Stanislav Kozlovski <st...@confluent.io>
> > > > Sent: Wednesday, October 10, 2018 7:22
> > > > To: dev@kafka.apache.org
> > > > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> > > > bounced brokers using broker generation
> > > >
> > > > Hi Patrick,
> > > >
> > > > Thanks for the KIP! Fixing such correctness issues is always very
> > > welcome -
> > > > they're commonly hard to diagnose and debug when they happen in
> > > production.
> > > >
> > > > I was wondering if I understood the potential correctness issues
> > > correctly.
> > > > Here is what I got:
> > > >
> > > >
> > > >    - If a broker bounces during controlled shutdown, the bounced
> broker
> > > may
> > > >    accidentally process its earlier generation’s StopReplicaRequest
> > sent
> > > > from
> > > >    the active controller for one of its follower replicas, leaving
> the
> > > > replica
> > > >    offline while its remaining replicas may stay online
> > > >
> > > > broker A is initiating a controlled shutdown (restart). The
> Controller
> > > > sends a StopReplicaRequest but it reaches broker A after it has
> started
> > > up
> > > > again. He therefore stops replicating those partitions even though he
> > > > should just be starting to
> > > >
> > > >
> > > >    - If the first LeaderAndIsrRequest that a broker processes is sent
> > by
> > > >    the active controller before its startup, the broker will
> overwrite
> > > the
> > > >    high watermark checkpoint file and may cause incorrect truncation
> (
> > > >    KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235>)
> > > >
> > > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > > restart.
> > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> > therefore
> > > > starts leading for the partitions sent by that request and might stop
> > > > leading partitions that it was leading previously.
> > > > This was well explained in the linked JIRA, but I cannot understand
> why
> > > > that would happen due to my limited experience. If Broker A leads p1
> > and
> > > > p2, when would a Controller send a LeaderAndIsrRequest with p1 only
> and
> > > not
> > > > want Broker A to drop leadership for p2?
> > > >
> > > >
> > > >    - If a broker bounces very quickly, the controller may start
> > > processing
> > > >    the BrokerChange event after the broker already re-registers
> itself
> > in
> > > > zk.
> > > >    In this case, controller will miss the broker restart and will not
> > > send
> > > > any
> > > >    requests to the broker for initialization. The broker will not be
> > able
> > > > to
> > > >    accept traffics.
> > > >
> > > > Here the controller will start processing the BrokerChange event
> (that
> > > says
> > > > that broker A shutdown) after the broker has come back up and
> > > re-registered
> > > > himself in ZK?
> > > > How will the Controller miss the restart, won't he subsequently
> receive
> > > > another ZK event saying that broker A has come back up?
> > > >
> > > >
> > > > Could we explain these potential problems in a bit more detail just
> so
> > > they
> > > > could be more easily digestable by novices?
> > > >
> > > > Thanks,
> > > > Stanislav
> > > >
> > > > On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com>
> wrote:
> > > >
> > > > > Hey Patrick,
> > > > >
> > > > > Thanks much for the KIP. The KIP is very well written.
> > > > >
> > > > > LGTM.  +1 (binding)
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > > On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <hz...@hotmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > Please find the below KIP which proposes the concept of broker
> > > > generation
> > > > > > to resolve issues caused by controller missing broker state
> changes
> > > and
> > > > > > broker processing outdated control requests.
> > > > > >
> > > > > >
> > > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 380%3A+Detect+outdated+control+requests+and+bounced+
> > brokers+using+broker+
> > > > generation
> > > > > >
> > > > > > All comments are appreciated.
> > > > > >
> > > > > > Best,
> > > > > > Zhanxiang (Patrick) Huang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Stanislav
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Patrick Huang <hz...@hotmail.com>.
Agreed. I have updated the PR to add czxid in ControlledShutdownRequest (https://github.com/apache/kafka/pull/5821). Appreciated if you can take a look.

Btw, I also have the vote thread for this KIP: https://lists.apache.org/thread.html/3689d83db537d5aa86d10967dee7ee29578897fc123daae4f77a8605@%3Cdev.kafka.apache.org%3E

Best,
Zhanxiang (Patrick) Huang

________________________________
From: Jun Rao <ju...@confluent.io>
Sent: Monday, October 22, 2018 21:31
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Hi, Patrick,

Yes, that's the general sequence. After step 2, the shutting down broker
can give up the controlled shutdown process and proceed to shut down. When
it's restarted, it could still receive StopReplica requests from the
controller in reaction to the previous controlled shutdown requests. This
could lead the restarted broker to a bad state.

Thanks,

Jun


On Mon, Oct 22, 2018 at 4:32 PM, Patrick Huang <hz...@hotmail.com> wrote:

> Hi Jun,
>
> That is a good point. I want to make it clear about the scenario you
> mentioned. Correct me if I am wrong. Here is the sequence of the event:
>
>    1. Broker sends ControlledShutdown request 1 to controller
>    2. Broker sends ControlledShutdown request 2 to controller due to
>    reties
>    3. Controller processes ControlledShutdown request 1
>    4. Controller sends control requests to the broker
>    5. Broker receives ControlledShutdown response 1 from controller
>    6. Broker shuts down and restarts quickly
>    7. Controller processes ControllerShutdown request 2
>    8. Controller sends control requests to the broker
>
> It is possible that controller processes the broker change event between
> 6) and 7). In this case, controller already updates the cached czxid to the
> up-to-date ones so the bounced broker will not reject control requests in
> 8), which cause a correctness problem.
>
>
> Best,
> Zhanxiang (Patrick) Huang
>
> ------------------------------
> *From:* Jun Rao <ju...@confluent.io>
> *Sent:* Monday, October 22, 2018 14:45
> *To:* dev
> *Subject:* Re: [DISCUSS] KIP-380: Detect outdated control requests and
> bounced brokers using broker generation
>
> Hi, Patrick,
>
> There is another thing that may be worth considering.
>
> 10. It will be useful to include the czxid also in the ControlledShutdown
> request. This way, if the broker has been restarted, the controller can
> ignore an old ControlledShutdown request(e.g., due to retries). This will
> prevent the restarted broker from incorrectly stopping replicas.
>
> Thanks,
>
> Jun
>
>
> On Thu, Oct 11, 2018 at 5:56 PM, Patrick Huang <hz...@gmail.com>
> wrote:
>
> > Hi Jun,
> >
> > Thanks a lot for the comments.
> >
> > 1. czxid is globally unique and monotonically increasing based on the
> > zookeeper doc.
> > References (from
> > https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
> > "Every change to the ZooKeeper state receives a stamp in the form of a
> > *zxid* (ZooKeeper Transaction Id). This exposes the total ordering of all
> > changes to ZooKeeper. Each change will have a unique zxid and if zxid1 is
> > smaller than zxid2 then zxid1 happened before zxid2."
> > "czxid: The zxid of the change that caused this znode to be created."
> >
> > 2. You are right. There will be only on broker change event fired in the
> > case I mentioned because we will not register the watcher before the
> read.
> >
> > 3. Let's say we want to initialize the states of broker set A and we want
> > the cluster to be aware of the absence of broker set B. The currently
> live
> > broker set in the cluster is C.
> >
> >     From the design point of view, here are the rules for broker state
> > transition:
> >     - Pass in broker ids of A for onBrokerStartup() and pass in broker
> ids
> > of B for onBrokerFailure().
> >     - When processing onBrokerStartup(), we use the broker generation
> > controller read from zk to send requests to broker set A and use the
> > previously cached broker generation to send requests to (C-A).
> >     - When processing onBrokerFailure(), we use the previously cached
> > broker generation to send requests to C.
> >
> >     From the implementation point of view, here are the steps we need to
> > follow when processing BrokerChangeEvent:
> >     -  Reads all child nodes in /brokers/ids/ to get current brokers with
> > broker generation
> >     -  Detect new brokers, dead brokers and bounced brokers
> >     -  Update the live broker ids in controller context
> >     -  Update broker generations for new brokers in controller context
> >     -  Invoke onBrokerStartup(new brokers)
> >     -  Invoke onBrokerFailure(bounced brokers)
> >     -  Update broker generations for bounce brokers in controller context
> >     -  Invoke onBrokerStartup(bounced brokers)
> >     -  Invoke onBrokerFailure(dead brokers)
> >     We can further optimize the flow by avoiding sending requests to a
> > broker if its broker generation is larger than the one in the controller
> > context.
> >
> > I will also update the KIP to clarify how it works for BrokerChangeEvent
> > processing in more detail.
> >
> > Thanks,
> > Patrick
> >
> >
> >
> > On Thu, Oct 11, 2018 at 12:12 PM Jun Rao <ju...@confluent.io> wrote:
> >
> > > Hi, Patrick,
> > >
> > > Thanks for the KIP. Looks good to me overall and very useful. A few
> > > comments below.
> > >
> > > 1. "will reject the requests with smaller broker generation than its
> > > current generation." Is czxid monotonically increasing?
> > >
> > > 2. To clarify on the issue of the controller missing a ZK watcher. ZK
> > > watchers are one-time watchers. Once a watcher is fired, one needs to
> > > register it again before the watcher can be triggered. So, when the
> > > controller is busy and a broker goes down and comes up, the first event
> > > will trigger the ZK watcher. Since the controller is busy and hasn't
> > > registered the watcher again, the second event actually won't fire. By
> > the
> > > time the controller reads from ZK, it sees that the broker is still
> > > registered and thus thinks that nothing has happened to the broker,
> which
> > > is causing the problem.
> > >
> > > 3. "Handle broker state change: invoke onBrokerFailure(...) first, then
> > > invoke onBrokerStartUp(...)". We probably want to be a bit careful
> here.
> > > Could you clarify the broker list and the broker epoch used when making
> > > these calls? We want to prevent the restarted broker from receiving a
> > > partial replica list on the first LeaderAndIsr request because of this.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Oct 10, 2018 at 12:51 PM, Patrick Huang <hz...@hotmail.com>
> > > wrote:
> > >
> > > > Hey Stanislav,
> > > >
> > > > Sure. Thanks for your interest in this KIP. I am glad to provide more
> > > > detail.
> > > >
> > > > broker A is initiating a controlled shutdown (restart). The
> Controller
> > > > sends a StopReplicaRequest but it reaches broker A after it has
> started
> > > up
> > > > again. He therefore stops replicating those partitions even though he
> > > > should just be starting to
> > > > This is right.
> > > >
> > > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > > restart.
> > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> > therefore
> > > > starts leading for the partitions sent by that request and might stop
> > > > leading partitions that it was leading previously.
> > > > This was well explained in the linked JIRA, but I cannot understand
> why
> > > > that would happen due to my limited experience. If Broker A leads p1
> > and
> > > > p2, when would a Controller send a LeaderAndIsrRequest with p1 only
> and
> > > not
> > > > want Broker A to drop leadership for p2?
> > > > The root cause of the issue is that after a broker just restarts, it
> > > > relies on the first LeaderAndIsrRequest to populate the partition
> state
> > > and
> > > > initializes the highwater mark checkpoint thread. The highwater mark
> > > > checkpoint thread will overwrite the highwater mark checkpoint file
> > based
> > > > on the broker's in-memory partition states. In other words, If a
> > > partition
> > > > that is physically hosted by the broker is missing in the in-memory
> > > > partition states map, its highwater mark will be lost after the
> > highwater
> > > > mark checkpoint thread overwrites the file. (Related codes:
> > > > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > > 4a5534d4e9/core/src/main/scala/kafka/server/
> > ReplicaManager.scala#L1091)
> > > > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> > > > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > > 4a5534d4e9/core/src/main/scala/kafka/server/
> > ReplicaManager.scala#L1091>
> > > >
> > > > apache/kafka<https://github.com/apache/kafka/blob/
> > > >
> > > ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/
> > scala/kafka/server/
> > > > ReplicaManager.scala#L1091>
> > > > Mirror of Apache Kafka. Contribute to apache/kafka development by
> > > creating
> > > > an account on GitHub.
> > > > github.com
> > > >
> > > >
> > > > In your example, assume the first LeaderAndIsrRequest broker A
> receives
> > > is
> > > > the one initiated in the controlled shutdown logic in Controller to
> > move
> > > > leadership away from broker A. This LeaderAndIsrRequest only contains
> > > > partitions that broker A leads, not all the partitions that broker A
> > > hosts
> > > > (i.e. no follower partitions), so the highwater mark for the follower
> > > > partitions will be lost. Also, the first LeaderAndIsrRequst broker A
> > > > receives may not necessarily be the one initiated in controlled
> > shutdown
> > > > logic (e.g. there can be an ongoing preferred leader election),
> > although
> > > I
> > > > think this may not be very common.
> > > >
> > > > Here the controller will start processing the BrokerChange event
> (that
> > > says
> > > > that broker A shutdown) after the broker has come back up and
> > > re-registered
> > > > himself in ZK?
> > > > How will the Controller miss the restart, won't he subsequently
> receive
> > > > another ZK event saying that broker A has come back up?
> > > > Controller will not miss the BrokerChange event and actually there
> will
> > > be
> > > > two BrokerChange events fired in this case (one for broker
> > deregistration
> > > > in zk and one for registration). However, when processing the
> > > > BrokerChangeEvent, controller needs to do a read from zookeeper to
> get
> > > back
> > > > the current brokers in the cluster and if the bounced broker already
> > > joined
> > > > the cluster by this time, controller will not know this broker has
> been
> > > > bounced because it sees no diff between zk and its in-memory cache.
> So
> > > > basically both of the BrokerChange event processing become no-op.
> > > >
> > > >
> > > > Hope that I answer your questions. Feel free to follow up if I am
> > missing
> > > > something.
> > > >
> > > >
> > > > Thanks,
> > > > Zhanxiang (Patrick) Huang
> > > >
> > > > ________________________________
> > > > From: Stanislav Kozlovski <st...@confluent.io>
> > > > Sent: Wednesday, October 10, 2018 7:22
> > > > To: dev@kafka.apache.org
> > > > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> > > > bounced brokers using broker generation
> > > >
> > > > Hi Patrick,
> > > >
> > > > Thanks for the KIP! Fixing such correctness issues is always very
> > > welcome -
> > > > they're commonly hard to diagnose and debug when they happen in
> > > production.
> > > >
> > > > I was wondering if I understood the potential correctness issues
> > > correctly.
> > > > Here is what I got:
> > > >
> > > >
> > > >    - If a broker bounces during controlled shutdown, the bounced
> broker
> > > may
> > > >    accidentally process its earlier generation’s StopReplicaRequest
> > sent
> > > > from
> > > >    the active controller for one of its follower replicas, leaving
> the
> > > > replica
> > > >    offline while its remaining replicas may stay online
> > > >
> > > > broker A is initiating a controlled shutdown (restart). The
> Controller
> > > > sends a StopReplicaRequest but it reaches broker A after it has
> started
> > > up
> > > > again. He therefore stops replicating those partitions even though he
> > > > should just be starting to
> > > >
> > > >
> > > >    - If the first LeaderAndIsrRequest that a broker processes is sent
> > by
> > > >    the active controller before its startup, the broker will
> overwrite
> > > the
> > > >    high watermark checkpoint file and may cause incorrect truncation
> (
> > > >    KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235>)
> > > >
> > > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > > restart.
> > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> > therefore
> > > > starts leading for the partitions sent by that request and might stop
> > > > leading partitions that it was leading previously.
> > > > This was well explained in the linked JIRA, but I cannot understand
> why
> > > > that would happen due to my limited experience. If Broker A leads p1
> > and
> > > > p2, when would a Controller send a LeaderAndIsrRequest with p1 only
> and
> > > not
> > > > want Broker A to drop leadership for p2?
> > > >
> > > >
> > > >    - If a broker bounces very quickly, the controller may start
> > > processing
> > > >    the BrokerChange event after the broker already re-registers
> itself
> > in
> > > > zk.
> > > >    In this case, controller will miss the broker restart and will not
> > > send
> > > > any
> > > >    requests to the broker for initialization. The broker will not be
> > able
> > > > to
> > > >    accept traffics.
> > > >
> > > > Here the controller will start processing the BrokerChange event
> (that
> > > says
> > > > that broker A shutdown) after the broker has come back up and
> > > re-registered
> > > > himself in ZK?
> > > > How will the Controller miss the restart, won't he subsequently
> receive
> > > > another ZK event saying that broker A has come back up?
> > > >
> > > >
> > > > Could we explain these potential problems in a bit more detail just
> so
> > > they
> > > > could be more easily digestable by novices?
> > > >
> > > > Thanks,
> > > > Stanislav
> > > >
> > > > On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com>
> wrote:
> > > >
> > > > > Hey Patrick,
> > > > >
> > > > > Thanks much for the KIP. The KIP is very well written.
> > > > >
> > > > > LGTM.  +1 (binding)
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > > On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <hz...@hotmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > Please find the below KIP which proposes the concept of broker
> > > > generation
> > > > > > to resolve issues caused by controller missing broker state
> changes
> > > and
> > > > > > broker processing outdated control requests.
> > > > > >
> > > > > >
> > > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 380%3A+Detect+outdated+control+requests+and+bounced+
> > brokers+using+broker+
> > > > generation
> > > > > >
> > > > > > All comments are appreciated.
> > > > > >
> > > > > > Best,
> > > > > > Zhanxiang (Patrick) Huang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Stanislav
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Jun Rao <ju...@confluent.io>.
Hi, Patrick,

Yes, that's the general sequence. After step 2, the shutting down broker
can give up the controlled shutdown process and proceed to shut down. When
it's restarted, it could still receive StopReplica requests from the
controller in reaction to the previous controlled shutdown requests. This
could lead the restarted broker to a bad state.

Thanks,

Jun


On Mon, Oct 22, 2018 at 4:32 PM, Patrick Huang <hz...@hotmail.com> wrote:

> Hi Jun,
>
> That is a good point. I want to make it clear about the scenario you
> mentioned. Correct me if I am wrong. Here is the sequence of the event:
>
>    1. Broker sends ControlledShutdown request 1 to controller
>    2. Broker sends ControlledShutdown request 2 to controller due to
>    reties
>    3. Controller processes ControlledShutdown request 1
>    4. Controller sends control requests to the broker
>    5. Broker receives ControlledShutdown response 1 from controller
>    6. Broker shuts down and restarts quickly
>    7. Controller processes ControllerShutdown request 2
>    8. Controller sends control requests to the broker
>
> It is possible that controller processes the broker change event between
> 6) and 7). In this case, controller already updates the cached czxid to the
> up-to-date ones so the bounced broker will not reject control requests in
> 8), which cause a correctness problem.
>
>
> Best,
> Zhanxiang (Patrick) Huang
>
> ------------------------------
> *From:* Jun Rao <ju...@confluent.io>
> *Sent:* Monday, October 22, 2018 14:45
> *To:* dev
> *Subject:* Re: [DISCUSS] KIP-380: Detect outdated control requests and
> bounced brokers using broker generation
>
> Hi, Patrick,
>
> There is another thing that may be worth considering.
>
> 10. It will be useful to include the czxid also in the ControlledShutdown
> request. This way, if the broker has been restarted, the controller can
> ignore an old ControlledShutdown request(e.g., due to retries). This will
> prevent the restarted broker from incorrectly stopping replicas.
>
> Thanks,
>
> Jun
>
>
> On Thu, Oct 11, 2018 at 5:56 PM, Patrick Huang <hz...@gmail.com>
> wrote:
>
> > Hi Jun,
> >
> > Thanks a lot for the comments.
> >
> > 1. czxid is globally unique and monotonically increasing based on the
> > zookeeper doc.
> > References (from
> > https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
> > "Every change to the ZooKeeper state receives a stamp in the form of a
> > *zxid* (ZooKeeper Transaction Id). This exposes the total ordering of all
> > changes to ZooKeeper. Each change will have a unique zxid and if zxid1 is
> > smaller than zxid2 then zxid1 happened before zxid2."
> > "czxid: The zxid of the change that caused this znode to be created."
> >
> > 2. You are right. There will be only on broker change event fired in the
> > case I mentioned because we will not register the watcher before the
> read.
> >
> > 3. Let's say we want to initialize the states of broker set A and we want
> > the cluster to be aware of the absence of broker set B. The currently
> live
> > broker set in the cluster is C.
> >
> >     From the design point of view, here are the rules for broker state
> > transition:
> >     - Pass in broker ids of A for onBrokerStartup() and pass in broker
> ids
> > of B for onBrokerFailure().
> >     - When processing onBrokerStartup(), we use the broker generation
> > controller read from zk to send requests to broker set A and use the
> > previously cached broker generation to send requests to (C-A).
> >     - When processing onBrokerFailure(), we use the previously cached
> > broker generation to send requests to C.
> >
> >     From the implementation point of view, here are the steps we need to
> > follow when processing BrokerChangeEvent:
> >     -  Reads all child nodes in /brokers/ids/ to get current brokers with
> > broker generation
> >     -  Detect new brokers, dead brokers and bounced brokers
> >     -  Update the live broker ids in controller context
> >     -  Update broker generations for new brokers in controller context
> >     -  Invoke onBrokerStartup(new brokers)
> >     -  Invoke onBrokerFailure(bounced brokers)
> >     -  Update broker generations for bounce brokers in controller context
> >     -  Invoke onBrokerStartup(bounced brokers)
> >     -  Invoke onBrokerFailure(dead brokers)
> >     We can further optimize the flow by avoiding sending requests to a
> > broker if its broker generation is larger than the one in the controller
> > context.
> >
> > I will also update the KIP to clarify how it works for BrokerChangeEvent
> > processing in more detail.
> >
> > Thanks,
> > Patrick
> >
> >
> >
> > On Thu, Oct 11, 2018 at 12:12 PM Jun Rao <ju...@confluent.io> wrote:
> >
> > > Hi, Patrick,
> > >
> > > Thanks for the KIP. Looks good to me overall and very useful. A few
> > > comments below.
> > >
> > > 1. "will reject the requests with smaller broker generation than its
> > > current generation." Is czxid monotonically increasing?
> > >
> > > 2. To clarify on the issue of the controller missing a ZK watcher. ZK
> > > watchers are one-time watchers. Once a watcher is fired, one needs to
> > > register it again before the watcher can be triggered. So, when the
> > > controller is busy and a broker goes down and comes up, the first event
> > > will trigger the ZK watcher. Since the controller is busy and hasn't
> > > registered the watcher again, the second event actually won't fire. By
> > the
> > > time the controller reads from ZK, it sees that the broker is still
> > > registered and thus thinks that nothing has happened to the broker,
> which
> > > is causing the problem.
> > >
> > > 3. "Handle broker state change: invoke onBrokerFailure(...) first, then
> > > invoke onBrokerStartUp(...)". We probably want to be a bit careful
> here.
> > > Could you clarify the broker list and the broker epoch used when making
> > > these calls? We want to prevent the restarted broker from receiving a
> > > partial replica list on the first LeaderAndIsr request because of this.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Oct 10, 2018 at 12:51 PM, Patrick Huang <hz...@hotmail.com>
> > > wrote:
> > >
> > > > Hey Stanislav,
> > > >
> > > > Sure. Thanks for your interest in this KIP. I am glad to provide more
> > > > detail.
> > > >
> > > > broker A is initiating a controlled shutdown (restart). The
> Controller
> > > > sends a StopReplicaRequest but it reaches broker A after it has
> started
> > > up
> > > > again. He therefore stops replicating those partitions even though he
> > > > should just be starting to
> > > > This is right.
> > > >
> > > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > > restart.
> > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> > therefore
> > > > starts leading for the partitions sent by that request and might stop
> > > > leading partitions that it was leading previously.
> > > > This was well explained in the linked JIRA, but I cannot understand
> why
> > > > that would happen due to my limited experience. If Broker A leads p1
> > and
> > > > p2, when would a Controller send a LeaderAndIsrRequest with p1 only
> and
> > > not
> > > > want Broker A to drop leadership for p2?
> > > > The root cause of the issue is that after a broker just restarts, it
> > > > relies on the first LeaderAndIsrRequest to populate the partition
> state
> > > and
> > > > initializes the highwater mark checkpoint thread. The highwater mark
> > > > checkpoint thread will overwrite the highwater mark checkpoint file
> > based
> > > > on the broker's in-memory partition states. In other words, If a
> > > partition
> > > > that is physically hosted by the broker is missing in the in-memory
> > > > partition states map, its highwater mark will be lost after the
> > highwater
> > > > mark checkpoint thread overwrites the file. (Related codes:
> > > > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > > 4a5534d4e9/core/src/main/scala/kafka/server/
> > ReplicaManager.scala#L1091)
> > > > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> > > > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > > 4a5534d4e9/core/src/main/scala/kafka/server/
> > ReplicaManager.scala#L1091>
> > > >
> > > > apache/kafka<https://github.com/apache/kafka/blob/
> > > >
> > > ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/
> > scala/kafka/server/
> > > > ReplicaManager.scala#L1091>
> > > > Mirror of Apache Kafka. Contribute to apache/kafka development by
> > > creating
> > > > an account on GitHub.
> > > > github.com
> > > >
> > > >
> > > > In your example, assume the first LeaderAndIsrRequest broker A
> receives
> > > is
> > > > the one initiated in the controlled shutdown logic in Controller to
> > move
> > > > leadership away from broker A. This LeaderAndIsrRequest only contains
> > > > partitions that broker A leads, not all the partitions that broker A
> > > hosts
> > > > (i.e. no follower partitions), so the highwater mark for the follower
> > > > partitions will be lost. Also, the first LeaderAndIsrRequst broker A
> > > > receives may not necessarily be the one initiated in controlled
> > shutdown
> > > > logic (e.g. there can be an ongoing preferred leader election),
> > although
> > > I
> > > > think this may not be very common.
> > > >
> > > > Here the controller will start processing the BrokerChange event
> (that
> > > says
> > > > that broker A shutdown) after the broker has come back up and
> > > re-registered
> > > > himself in ZK?
> > > > How will the Controller miss the restart, won't he subsequently
> receive
> > > > another ZK event saying that broker A has come back up?
> > > > Controller will not miss the BrokerChange event and actually there
> will
> > > be
> > > > two BrokerChange events fired in this case (one for broker
> > deregistration
> > > > in zk and one for registration). However, when processing the
> > > > BrokerChangeEvent, controller needs to do a read from zookeeper to
> get
> > > back
> > > > the current brokers in the cluster and if the bounced broker already
> > > joined
> > > > the cluster by this time, controller will not know this broker has
> been
> > > > bounced because it sees no diff between zk and its in-memory cache.
> So
> > > > basically both of the BrokerChange event processing become no-op.
> > > >
> > > >
> > > > Hope that I answer your questions. Feel free to follow up if I am
> > missing
> > > > something.
> > > >
> > > >
> > > > Thanks,
> > > > Zhanxiang (Patrick) Huang
> > > >
> > > > ________________________________
> > > > From: Stanislav Kozlovski <st...@confluent.io>
> > > > Sent: Wednesday, October 10, 2018 7:22
> > > > To: dev@kafka.apache.org
> > > > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> > > > bounced brokers using broker generation
> > > >
> > > > Hi Patrick,
> > > >
> > > > Thanks for the KIP! Fixing such correctness issues is always very
> > > welcome -
> > > > they're commonly hard to diagnose and debug when they happen in
> > > production.
> > > >
> > > > I was wondering if I understood the potential correctness issues
> > > correctly.
> > > > Here is what I got:
> > > >
> > > >
> > > >    - If a broker bounces during controlled shutdown, the bounced
> broker
> > > may
> > > >    accidentally process its earlier generation’s StopReplicaRequest
> > sent
> > > > from
> > > >    the active controller for one of its follower replicas, leaving
> the
> > > > replica
> > > >    offline while its remaining replicas may stay online
> > > >
> > > > broker A is initiating a controlled shutdown (restart). The
> Controller
> > > > sends a StopReplicaRequest but it reaches broker A after it has
> started
> > > up
> > > > again. He therefore stops replicating those partitions even though he
> > > > should just be starting to
> > > >
> > > >
> > > >    - If the first LeaderAndIsrRequest that a broker processes is sent
> > by
> > > >    the active controller before its startup, the broker will
> overwrite
> > > the
> > > >    high watermark checkpoint file and may cause incorrect truncation
> (
> > > >    KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235>)
> > > >
> > > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > > restart.
> > > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> > therefore
> > > > starts leading for the partitions sent by that request and might stop
> > > > leading partitions that it was leading previously.
> > > > This was well explained in the linked JIRA, but I cannot understand
> why
> > > > that would happen due to my limited experience. If Broker A leads p1
> > and
> > > > p2, when would a Controller send a LeaderAndIsrRequest with p1 only
> and
> > > not
> > > > want Broker A to drop leadership for p2?
> > > >
> > > >
> > > >    - If a broker bounces very quickly, the controller may start
> > > processing
> > > >    the BrokerChange event after the broker already re-registers
> itself
> > in
> > > > zk.
> > > >    In this case, controller will miss the broker restart and will not
> > > send
> > > > any
> > > >    requests to the broker for initialization. The broker will not be
> > able
> > > > to
> > > >    accept traffics.
> > > >
> > > > Here the controller will start processing the BrokerChange event
> (that
> > > says
> > > > that broker A shutdown) after the broker has come back up and
> > > re-registered
> > > > himself in ZK?
> > > > How will the Controller miss the restart, won't he subsequently
> receive
> > > > another ZK event saying that broker A has come back up?
> > > >
> > > >
> > > > Could we explain these potential problems in a bit more detail just
> so
> > > they
> > > > could be more easily digestable by novices?
> > > >
> > > > Thanks,
> > > > Stanislav
> > > >
> > > > On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com>
> wrote:
> > > >
> > > > > Hey Patrick,
> > > > >
> > > > > Thanks much for the KIP. The KIP is very well written.
> > > > >
> > > > > LGTM.  +1 (binding)
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > > On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <hz...@hotmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > Please find the below KIP which proposes the concept of broker
> > > > generation
> > > > > > to resolve issues caused by controller missing broker state
> changes
> > > and
> > > > > > broker processing outdated control requests.
> > > > > >
> > > > > >
> > > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 380%3A+Detect+outdated+control+requests+and+bounced+
> > brokers+using+broker+
> > > > generation
> > > > > >
> > > > > > All comments are appreciated.
> > > > > >
> > > > > > Best,
> > > > > > Zhanxiang (Patrick) Huang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Stanislav
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Patrick Huang <hz...@hotmail.com>.
Hi Jun,

That is a good point. I want to make it clear about the scenario you mentioned. Correct me if I am wrong. Here is the sequence of the event:

  1.  Broker sends ControlledShutdown request 1 to controller
  2.  Broker sends ControlledShutdown request 2 to controller due to reties
  3.  Controller processes ControlledShutdown request 1
  4.  Controller sends control requests to the broker
  5.  Broker receives ControlledShutdown response 1 from controller
  6.  Broker shuts down and restarts quickly
  7.  Controller processes ControllerShutdown request 2
  8.  Controller sends control requests to the broker

It is possible that controller processes the broker change event between 6) and 7). In this case, controller already updates the cached czxid to the up-to-date ones so the bounced broker will not reject control requests in 8), which cause a correctness problem.


Best,
Zhanxiang (Patrick) Huang

________________________________
From: Jun Rao <ju...@confluent.io>
Sent: Monday, October 22, 2018 14:45
To: dev
Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Hi, Patrick,

There is another thing that may be worth considering.

10. It will be useful to include the czxid also in the ControlledShutdown
request. This way, if the broker has been restarted, the controller can
ignore an old ControlledShutdown request(e.g., due to retries). This will
prevent the restarted broker from incorrectly stopping replicas.

Thanks,

Jun


On Thu, Oct 11, 2018 at 5:56 PM, Patrick Huang <hz...@gmail.com>
wrote:

> Hi Jun,
>
> Thanks a lot for the comments.
>
> 1. czxid is globally unique and monotonically increasing based on the
> zookeeper doc.
> References (from
> https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
> "Every change to the ZooKeeper state receives a stamp in the form of a
> *zxid* (ZooKeeper Transaction Id). This exposes the total ordering of all
> changes to ZooKeeper. Each change will have a unique zxid and if zxid1 is
> smaller than zxid2 then zxid1 happened before zxid2."
> "czxid: The zxid of the change that caused this znode to be created."
>
> 2. You are right. There will be only on broker change event fired in the
> case I mentioned because we will not register the watcher before the read.
>
> 3. Let's say we want to initialize the states of broker set A and we want
> the cluster to be aware of the absence of broker set B. The currently live
> broker set in the cluster is C.
>
>     From the design point of view, here are the rules for broker state
> transition:
>     - Pass in broker ids of A for onBrokerStartup() and pass in broker ids
> of B for onBrokerFailure().
>     - When processing onBrokerStartup(), we use the broker generation
> controller read from zk to send requests to broker set A and use the
> previously cached broker generation to send requests to (C-A).
>     - When processing onBrokerFailure(), we use the previously cached
> broker generation to send requests to C.
>
>     From the implementation point of view, here are the steps we need to
> follow when processing BrokerChangeEvent:
>     -  Reads all child nodes in /brokers/ids/ to get current brokers with
> broker generation
>     -  Detect new brokers, dead brokers and bounced brokers
>     -  Update the live broker ids in controller context
>     -  Update broker generations for new brokers in controller context
>     -  Invoke onBrokerStartup(new brokers)
>     -  Invoke onBrokerFailure(bounced brokers)
>     -  Update broker generations for bounce brokers in controller context
>     -  Invoke onBrokerStartup(bounced brokers)
>     -  Invoke onBrokerFailure(dead brokers)
>     We can further optimize the flow by avoiding sending requests to a
> broker if its broker generation is larger than the one in the controller
> context.
>
> I will also update the KIP to clarify how it works for BrokerChangeEvent
> processing in more detail.
>
> Thanks,
> Patrick
>
>
>
> On Thu, Oct 11, 2018 at 12:12 PM Jun Rao <ju...@confluent.io> wrote:
>
> > Hi, Patrick,
> >
> > Thanks for the KIP. Looks good to me overall and very useful. A few
> > comments below.
> >
> > 1. "will reject the requests with smaller broker generation than its
> > current generation." Is czxid monotonically increasing?
> >
> > 2. To clarify on the issue of the controller missing a ZK watcher. ZK
> > watchers are one-time watchers. Once a watcher is fired, one needs to
> > register it again before the watcher can be triggered. So, when the
> > controller is busy and a broker goes down and comes up, the first event
> > will trigger the ZK watcher. Since the controller is busy and hasn't
> > registered the watcher again, the second event actually won't fire. By
> the
> > time the controller reads from ZK, it sees that the broker is still
> > registered and thus thinks that nothing has happened to the broker, which
> > is causing the problem.
> >
> > 3. "Handle broker state change: invoke onBrokerFailure(...) first, then
> > invoke onBrokerStartUp(...)". We probably want to be a bit careful here.
> > Could you clarify the broker list and the broker epoch used when making
> > these calls? We want to prevent the restarted broker from receiving a
> > partial replica list on the first LeaderAndIsr request because of this.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Oct 10, 2018 at 12:51 PM, Patrick Huang <hz...@hotmail.com>
> > wrote:
> >
> > > Hey Stanislav,
> > >
> > > Sure. Thanks for your interest in this KIP. I am glad to provide more
> > > detail.
> > >
> > > broker A is initiating a controlled shutdown (restart). The Controller
> > > sends a StopReplicaRequest but it reaches broker A after it has started
> > up
> > > again. He therefore stops replicating those partitions even though he
> > > should just be starting to
> > > This is right.
> > >
> > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > restart.
> > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> therefore
> > > starts leading for the partitions sent by that request and might stop
> > > leading partitions that it was leading previously.
> > > This was well explained in the linked JIRA, but I cannot understand why
> > > that would happen due to my limited experience. If Broker A leads p1
> and
> > > p2, when would a Controller send a LeaderAndIsrRequest with p1 only and
> > not
> > > want Broker A to drop leadership for p2?
> > > The root cause of the issue is that after a broker just restarts, it
> > > relies on the first LeaderAndIsrRequest to populate the partition state
> > and
> > > initializes the highwater mark checkpoint thread. The highwater mark
> > > checkpoint thread will overwrite the highwater mark checkpoint file
> based
> > > on the broker's in-memory partition states. In other words, If a
> > partition
> > > that is physically hosted by the broker is missing in the in-memory
> > > partition states map, its highwater mark will be lost after the
> highwater
> > > mark checkpoint thread overwrites the file. (Related codes:
> > > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > 4a5534d4e9/core/src/main/scala/kafka/server/
> ReplicaManager.scala#L1091)
> > > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> > > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > 4a5534d4e9/core/src/main/scala/kafka/server/
> ReplicaManager.scala#L1091>
> > >
> > > apache/kafka<https://github.com/apache/kafka/blob/
> > >
> > ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/
> scala/kafka/server/
> > > ReplicaManager.scala#L1091>
> > > Mirror of Apache Kafka. Contribute to apache/kafka development by
> > creating
> > > an account on GitHub.
> > > github.com
> > >
> > >
> > > In your example, assume the first LeaderAndIsrRequest broker A receives
> > is
> > > the one initiated in the controlled shutdown logic in Controller to
> move
> > > leadership away from broker A. This LeaderAndIsrRequest only contains
> > > partitions that broker A leads, not all the partitions that broker A
> > hosts
> > > (i.e. no follower partitions), so the highwater mark for the follower
> > > partitions will be lost. Also, the first LeaderAndIsrRequst broker A
> > > receives may not necessarily be the one initiated in controlled
> shutdown
> > > logic (e.g. there can be an ongoing preferred leader election),
> although
> > I
> > > think this may not be very common.
> > >
> > > Here the controller will start processing the BrokerChange event (that
> > says
> > > that broker A shutdown) after the broker has come back up and
> > re-registered
> > > himself in ZK?
> > > How will the Controller miss the restart, won't he subsequently receive
> > > another ZK event saying that broker A has come back up?
> > > Controller will not miss the BrokerChange event and actually there will
> > be
> > > two BrokerChange events fired in this case (one for broker
> deregistration
> > > in zk and one for registration). However, when processing the
> > > BrokerChangeEvent, controller needs to do a read from zookeeper to get
> > back
> > > the current brokers in the cluster and if the bounced broker already
> > joined
> > > the cluster by this time, controller will not know this broker has been
> > > bounced because it sees no diff between zk and its in-memory cache. So
> > > basically both of the BrokerChange event processing become no-op.
> > >
> > >
> > > Hope that I answer your questions. Feel free to follow up if I am
> missing
> > > something.
> > >
> > >
> > > Thanks,
> > > Zhanxiang (Patrick) Huang
> > >
> > > ________________________________
> > > From: Stanislav Kozlovski <st...@confluent.io>
> > > Sent: Wednesday, October 10, 2018 7:22
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> > > bounced brokers using broker generation
> > >
> > > Hi Patrick,
> > >
> > > Thanks for the KIP! Fixing such correctness issues is always very
> > welcome -
> > > they're commonly hard to diagnose and debug when they happen in
> > production.
> > >
> > > I was wondering if I understood the potential correctness issues
> > correctly.
> > > Here is what I got:
> > >
> > >
> > >    - If a broker bounces during controlled shutdown, the bounced broker
> > may
> > >    accidentally process its earlier generation’s StopReplicaRequest
> sent
> > > from
> > >    the active controller for one of its follower replicas, leaving the
> > > replica
> > >    offline while its remaining replicas may stay online
> > >
> > > broker A is initiating a controlled shutdown (restart). The Controller
> > > sends a StopReplicaRequest but it reaches broker A after it has started
> > up
> > > again. He therefore stops replicating those partitions even though he
> > > should just be starting to
> > >
> > >
> > >    - If the first LeaderAndIsrRequest that a broker processes is sent
> by
> > >    the active controller before its startup, the broker will overwrite
> > the
> > >    high watermark checkpoint file and may cause incorrect truncation (
> > >    KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235>)
> > >
> > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > restart.
> > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> therefore
> > > starts leading for the partitions sent by that request and might stop
> > > leading partitions that it was leading previously.
> > > This was well explained in the linked JIRA, but I cannot understand why
> > > that would happen due to my limited experience. If Broker A leads p1
> and
> > > p2, when would a Controller send a LeaderAndIsrRequest with p1 only and
> > not
> > > want Broker A to drop leadership for p2?
> > >
> > >
> > >    - If a broker bounces very quickly, the controller may start
> > processing
> > >    the BrokerChange event after the broker already re-registers itself
> in
> > > zk.
> > >    In this case, controller will miss the broker restart and will not
> > send
> > > any
> > >    requests to the broker for initialization. The broker will not be
> able
> > > to
> > >    accept traffics.
> > >
> > > Here the controller will start processing the BrokerChange event (that
> > says
> > > that broker A shutdown) after the broker has come back up and
> > re-registered
> > > himself in ZK?
> > > How will the Controller miss the restart, won't he subsequently receive
> > > another ZK event saying that broker A has come back up?
> > >
> > >
> > > Could we explain these potential problems in a bit more detail just so
> > they
> > > could be more easily digestable by novices?
> > >
> > > Thanks,
> > > Stanislav
> > >
> > > On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com> wrote:
> > >
> > > > Hey Patrick,
> > > >
> > > > Thanks much for the KIP. The KIP is very well written.
> > > >
> > > > LGTM.  +1 (binding)
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > > On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <hz...@hotmail.com>
> > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > Please find the below KIP which proposes the concept of broker
> > > generation
> > > > > to resolve issues caused by controller missing broker state changes
> > and
> > > > > broker processing outdated control requests.
> > > > >
> > > > >
> > > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 380%3A+Detect+outdated+control+requests+and+bounced+
> brokers+using+broker+
> > > generation
> > > > >
> > > > > All comments are appreciated.
> > > > >
> > > > > Best,
> > > > > Zhanxiang (Patrick) Huang
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Stanislav
> > >
> >
>

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Jun Rao <ju...@confluent.io>.
Hi, Patrick,

There is another thing that may be worth considering.

10. It will be useful to include the czxid also in the ControlledShutdown
request. This way, if the broker has been restarted, the controller can
ignore an old ControlledShutdown request(e.g., due to retries). This will
prevent the restarted broker from incorrectly stopping replicas.

Thanks,

Jun


On Thu, Oct 11, 2018 at 5:56 PM, Patrick Huang <hz...@gmail.com>
wrote:

> Hi Jun,
>
> Thanks a lot for the comments.
>
> 1. czxid is globally unique and monotonically increasing based on the
> zookeeper doc.
> References (from
> https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
> "Every change to the ZooKeeper state receives a stamp in the form of a
> *zxid* (ZooKeeper Transaction Id). This exposes the total ordering of all
> changes to ZooKeeper. Each change will have a unique zxid and if zxid1 is
> smaller than zxid2 then zxid1 happened before zxid2."
> "czxid: The zxid of the change that caused this znode to be created."
>
> 2. You are right. There will be only on broker change event fired in the
> case I mentioned because we will not register the watcher before the read.
>
> 3. Let's say we want to initialize the states of broker set A and we want
> the cluster to be aware of the absence of broker set B. The currently live
> broker set in the cluster is C.
>
>     From the design point of view, here are the rules for broker state
> transition:
>     - Pass in broker ids of A for onBrokerStartup() and pass in broker ids
> of B for onBrokerFailure().
>     - When processing onBrokerStartup(), we use the broker generation
> controller read from zk to send requests to broker set A and use the
> previously cached broker generation to send requests to (C-A).
>     - When processing onBrokerFailure(), we use the previously cached
> broker generation to send requests to C.
>
>     From the implementation point of view, here are the steps we need to
> follow when processing BrokerChangeEvent:
>     -  Reads all child nodes in /brokers/ids/ to get current brokers with
> broker generation
>     -  Detect new brokers, dead brokers and bounced brokers
>     -  Update the live broker ids in controller context
>     -  Update broker generations for new brokers in controller context
>     -  Invoke onBrokerStartup(new brokers)
>     -  Invoke onBrokerFailure(bounced brokers)
>     -  Update broker generations for bounce brokers in controller context
>     -  Invoke onBrokerStartup(bounced brokers)
>     -  Invoke onBrokerFailure(dead brokers)
>     We can further optimize the flow by avoiding sending requests to a
> broker if its broker generation is larger than the one in the controller
> context.
>
> I will also update the KIP to clarify how it works for BrokerChangeEvent
> processing in more detail.
>
> Thanks,
> Patrick
>
>
>
> On Thu, Oct 11, 2018 at 12:12 PM Jun Rao <ju...@confluent.io> wrote:
>
> > Hi, Patrick,
> >
> > Thanks for the KIP. Looks good to me overall and very useful. A few
> > comments below.
> >
> > 1. "will reject the requests with smaller broker generation than its
> > current generation." Is czxid monotonically increasing?
> >
> > 2. To clarify on the issue of the controller missing a ZK watcher. ZK
> > watchers are one-time watchers. Once a watcher is fired, one needs to
> > register it again before the watcher can be triggered. So, when the
> > controller is busy and a broker goes down and comes up, the first event
> > will trigger the ZK watcher. Since the controller is busy and hasn't
> > registered the watcher again, the second event actually won't fire. By
> the
> > time the controller reads from ZK, it sees that the broker is still
> > registered and thus thinks that nothing has happened to the broker, which
> > is causing the problem.
> >
> > 3. "Handle broker state change: invoke onBrokerFailure(...) first, then
> > invoke onBrokerStartUp(...)". We probably want to be a bit careful here.
> > Could you clarify the broker list and the broker epoch used when making
> > these calls? We want to prevent the restarted broker from receiving a
> > partial replica list on the first LeaderAndIsr request because of this.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Oct 10, 2018 at 12:51 PM, Patrick Huang <hz...@hotmail.com>
> > wrote:
> >
> > > Hey Stanislav,
> > >
> > > Sure. Thanks for your interest in this KIP. I am glad to provide more
> > > detail.
> > >
> > > broker A is initiating a controlled shutdown (restart). The Controller
> > > sends a StopReplicaRequest but it reaches broker A after it has started
> > up
> > > again. He therefore stops replicating those partitions even though he
> > > should just be starting to
> > > This is right.
> > >
> > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > restart.
> > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> therefore
> > > starts leading for the partitions sent by that request and might stop
> > > leading partitions that it was leading previously.
> > > This was well explained in the linked JIRA, but I cannot understand why
> > > that would happen due to my limited experience. If Broker A leads p1
> and
> > > p2, when would a Controller send a LeaderAndIsrRequest with p1 only and
> > not
> > > want Broker A to drop leadership for p2?
> > > The root cause of the issue is that after a broker just restarts, it
> > > relies on the first LeaderAndIsrRequest to populate the partition state
> > and
> > > initializes the highwater mark checkpoint thread. The highwater mark
> > > checkpoint thread will overwrite the highwater mark checkpoint file
> based
> > > on the broker's in-memory partition states. In other words, If a
> > partition
> > > that is physically hosted by the broker is missing in the in-memory
> > > partition states map, its highwater mark will be lost after the
> highwater
> > > mark checkpoint thread overwrites the file. (Related codes:
> > > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > 4a5534d4e9/core/src/main/scala/kafka/server/
> ReplicaManager.scala#L1091)
> > > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> > > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > > 4a5534d4e9/core/src/main/scala/kafka/server/
> ReplicaManager.scala#L1091>
> > >
> > > apache/kafka<https://github.com/apache/kafka/blob/
> > >
> > ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/
> scala/kafka/server/
> > > ReplicaManager.scala#L1091>
> > > Mirror of Apache Kafka. Contribute to apache/kafka development by
> > creating
> > > an account on GitHub.
> > > github.com
> > >
> > >
> > > In your example, assume the first LeaderAndIsrRequest broker A receives
> > is
> > > the one initiated in the controlled shutdown logic in Controller to
> move
> > > leadership away from broker A. This LeaderAndIsrRequest only contains
> > > partitions that broker A leads, not all the partitions that broker A
> > hosts
> > > (i.e. no follower partitions), so the highwater mark for the follower
> > > partitions will be lost. Also, the first LeaderAndIsrRequst broker A
> > > receives may not necessarily be the one initiated in controlled
> shutdown
> > > logic (e.g. there can be an ongoing preferred leader election),
> although
> > I
> > > think this may not be very common.
> > >
> > > Here the controller will start processing the BrokerChange event (that
> > says
> > > that broker A shutdown) after the broker has come back up and
> > re-registered
> > > himself in ZK?
> > > How will the Controller miss the restart, won't he subsequently receive
> > > another ZK event saying that broker A has come back up?
> > > Controller will not miss the BrokerChange event and actually there will
> > be
> > > two BrokerChange events fired in this case (one for broker
> deregistration
> > > in zk and one for registration). However, when processing the
> > > BrokerChangeEvent, controller needs to do a read from zookeeper to get
> > back
> > > the current brokers in the cluster and if the bounced broker already
> > joined
> > > the cluster by this time, controller will not know this broker has been
> > > bounced because it sees no diff between zk and its in-memory cache. So
> > > basically both of the BrokerChange event processing become no-op.
> > >
> > >
> > > Hope that I answer your questions. Feel free to follow up if I am
> missing
> > > something.
> > >
> > >
> > > Thanks,
> > > Zhanxiang (Patrick) Huang
> > >
> > > ________________________________
> > > From: Stanislav Kozlovski <st...@confluent.io>
> > > Sent: Wednesday, October 10, 2018 7:22
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> > > bounced brokers using broker generation
> > >
> > > Hi Patrick,
> > >
> > > Thanks for the KIP! Fixing such correctness issues is always very
> > welcome -
> > > they're commonly hard to diagnose and debug when they happen in
> > production.
> > >
> > > I was wondering if I understood the potential correctness issues
> > correctly.
> > > Here is what I got:
> > >
> > >
> > >    - If a broker bounces during controlled shutdown, the bounced broker
> > may
> > >    accidentally process its earlier generation’s StopReplicaRequest
> sent
> > > from
> > >    the active controller for one of its follower replicas, leaving the
> > > replica
> > >    offline while its remaining replicas may stay online
> > >
> > > broker A is initiating a controlled shutdown (restart). The Controller
> > > sends a StopReplicaRequest but it reaches broker A after it has started
> > up
> > > again. He therefore stops replicating those partitions even though he
> > > should just be starting to
> > >
> > >
> > >    - If the first LeaderAndIsrRequest that a broker processes is sent
> by
> > >    the active controller before its startup, the broker will overwrite
> > the
> > >    high watermark checkpoint file and may cause incorrect truncation (
> > >    KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235>)
> > >
> > > Controller sends a LeaderAndIsrRequest before broker A initiates a
> > restart.
> > > Broker A restarts and receives the LeaderAndIsrRequest then. It
> therefore
> > > starts leading for the partitions sent by that request and might stop
> > > leading partitions that it was leading previously.
> > > This was well explained in the linked JIRA, but I cannot understand why
> > > that would happen due to my limited experience. If Broker A leads p1
> and
> > > p2, when would a Controller send a LeaderAndIsrRequest with p1 only and
> > not
> > > want Broker A to drop leadership for p2?
> > >
> > >
> > >    - If a broker bounces very quickly, the controller may start
> > processing
> > >    the BrokerChange event after the broker already re-registers itself
> in
> > > zk.
> > >    In this case, controller will miss the broker restart and will not
> > send
> > > any
> > >    requests to the broker for initialization. The broker will not be
> able
> > > to
> > >    accept traffics.
> > >
> > > Here the controller will start processing the BrokerChange event (that
> > says
> > > that broker A shutdown) after the broker has come back up and
> > re-registered
> > > himself in ZK?
> > > How will the Controller miss the restart, won't he subsequently receive
> > > another ZK event saying that broker A has come back up?
> > >
> > >
> > > Could we explain these potential problems in a bit more detail just so
> > they
> > > could be more easily digestable by novices?
> > >
> > > Thanks,
> > > Stanislav
> > >
> > > On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com> wrote:
> > >
> > > > Hey Patrick,
> > > >
> > > > Thanks much for the KIP. The KIP is very well written.
> > > >
> > > > LGTM.  +1 (binding)
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > > On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <hz...@hotmail.com>
> > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > Please find the below KIP which proposes the concept of broker
> > > generation
> > > > > to resolve issues caused by controller missing broker state changes
> > and
> > > > > broker processing outdated control requests.
> > > > >
> > > > >
> > > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 380%3A+Detect+outdated+control+requests+and+bounced+
> brokers+using+broker+
> > > generation
> > > > >
> > > > > All comments are appreciated.
> > > > >
> > > > > Best,
> > > > > Zhanxiang (Patrick) Huang
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Stanislav
> > >
> >
>

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Patrick Huang <hz...@gmail.com>.
Hi Jun,

Thanks a lot for the comments.

1. czxid is globally unique and monotonically increasing based on the
zookeeper doc.
References (from
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
"Every change to the ZooKeeper state receives a stamp in the form of a
*zxid* (ZooKeeper Transaction Id). This exposes the total ordering of all
changes to ZooKeeper. Each change will have a unique zxid and if zxid1 is
smaller than zxid2 then zxid1 happened before zxid2."
"czxid: The zxid of the change that caused this znode to be created."

2. You are right. There will be only on broker change event fired in the
case I mentioned because we will not register the watcher before the read.

3. Let's say we want to initialize the states of broker set A and we want
the cluster to be aware of the absence of broker set B. The currently live
broker set in the cluster is C.

    From the design point of view, here are the rules for broker state
transition:
    - Pass in broker ids of A for onBrokerStartup() and pass in broker ids
of B for onBrokerFailure().
    - When processing onBrokerStartup(), we use the broker generation
controller read from zk to send requests to broker set A and use the
previously cached broker generation to send requests to (C-A).
    - When processing onBrokerFailure(), we use the previously cached
broker generation to send requests to C.

    From the implementation point of view, here are the steps we need to
follow when processing BrokerChangeEvent:
    -  Reads all child nodes in /brokers/ids/ to get current brokers with
broker generation
    -  Detect new brokers, dead brokers and bounced brokers
    -  Update the live broker ids in controller context
    -  Update broker generations for new brokers in controller context
    -  Invoke onBrokerStartup(new brokers)
    -  Invoke onBrokerFailure(bounced brokers)
    -  Update broker generations for bounce brokers in controller context
    -  Invoke onBrokerStartup(bounced brokers)
    -  Invoke onBrokerFailure(dead brokers)
    We can further optimize the flow by avoiding sending requests to a
broker if its broker generation is larger than the one in the controller
context.

I will also update the KIP to clarify how it works for BrokerChangeEvent
processing in more detail.

Thanks,
Patrick



On Thu, Oct 11, 2018 at 12:12 PM Jun Rao <ju...@confluent.io> wrote:

> Hi, Patrick,
>
> Thanks for the KIP. Looks good to me overall and very useful. A few
> comments below.
>
> 1. "will reject the requests with smaller broker generation than its
> current generation." Is czxid monotonically increasing?
>
> 2. To clarify on the issue of the controller missing a ZK watcher. ZK
> watchers are one-time watchers. Once a watcher is fired, one needs to
> register it again before the watcher can be triggered. So, when the
> controller is busy and a broker goes down and comes up, the first event
> will trigger the ZK watcher. Since the controller is busy and hasn't
> registered the watcher again, the second event actually won't fire. By the
> time the controller reads from ZK, it sees that the broker is still
> registered and thus thinks that nothing has happened to the broker, which
> is causing the problem.
>
> 3. "Handle broker state change: invoke onBrokerFailure(...) first, then
> invoke onBrokerStartUp(...)". We probably want to be a bit careful here.
> Could you clarify the broker list and the broker epoch used when making
> these calls? We want to prevent the restarted broker from receiving a
> partial replica list on the first LeaderAndIsr request because of this.
>
> Thanks,
>
> Jun
>
> On Wed, Oct 10, 2018 at 12:51 PM, Patrick Huang <hz...@hotmail.com>
> wrote:
>
> > Hey Stanislav,
> >
> > Sure. Thanks for your interest in this KIP. I am glad to provide more
> > detail.
> >
> > broker A is initiating a controlled shutdown (restart). The Controller
> > sends a StopReplicaRequest but it reaches broker A after it has started
> up
> > again. He therefore stops replicating those partitions even though he
> > should just be starting to
> > This is right.
> >
> > Controller sends a LeaderAndIsrRequest before broker A initiates a
> restart.
> > Broker A restarts and receives the LeaderAndIsrRequest then. It therefore
> > starts leading for the partitions sent by that request and might stop
> > leading partitions that it was leading previously.
> > This was well explained in the linked JIRA, but I cannot understand why
> > that would happen due to my limited experience. If Broker A leads p1 and
> > p2, when would a Controller send a LeaderAndIsrRequest with p1 only and
> not
> > want Broker A to drop leadership for p2?
> > The root cause of the issue is that after a broker just restarts, it
> > relies on the first LeaderAndIsrRequest to populate the partition state
> and
> > initializes the highwater mark checkpoint thread. The highwater mark
> > checkpoint thread will overwrite the highwater mark checkpoint file based
> > on the broker's in-memory partition states. In other words, If a
> partition
> > that is physically hosted by the broker is missing in the in-memory
> > partition states map, its highwater mark will be lost after the highwater
> > mark checkpoint thread overwrites the file. (Related codes:
> > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > 4a5534d4e9/core/src/main/scala/kafka/server/ReplicaManager.scala#L1091)
> > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> > https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> > 4a5534d4e9/core/src/main/scala/kafka/server/ReplicaManager.scala#L1091>
> >
> > apache/kafka<https://github.com/apache/kafka/blob/
> >
> ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/scala/kafka/server/
> > ReplicaManager.scala#L1091>
> > Mirror of Apache Kafka. Contribute to apache/kafka development by
> creating
> > an account on GitHub.
> > github.com
> >
> >
> > In your example, assume the first LeaderAndIsrRequest broker A receives
> is
> > the one initiated in the controlled shutdown logic in Controller to move
> > leadership away from broker A. This LeaderAndIsrRequest only contains
> > partitions that broker A leads, not all the partitions that broker A
> hosts
> > (i.e. no follower partitions), so the highwater mark for the follower
> > partitions will be lost. Also, the first LeaderAndIsrRequst broker A
> > receives may not necessarily be the one initiated in controlled shutdown
> > logic (e.g. there can be an ongoing preferred leader election), although
> I
> > think this may not be very common.
> >
> > Here the controller will start processing the BrokerChange event (that
> says
> > that broker A shutdown) after the broker has come back up and
> re-registered
> > himself in ZK?
> > How will the Controller miss the restart, won't he subsequently receive
> > another ZK event saying that broker A has come back up?
> > Controller will not miss the BrokerChange event and actually there will
> be
> > two BrokerChange events fired in this case (one for broker deregistration
> > in zk and one for registration). However, when processing the
> > BrokerChangeEvent, controller needs to do a read from zookeeper to get
> back
> > the current brokers in the cluster and if the bounced broker already
> joined
> > the cluster by this time, controller will not know this broker has been
> > bounced because it sees no diff between zk and its in-memory cache. So
> > basically both of the BrokerChange event processing become no-op.
> >
> >
> > Hope that I answer your questions. Feel free to follow up if I am missing
> > something.
> >
> >
> > Thanks,
> > Zhanxiang (Patrick) Huang
> >
> > ________________________________
> > From: Stanislav Kozlovski <st...@confluent.io>
> > Sent: Wednesday, October 10, 2018 7:22
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> > bounced brokers using broker generation
> >
> > Hi Patrick,
> >
> > Thanks for the KIP! Fixing such correctness issues is always very
> welcome -
> > they're commonly hard to diagnose and debug when they happen in
> production.
> >
> > I was wondering if I understood the potential correctness issues
> correctly.
> > Here is what I got:
> >
> >
> >    - If a broker bounces during controlled shutdown, the bounced broker
> may
> >    accidentally process its earlier generation’s StopReplicaRequest sent
> > from
> >    the active controller for one of its follower replicas, leaving the
> > replica
> >    offline while its remaining replicas may stay online
> >
> > broker A is initiating a controlled shutdown (restart). The Controller
> > sends a StopReplicaRequest but it reaches broker A after it has started
> up
> > again. He therefore stops replicating those partitions even though he
> > should just be starting to
> >
> >
> >    - If the first LeaderAndIsrRequest that a broker processes is sent by
> >    the active controller before its startup, the broker will overwrite
> the
> >    high watermark checkpoint file and may cause incorrect truncation (
> >    KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235>)
> >
> > Controller sends a LeaderAndIsrRequest before broker A initiates a
> restart.
> > Broker A restarts and receives the LeaderAndIsrRequest then. It therefore
> > starts leading for the partitions sent by that request and might stop
> > leading partitions that it was leading previously.
> > This was well explained in the linked JIRA, but I cannot understand why
> > that would happen due to my limited experience. If Broker A leads p1 and
> > p2, when would a Controller send a LeaderAndIsrRequest with p1 only and
> not
> > want Broker A to drop leadership for p2?
> >
> >
> >    - If a broker bounces very quickly, the controller may start
> processing
> >    the BrokerChange event after the broker already re-registers itself in
> > zk.
> >    In this case, controller will miss the broker restart and will not
> send
> > any
> >    requests to the broker for initialization. The broker will not be able
> > to
> >    accept traffics.
> >
> > Here the controller will start processing the BrokerChange event (that
> says
> > that broker A shutdown) after the broker has come back up and
> re-registered
> > himself in ZK?
> > How will the Controller miss the restart, won't he subsequently receive
> > another ZK event saying that broker A has come back up?
> >
> >
> > Could we explain these potential problems in a bit more detail just so
> they
> > could be more easily digestable by novices?
> >
> > Thanks,
> > Stanislav
> >
> > On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com> wrote:
> >
> > > Hey Patrick,
> > >
> > > Thanks much for the KIP. The KIP is very well written.
> > >
> > > LGTM.  +1 (binding)
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <hz...@hotmail.com>
> > wrote:
> > >
> > > > Hi All,
> > > >
> > > > Please find the below KIP which proposes the concept of broker
> > generation
> > > > to resolve issues caused by controller missing broker state changes
> and
> > > > broker processing outdated control requests.
> > > >
> > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 380%3A+Detect+outdated+control+requests+and+bounced+brokers+using+broker+
> > generation
> > > >
> > > > All comments are appreciated.
> > > >
> > > > Best,
> > > > Zhanxiang (Patrick) Huang
> > > >
> > >
> >
> >
> > --
> > Best,
> > Stanislav
> >
>

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Patrick Huang <hz...@hotmail.com>.
Hi Jun,

Thanks a lot for the comments.

1. czxid is globally unique and monotonically increasing based on the zookeeper doc.
References (from https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html):
"Every change to the ZooKeeper state receives a stamp in the form of a zxid (ZooKeeper Transaction Id). This exposes the total ordering of all changes to ZooKeeper. Each change will have a unique zxid and if zxid1 is smaller than zxid2 then zxid1 happened before zxid2."
"czxid: The zxid of the change that caused this znode to be created."

2. You are right. There will be only on broker change event fired in the case I mentioned because we will not register the watcher before the read.

3. Let's say we want to initialize the states of broker set A and we want the cluster to be aware of the absence of broker set B. The currently live broker set in the cluster is C.

    From the design point of view, here are the rules for broker state transition:
    - Pass in broker ids of A for onBrokerStartup() and pass in broker ids of B for onBrokerFailure().
    - When processing onBrokerStartup(), we use the broker generation controller read from zk to send requests to broker set A and use the previously cached broker generation to send requests to (C-A).
    - When processing onBrokerFailure(), we use the previously cached broker generation to send requests to C.

    From the implementation point of view, here are the steps we need to follow when processing BrokerChangeEvent:
    -  Reads all child nodes in /brokers/ids/ to get current brokers with broker generation
    -  Detect new brokers, dead brokers and bounced brokers
    -  Update the live broker ids in controller context
    -  Update broker generations for new brokers in controller context
    -  Invoke onBrokerStartup(new brokers)
    -  Invoke onBrokerFailure(bounced brokers)
    -  Update broker generations for bounce brokers in controller context
    -  Invoke onBrokerStartup(bounced brokers)
    -  Invoke onBrokerFailure(dead brokers)
    We can further optimize the flow by avoiding sending requests to a broker if its broker generation is larger than the one in the controller context.

I will also update the KIP to clarify how it works for BrokerChangeEvent processing in more detail.


Best,
Zhanxiang (Patrick) Huang

________________________________
From: Jun Rao <ju...@confluent.io>
Sent: Thursday, October 11, 2018 16:12
To: dev
Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Hi, Patrick,

Thanks for the KIP. Looks good to me overall and very useful. A few
comments below.

1. "will reject the requests with smaller broker generation than its
current generation." Is czxid monotonically increasing?

2. To clarify on the issue of the controller missing a ZK watcher. ZK
watchers are one-time watchers. Once a watcher is fired, one needs to
register it again before the watcher can be triggered. So, when the
controller is busy and a broker goes down and comes up, the first event
will trigger the ZK watcher. Since the controller is busy and hasn't
registered the watcher again, the second event actually won't fire. By the
time the controller reads from ZK, it sees that the broker is still
registered and thus thinks that nothing has happened to the broker, which
is causing the problem.

3. "Handle broker state change: invoke onBrokerFailure(...) first, then
invoke onBrokerStartUp(...)". We probably want to be a bit careful here.
Could you clarify the broker list and the broker epoch used when making
these calls? We want to prevent the restarted broker from receiving a
partial replica list on the first LeaderAndIsr request because of this.

Thanks,

Jun

On Wed, Oct 10, 2018 at 12:51 PM, Patrick Huang <hz...@hotmail.com> wrote:

> Hey Stanislav,
>
> Sure. Thanks for your interest in this KIP. I am glad to provide more
> detail.
>
> broker A is initiating a controlled shutdown (restart). The Controller
> sends a StopReplicaRequest but it reaches broker A after it has started up
> again. He therefore stops replicating those partitions even though he
> should just be starting to
> This is right.
>
> Controller sends a LeaderAndIsrRequest before broker A initiates a restart.
> Broker A restarts and receives the LeaderAndIsrRequest then. It therefore
> starts leading for the partitions sent by that request and might stop
> leading partitions that it was leading previously.
> This was well explained in the linked JIRA, but I cannot understand why
> that would happen due to my limited experience. If Broker A leads p1 and
> p2, when would a Controller send a LeaderAndIsrRequest with p1 only and not
> want Broker A to drop leadership for p2?
> The root cause of the issue is that after a broker just restarts, it
> relies on the first LeaderAndIsrRequest to populate the partition state and
> initializes the highwater mark checkpoint thread. The highwater mark
> checkpoint thread will overwrite the highwater mark checkpoint file based
> on the broker's in-memory partition states. In other words, If a partition
> that is physically hosted by the broker is missing in the in-memory
> partition states map, its highwater mark will be lost after the highwater
> mark checkpoint thread overwrites the file. (Related codes:
> https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> 4a5534d4e9/core/src/main/scala/kafka/server/ReplicaManager.scala#L1091)
> [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> 4a5534d4e9/core/src/main/scala/kafka/server/ReplicaManager.scala#L1091>
>
> apache/kafka<https://github.com/apache/kafka/blob/
> ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/scala/kafka/server/
> ReplicaManager.scala#L1091>
> Mirror of Apache Kafka. Contribute to apache/kafka development by creating
> an account on GitHub.
> github.com
>
>
> In your example, assume the first LeaderAndIsrRequest broker A receives is
> the one initiated in the controlled shutdown logic in Controller to move
> leadership away from broker A. This LeaderAndIsrRequest only contains
> partitions that broker A leads, not all the partitions that broker A hosts
> (i.e. no follower partitions), so the highwater mark for the follower
> partitions will be lost. Also, the first LeaderAndIsrRequst broker A
> receives may not necessarily be the one initiated in controlled shutdown
> logic (e.g. there can be an ongoing preferred leader election), although I
> think this may not be very common.
>
> Here the controller will start processing the BrokerChange event (that says
> that broker A shutdown) after the broker has come back up and re-registered
> himself in ZK?
> How will the Controller miss the restart, won't he subsequently receive
> another ZK event saying that broker A has come back up?
> Controller will not miss the BrokerChange event and actually there will be
> two BrokerChange events fired in this case (one for broker deregistration
> in zk and one for registration). However, when processing the
> BrokerChangeEvent, controller needs to do a read from zookeeper to get back
> the current brokers in the cluster and if the bounced broker already joined
> the cluster by this time, controller will not know this broker has been
> bounced because it sees no diff between zk and its in-memory cache. So
> basically both of the BrokerChange event processing become no-op.
>
>
> Hope that I answer your questions. Feel free to follow up if I am missing
> something.
>
>
> Thanks,
> Zhanxiang (Patrick) Huang
>
> ________________________________
> From: Stanislav Kozlovski <st...@confluent.io>
> Sent: Wednesday, October 10, 2018 7:22
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> bounced brokers using broker generation
>
> Hi Patrick,
>
> Thanks for the KIP! Fixing such correctness issues is always very welcome -
> they're commonly hard to diagnose and debug when they happen in production.
>
> I was wondering if I understood the potential correctness issues correctly.
> Here is what I got:
>
>
>    - If a broker bounces during controlled shutdown, the bounced broker may
>    accidentally process its earlier generation’s StopReplicaRequest sent
> from
>    the active controller for one of its follower replicas, leaving the
> replica
>    offline while its remaining replicas may stay online
>
> broker A is initiating a controlled shutdown (restart). The Controller
> sends a StopReplicaRequest but it reaches broker A after it has started up
> again. He therefore stops replicating those partitions even though he
> should just be starting to
>
>
>    - If the first LeaderAndIsrRequest that a broker processes is sent by
>    the active controller before its startup, the broker will overwrite the
>    high watermark checkpoint file and may cause incorrect truncation (
>    KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235>)
>
> Controller sends a LeaderAndIsrRequest before broker A initiates a restart.
> Broker A restarts and receives the LeaderAndIsrRequest then. It therefore
> starts leading for the partitions sent by that request and might stop
> leading partitions that it was leading previously.
> This was well explained in the linked JIRA, but I cannot understand why
> that would happen due to my limited experience. If Broker A leads p1 and
> p2, when would a Controller send a LeaderAndIsrRequest with p1 only and not
> want Broker A to drop leadership for p2?
>
>
>    - If a broker bounces very quickly, the controller may start processing
>    the BrokerChange event after the broker already re-registers itself in
> zk.
>    In this case, controller will miss the broker restart and will not send
> any
>    requests to the broker for initialization. The broker will not be able
> to
>    accept traffics.
>
> Here the controller will start processing the BrokerChange event (that says
> that broker A shutdown) after the broker has come back up and re-registered
> himself in ZK?
> How will the Controller miss the restart, won't he subsequently receive
> another ZK event saying that broker A has come back up?
>
>
> Could we explain these potential problems in a bit more detail just so they
> could be more easily digestable by novices?
>
> Thanks,
> Stanislav
>
> On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com> wrote:
>
> > Hey Patrick,
> >
> > Thanks much for the KIP. The KIP is very well written.
> >
> > LGTM.  +1 (binding)
> >
> > Thanks,
> > Dong
> >
> >
> > On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <hz...@hotmail.com>
> wrote:
> >
> > > Hi All,
> > >
> > > Please find the below KIP which proposes the concept of broker
> generation
> > > to resolve issues caused by controller missing broker state changes and
> > > broker processing outdated control requests.
> > >
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 380%3A+Detect+outdated+control+requests+and+bounced+brokers+using+broker+
> generation
> > >
> > > All comments are appreciated.
> > >
> > > Best,
> > > Zhanxiang (Patrick) Huang
> > >
> >
>
>
> --
> Best,
> Stanislav
>

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Jun Rao <ju...@confluent.io>.
Hi, Patrick,

Thanks for the KIP. Looks good to me overall and very useful. A few
comments below.

1. "will reject the requests with smaller broker generation than its
current generation." Is czxid monotonically increasing?

2. To clarify on the issue of the controller missing a ZK watcher. ZK
watchers are one-time watchers. Once a watcher is fired, one needs to
register it again before the watcher can be triggered. So, when the
controller is busy and a broker goes down and comes up, the first event
will trigger the ZK watcher. Since the controller is busy and hasn't
registered the watcher again, the second event actually won't fire. By the
time the controller reads from ZK, it sees that the broker is still
registered and thus thinks that nothing has happened to the broker, which
is causing the problem.

3. "Handle broker state change: invoke onBrokerFailure(...) first, then
invoke onBrokerStartUp(...)". We probably want to be a bit careful here.
Could you clarify the broker list and the broker epoch used when making
these calls? We want to prevent the restarted broker from receiving a
partial replica list on the first LeaderAndIsr request because of this.

Thanks,

Jun

On Wed, Oct 10, 2018 at 12:51 PM, Patrick Huang <hz...@hotmail.com> wrote:

> Hey Stanislav,
>
> Sure. Thanks for your interest in this KIP. I am glad to provide more
> detail.
>
> broker A is initiating a controlled shutdown (restart). The Controller
> sends a StopReplicaRequest but it reaches broker A after it has started up
> again. He therefore stops replicating those partitions even though he
> should just be starting to
> This is right.
>
> Controller sends a LeaderAndIsrRequest before broker A initiates a restart.
> Broker A restarts and receives the LeaderAndIsrRequest then. It therefore
> starts leading for the partitions sent by that request and might stop
> leading partitions that it was leading previously.
> This was well explained in the linked JIRA, but I cannot understand why
> that would happen due to my limited experience. If Broker A leads p1 and
> p2, when would a Controller send a LeaderAndIsrRequest with p1 only and not
> want Broker A to drop leadership for p2?
> The root cause of the issue is that after a broker just restarts, it
> relies on the first LeaderAndIsrRequest to populate the partition state and
> initializes the highwater mark checkpoint thread. The highwater mark
> checkpoint thread will overwrite the highwater mark checkpoint file based
> on the broker's in-memory partition states. In other words, If a partition
> that is physically hosted by the broker is missing in the in-memory
> partition states map, its highwater mark will be lost after the highwater
> mark checkpoint thread overwrites the file. (Related codes:
> https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> 4a5534d4e9/core/src/main/scala/kafka/server/ReplicaManager.scala#L1091)
> [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f38
> 4a5534d4e9/core/src/main/scala/kafka/server/ReplicaManager.scala#L1091>
>
> apache/kafka<https://github.com/apache/kafka/blob/
> ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/scala/kafka/server/
> ReplicaManager.scala#L1091>
> Mirror of Apache Kafka. Contribute to apache/kafka development by creating
> an account on GitHub.
> github.com
>
>
> In your example, assume the first LeaderAndIsrRequest broker A receives is
> the one initiated in the controlled shutdown logic in Controller to move
> leadership away from broker A. This LeaderAndIsrRequest only contains
> partitions that broker A leads, not all the partitions that broker A hosts
> (i.e. no follower partitions), so the highwater mark for the follower
> partitions will be lost. Also, the first LeaderAndIsrRequst broker A
> receives may not necessarily be the one initiated in controlled shutdown
> logic (e.g. there can be an ongoing preferred leader election), although I
> think this may not be very common.
>
> Here the controller will start processing the BrokerChange event (that says
> that broker A shutdown) after the broker has come back up and re-registered
> himself in ZK?
> How will the Controller miss the restart, won't he subsequently receive
> another ZK event saying that broker A has come back up?
> Controller will not miss the BrokerChange event and actually there will be
> two BrokerChange events fired in this case (one for broker deregistration
> in zk and one for registration). However, when processing the
> BrokerChangeEvent, controller needs to do a read from zookeeper to get back
> the current brokers in the cluster and if the bounced broker already joined
> the cluster by this time, controller will not know this broker has been
> bounced because it sees no diff between zk and its in-memory cache. So
> basically both of the BrokerChange event processing become no-op.
>
>
> Hope that I answer your questions. Feel free to follow up if I am missing
> something.
>
>
> Thanks,
> Zhanxiang (Patrick) Huang
>
> ________________________________
> From: Stanislav Kozlovski <st...@confluent.io>
> Sent: Wednesday, October 10, 2018 7:22
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and
> bounced brokers using broker generation
>
> Hi Patrick,
>
> Thanks for the KIP! Fixing such correctness issues is always very welcome -
> they're commonly hard to diagnose and debug when they happen in production.
>
> I was wondering if I understood the potential correctness issues correctly.
> Here is what I got:
>
>
>    - If a broker bounces during controlled shutdown, the bounced broker may
>    accidentally process its earlier generation’s StopReplicaRequest sent
> from
>    the active controller for one of its follower replicas, leaving the
> replica
>    offline while its remaining replicas may stay online
>
> broker A is initiating a controlled shutdown (restart). The Controller
> sends a StopReplicaRequest but it reaches broker A after it has started up
> again. He therefore stops replicating those partitions even though he
> should just be starting to
>
>
>    - If the first LeaderAndIsrRequest that a broker processes is sent by
>    the active controller before its startup, the broker will overwrite the
>    high watermark checkpoint file and may cause incorrect truncation (
>    KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235>)
>
> Controller sends a LeaderAndIsrRequest before broker A initiates a restart.
> Broker A restarts and receives the LeaderAndIsrRequest then. It therefore
> starts leading for the partitions sent by that request and might stop
> leading partitions that it was leading previously.
> This was well explained in the linked JIRA, but I cannot understand why
> that would happen due to my limited experience. If Broker A leads p1 and
> p2, when would a Controller send a LeaderAndIsrRequest with p1 only and not
> want Broker A to drop leadership for p2?
>
>
>    - If a broker bounces very quickly, the controller may start processing
>    the BrokerChange event after the broker already re-registers itself in
> zk.
>    In this case, controller will miss the broker restart and will not send
> any
>    requests to the broker for initialization. The broker will not be able
> to
>    accept traffics.
>
> Here the controller will start processing the BrokerChange event (that says
> that broker A shutdown) after the broker has come back up and re-registered
> himself in ZK?
> How will the Controller miss the restart, won't he subsequently receive
> another ZK event saying that broker A has come back up?
>
>
> Could we explain these potential problems in a bit more detail just so they
> could be more easily digestable by novices?
>
> Thanks,
> Stanislav
>
> On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com> wrote:
>
> > Hey Patrick,
> >
> > Thanks much for the KIP. The KIP is very well written.
> >
> > LGTM.  +1 (binding)
> >
> > Thanks,
> > Dong
> >
> >
> > On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <hz...@hotmail.com>
> wrote:
> >
> > > Hi All,
> > >
> > > Please find the below KIP which proposes the concept of broker
> generation
> > > to resolve issues caused by controller missing broker state changes and
> > > broker processing outdated control requests.
> > >
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 380%3A+Detect+outdated+control+requests+and+bounced+brokers+using+broker+
> generation
> > >
> > > All comments are appreciated.
> > >
> > > Best,
> > > Zhanxiang (Patrick) Huang
> > >
> >
>
>
> --
> Best,
> Stanislav
>

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Patrick Huang <hz...@hotmail.com>.
Hey Stanislav,

Sure. Thanks for your interest in this KIP. I am glad to provide more detail.

broker A is initiating a controlled shutdown (restart). The Controller
sends a StopReplicaRequest but it reaches broker A after it has started up
again. He therefore stops replicating those partitions even though he
should just be starting to
This is right.

Controller sends a LeaderAndIsrRequest before broker A initiates a restart.
Broker A restarts and receives the LeaderAndIsrRequest then. It therefore
starts leading for the partitions sent by that request and might stop
leading partitions that it was leading previously.
This was well explained in the linked JIRA, but I cannot understand why
that would happen due to my limited experience. If Broker A leads p1 and
p2, when would a Controller send a LeaderAndIsrRequest with p1 only and not
want Broker A to drop leadership for p2?
The root cause of the issue is that after a broker just restarts, it relies on the first LeaderAndIsrRequest to populate the partition state and initializes the highwater mark checkpoint thread. The highwater mark checkpoint thread will overwrite the highwater mark checkpoint file based on the broker's in-memory partition states. In other words, If a partition that is physically hosted by the broker is missing in the in-memory partition states map, its highwater mark will be lost after the highwater mark checkpoint thread overwrites the file. (Related codes: https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/scala/kafka/server/ReplicaManager.scala#L1091)
[https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/scala/kafka/server/ReplicaManager.scala#L1091>

apache/kafka<https://github.com/apache/kafka/blob/ed3bd79633ae227ad995dafc3d9f384a5534d4e9/core/src/main/scala/kafka/server/ReplicaManager.scala#L1091>
Mirror of Apache Kafka. Contribute to apache/kafka development by creating an account on GitHub.
github.com


In your example, assume the first LeaderAndIsrRequest broker A receives is the one initiated in the controlled shutdown logic in Controller to move leadership away from broker A. This LeaderAndIsrRequest only contains partitions that broker A leads, not all the partitions that broker A hosts (i.e. no follower partitions), so the highwater mark for the follower partitions will be lost. Also, the first LeaderAndIsrRequst broker A receives may not necessarily be the one initiated in controlled shutdown logic (e.g. there can be an ongoing preferred leader election), although I think this may not be very common.

Here the controller will start processing the BrokerChange event (that says
that broker A shutdown) after the broker has come back up and re-registered
himself in ZK?
How will the Controller miss the restart, won't he subsequently receive
another ZK event saying that broker A has come back up?
Controller will not miss the BrokerChange event and actually there will be two BrokerChange events fired in this case (one for broker deregistration in zk and one for registration). However, when processing the BrokerChangeEvent, controller needs to do a read from zookeeper to get back the current brokers in the cluster and if the bounced broker already joined the cluster by this time, controller will not know this broker has been bounced because it sees no diff between zk and its in-memory cache. So basically both of the BrokerChange event processing become no-op.


Hope that I answer your questions. Feel free to follow up if I am missing something.


Thanks,
Zhanxiang (Patrick) Huang

________________________________
From: Stanislav Kozlovski <st...@confluent.io>
Sent: Wednesday, October 10, 2018 7:22
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Hi Patrick,

Thanks for the KIP! Fixing such correctness issues is always very welcome -
they're commonly hard to diagnose and debug when they happen in production.

I was wondering if I understood the potential correctness issues correctly.
Here is what I got:


   - If a broker bounces during controlled shutdown, the bounced broker may
   accidentally process its earlier generation’s StopReplicaRequest sent from
   the active controller for one of its follower replicas, leaving the replica
   offline while its remaining replicas may stay online

broker A is initiating a controlled shutdown (restart). The Controller
sends a StopReplicaRequest but it reaches broker A after it has started up
again. He therefore stops replicating those partitions even though he
should just be starting to


   - If the first LeaderAndIsrRequest that a broker processes is sent by
   the active controller before its startup, the broker will overwrite the
   high watermark checkpoint file and may cause incorrect truncation (
   KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235>)

Controller sends a LeaderAndIsrRequest before broker A initiates a restart.
Broker A restarts and receives the LeaderAndIsrRequest then. It therefore
starts leading for the partitions sent by that request and might stop
leading partitions that it was leading previously.
This was well explained in the linked JIRA, but I cannot understand why
that would happen due to my limited experience. If Broker A leads p1 and
p2, when would a Controller send a LeaderAndIsrRequest with p1 only and not
want Broker A to drop leadership for p2?


   - If a broker bounces very quickly, the controller may start processing
   the BrokerChange event after the broker already re-registers itself in zk.
   In this case, controller will miss the broker restart and will not send any
   requests to the broker for initialization. The broker will not be able to
   accept traffics.

Here the controller will start processing the BrokerChange event (that says
that broker A shutdown) after the broker has come back up and re-registered
himself in ZK?
How will the Controller miss the restart, won't he subsequently receive
another ZK event saying that broker A has come back up?


Could we explain these potential problems in a bit more detail just so they
could be more easily digestable by novices?

Thanks,
Stanislav

On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com> wrote:

> Hey Patrick,
>
> Thanks much for the KIP. The KIP is very well written.
>
> LGTM.  +1 (binding)
>
> Thanks,
> Dong
>
>
> On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <hz...@hotmail.com> wrote:
>
> > Hi All,
> >
> > Please find the below KIP which proposes the concept of broker generation
> > to resolve issues caused by controller missing broker state changes and
> > broker processing outdated control requests.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-380%3A+Detect+outdated+control+requests+and+bounced+brokers+using+broker+generation
> >
> > All comments are appreciated.
> >
> > Best,
> > Zhanxiang (Patrick) Huang
> >
>


--
Best,
Stanislav

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Stanislav Kozlovski <st...@confluent.io>.
Hi Patrick,

Thanks for the KIP! Fixing such correctness issues is always very welcome -
they're commonly hard to diagnose and debug when they happen in production.

I was wondering if I understood the potential correctness issues correctly.
Here is what I got:


   - If a broker bounces during controlled shutdown, the bounced broker may
   accidentally process its earlier generation’s StopReplicaRequest sent from
   the active controller for one of its follower replicas, leaving the replica
   offline while its remaining replicas may stay online

broker A is initiating a controlled shutdown (restart). The Controller
sends a StopReplicaRequest but it reaches broker A after it has started up
again. He therefore stops replicating those partitions even though he
should just be starting to


   - If the first LeaderAndIsrRequest that a broker processes is sent by
   the active controller before its startup, the broker will overwrite the
   high watermark checkpoint file and may cause incorrect truncation (
   KAFKA-7235 <https://issues.apache.org/jira/browse/KAFKA-7235>)

Controller sends a LeaderAndIsrRequest before broker A initiates a restart.
Broker A restarts and receives the LeaderAndIsrRequest then. It therefore
starts leading for the partitions sent by that request and might stop
leading partitions that it was leading previously.
This was well explained in the linked JIRA, but I cannot understand why
that would happen due to my limited experience. If Broker A leads p1 and
p2, when would a Controller send a LeaderAndIsrRequest with p1 only and not
want Broker A to drop leadership for p2?


   - If a broker bounces very quickly, the controller may start processing
   the BrokerChange event after the broker already re-registers itself in zk.
   In this case, controller will miss the broker restart and will not send any
   requests to the broker for initialization. The broker will not be able to
   accept traffics.

Here the controller will start processing the BrokerChange event (that says
that broker A shutdown) after the broker has come back up and re-registered
himself in ZK?
How will the Controller miss the restart, won't he subsequently receive
another ZK event saying that broker A has come back up?


Could we explain these potential problems in a bit more detail just so they
could be more easily digestable by novices?

Thanks,
Stanislav

On Wed, Oct 10, 2018 at 9:21 AM Dong Lin <li...@gmail.com> wrote:

> Hey Patrick,
>
> Thanks much for the KIP. The KIP is very well written.
>
> LGTM.  +1 (binding)
>
> Thanks,
> Dong
>
>
> On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <hz...@hotmail.com> wrote:
>
> > Hi All,
> >
> > Please find the below KIP which proposes the concept of broker generation
> > to resolve issues caused by controller missing broker state changes and
> > broker processing outdated control requests.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-380%3A+Detect+outdated+control+requests+and+bounced+brokers+using+broker+generation
> >
> > All comments are appreciated.
> >
> > Best,
> > Zhanxiang (Patrick) Huang
> >
>


-- 
Best,
Stanislav

Re: [DISCUSS] KIP-380: Detect outdated control requests and bounced brokers using broker generation

Posted by Dong Lin <li...@gmail.com>.
Hey Patrick,

Thanks much for the KIP. The KIP is very well written.

LGTM.  +1 (binding)

Thanks,
Dong


On Tue, Oct 9, 2018 at 11:46 PM Patrick Huang <hz...@hotmail.com> wrote:

> Hi All,
>
> Please find the below KIP which proposes the concept of broker generation
> to resolve issues caused by controller missing broker state changes and
> broker processing outdated control requests.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-380%3A+Detect+outdated+control+requests+and+bounced+brokers+using+broker+generation
>
> All comments are appreciated.
>
> Best,
> Zhanxiang (Patrick) Huang
>