You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Unmesh Joshi <un...@gmail.com> on 2020/08/03 08:48:11 UTC

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

>>>>We could combine the heartbeat with the fetch request.  It would
basically mean moving all the heartbeat fields into the fetch request.  As
the KIP says, this would be pretty messy.  >>>>Another reason why it would
be messy is because of the timing.  Fetch requests can get delayed when
they're fetching a lot of data.  If this delays heartbeats then it could
cause >>>>brokers to get fenced unnecessarily.  This is something that
we've gone back and forth about, but overall I think it's good to at least
implement the simple thing first.

Just a quick comment. The description of the heartbeat in this KIP is more
about maintaining a 'time bound lease' with broker details, for group
membership and failure detection of the brokers. That way they are more
than just heartbeats, as they need to have all the details we need in the
broker metadata. Calling them BrokerLease requests instead of Broker
heartbeat
might make it more clear?  Specifically because if the broker metadata is
not included in there, and they just need to indicate liveness of the
broker, the Fetch request is enough for that.
Raft AppendEntries requests do the same.

Two more things to be considered.
1.  Are heartbeat requests and Fetch requests sent on the same channel? In
that case Heartbeats can still suffer from slow disk disk reads/writes, and
even with the separate heartbeat request, care must be taken to
accommodate slower disk writes. (Unless like in zookeeper, any request on a
connection, touches the session to update the expirationTime even before
its put in the processing queue.)
   (e.g. This issue in Hashicorp Consul
https://github.com/hashicorp/raft/commit/1e6849570f4eecd17b68b2ee6f7624b44ff316eb
)
   (LogCabin resets the election time to 'avoid punishing the leader for
our own long disk writes'
https://github.com/logcabin/logcabin/blob/master/Server/RaftConsensus.cc)

2.  Local process pauses can still be an issue on the active controller?. (
https://issues.apache.org/jira/browse/CASSANDRA-9183)

Thanks,
Unmesh



On Thu, Jul 30, 2020 at 6:04 AM Colin McCabe <cm...@apache.org> wrote:

> On Thu, Jul 23, 2020, at 23:02, Boyang Chen wrote:
> > Hey Colin,
> >
> > some more questions I have about the proposal:
> >
> > 1. We mentioned in the networking section that "The only time when
> clients
> > should contact a controller node directly is when they are debugging
> system
> > issues". But later we didn't talk about how to enable this debug mode,
> > could you consider getting a section about that?
> >
>
> Hi Boyang,
>
> Thanks for the review.
>
> There isn't a separate debug mode or anything like that.  The assumption
> here is that when debugging system issues, we are inside the cluster, or at
> least have access to the private controller network.  This is pretty
> similar to how most administrators run ZooKeeper.
>
> >
> > 2. “When the active controller decides that a standby controller should
> > start a snapshot, it will communicate that information in its response to
> > the periodic heartbeat sent by that node.“ In the KIP-595, we provide an
> > RPC called `EndQuorumEpoch` which would transfer the leadership role to a
> > dedicated successor, do you think we could reuse that method instead of
> > piggy-backing on the heartbeat RPC?
> >
>
> I thought about this a little bit more, and I think the active controller
> should not need to give up the leadership in order to snapshot.  So I
> removed the part about renouncing the controllership.
>
> >
> > 3. The `DeleteBroker` record is listed but not mentioned in details for
> the
> > KIP. Are we going to support removing a broker in runtime, or this record
> > is just for the sake of removing an obsolete broker due to heartbeat
> > failure?
> >
>
> This record is about removing a broker from the cluster due to heartbeat
> failure, or because it's shutting down.  I have renamed it to FenceBroker
> to make that clearer.
>
> >
> > 4. In the rejected alternatives, we mentioned we don't want to combine
> > heartbeats and fetch and listed out the reason was due to extra
> complexity.
> > However, we should also mention some cons caused by this model, for
> example
> > we are doing 2X round trips to maintain a liveness, where as a regular
> > follower it should always send out fetch, for sure. If we are combining
> the
> > two, what are the heartbeat request fields we need to populate in the
> Fetch
> > protocol to make it work? Could we piggy-back on the UpdateMetadata RPC
> to
> > propagate the broker state change for listeners separately to the
> > controller? I'm not buying either approach here, just hope we could list
> > out more reasoning for separating the heartbeat RPC from Fetch, pros and
> > cons.
> >
>
> The UpdateMetadata RPC is not used in the post-KIP-500 world.  This is
> mentioned later in KIP-631 where it says that "we will no longer need to
> send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest"
>
> We could combine the heartbeat with the fetch request.  It would basically
> mean moving all the heartbeat fields into the fetch request.  As the KIP
> says, this would be pretty messy.  Another reason why it would be messy is
> because of the timing.  Fetch requests can get delayed when they're
> fetching a lot of data.  If this delays heartbeats then it could cause
> brokers to get fenced unnecessarily.  This is something that we've gone
> back and forth about, but overall I think it's good to at least implement
> the simple thing first.
>
> best,
> Colin
>
> >
> > Boyang
> >
> > On Wed, Jul 15, 2020 at 5:30 PM Colin McCabe <cm...@apache.org> wrote:
> >
> > > On Mon, Jul 13, 2020, at 11:08, Boyang Chen wrote:
> > > > Hey Colin, some quick questions,
> > > >
> > > > 1. I looked around and didn't find a config for broker heartbeat
> > > interval,
> > > > are we piggy-back on some existing configs?
> > > >
> > >
> > > Good point.  I meant to add this, but I forgot.  I added
> > > registration.heartbeat.interval.ms in the table.
> > >
> > > >
> > > > 2. We only mentioned that the lease time is 10X of the heartbeat
> > > interval,
> > > > could we also include why we chose this value?
> > > >
> > >
> > > I will add registration.lease.timeout.ms so that this can be set
> > > separately from registration.heartbeat.interval.ms.  The choice of
> value
> > > is a balance between not timing out brokers too soon, and not keeping
> > > unavailable brokers around too long.
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > > On Mon, Jul 13, 2020 at 10:09 AM Jason Gustafson <jason@confluent.io
> >
> > > wrote:
> > > >
> > > > > Hi Colin,
> > > > >
> > > > > Thanks for the proposal. A few initial comments comments/questions
> > > below:
> > > > >
> > > > > 1. I don't follow why we need a separate configuration for
> > > > > `controller.listeners`. The current listener configuration already
> > > allows
> > > > > users to specify multiple listeners, which allows them to define
> > > internal
> > > > > endpoints that are not exposed to clients. Can you explain what
> the new
> > > > > configuration gives us that we don't already have?
> > > > > 2. What is the advantage of creating a separate `controller.id`
> > > instead of
> > > > > just using `broker.id`?
> > > > > 3. It sounds like you are imagining a stop-the-world approach to
> > > > > snapshotting, which is why we need the controller micromanaging
> > > snapshots
> > > > > on all followers. Alternatives include fuzzy snapshots which can be
> > > done
> > > > > concurrently. If this has been rejected, can you add some detail
> about
> > > why?
> > > > > 4. More of a nit, but should `DeleteBrokerRecord` be
> > > > > `ShutdownBrokerRecord`? The broker is just getting removed from
> ISRs,
> > > but
> > > > > it would still be present in the replica set (I assume).
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > > On Sun, Jul 12, 2020 at 12:24 AM Colin McCabe <cm...@apache.org>
> > > wrote:
> > > > >
> > > > > > Hi Unmesh,
> > > > > >
> > > > > > That's an interesting idea, but I think it would be best to
> strive
> > > for
> > > > > > single metadata events that are complete in themselves, rather
> than
> > > > > trying
> > > > > > to do something transactional or EOS-like.  For example, we could
> > > have a
> > > > > > create event that contains all the partitions to be created.
> > > > > >
> > > > > > best,
> > > > > > Colin
> > > > > >
> > > > > >
> > > > > > On Fri, Jul 10, 2020, at 04:12, Unmesh Joshi wrote:
> > > > > > > I was thinking that we might need something like
> multi-operation
> > > > > > > <https://issues.apache.org/jira/browse/ZOOKEEPER-965> record
> in
> > > > > > zookeeper
> > > > > > > to atomically create topic and partition records when this
> multi
> > > record
> > > > > > is
> > > > > > > committed.  This way metadata will have both the TopicRecord
> and
> > > > > > > PartitionRecord together always, and in no situation we can
> have
> > > > > > > TopicRecord without PartitionRecord. Not sure if there are
> other
> > > > > > situations
> > > > > > > where multi-operation is needed.
> > > > > > > <https://issues.apache.org/jira/browse/ZOOKEEPER-965>
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Unmesh
> > > > > > >
> > > > > > > On Fri, Jul 10, 2020 at 11:32 AM Colin McCabe <
> cmccabe@apache.org>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Unmesh,
> > > > > > > >
> > > > > > > > Yes, once the last stable offset advanced, we would consider
> the
> > > > > topic
> > > > > > > > creation to be done, and then we could return success to the
> > > client.
> > > > > > > >
> > > > > > > > best,
> > > > > > > > Colin
> > > > > > > >
> > > > > > > > On Thu, Jul 9, 2020, at 19:44, Unmesh Joshi wrote:
> > > > > > > > > It still needs HighWaterMark / LastStableOffset to be
> advanced
> > > by
> > > > > two
> > > > > > > > > records? Something like following?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >                        |                |
> > > > > > > > > <------------------    |----------------|   HighWaterMark
> > > > > > > > >    Response            |PartitionRecord |
> > > > > > > > >                        |                |
> > > > > > > > >                        -----------------|
> > > > > > > > >                        | TopicRecord    |
> > > > > -
> > > > > > > > >                        |                |
> > > > > > > > > ------------------->   ------------------   Previous
> > > HighWaterMark
> > > > > > > > >    CreateTopic         |                |
> > > > > > > > >                        |                |
> > > > > > > > >                        |                |
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Jul 10, 2020 at 1:30 AM Colin McCabe <
> > > cmccabe@apache.org>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > On Thu, Jul 9, 2020, at 04:37, Unmesh Joshi wrote:
> > > > > > > > > > > I see that, when a new topic is created, two metadata
> > > records,
> > > > > a
> > > > > > > > > > > TopicRecord (just the name and id of the topic) and a
> > > > > > PartitionRecord
> > > > > > > > > > (more
> > > > > > > > > > > like LeaderAndIsr, with leader id and replica ids for
> the
> > > > > > partition)
> > > > > > > > are
> > > > > > > > > > > created.
> > > > > > > > > > > While creating the topic, log entries for both the
> records
> > > need
> > > > > > to be
> > > > > > > > > > > committed in RAFT core. Will it need something like a
> > > > > > > > > > MultiOperationRecord
> > > > > > > > > > > in zookeeper. Then, we can have a single log entry with
> > > both
> > > > > the
> > > > > > > > records,
> > > > > > > > > > > and  the create topic request can be fulfilled
> atomically
> > > when
> > > > > > both
> > > > > > > > the
> > > > > > > > > > > records are committed?
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Hi Unmesh,
> > > > > > > > > >
> > > > > > > > > > Since the active controller is the only node writing to
> the
> > > log,
> > > > > > there
> > > > > > > > is
> > > > > > > > > > no need for any kind of synchronization or access
> control at
> > > the
> > > > > > log
> > > > > > > > level.
> > > > > > > > > >
> > > > > > > > > > best,
> > > > > > > > > > Colin
> > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Unmesh
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Jul 8, 2020 at 6:57 AM Ron Dagostino <
> > > > > rndgstn@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > HI Colin.  Thanks for the KIP.  Here is some
> feedback and
> > > > > > various
> > > > > > > > > > > > questions.
> > > > > > > > > > > >
> > > > > > > > > > > > "*Controller processes will listen on a separate port
> > > from
> > > > > > brokers.
> > > > > > > > > > This
> > > > > > > > > > > > will be true even when the broker and controller are
> > > > > > co-located in
> > > > > > > > the
> > > > > > > > > > same
> > > > > > > > > > > > JVM*". I assume it is possible that the port numbers
> > > could be
> > > > > > the
> > > > > > > > same
> > > > > > > > > > when
> > > > > > > > > > > > using separate JVMs (i.e. broker uses port 9192 and
> > > > > controller
> > > > > > also
> > > > > > > > > > uses
> > > > > > > > > > > > port 9192).  I think it would be clearer to state
> this
> > > along
> > > > > > these
> > > > > > > > > > > > lines: "Controller
> > > > > > > > > > > > nodes will listen on a port, and the controller port
> must
> > > > > > differ
> > > > > > > > from
> > > > > > > > > > any
> > > > > > > > > > > > port that a broker in the same JVM is listening on.
> In
> > > other
> > > > > > > > words, a
> > > > > > > > > > > > controller and a broker node, when in the same JVM,
> do
> > > not
> > > > > > share
> > > > > > > > ports"
> > > > > > > > > > > >
> > > > > > > > > > > > I think the sentence "*In the realm of ACLs, this
> > > translates
> > > > > to
> > > > > > > > > > controllers
> > > > > > > > > > > > requiring CLUSTERACTION on CLUSTER for all
> operations*"
> > > is
> > > > > > > > confusing.
> > > > > > > > > > It
> > > > > > > > > > > > feels to me that you can just delete it.  Am I
> missing
> > > > > > something
> > > > > > > > here?
> > > > > > > > > > > >
> > > > > > > > > > > > The KIP states "*The metadata will be stored in
> memory
> > > on all
> > > > > > the
> > > > > > > > > > active
> > > > > > > > > > > > controllers.*"  Can there be multiple active
> controllers?
> > > > > > Should
> > > > > > > > it
> > > > > > > > > > > > instead read "The metadata will be stored in memory
> on
> > > all
> > > > > > > > potential
> > > > > > > > > > > > controllers." (or something like that)?
> > > > > > > > > > > >
> > > > > > > > > > > > KIP-595 states "*we have assumed the name
> > > __cluster_metadata
> > > > > > for
> > > > > > > > this
> > > > > > > > > > > > topic, but this is not a formal part of this
> proposal*".
> > > > > This
> > > > > > > > KIP-631
> > > > > > > > > > > > states "*Metadata changes need to be persisted to the
> > > > > > __metadata
> > > > > > > > log
> > > > > > > > > > before
> > > > > > > > > > > > we propagate them to the other nodes in the cluster.
> > > This
> > > > > > means
> > > > > > > > > > waiting
> > > > > > > > > > > > for the metadata log's last stable offset to advance
> to
> > > the
> > > > > > offset
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > > change.*"  Are we here formally defining
> "__metadata" as
> > > the
> > > > > > topic
> > > > > > > > > > name,
> > > > > > > > > > > > and should these sentences refer to "__metadata
> topic"
> > > rather
> > > > > > than
> > > > > > > > > > > > "__metadata log"?  What are the "other nodes in the
> > > cluster"
> > > > > > that
> > > > > > > > are
> > > > > > > > > > > > referred to?  These are not controller nodes but
> brokers,
> > > > > > right?
> > > > > > > > If
> > > > > > > > > > so,
> > > > > > > > > > > > then should we say "before we propagate them to the
> > > brokers"?
> > > > > > > > > > Technically
> > > > > > > > > > > > we have a controller cluster and a broker cluster --
> two
> > > > > > separate
> > > > > > > > > > clusters,
> > > > > > > > > > > > correct?  (Even though we could potentially share
> JVMs
> > > and
> > > > > > > > therefore
> > > > > > > > > > > > require no additional processes.). If the statement
> is
> > > > > > referring to
> > > > > > > > > > nodes
> > > > > > > > > > > > in both clusters then maybe we should state "before
> we
> > > > > > propagate
> > > > > > > > them
> > > > > > > > > > to
> > > > > > > > > > > > the other nodes in the controller cluster or to
> brokers."
> > > > > > > > > > > >
> > > > > > > > > > > > "*The controller may have several of these
> uncommitted
> > > > > changes
> > > > > > in
> > > > > > > > > > flight at
> > > > > > > > > > > > any given time.  In essence, the controller's
> in-memory
> > > state
> > > > > > is
> > > > > > > > > > always a
> > > > > > > > > > > > little bit in the future compared to the current
> state.
> > > This
> > > > > > > > allows
> > > > > > > > > > the
> > > > > > > > > > > > controller to continue doing things while it waits
> for
> > > the
> > > > > > previous
> > > > > > > > > > changes
> > > > > > > > > > > > to be committed to the Raft log.*"  Should the three
> > > > > references
> > > > > > > > above
> > > > > > > > > > be to
> > > > > > > > > > > > the active controller rather than just the
> controller?
> > > > > > > > > > > >
> > > > > > > > > > > > "*Therefore, the controller must not make this future
> > > state
> > > > > > > > "visible"
> > > > > > > > > > to
> > > > > > > > > > > > the rest of the cluster until it has been made
> > > persistent –
> > > > > > that
> > > > > > > > is,
> > > > > > > > > > until
> > > > > > > > > > > > it becomes current state*". Again I wonder if this
> should
> > > > > > refer to
> > > > > > > > > > "active"
> > > > > > > > > > > > controller, and indicate "anyone else" as opposed to
> "the
> > > > > rest
> > > > > > of
> > > > > > > > the
> > > > > > > > > > > > cluster" since we are talking about 2 clusters here?
> > > > > > > > > > > >
> > > > > > > > > > > > "*When the active controller decides that it itself
> > > should
> > > > > > create a
> > > > > > > > > > > > snapshot, it will first try to give up the
> leadership of
> > > the
> > > > > > Raft
> > > > > > > > > > quorum.*"
> > > > > > > > > > > > Why?  Is it necessary to state this?  It seems like
> it
> > > might
> > > > > > be an
> > > > > > > > > > > > implementation detail rather than a necessary
> > > > > > > > constraint/requirement
> > > > > > > > > > that
> > > > > > > > > > > > we declare publicly and would have to abide by.
> > > > > > > > > > > >
> > > > > > > > > > > > "*It will reject brokers whose metadata is too
> stale*".
> > > Why?
> > > > > > An
> > > > > > > > > > example
> > > > > > > > > > > > might be helpful here.
> > > > > > > > > > > >
> > > > > > > > > > > > "*it may lose subsequent conflicts if its broker
> epoch is
> > > > > > stale*"
> > > > > > > > This
> > > > > > > > > > is
> > > > > > > > > > > > the first time a "broker epoch" is mentioned.  I am
> > > assuming
> > > > > > it is
> > > > > > > > the
> > > > > > > > > > > > controller epoch communicated to it (if any).  It
> would
> > > be
> > > > > > good to
> > > > > > > > > > > > introduce it/explicitly state what it is before
> > > referring to
> > > > > > it.
> > > > > > > > > > > >
> > > > > > > > > > > > Ron
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Jul 7, 2020 at 6:48 PM Colin McCabe <
> > > > > > cmccabe@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I posted a KIP about how the quorum-based
> controller
> > > > > > envisioned
> > > > > > > > in
> > > > > > > > > > > > KIP-500
> > > > > > > > > > > > > will work.  Please take a look here:
> > > > > > > > > > > > > https://cwiki.apache.org/confluence/x/4RV4CQ
> > > > > > > > > > > > >
> > > > > > > > > > > > > best,
> > > > > > > > > > > > > Colin
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

Posted by Colin McCabe <cm...@apache.org>.
On Mon, Aug 3, 2020, at 01:48, Unmesh Joshi wrote:
> Just a quick comment. The description of the heartbeat in this KIP is more
> about maintaining a 'time bound lease' with broker details, for group
> membership and failure detection of the brokers. That way they are more
> than just heartbeats, as they need to have all the details we need in the
> broker metadata. Calling them BrokerLease requests instead of Broker
> heartbeat
> might make it more clear?  Specifically because if the broker metadata is
> not included in there, and they just need to indicate liveness of the
> broker, the Fetch request is enough for that.
> Raft AppendEntries requests do the same.
> 
> Two more things to be considered.
> 1.  Are heartbeat requests and Fetch requests sent on the same channel? 
> In that case Heartbeats can still suffer from slow disk disk reads/writes, 
> and even with the separate heartbeat request, care must be taken to
> accommodate slower disk writes. (Unless like in zookeeper, any request 
> on a connection, touches the session to update the expirationTime even before
> its put in the processing queue.)
>    (e.g. This issue in Hashicorp Consul
> https://github.com/hashicorp/raft/commit/1e6849570f4eecd17b68b2ee6f7624b44ff316eb
> )
>    (LogCabin resets the election time to 'avoid punishing the leader for
> our own long disk writes'
> https://github.com/logcabin/logcabin/blob/master/Server/RaftConsensus.cc)
> 

That's a fair point.  I think we definitely want to put fetch requests and heartbeat requests on their own channels, separate from other requests.  The alternative would be getting rid of head-of-line blocking, but that would require some difficult changes to the networking layer.

>
> 2.  Local process pauses can still be an issue on the active controller?. (
> https://issues.apache.org/jira/browse/CASSANDRA-9183)
> 

Cassandra has a very different way of handling cluster membership than we do.  This would not be an issue in Kafka because clients don't get their metadata responses from the controller quorum anyway.

best,
Colin

> Thanks,
> Unmesh
> 
> 
> 
> On Thu, Jul 30, 2020 at 6:04 AM Colin McCabe <cm...@apache.org> wrote:
> 
> > On Thu, Jul 23, 2020, at 23:02, Boyang Chen wrote:
> > > Hey Colin,
> > >
> > > some more questions I have about the proposal:
> > >
> > > 1. We mentioned in the networking section that "The only time when
> > clients
> > > should contact a controller node directly is when they are debugging
> > system
> > > issues". But later we didn't talk about how to enable this debug mode,
> > > could you consider getting a section about that?
> > >
> >
> > Hi Boyang,
> >
> > Thanks for the review.
> >
> > There isn't a separate debug mode or anything like that.  The assumption
> > here is that when debugging system issues, we are inside the cluster, or at
> > least have access to the private controller network.  This is pretty
> > similar to how most administrators run ZooKeeper.
> >
> > >
> > > 2. “When the active controller decides that a standby controller should
> > > start a snapshot, it will communicate that information in its response to
> > > the periodic heartbeat sent by that node.“ In the KIP-595, we provide an
> > > RPC called `EndQuorumEpoch` which would transfer the leadership role to a
> > > dedicated successor, do you think we could reuse that method instead of
> > > piggy-backing on the heartbeat RPC?
> > >
> >
> > I thought about this a little bit more, and I think the active controller
> > should not need to give up the leadership in order to snapshot.  So I
> > removed the part about renouncing the controllership.
> >
> > >
> > > 3. The `DeleteBroker` record is listed but not mentioned in details for
> > the
> > > KIP. Are we going to support removing a broker in runtime, or this record
> > > is just for the sake of removing an obsolete broker due to heartbeat
> > > failure?
> > >
> >
> > This record is about removing a broker from the cluster due to heartbeat
> > failure, or because it's shutting down.  I have renamed it to FenceBroker
> > to make that clearer.
> >
> > >
> > > 4. In the rejected alternatives, we mentioned we don't want to combine
> > > heartbeats and fetch and listed out the reason was due to extra
> > complexity.
> > > However, we should also mention some cons caused by this model, for
> > example
> > > we are doing 2X round trips to maintain a liveness, where as a regular
> > > follower it should always send out fetch, for sure. If we are combining
> > the
> > > two, what are the heartbeat request fields we need to populate in the
> > Fetch
> > > protocol to make it work? Could we piggy-back on the UpdateMetadata RPC
> > to
> > > propagate the broker state change for listeners separately to the
> > > controller? I'm not buying either approach here, just hope we could list
> > > out more reasoning for separating the heartbeat RPC from Fetch, pros and
> > > cons.
> > >
> >
> > The UpdateMetadata RPC is not used in the post-KIP-500 world.  This is
> > mentioned later in KIP-631 where it says that "we will no longer need to
> > send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest"
> >
> > We could combine the heartbeat with the fetch request.  It would basically
> > mean moving all the heartbeat fields into the fetch request.  As the KIP
> > says, this would be pretty messy.  Another reason why it would be messy is
> > because of the timing.  Fetch requests can get delayed when they're
> > fetching a lot of data.  If this delays heartbeats then it could cause
> > brokers to get fenced unnecessarily.  This is something that we've gone
> > back and forth about, but overall I think it's good to at least implement
> > the simple thing first.
> >
> > best,
> > Colin
> >
> > >
> > > Boyang
> > >
> > > On Wed, Jul 15, 2020 at 5:30 PM Colin McCabe <cm...@apache.org> wrote:
> > >
> > > > On Mon, Jul 13, 2020, at 11:08, Boyang Chen wrote:
> > > > > Hey Colin, some quick questions,
> > > > >
> > > > > 1. I looked around and didn't find a config for broker heartbeat
> > > > interval,
> > > > > are we piggy-back on some existing configs?
> > > > >
> > > >
> > > > Good point.  I meant to add this, but I forgot.  I added
> > > > registration.heartbeat.interval.ms in the table.
> > > >
> > > > >
> > > > > 2. We only mentioned that the lease time is 10X of the heartbeat
> > > > interval,
> > > > > could we also include why we chose this value?
> > > > >
> > > >
> > > > I will add registration.lease.timeout.ms so that this can be set
> > > > separately from registration.heartbeat.interval.ms.  The choice of
> > value
> > > > is a balance between not timing out brokers too soon, and not keeping
> > > > unavailable brokers around too long.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > >
> > > > > On Mon, Jul 13, 2020 at 10:09 AM Jason Gustafson <jason@confluent.io
> > >
> > > > wrote:
> > > > >
> > > > > > Hi Colin,
> > > > > >
> > > > > > Thanks for the proposal. A few initial comments comments/questions
> > > > below:
> > > > > >
> > > > > > 1. I don't follow why we need a separate configuration for
> > > > > > `controller.listeners`. The current listener configuration already
> > > > allows
> > > > > > users to specify multiple listeners, which allows them to define
> > > > internal
> > > > > > endpoints that are not exposed to clients. Can you explain what
> > the new
> > > > > > configuration gives us that we don't already have?
> > > > > > 2. What is the advantage of creating a separate `controller.id`
> > > > instead of
> > > > > > just using `broker.id`?
> > > > > > 3. It sounds like you are imagining a stop-the-world approach to
> > > > > > snapshotting, which is why we need the controller micromanaging
> > > > snapshots
> > > > > > on all followers. Alternatives include fuzzy snapshots which can be
> > > > done
> > > > > > concurrently. If this has been rejected, can you add some detail
> > about
> > > > why?
> > > > > > 4. More of a nit, but should `DeleteBrokerRecord` be
> > > > > > `ShutdownBrokerRecord`? The broker is just getting removed from
> > ISRs,
> > > > but
> > > > > > it would still be present in the replica set (I assume).
> > > > > >
> > > > > > Thanks,
> > > > > > Jason
> > > > > >
> > > > > > On Sun, Jul 12, 2020 at 12:24 AM Colin McCabe <cm...@apache.org>
> > > > wrote:
> > > > > >
> > > > > > > Hi Unmesh,
> > > > > > >
> > > > > > > That's an interesting idea, but I think it would be best to
> > strive
> > > > for
> > > > > > > single metadata events that are complete in themselves, rather
> > than
> > > > > > trying
> > > > > > > to do something transactional or EOS-like.  For example, we could
> > > > have a
> > > > > > > create event that contains all the partitions to be created.
> > > > > > >
> > > > > > > best,
> > > > > > > Colin
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Jul 10, 2020, at 04:12, Unmesh Joshi wrote:
> > > > > > > > I was thinking that we might need something like
> > multi-operation
> > > > > > > > <https://issues.apache.org/jira/browse/ZOOKEEPER-965> record
> > in
> > > > > > > zookeeper
> > > > > > > > to atomically create topic and partition records when this
> > multi
> > > > record
> > > > > > > is
> > > > > > > > committed.  This way metadata will have both the TopicRecord
> > and
> > > > > > > > PartitionRecord together always, and in no situation we can
> > have
> > > > > > > > TopicRecord without PartitionRecord. Not sure if there are
> > other
> > > > > > > situations
> > > > > > > > where multi-operation is needed.
> > > > > > > > <https://issues.apache.org/jira/browse/ZOOKEEPER-965>
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Unmesh
> > > > > > > >
> > > > > > > > On Fri, Jul 10, 2020 at 11:32 AM Colin McCabe <
> > cmccabe@apache.org>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Unmesh,
> > > > > > > > >
> > > > > > > > > Yes, once the last stable offset advanced, we would consider
> > the
> > > > > > topic
> > > > > > > > > creation to be done, and then we could return success to the
> > > > client.
> > > > > > > > >
> > > > > > > > > best,
> > > > > > > > > Colin
> > > > > > > > >
> > > > > > > > > On Thu, Jul 9, 2020, at 19:44, Unmesh Joshi wrote:
> > > > > > > > > > It still needs HighWaterMark / LastStableOffset to be
> > advanced
> > > > by
> > > > > > two
> > > > > > > > > > records? Something like following?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >                        |                |
> > > > > > > > > > <------------------    |----------------|   HighWaterMark
> > > > > > > > > >    Response            |PartitionRecord |
> > > > > > > > > >                        |                |
> > > > > > > > > >                        -----------------|
> > > > > > > > > >                        | TopicRecord    |
> > > > > > -
> > > > > > > > > >                        |                |
> > > > > > > > > > ------------------->   ------------------   Previous
> > > > HighWaterMark
> > > > > > > > > >    CreateTopic         |                |
> > > > > > > > > >                        |                |
> > > > > > > > > >                        |                |
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Jul 10, 2020 at 1:30 AM Colin McCabe <
> > > > cmccabe@apache.org>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > On Thu, Jul 9, 2020, at 04:37, Unmesh Joshi wrote:
> > > > > > > > > > > > I see that, when a new topic is created, two metadata
> > > > records,
> > > > > > a
> > > > > > > > > > > > TopicRecord (just the name and id of the topic) and a
> > > > > > > PartitionRecord
> > > > > > > > > > > (more
> > > > > > > > > > > > like LeaderAndIsr, with leader id and replica ids for
> > the
> > > > > > > partition)
> > > > > > > > > are
> > > > > > > > > > > > created.
> > > > > > > > > > > > While creating the topic, log entries for both the
> > records
> > > > need
> > > > > > > to be
> > > > > > > > > > > > committed in RAFT core. Will it need something like a
> > > > > > > > > > > MultiOperationRecord
> > > > > > > > > > > > in zookeeper. Then, we can have a single log entry with
> > > > both
> > > > > > the
> > > > > > > > > records,
> > > > > > > > > > > > and  the create topic request can be fulfilled
> > atomically
> > > > when
> > > > > > > both
> > > > > > > > > the
> > > > > > > > > > > > records are committed?
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Hi Unmesh,
> > > > > > > > > > >
> > > > > > > > > > > Since the active controller is the only node writing to
> > the
> > > > log,
> > > > > > > there
> > > > > > > > > is
> > > > > > > > > > > no need for any kind of synchronization or access
> > control at
> > > > the
> > > > > > > log
> > > > > > > > > level.
> > > > > > > > > > >
> > > > > > > > > > > best,
> > > > > > > > > > > Colin
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Unmesh
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Jul 8, 2020 at 6:57 AM Ron Dagostino <
> > > > > > rndgstn@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > HI Colin.  Thanks for the KIP.  Here is some
> > feedback and
> > > > > > > various
> > > > > > > > > > > > > questions.
> > > > > > > > > > > > >
> > > > > > > > > > > > > "*Controller processes will listen on a separate port
> > > > from
> > > > > > > brokers.
> > > > > > > > > > > This
> > > > > > > > > > > > > will be true even when the broker and controller are
> > > > > > > co-located in
> > > > > > > > > the
> > > > > > > > > > > same
> > > > > > > > > > > > > JVM*". I assume it is possible that the port numbers
> > > > could be
> > > > > > > the
> > > > > > > > > same
> > > > > > > > > > > when
> > > > > > > > > > > > > using separate JVMs (i.e. broker uses port 9192 and
> > > > > > controller
> > > > > > > also
> > > > > > > > > > > uses
> > > > > > > > > > > > > port 9192).  I think it would be clearer to state
> > this
> > > > along
> > > > > > > these
> > > > > > > > > > > > > lines: "Controller
> > > > > > > > > > > > > nodes will listen on a port, and the controller port
> > must
> > > > > > > differ
> > > > > > > > > from
> > > > > > > > > > > any
> > > > > > > > > > > > > port that a broker in the same JVM is listening on.
> > In
> > > > other
> > > > > > > > > words, a
> > > > > > > > > > > > > controller and a broker node, when in the same JVM,
> > do
> > > > not
> > > > > > > share
> > > > > > > > > ports"
> > > > > > > > > > > > >
> > > > > > > > > > > > > I think the sentence "*In the realm of ACLs, this
> > > > translates
> > > > > > to
> > > > > > > > > > > controllers
> > > > > > > > > > > > > requiring CLUSTERACTION on CLUSTER for all
> > operations*"
> > > > is
> > > > > > > > > confusing.
> > > > > > > > > > > It
> > > > > > > > > > > > > feels to me that you can just delete it.  Am I
> > missing
> > > > > > > something
> > > > > > > > > here?
> > > > > > > > > > > > >
> > > > > > > > > > > > > The KIP states "*The metadata will be stored in
> > memory
> > > > on all
> > > > > > > the
> > > > > > > > > > > active
> > > > > > > > > > > > > controllers.*"  Can there be multiple active
> > controllers?
> > > > > > > Should
> > > > > > > > > it
> > > > > > > > > > > > > instead read "The metadata will be stored in memory
> > on
> > > > all
> > > > > > > > > potential
> > > > > > > > > > > > > controllers." (or something like that)?
> > > > > > > > > > > > >
> > > > > > > > > > > > > KIP-595 states "*we have assumed the name
> > > > __cluster_metadata
> > > > > > > for
> > > > > > > > > this
> > > > > > > > > > > > > topic, but this is not a formal part of this
> > proposal*".
> > > > > > This
> > > > > > > > > KIP-631
> > > > > > > > > > > > > states "*Metadata changes need to be persisted to the
> > > > > > > __metadata
> > > > > > > > > log
> > > > > > > > > > > before
> > > > > > > > > > > > > we propagate them to the other nodes in the cluster.
> > > > This
> > > > > > > means
> > > > > > > > > > > waiting
> > > > > > > > > > > > > for the metadata log's last stable offset to advance
> > to
> > > > the
> > > > > > > offset
> > > > > > > > > of
> > > > > > > > > > > the
> > > > > > > > > > > > > change.*"  Are we here formally defining
> > "__metadata" as
> > > > the
> > > > > > > topic
> > > > > > > > > > > name,
> > > > > > > > > > > > > and should these sentences refer to "__metadata
> > topic"
> > > > rather
> > > > > > > than
> > > > > > > > > > > > > "__metadata log"?  What are the "other nodes in the
> > > > cluster"
> > > > > > > that
> > > > > > > > > are
> > > > > > > > > > > > > referred to?  These are not controller nodes but
> > brokers,
> > > > > > > right?
> > > > > > > > > If
> > > > > > > > > > > so,
> > > > > > > > > > > > > then should we say "before we propagate them to the
> > > > brokers"?
> > > > > > > > > > > Technically
> > > > > > > > > > > > > we have a controller cluster and a broker cluster --
> > two
> > > > > > > separate
> > > > > > > > > > > clusters,
> > > > > > > > > > > > > correct?  (Even though we could potentially share
> > JVMs
> > > > and
> > > > > > > > > therefore
> > > > > > > > > > > > > require no additional processes.). If the statement
> > is
> > > > > > > referring to
> > > > > > > > > > > nodes
> > > > > > > > > > > > > in both clusters then maybe we should state "before
> > we
> > > > > > > propagate
> > > > > > > > > them
> > > > > > > > > > > to
> > > > > > > > > > > > > the other nodes in the controller cluster or to
> > brokers."
> > > > > > > > > > > > >
> > > > > > > > > > > > > "*The controller may have several of these
> > uncommitted
> > > > > > changes
> > > > > > > in
> > > > > > > > > > > flight at
> > > > > > > > > > > > > any given time.  In essence, the controller's
> > in-memory
> > > > state
> > > > > > > is
> > > > > > > > > > > always a
> > > > > > > > > > > > > little bit in the future compared to the current
> > state.
> > > > This
> > > > > > > > > allows
> > > > > > > > > > > the
> > > > > > > > > > > > > controller to continue doing things while it waits
> > for
> > > > the
> > > > > > > previous
> > > > > > > > > > > changes
> > > > > > > > > > > > > to be committed to the Raft log.*"  Should the three
> > > > > > references
> > > > > > > > > above
> > > > > > > > > > > be to
> > > > > > > > > > > > > the active controller rather than just the
> > controller?
> > > > > > > > > > > > >
> > > > > > > > > > > > > "*Therefore, the controller must not make this future
> > > > state
> > > > > > > > > "visible"
> > > > > > > > > > > to
> > > > > > > > > > > > > the rest of the cluster until it has been made
> > > > persistent –
> > > > > > > that
> > > > > > > > > is,
> > > > > > > > > > > until
> > > > > > > > > > > > > it becomes current state*". Again I wonder if this
> > should
> > > > > > > refer to
> > > > > > > > > > > "active"
> > > > > > > > > > > > > controller, and indicate "anyone else" as opposed to
> > "the
> > > > > > rest
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > > > cluster" since we are talking about 2 clusters here?
> > > > > > > > > > > > >
> > > > > > > > > > > > > "*When the active controller decides that it itself
> > > > should
> > > > > > > create a
> > > > > > > > > > > > > snapshot, it will first try to give up the
> > leadership of
> > > > the
> > > > > > > Raft
> > > > > > > > > > > quorum.*"
> > > > > > > > > > > > > Why?  Is it necessary to state this?  It seems like
> > it
> > > > might
> > > > > > > be an
> > > > > > > > > > > > > implementation detail rather than a necessary
> > > > > > > > > constraint/requirement
> > > > > > > > > > > that
> > > > > > > > > > > > > we declare publicly and would have to abide by.
> > > > > > > > > > > > >
> > > > > > > > > > > > > "*It will reject brokers whose metadata is too
> > stale*".
> > > > Why?
> > > > > > > An
> > > > > > > > > > > example
> > > > > > > > > > > > > might be helpful here.
> > > > > > > > > > > > >
> > > > > > > > > > > > > "*it may lose subsequent conflicts if its broker
> > epoch is
> > > > > > > stale*"
> > > > > > > > > This
> > > > > > > > > > > is
> > > > > > > > > > > > > the first time a "broker epoch" is mentioned.  I am
> > > > assuming
> > > > > > > it is
> > > > > > > > > the
> > > > > > > > > > > > > controller epoch communicated to it (if any).  It
> > would
> > > > be
> > > > > > > good to
> > > > > > > > > > > > > introduce it/explicitly state what it is before
> > > > referring to
> > > > > > > it.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Ron
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Jul 7, 2020 at 6:48 PM Colin McCabe <
> > > > > > > cmccabe@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I posted a KIP about how the quorum-based
> > controller
> > > > > > > envisioned
> > > > > > > > > in
> > > > > > > > > > > > > KIP-500
> > > > > > > > > > > > > > will work.  Please take a look here:
> > > > > > > > > > > > > > https://cwiki.apache.org/confluence/x/4RV4CQ
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > best,
> > > > > > > > > > > > > > Colin
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>