You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by José Armando García Sancio <js...@confluent.io.INVALID> on 2024/03/01 18:50:29 UTC

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Hi Jun,

Thanks for the feedback. See my comments below.

On Tue, Feb 27, 2024 at 11:27 AM Jun Rao <ju...@confluent.io.invalid> wrote:
> 30. Who controls RPCs like Fetch, FetchSnapshot, DescribeQuorum RPC? They
> are shared between voters and observers.

For Fetch and FetchSnapshot, this KIP adds the tagged field
ReplicaUuid to the request. This means that if the sender supports the
latest version it can always add the replica uuid to the request. If
the receiver supports the new tagged field it is included in the
appropriate FetchRequestData and FetchSnapshotRequestData field. If it
doesn't support the new tagged field it will be in the unknown tagged
fields.

For DescribeQuorum, this KIP only changes the response. The KRaft
leader will inspect the RequestHeader::apiVersion to determine what
information to include in the response.

> Does the client use supported ApiKeys or kraft.version feature in
> ApiVersions response for deciding whether to send AddVoter requests?

That's a good point. The Admin client has access to the finalized
kraft.version in the ApiVersions response. I was thinking of only
using the ApiKeys since we don't have a precedence of using the
finalized features in the Admin client. The receiver of the request
still needs to validate the kraft.version for those requests and
return an UNSUPPORTED_VERSION error in those cases.

Do you mind if we define this in a separate KIP?

Thanks,
-- 
-José

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by José Armando García Sancio <js...@confluent.io.INVALID>.
Hi Jun,

See my comments below.

On Thu, Mar 14, 2024 at 3:38 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> 37. Have you updated the wiki? It seems that LeaderIdAndEpoch and
> NodeEpoint are still two separate structs.

It is updated now. Apologies for the delayed wiki updates but I was
dealing with other issues in the past couple of weeks.

> 45. kafka-storage format --cluster-id <cluster-id> --release-version 3.8
> --standalone --config controller.properties
> It seems that --release-version is optional and the default value of this
> initial metadata.version is statically defined by the binary used to run
> the tool. So, in the common case, it seems that we could just omit
> --release-version.

That's correct. I removed it from the user explanation section and
updated the reference explanation section for the kafka-storage tool.

> 46. The KIP says "The leader will learn the range of supported version from
> the UpdateVoter RPC.
> 46.1 Then "KRaft will use the information in the controller registration,
> broker registration and add voter records to determine if the new version
> is compatible." is no longer accurate.

Good catch. I updated it to "KRaft will use the information in the
broker registration, voters records and UpdateVoters RPC to determine
if the new kraft.version are compatible. The new metadata.version will
be handled by the controller (QuorumController) like it is today."

> 46.2 Do we have a bit of a chicken-and-egg problem? A voter can't send
> UpdateVoter RPC until kraft.version is upgraded to 1. But to upgrade
> kraft.version, the leader needs to receive UpdateVoter RPCs.

Yes we do. It is a little subtle but this is why I relaxed UpdateVoter
so that it is sent irrespective of the kraft.version. The voters that
are following the leader will send a UpdateVoter request as long as
the "ApiKey" is in the ApiVersion response. I updated this sentence in
the UpdateVoter RPC section to make this clear:

"The voter will always send the UpdateVoter RPC whenever it starts and
whenever the leader changes irrespective of the kraft.version"

The KIP already has this wording in the handling section:

"5. Append the updated VotersRecord to the log if the finalized
kraft.version is greater than 0."

> 46.3 If the leader always learns the range of supported kraft.version from
> UpdateVoter RPC, does the VotersRecord need to include KRaftVersionFeature?

Yes. Storing it in VotersRecord allows the feature (kraft.version) to
be upgraded while K voters are offline where K is the number of
failures tolerated by the voter set. Upgrading from version
kraft.version 0 to kraft.version 1 is special. To upgrade from version
0 to version 1 all of the voters needed to be online at some point
while the current leader has been leader.

> 47. When a voter is started, in what order does it send the UpdateVoter and
> ControllerRegistration RPC?

They are independent of one another and can be sent in any order. Why
do you ask?

> 48. Voters set: "If the partition doesn't contain a VotersRecord control
> record then the value stored in the controller.quorum.voters property will
> be used."
>   Hmm, I thought controller.quorum.voters is only the fallback
> for controller.quorum.bootstrap.servers?

My observation is that the current controller.quorum.voters
configuration plays two roles. 1) It is the voters set for "voter
replicas". 2) It is the bootstrapping endpoint for "observer
replicas".

For 1) the resolution is going to be VotersRecord first,
controller.quorum.voters second.
For 2) the resolution is going to be
controller.quorum.bootstrap.servers first, controller.quorum.voters
second.

This is mainly there for backwards compatibility with configurations
that are valid in 3.7 and before.

> 49. "If the local replica is getting added to the voters set, it will allow
> the transition to prospective candidate when the fetch timer expires."
>   Could you explain why this is needed?

This is just saying that when a replica sees a VotersRecord which
includes itself it means that this local replica has become a voter.
Only voters are allowed to transition to prospective with KIP-996
(pre-vote) or transition to candidate (without KIP-996).

> 50. Quorum state: AppliedOffset will get removed in version 1.
>   This is the original description of AppliedOffset: Reflects the maximum
> offset that has been applied to this quorum state. This is used for log
> recovery. The broker must scan from this point on initialization to detect
> updates to this file. If we remove this field, how do we reason about the
> consistency between the quorum state and the metadata log?

With this KIP, these are the only fields that are present in the
lastest quorum data:
leader id, leader epoch, voted id and voted uuid. None of them are
related to the voters set that is stored in the log and snapshot.

For example, it is correct for the leader id to be in the voters set
and it is also correct for the leader id to not be in the voter set.
This second case can happen when the leader is removing itself from
the voters set.

For example, voter id and voted uuid may or may not be in the voters
set. See this wording in the KIP: "Since the set of voters can change
and not all replicas know the latest voters set, handling of Vote
requests needs to be relaxed from what was defined and implemented for
KIP-595. KRaft replicas will accept Vote requests from all replicas.
Candidate replicas don't need to be in the voters' voters set to
receive a vote. This is needed to be able to elect a leader from the
new voters set even though the new voters set hasn't been replicated
to all of its voters."

> 51. AddVoter has the following steps.
> 1. Wait for the fetch offset of the replica (ID, UUID) to catch up to the
> log end offset of the leader.
> 2. Wait until there are no uncommitted VotersRecord. Note that the
> implementation may just return a REQUEST_TIMED_OUT error if there are
> pending operations.
> 3. Wait for the LeaderChangeMessage control record from the current epoch
> to get committed. Note that the implementation may just return a
> REQUEST_TIMED_OUT error if there are pending operations.
> 4. Send an ApiVersions RPC to the first listener to discover the supported
> kraft.version of the new voter.
> It seems that doing the check on supported kraft.version of the new voter
> in step 4 is too late. If the new voter doesn't support kraft.version of 1,
> it can't process the metadata log records and step 1 could fail.

That's true. Good catch. I think the best change is to move the check
in 1. after 4. I updated the KIP.

> 52. Admin client: Could you provide a bit more details on the changes?

Yes. Let me update the KIP. I'll send another email when I have made
these changes.

> 53. A few more typos.
> ...

Thanks for the suggestions. Updated the KIP accordingly.

-- 
-José

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by José Armando García Sancio <js...@confluent.io.INVALID>.
Jun, thanks a lot for your help. I feel that the KIP is much better
after your detailed input.

If there is no more feedback, I'll start a voting thread tomorrow
morning. I'll monitor KIP-1022's discussion thread and update this KIP
with anything that affects the KIP's specification.

Thanks,
-- 
-José

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Jose,

Thanks for the explanation. Other than depending on KIP-1022 to be
approved, the KIP looks good to me now.

Jun

On Thu, Mar 28, 2024 at 2:56 PM José Armando García Sancio
<js...@confluent.io.invalid> wrote:

> Hi Jun,
>
> See my comments below.
>
> On Thu, Mar 28, 2024 at 11:09 AM Jun Rao <ju...@confluent.io.invalid> wrote:
> > If I am adding a new voter and it takes a long time (because the new
> voter
> > is catching up), I'd want to know if the request is indeed being
> processed.
> > I thought that's the usage of uncommitted-voter-change.
>
> They can get related information by using the 'kafka-metadata describe
> --replication" command (or the log-end-offset metric from KIP-595).
> That command (and metric) displays the LEO of all of the replicas
> (voters and observers), according to the leader. They can use that
> output to discover if the observer they are trying to add is lagging
> or is not replicating at all.
>
> When the user runs the command above, they don't know the exact offset
> that the new controller needs to reach but they can do some rough
> estimation of how far behind it is. What do you think? Is this good
> enough?
>
> > Also, I am still not sure about having multiple brokers reporting the
> same
> > metric. For example, if they don't report the same value (e.g. because
> one
> > broker is catching up), how does a user know which value is correct?
>
> They are all correct according to the local view. Here are two
> examples of monitors that the user can write:
>
> 1. Is there a voter that I need to remove from the quorum? They can
> create a monitor that fires, if the number-of-offline-voters metric
> has been greater than 0 for the past hour.
> 2. Is there a cluster that doesn't have 3 voters? They can create a
> monitor that fires, if any replica doesn't report three for
> number-of-voters for the past hour.
>
> Is there a specific metric that you have in mind that should only be
> reported by the KRaft leader?
>
> Thanks,
> --
> -José
>

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by José Armando García Sancio <js...@confluent.io.INVALID>.
Hi Jun,

See my comments below.

On Thu, Mar 28, 2024 at 11:09 AM Jun Rao <ju...@confluent.io.invalid> wrote:
> If I am adding a new voter and it takes a long time (because the new voter
> is catching up), I'd want to know if the request is indeed being processed.
> I thought that's the usage of uncommitted-voter-change.

They can get related information by using the 'kafka-metadata describe
--replication" command (or the log-end-offset metric from KIP-595).
That command (and metric) displays the LEO of all of the replicas
(voters and observers), according to the leader. They can use that
output to discover if the observer they are trying to add is lagging
or is not replicating at all.

When the user runs the command above, they don't know the exact offset
that the new controller needs to reach but they can do some rough
estimation of how far behind it is. What do you think? Is this good
enough?

> Also, I am still not sure about having multiple brokers reporting the same
> metric. For example, if they don't report the same value (e.g. because one
> broker is catching up), how does a user know which value is correct?

They are all correct according to the local view. Here are two
examples of monitors that the user can write:

1. Is there a voter that I need to remove from the quorum? They can
create a monitor that fires, if the number-of-offline-voters metric
has been greater than 0 for the past hour.
2. Is there a cluster that doesn't have 3 voters? They can create a
monitor that fires, if any replica doesn't report three for
number-of-voters for the past hour.

Is there a specific metric that you have in mind that should only be
reported by the KRaft leader?

Thanks,
-- 
-José

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Jose,

Thanks for the reply.

If I am adding a new voter and it takes a long time (because the new voter
is catching up), I'd want to know if the request is indeed being processed.
I thought that's the usage of uncommitted-voter-change.

Also, I am still not sure about having multiple brokers reporting the same
metric. For example, if they don't report the same value (e.g. because one
broker is catching up), how does a user know which value is correct?

Thanks,

Jun

On Thu, Mar 28, 2024 at 10:56 AM José Armando García Sancio
<js...@confluent.io.invalid> wrote:

> Hi Jun,
>
> On Thu, Mar 28, 2024 at 10:35 AM Jun Rao <ju...@confluent.io.invalid> wrote:
> > The following are the steps of AddVoter. The bulk of the time is probably
> > in step 5, but the updated VotersRecord won't be written until step 6.
> So,
> > ideally, the controller leader should report the pending voter as soon as
> > step 1. The brokers and non-leader controllers can't do that until after
> > step 6. Having multiple brokers report the same metric can be confusing
> > when there is inconsistency.
>
> First, the replicas (leader, following voters and observers) will
> compute the metrics the same way. In other words, in the leader the
> uncommitted-voter-change metric will be true (1) from step 7 to after
> step 8.
>
> I added the metric to indicate to the operator if there are replicas
> that have updated their voters set to a value that is uncommitted
> value. The leader doesn't update its voters set until step 7.
>
> I don't think that we should add metrics to track the state of a
> specific RPC. Or if we do, it should be a seperate KIP where we have a
> mechanism for consistently tracking this state across all admin RPCs.
> What do you think?
>
> Thanks,
> --
> -José
>

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by José Armando García Sancio <js...@confluent.io.INVALID>.
Hi Jun,

On Thu, Mar 28, 2024 at 10:35 AM Jun Rao <ju...@confluent.io.invalid> wrote:
> The following are the steps of AddVoter. The bulk of the time is probably
> in step 5, but the updated VotersRecord won't be written until step 6. So,
> ideally, the controller leader should report the pending voter as soon as
> step 1. The brokers and non-leader controllers can't do that until after
> step 6. Having multiple brokers report the same metric can be confusing
> when there is inconsistency.

First, the replicas (leader, following voters and observers) will
compute the metrics the same way. In other words, in the leader the
uncommitted-voter-change metric will be true (1) from step 7 to after
step 8.

I added the metric to indicate to the operator if there are replicas
that have updated their voters set to a value that is uncommitted
value. The leader doesn't update its voters set until step 7.

I don't think that we should add metrics to track the state of a
specific RPC. Or if we do, it should be a seperate KIP where we have a
mechanism for consistently tracking this state across all admin RPCs.
What do you think?

Thanks,
-- 
-José

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Jose,

Thanks for the reply.

The following are the steps of AddVoter. The bulk of the time is probably
in step 5, but the updated VotersRecord won't be written until step 6. So,
ideally, the controller leader should report the pending voter as soon as
step 1. The brokers and non-leader controllers can't do that until after
step 6. Having multiple brokers report the same metric can be confusing
when there is inconsistency.

1. Wait until there are no uncommitted VotersRecord. Note that the
implementation may just return a REQUEST_TIMED_OUT error if there are
pending operations.
2. Wait for the LeaderChangeMessage control record from the current epoch
to get committed. Note that the implementation may just return a
REQUEST_TIMED_OUT error if there are pending operations.
3. Send an ApiVersions RPC to the first listener to discover the supported
kraft.version of the new voter.
4. Check that the new voter supports the current kraft.version.
5. Wait for the fetch offset of the replica (ID, UUID) to catch up to the
log end offset of the leader.
6. Append the updated VotersRecord to the log.
7. The KRaft internal listener will read this record from the log and add
the voter to the voters set.
8. Wait for the VotersRecord to commit using the majority of new voters
set. Return a REQUEST_TIMED_OUT error if it doesn't succeed in time.
9. Send the AddVoter response to the client.

Jun

On Wed, Mar 27, 2024 at 7:52 PM José Armando García Sancio
<js...@confluent.io.invalid> wrote:

> Hi Jun,
>
> On Wed, Mar 27, 2024 at 2:26 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> > 55.1 How does the broker and non-leader controller know the pending
> voters?
>
> They are in the log. Pending voter sets are VotersRecords between the
> HWM and the LEO. The leader will make sure that there is at most one
> VoterRecord that is uncommitted (between the HWM and LEO).
>
> Maybe uncommitted-voter-change is a better name. Updated the KIP to
> use this name.
>
> Thanks,
> --
> -José
>

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by José Armando García Sancio <js...@confluent.io.INVALID>.
Hi Jun,

On Wed, Mar 27, 2024 at 2:26 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> 55.1 How does the broker and non-leader controller know the pending voters?

They are in the log. Pending voter sets are VotersRecords between the
HWM and the LEO. The leader will make sure that there is at most one
VoterRecord that is uncommitted (between the HWM and LEO).

Maybe uncommitted-voter-change is a better name. Updated the KIP to
use this name.

Thanks,
-- 
-José

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Jose,

Thanks for the reply.

55.1 How does the broker and non-leader controller know the pending voters?

Jun

On Wed, Mar 27, 2024 at 1:03 PM José Armando García Sancio
<js...@confluent.io.invalid> wrote:

> Hi Jun,
>
> Thanks for the feedback. See my comments below.
>
> On Mon, Mar 25, 2024 at 2:21 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> > 54. Yes, we could include SecurityProtocol in DescribeQuorumResponse.
> Then,
> > we could include it in the output of kafka-metadata-quorum --describe.
>
> Yes, I updated the DescribeQuorumResponse to include the
> SecurityProtocol and I also updated the example output for
> "kafka-metadata-quorum describe --status".
>
> > 55.1 Could number-of-observers and pending-voter-change be reported by
> all
> > brokers and controllers? I thought only the controller leader tracks
> those.
>
> These metrics are reported by all of the KRaft replicas (broker and
> controller). I think this makes it easier to monitor since metrics
> collectors can collect the same metrics from all of the nodes
> irrespective of their role (broker or controller). The main exception
> that Kafka has right now is type=KafkaController vs
> type=broker-metadata-metrics but I would favor a KIP that unified
> these two sets of metrics to something like type=metadata-metrics.
>
> > 55.2 So, IgnoredStaticVoters and IsObserver are Yammer metrics and the
> rest
> > are KafkaMetric. It would be useful to document the metric names clearer.
> > For Yammer metrics, we need to specify group, type, name and tags. For
> > KafkaMetrics, we need to specify just name and tags.
>
> Yeah. I always struggle with the MBean specification. I connected
> jconsole to Kafka and updated the KIP to be more accurate. Please take
> a look.
>
> > 57. Could we remove --release-version 3.8 in the upgrade example?
>
> Done. I also removed wording about deprecating --metadata from
> kafka-features.sh. I'll let KIP-1022 and the discussion there make
> that decision.
>
> Thanks,
> --
> -José
>

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by José Armando García Sancio <js...@confluent.io.INVALID>.
Hi Jun,

Thanks for the feedback. See my comments below.

On Mon, Mar 25, 2024 at 2:21 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> 54. Yes, we could include SecurityProtocol in DescribeQuorumResponse. Then,
> we could include it in the output of kafka-metadata-quorum --describe.

Yes, I updated the DescribeQuorumResponse to include the
SecurityProtocol and I also updated the example output for
"kafka-metadata-quorum describe --status".

> 55.1 Could number-of-observers and pending-voter-change be reported by all
> brokers and controllers? I thought only the controller leader tracks those.

These metrics are reported by all of the KRaft replicas (broker and
controller). I think this makes it easier to monitor since metrics
collectors can collect the same metrics from all of the nodes
irrespective of their role (broker or controller). The main exception
that Kafka has right now is type=KafkaController vs
type=broker-metadata-metrics but I would favor a KIP that unified
these two sets of metrics to something like type=metadata-metrics.

> 55.2 So, IgnoredStaticVoters and IsObserver are Yammer metrics and the rest
> are KafkaMetric. It would be useful to document the metric names clearer.
> For Yammer metrics, we need to specify group, type, name and tags. For
> KafkaMetrics, we need to specify just name and tags.

Yeah. I always struggle with the MBean specification. I connected
jconsole to Kafka and updated the KIP to be more accurate. Please take
a look.

> 57. Could we remove --release-version 3.8 in the upgrade example?

Done. I also removed wording about deprecating --metadata from
kafka-features.sh. I'll let KIP-1022 and the discussion there make
that decision.

Thanks,
--
-José

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Jose,

Thanks for the reply.

54. Yes, we could include SecurityProtocol in DescribeQuorumResponse. Then,
we could include it in the output of kafka-metadata-quorum --describe.

55.1 Could number-of-observers and pending-voter-change be reported by all
brokers and controllers? I thought only the controller leader tracks those.

55.2 So, IgnoredStaticVoters and IsObserver are Yammer metrics and the rest
are KafkaMetric. It would be useful to document the metric names clearer.
For Yammer metrics, we need to specify group, type, name and tags. For
KafkaMetrics, we need to specify just name and tags.

57. Could we remove --release-version 3.8 in the upgrade example?

Jun

On Mon, Mar 25, 2024 at 11:54 AM José Armando García Sancio
<js...@confluent.io.invalid> wrote:

> Hi Jun,
>
> See my comments below.
>
> On Fri, Mar 22, 2024 at 1:30 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> > 54. Admin.addMetadataVoter: It seems that Endpoint shouldn't include
> > securityProtocol since it's not in DescribeQuorumResponse.
>
> Yeah. I noticed that when I made the Admin changes. We either use a
> different type in the Admin client or add SecurityProtocol to the
> DescribeQuorumResponse. I was originally leaning towards adding
> SecurityProtocol to the DescribeQuorumResponse.
>
> Does the user want to see the security protocol in the response and
> CLI output? Listener name is not very useful unless the user also has
> access to the controller's configuration.
>
> I can go either way, what do you think?
>
> > 55. Metrics:
> > 55.1 It would be useful to be clear whether they are reported by the
> > controller leader, all controllers or all controllers and brokers.
>
> Done. I also noticed that I was missing one metric in the controller
> process role.
>
> > 55.2 IsObserver, type=KafkaController: Should we use the dash convention
> to
> > be consistent with the rest of the metrics?
>
> I would like to but I had to do this for backward compatibility. The
> existing controller metrics are all scoped under the KafkaController
> type. We had a similar discussion for "KIP-835: Monitor KRaft
> Controller Quorum Health."
>
> > 56. kafka-storage : "If the --release-version flag is not specified, the
> > IBP in the configuration is used."
> >   kafka-storage takes controller.properties as the input parameter and
> IBP
> > is not a controller property, right?
>
> I was documenting the current code. I suspect that the developer that
> implemented kafka-storage wanted it to work with a configuration that
> had an IBP.
>
> > 57. To be consistent with kafka-storage, should we make the
> > --release-version flag in kafka-features optional too? If this is not
> > specified, the default associated with the tool will be used.
>
> Sounds good. I updated that section to define this behavior for both
> the upgrade and downgrade commands.
>
> > 58. Typo: when the voter ID and UUID doesn't match
> >   doesn't => don't
>
> Fixed.
>
> Thanks, I already updated the KIP to match my comments above and
> include your feedback.
> --
> -José
>

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by José Armando García Sancio <js...@confluent.io.INVALID>.
Hi Jun,

See my comments below.

On Fri, Mar 22, 2024 at 1:30 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> 54. Admin.addMetadataVoter: It seems that Endpoint shouldn't include
> securityProtocol since it's not in DescribeQuorumResponse.

Yeah. I noticed that when I made the Admin changes. We either use a
different type in the Admin client or add SecurityProtocol to the
DescribeQuorumResponse. I was originally leaning towards adding
SecurityProtocol to the DescribeQuorumResponse.

Does the user want to see the security protocol in the response and
CLI output? Listener name is not very useful unless the user also has
access to the controller's configuration.

I can go either way, what do you think?

> 55. Metrics:
> 55.1 It would be useful to be clear whether they are reported by the
> controller leader, all controllers or all controllers and brokers.

Done. I also noticed that I was missing one metric in the controller
process role.

> 55.2 IsObserver, type=KafkaController: Should we use the dash convention to
> be consistent with the rest of the metrics?

I would like to but I had to do this for backward compatibility. The
existing controller metrics are all scoped under the KafkaController
type. We had a similar discussion for "KIP-835: Monitor KRaft
Controller Quorum Health."

> 56. kafka-storage : "If the --release-version flag is not specified, the
> IBP in the configuration is used."
>   kafka-storage takes controller.properties as the input parameter and IBP
> is not a controller property, right?

I was documenting the current code. I suspect that the developer that
implemented kafka-storage wanted it to work with a configuration that
had an IBP.

> 57. To be consistent with kafka-storage, should we make the
> --release-version flag in kafka-features optional too? If this is not
> specified, the default associated with the tool will be used.

Sounds good. I updated that section to define this behavior for both
the upgrade and downgrade commands.

> 58. Typo: when the voter ID and UUID doesn't match
>   doesn't => don't

Fixed.

Thanks, I already updated the KIP to match my comments above and
include your feedback.
-- 
-José

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Jose,

Thanks for the reply. A few more comments.

54. Admin.addMetadataVoter: It seems that Endpoint shouldn't include
securityProtocol since it's not in DescribeQuorumResponse.

55. Metrics:
55.1 It would be useful to be clear whether they are reported by the
controller leader, all controllers or all controllers and brokers.
55.2 IsObserver, type=KafkaController: Should we use the dash convention to
be consistent with the rest of the metrics?

56. kafka-storage : "If the --release-version flag is not specified, the
IBP in the configuration is used."
  kafka-storage takes controller.properties as the input parameter and IBP
is not a controller property, right?

57. To be consistent with kafka-storage, should we make the
--release-version flag in kafka-features optional too? If this is not
specified, the default associated with the tool will be used.

58. Typo: when the voter ID and UUID doesn't match
  doesn't => don't

Jun

On Fri, Mar 22, 2024 at 9:21 AM José Armando García Sancio
<js...@confluent.io.invalid> wrote:

> Hi Claude,
>
> On Fri, Mar 22, 2024 at 4:36 AM Claude Warren <cl...@apache.org> wrote:
> > Is there test code, or initial POC code for this KIP somewhere?  I would
> like to help move this forward but need a few pointers to associated
> resources.  I have read KIP-853 and it is beginning to sink in, but code
> would be nice.
>
> Thanks for your interest and I would appreciate the help with the
> implementation. I don't have a lot of code to show at the moment. The
> existing KRaft implementation is in the "raft" Java module in the
> apache/kafka repo.
>
> I am planning to create a set of sub-tasks under KAFKA-14094 soon, to
> give a rough outline of what needs implementing.
>
> Thanks,
> --
> -José
>

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by José Armando García Sancio <js...@confluent.io.INVALID>.
Hi Claude,

On Fri, Mar 22, 2024 at 4:36 AM Claude Warren <cl...@apache.org> wrote:
> Is there test code, or initial POC code for this KIP somewhere?  I would like to help move this forward but need a few pointers to associated resources.  I have read KIP-853 and it is beginning to sink in, but code would be nice.

Thanks for your interest and I would appreciate the help with the
implementation. I don't have a lot of code to show at the moment. The
existing KRaft implementation is in the "raft" Java module in the
apache/kafka repo.

I am planning to create a set of sub-tasks under KAFKA-14094 soon, to
give a rough outline of what needs implementing.

Thanks,
-- 
-José

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by Claude Warren <cl...@apache.org>.
Is there test code, or initial POC code for this KIP somewhere?  I would like to help move this forward but need a few pointers to associated resources.  I have read KIP-853 and it is beginning to sink in, but code would be nice.

Thanks,
Claude

On 2024/03/21 18:41:04 José Armando García Sancio wrote:
> Hi Jun,
> 
> On Thu, Mar 14, 2024 at 3:38 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> > 52. Admin client: Could you provide a bit more details on the changes?
> 
> I updated the KIP to include the API changes to the Admin client.
> 
> Thanks,
> -- 
> -José
> 

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by José Armando García Sancio <js...@confluent.io.INVALID>.
Hi Jun,

On Thu, Mar 14, 2024 at 3:38 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> 52. Admin client: Could you provide a bit more details on the changes?

I updated the KIP to include the API changes to the Admin client.

Thanks,
-- 
-José

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Jose,

Thanks for the reply. A few more comments.

37. Have you updated the wiki? It seems that LeaderIdAndEpoch and
NodeEpoint are still two separate structs.

45. kafka-storage format --cluster-id <cluster-id> --release-version 3.8
--standalone --config controller.properties
It seems that --release-version is optional and the default value of this
initial metadata.version is statically defined by the binary used to run
the tool. So, in the common case, it seems that we could just omit
--release-version.

46. The KIP says "The leader will learn the range of supported version from
the UpdateVoter RPC.
46.1 Then "KRaft will use the information in the controller registration,
broker registration and add voter records to determine if the new version
is compatible." is no longer accurate.
46.2 Do we have a bit of a chicken-and-egg problem? A voter can't send
UpdateVoter RPC until kraft.version is upgraded to 1. But to upgrade
kraft.version, the leader needs to receive UpdateVoter RPCs.
46.3 If the leader always learns the range of supported kraft.version from
UpdateVoter RPC, does the VotersRecord need to include KRaftVersionFeature?

47. When a voter is started, in what order does it send the UpdateVoter and
ControllerRegistration RPC?

48. Voters set: "If the partition doesn't contain a VotersRecord control
record then the value stored in the controller.quorum.voters property will
be used."
  Hmm, I thought controller.quorum.voters is only the fallback
for controller.quorum.bootstrap.servers?

49. "If the local replica is getting added to the voters set, it will allow
the transition to prospective candidate when the fetch timer expires."
  Could you explain why this is needed?

50. Quorum state: AppliedOffset will get removed in version 1.
  This is the original description of AppliedOffset: Reflects the maximum
offset that has been applied to this quorum state. This is used for log
recovery. The broker must scan from this point on initialization to detect
updates to this file. If we remove this field, how do we reason about the
consistency between the quorum state and the metadata log?

51. AddVoter has the following steps.
1. Wait for the fetch offset of the replica (ID, UUID) to catch up to the
log end offset of the leader.
2. Wait until there are no uncommitted VotersRecord. Note that the
implementation may just return a REQUEST_TIMED_OUT error if there are
pending operations.
3. Wait for the LeaderChangeMessage control record from the current epoch
to get committed. Note that the implementation may just return a
REQUEST_TIMED_OUT error if there are pending operations.
4. Send an ApiVersions RPC to the first listener to discover the supported
kraft.version of the new voter.
It seems that doing the check on supported kraft.version of the new voter
in step 4 is too late. If the new voter doesn't support kraft.version of 1,
it can't process the metadata log records and step 1 could fail.

52. Admin client: Could you provide a bit more details on the changes?

53. A few more typos.
53.1 be bootstrap => be bootstrapped
53.2 If the new leader supports the current kraft.version,
  new leader => new voter
53.3 Voter are removed
  Voter => Voters
53.4 that are part that are voters
  This part doesn't read well.
53.5 the the current
  extra the
53.6 it will be discover
  discover => discovered
53.7 it would beneficial
  beneficial => be beneficial

Jun

On Mon, Mar 11, 2024 at 10:39 AM José Armando García Sancio
<js...@confluent.io.invalid> wrote:

> Hi Jun
>
> Thanks for the feedback. See my comments below.
>
> On Wed, Mar 6, 2024 at 4:47 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> > 20.1. It seems that the PreferredSuccessors field didn't get fixed. It's
> > still there together with PreferredCandidates.
> > +        { "name": "PreferredSuccessors", "type": "[]int32", "versions":
> > "0",
> > +          "about": "A sorted list of preferred successors to start the
> > election" },
> > +        { "name": "PreferredCandidates", "type": "[]ReplicaInfo",
> > "versions": "1+",
> > +          "about": "A sorted list of preferred candidates to start the
> > election", "fields": [
>
> Notice that the PreferredSuccessors field is only for version 0 while
> the PreferredCandidate field is for version 1 or greater. I had to
> create a new field because arrays of int32 ([]int32) are not
> compatible with arrays of structs because of tagged fields in
> sub-structs.
>
> > 37. If we don't support batching in AddVoterResponse, RemoveVoterResponse
> > and UpdateVoterResponse, should we combine CurrentLeader and NodeEndpoint
> > into a single field?
>
> Yes. I replaced the LeaderIdAndEpoch and NodeEpoint structs into a
> single struct that contains the leader id, epoch, host and port.
>
> > 42. We include VoterId and VoterUuid for the receiver in Vote and
> > BeginQuorumEpoch requests, but not in EndQuorumEpoch, Fetch and
> > FetchSnapshot. Could you explain how they are used?
>
> For the Vote request and BeginQuorumEpoch request the replica
> (candidate for Vote and leader for BeginQuorumEpoch) sending the
> request needs to make sure that it is sending the request to the
> correct node. This is needed for correctness. The most important case
> that I wanted to make sure that replicas handle correctly is the
> following:
> 1. Voter set is A, B, C  and the leader is A. The voter A is both the
> voter id and voter uuid
> 2. Assume that A crashes and loses its disk. When it recovers it will
> come back as A'. A' means a replica with the same id but with a
> different replica uuid.
>
> Replica A' will most likely be accepting connection and handling
> requests (e.g. Vote and BeginQuorumEpoch) on the same endpoints as A.
> There can be inconsistency in the state, if for example replica B
> sends a Vote request to A' but A' handles it as if it was A. This is
> the reason the sender sends the remote replica's id and uuid (VoterId
> and VoterUuid) in the request. The same analysis applies to
> BeginEpochQuorum.
>
> For the Fetch and FetchSnapshot request the closest equivalent would
> be leader id and leader epoch. Those RPCs only have leader epochs. You
> can argue that they don't need the leader id because a leader epoch
> can have at most one leader id. In other words, the leader epoch also
> uniquely identifies the leader id if there is one. I am reluctant to
> change the Fetch RPC unless it is strictly required because that RPC
> is also used for regular topic partition replication. I tried to make
> the FetchSnapshot RPC as consistent to the Fetch RPC as possible since
> they have similar access patterns.
>
> EndQuorumEpoch is not needed for correctness. It is there for improved
> availability; to speedup leadership change when the nodes are
> cooperating (controlled shutdown and resignation). The sending replica
> (leader) doesn't need to wait for the response or to check that the
> RPC was handled correctly.
>
> I'll reread the KIP and update it to better explain the need for
> VoteId and VoteUuid in the Vote and BeginQuorumEpoch requests.
>
> Thanks,
> --
> -José
>

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by José Armando García Sancio <js...@confluent.io.INVALID>.
Hi Jun

Thanks for the feedback. See my comments below.

On Wed, Mar 6, 2024 at 4:47 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> 20.1. It seems that the PreferredSuccessors field didn't get fixed. It's
> still there together with PreferredCandidates.
> +        { "name": "PreferredSuccessors", "type": "[]int32", "versions":
> "0",
> +          "about": "A sorted list of preferred successors to start the
> election" },
> +        { "name": "PreferredCandidates", "type": "[]ReplicaInfo",
> "versions": "1+",
> +          "about": "A sorted list of preferred candidates to start the
> election", "fields": [

Notice that the PreferredSuccessors field is only for version 0 while
the PreferredCandidate field is for version 1 or greater. I had to
create a new field because arrays of int32 ([]int32) are not
compatible with arrays of structs because of tagged fields in
sub-structs.

> 37. If we don't support batching in AddVoterResponse, RemoveVoterResponse
> and UpdateVoterResponse, should we combine CurrentLeader and NodeEndpoint
> into a single field?

Yes. I replaced the LeaderIdAndEpoch and NodeEpoint structs into a
single struct that contains the leader id, epoch, host and port.

> 42. We include VoterId and VoterUuid for the receiver in Vote and
> BeginQuorumEpoch requests, but not in EndQuorumEpoch, Fetch and
> FetchSnapshot. Could you explain how they are used?

For the Vote request and BeginQuorumEpoch request the replica
(candidate for Vote and leader for BeginQuorumEpoch) sending the
request needs to make sure that it is sending the request to the
correct node. This is needed for correctness. The most important case
that I wanted to make sure that replicas handle correctly is the
following:
1. Voter set is A, B, C  and the leader is A. The voter A is both the
voter id and voter uuid
2. Assume that A crashes and loses its disk. When it recovers it will
come back as A'. A' means a replica with the same id but with a
different replica uuid.

Replica A' will most likely be accepting connection and handling
requests (e.g. Vote and BeginQuorumEpoch) on the same endpoints as A.
There can be inconsistency in the state, if for example replica B
sends a Vote request to A' but A' handles it as if it was A. This is
the reason the sender sends the remote replica's id and uuid (VoterId
and VoterUuid) in the request. The same analysis applies to
BeginEpochQuorum.

For the Fetch and FetchSnapshot request the closest equivalent would
be leader id and leader epoch. Those RPCs only have leader epochs. You
can argue that they don't need the leader id because a leader epoch
can have at most one leader id. In other words, the leader epoch also
uniquely identifies the leader id if there is one. I am reluctant to
change the Fetch RPC unless it is strictly required because that RPC
is also used for regular topic partition replication. I tried to make
the FetchSnapshot RPC as consistent to the Fetch RPC as possible since
they have similar access patterns.

EndQuorumEpoch is not needed for correctness. It is there for improved
availability; to speedup leadership change when the nodes are
cooperating (controlled shutdown and resignation). The sending replica
(leader) doesn't need to wait for the response or to check that the
RPC was handled correctly.

I'll reread the KIP and update it to better explain the need for
VoteId and VoteUuid in the Vote and BeginQuorumEpoch requests.

Thanks,
-- 
-José

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Jose,

Thanks for the reply.

20.1. It seems that the PreferredSuccessors field didn't get fixed. It's
still there together with PreferredCandidates.
+        { "name": "PreferredSuccessors", "type": "[]int32", "versions":
"0",
+          "about": "A sorted list of preferred successors to start the
election" },
+        { "name": "PreferredCandidates", "type": "[]ReplicaInfo",
"versions": "1+",
+          "about": "A sorted list of preferred candidates to start the
election", "fields": [

37. If we don't support batching in AddVoterResponse, RemoveVoterResponse
and UpdateVoterResponse, should we combine CurrentLeader and NodeEndpoint
into a single field?

42. We include VoterId and VoterUuid for the receiver in Vote and
BeginQuorumEpoch requests, but not in EndQuorumEpoch, Fetch and
FetchSnapshot. Could you explain how they are used?

Jun

On Wed, Mar 6, 2024 at 8:53 AM José Armando García Sancio
<js...@confluent.io.invalid> wrote:

> Hi Jun,
>
> See my comments below.
>
> On Tue, Mar 5, 2024 at 2:57 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> > 37. We don't batch multiple topic partitions in AddVoter, RemoveVoter and
> > UpdateVoter requests while other requests like Vote and BeginQuorumEpoch
> > support batching. Should we make them consistent?
>
> Originally I had them as batched RPCs but decided to make them only
> operate on one KRaft topic partition. I made this change primarily
> because this is false sharing and batching. Topic partitions in KRaft
> are independent; those operations will be handled independently and
> committed independently but because of the batching the kraft node
> will be required to wait on all the batched operations before it can
> send the response.
>
> I know that this is inconsistent with the other RPCs but I am hesitant
> to propagate this incorrect batching to new RPCs.
>
> > 38. BeginQuorumEpochRequest: It seems that we need to replace the name
> > field with a nodeId field in LeaderEndpoints?
>
> The schema looks good to me. This struct has a different meaning from
> the struct in the response of other RPCs. The BeginQuorumEpoch request
> is sent by the leader so the expectation is that the sending
> node/replica is the leader for all of the partitions sent. This also
> means that the endpoints sent in LeaderEndpoints are all for the same
> leader (or replica). The reason that BeginQuorumEpoch sends multiple
> endpoints is because the leader may be binding to multiple listeners.
> The leader sends a tuple (listener name, host name, host port) for
> each of its own advertised controller listeners.
>
> > 39. VoteRequest: Will the Voter ever be different from the Candidate? I
> > thought that in all the VoteRequests, the voter just votes for itself.
>
> Yes. This is a naming that always confuses me but makes sense to me
> after further analysis. The voter (VoterId and VoterUuid) is the
> replica receiving the Vote request and potentially voting for the
> sender (candidate). The candidate (CandidateId and CandidateUuid) is
> the replica sending the Vote request and asking for votes from the
> receivers (voters). I tried to better document it in their respective
> "about" schema fields.
>
> > 40. EndQuorumEpochRequest: Should we add a replicaUUID field to pair with
> > LeaderId?
>
> I don't think so. We can add it for consistency and to help debugging
> but I don't think it is needed for correctness. A leader cannot
> (should not) change replica uuid and remain leader. In theory the only
> way for a replica to change uuid is to lose their disk. If this
> happens the expectation is that they will also lose their
> QuorumStateData.
>
> > 41. Regarding including replica UUID to identify a voter: It adds a bit
> of
> > complexity. Could you explain whether it is truly needed? Before this
> KIP,
> > KRaft already supports replacing a disk on the voter node, right?
>
> Yes. This KIP attempts to solve two general problems. 1) How to
> proactively change the voters set by increasing or decreasing the
> replication factor; or replace voters in the voters set. 2) Identify
> disk failures and recover from them safely. This is what I have in the
> Motivation section:
> "Consensus on the cluster metadata partition was achieved by the
> voters (controllers). If the operator of a KRaft cluster wanted to
> make changes to the set of voters, they would have to shutdown all of
> the controllers nodes and manually make changes to the on-disk state
> of the old controllers and new controllers. If the operator wanted to
> replace an existing voter because of a disk failure or general
> hardware failure, they would have to make sure that the new voter node
> has a superset of the previous voter's on-disk state. Both of these
> solutions are manual and error prone."
>
> directory.id (replica uuid) is needed to identify and resolve disk
> failures in a voter. The section "Proposed change / User explanation /
> Common scenarios / Disk failure recovery" documents this use case in
> more detail.
>
> Thanks,
> --
> -José
>

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by José Armando García Sancio <js...@confluent.io.INVALID>.
Hi Jun,

See my comments below.

On Tue, Mar 5, 2024 at 2:57 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> 37. We don't batch multiple topic partitions in AddVoter, RemoveVoter and
> UpdateVoter requests while other requests like Vote and BeginQuorumEpoch
> support batching. Should we make them consistent?

Originally I had them as batched RPCs but decided to make them only
operate on one KRaft topic partition. I made this change primarily
because this is false sharing and batching. Topic partitions in KRaft
are independent; those operations will be handled independently and
committed independently but because of the batching the kraft node
will be required to wait on all the batched operations before it can
send the response.

I know that this is inconsistent with the other RPCs but I am hesitant
to propagate this incorrect batching to new RPCs.

> 38. BeginQuorumEpochRequest: It seems that we need to replace the name
> field with a nodeId field in LeaderEndpoints?

The schema looks good to me. This struct has a different meaning from
the struct in the response of other RPCs. The BeginQuorumEpoch request
is sent by the leader so the expectation is that the sending
node/replica is the leader for all of the partitions sent. This also
means that the endpoints sent in LeaderEndpoints are all for the same
leader (or replica). The reason that BeginQuorumEpoch sends multiple
endpoints is because the leader may be binding to multiple listeners.
The leader sends a tuple (listener name, host name, host port) for
each of its own advertised controller listeners.

> 39. VoteRequest: Will the Voter ever be different from the Candidate? I
> thought that in all the VoteRequests, the voter just votes for itself.

Yes. This is a naming that always confuses me but makes sense to me
after further analysis. The voter (VoterId and VoterUuid) is the
replica receiving the Vote request and potentially voting for the
sender (candidate). The candidate (CandidateId and CandidateUuid) is
the replica sending the Vote request and asking for votes from the
receivers (voters). I tried to better document it in their respective
"about" schema fields.

> 40. EndQuorumEpochRequest: Should we add a replicaUUID field to pair with
> LeaderId?

I don't think so. We can add it for consistency and to help debugging
but I don't think it is needed for correctness. A leader cannot
(should not) change replica uuid and remain leader. In theory the only
way for a replica to change uuid is to lose their disk. If this
happens the expectation is that they will also lose their
QuorumStateData.

> 41. Regarding including replica UUID to identify a voter: It adds a bit of
> complexity. Could you explain whether it is truly needed? Before this KIP,
> KRaft already supports replacing a disk on the voter node, right?

Yes. This KIP attempts to solve two general problems. 1) How to
proactively change the voters set by increasing or decreasing the
replication factor; or replace voters in the voters set. 2) Identify
disk failures and recover from them safely. This is what I have in the
Motivation section:
"Consensus on the cluster metadata partition was achieved by the
voters (controllers). If the operator of a KRaft cluster wanted to
make changes to the set of voters, they would have to shutdown all of
the controllers nodes and manually make changes to the on-disk state
of the old controllers and new controllers. If the operator wanted to
replace an existing voter because of a disk failure or general
hardware failure, they would have to make sure that the new voter node
has a superset of the previous voter's on-disk state. Both of these
solutions are manual and error prone."

directory.id (replica uuid) is needed to identify and resolve disk
failures in a voter. The section "Proposed change / User explanation /
Common scenarios / Disk failure recovery" documents this use case in
more detail.

Thanks,
-- 
-José

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Jose,

Thanks for the reply.

30. So raft.version controls the version of Fetch among the voters. It
would be useful to document that.

36. Option 1 is fine. Could we document this in the section of
"Bootstrapping with multiple voters"?

37. We don't batch multiple topic partitions in AddVoter, RemoveVoter and
UpdateVoter requests while other requests like Vote and BeginQuorumEpoch
support batching. Should we make them consistent?

38. BeginQuorumEpochRequest: It seems that we need to replace the name
field with a nodeId field in LeaderEndpoints?

39. VoteRequest: Will the Voter ever be different from the Candidate? I
thought that in all the VoteRequests, the voter just votes for itself.

40. EndQuorumEpochRequest: Should we add a replicaUUID field to pair with
LeaderId?

41. Regarding including replica UUID to identify a voter: It adds a bit of
complexity. Could you explain whether it is truly needed? Before this KIP,
KRaft already supports replacing a disk on the voter node, right?

Jun

On Mon, Mar 4, 2024 at 2:55 PM José Armando García Sancio
<js...@confluent.io.invalid> wrote:

> Hi Jun,
>
> Thanks for the feedback. See my comments below.
>
> On Fri, Mar 1, 2024 at 11:36 AM Jun Rao <ju...@confluent.io.invalid> wrote:
> > 30. Historically, we used MV to gate the version of Fetch request. Are
> you
> > saying that voters will ignore MV and only depend on raft.version when
> > choosing the version of Fetch request?
>
> Between Kafka servers/nodes (brokers and controllers) there are two
> implementations for the Fetch RPC.
>
> One, is the one traditionally used between brokers to replicate ISR
> based topic partitions. As you point out Kafka negotiates those
> versions using the IBP for ZK-based clusters and MV for KRaft-based
> clusters. This KIP doesn't change that. There have been offline
> conversations of potentially using the ApiVersions to negotiate those
> RPC versions but that is outside the scope of this KIP.
>
> Two, is the KRaft implementation. As of today only the controller
> listeners  (controller.listener.names) implement the request handlers
> for this version of the Fetch RPC. KafkaRaftClient implements the
> client side of this RPC. This version of the Fetch RPC is negotiated
> using ApiVersions.
>
> I hope that clarifies the two implementations. On a similar note,
> Jason and I did have a brief conversation regarding if KRaft should
> use a different RPC from Fetch to replicate the log of KRaft topic
> partition. This could be a long term option to make these two
> implementations clearer and allow them to diverge. I am not ready to
> tackle that problem in this KIP.
>
> > 35. Upgrading the controller listeners.
> > 35.1 So, the protocol is that each controller will pick the first
> listener
> > in controller.listener.names to initiate a connection?
>
> Yes. The negative of this solution is that it requires 3 rolls of
> voters (controllers) and 1 roll of observers (brokers) to replace a
> voter endpoint. In the future, we can have a solution that initiates
> the connection based on the state of the VotersRecord for voters RPCs.
> That solution can replace an endpoint with 2 rolls of voters and 1
> roll of observers.
>
> > 35.2 Should we include the new listeners in the section "Change the
> > controller listener in the brokers"?
>
> Yes. We need to. The observers (brokers) need to know what security
> protocol to use to connect to the endpoint(s) in
> controller.quorum.bootstrap.servers. This is also how connections to
> controller.quorum.voters work today.
>
> > 35.3 For every RPC that returns the controller leader, do we need to
> > return multiple endpoints?
>
> KRaft only needs to return the endpoint associated with the listener
> used to send the RPC request. This is similar to how the Metadata RPC
> works. The Brokers field in the Metadata response only returns the
> endpoints that match the listener used to receive the Metadata
> request.
>
> This is the main reason why KRaft needs to initiate connections using
> a security protocol (listener name) that is supported by all of the
> replicas. All of the clients (voters and observers) need to know
> (security protocol) how to connect to the redirection endpoint. All of
> the voters need to be listening on that listener name so that
> redirection works no matter the leader.
>
> > 35.4 The controller/observer can now get the endpoint from both records
> and
> > RPCs. Which one takes precedence? For example, suppose that a voter is
> down
> > for a while. It's started and gets the latest listener for the leader
> from
> > the initial fetch response. When fetching the records, it could see an
> > outdated listener. If it picks up this listener, it may not be able to
> > connect to the leader.
>
> Yeah. This is where connection and endpoint management gets tricky.
> This is my implementation strategy:
>
> 1. For the RPCs Vote, BeginQuorumEpoch and EndQuorumEpoch the replicas
> (votes) will always initiate connections using the endpoints described
> in the VotersRecord (or controller.quorum.voters for kraft.version 0).
> 2. For the Fetch RPC when the leader is not known, the replicas will
> use the endpoints in controller.quorum.bootstrap.servers (or
> controller.quorum.voters for kraft.version 0). This is how the
> replicas (observers) normally discover the latest leader.
> 2. For the Fetch and FetchSnapshot RPC when the leader is known, the
> replicas use the endpoint that was discovered through previous RPC
> response(s) or the endpoint in the BeginQuorumEpoch request.
>
> I have been thinking a lot about this and this is the most consistent
> and deterministic algorithm that I can think of. We should be able to
> implement a different algorithm in the future without changing the
> protocol or KIP.
>
> > 36. Bootstrapping with multiple voters: How does a user get the replica
> > uuid? In that case, do we use the specified replica uuid instead of a
> > randomly generated one in the meta.properties file in metadata.log.dir?
>
> There are two options:
> 1. They generate the directory.id for all of the voters using
> something like "kafka-storage random-uuid" and specify those in
> "kafka-storage format --controller-quorum-voters". This is the safest
> option as it can detect disk replacement from bootstrap.
>
> 2. They only specify the replica id, host and endpoint with
> "kafka-storage format --controller-quorum-voters
> 0@controller-0:1234,1@controller-1:1234,2@controller-2:1234" and let
> the KRaft leader discover the directory ids (replica uuid). This is
> not as safe as the first option as KRaft won't be able to detect disk
> failure during this discovery phase.
>
> This is described briefly in the "Automatic endpoint and directory id
> discovery" section:
> "For directory id, the leader will only override it if it was not
> previously set. This behavior is useful when a cluster gets upgraded
> to a kraft.version greater than 0."
>
> I see that this description is missing from the UpdateVoter handling
> section. Let me update that section.
>
> Thanks,
> --
> -José
>

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by José Armando García Sancio <js...@confluent.io.INVALID>.
Hi Jun,

Thanks for the feedback. See my comments below.

On Fri, Mar 1, 2024 at 11:36 AM Jun Rao <ju...@confluent.io.invalid> wrote:
> 30. Historically, we used MV to gate the version of Fetch request. Are you
> saying that voters will ignore MV and only depend on raft.version when
> choosing the version of Fetch request?

Between Kafka servers/nodes (brokers and controllers) there are two
implementations for the Fetch RPC.

One, is the one traditionally used between brokers to replicate ISR
based topic partitions. As you point out Kafka negotiates those
versions using the IBP for ZK-based clusters and MV for KRaft-based
clusters. This KIP doesn't change that. There have been offline
conversations of potentially using the ApiVersions to negotiate those
RPC versions but that is outside the scope of this KIP.

Two, is the KRaft implementation. As of today only the controller
listeners  (controller.listener.names) implement the request handlers
for this version of the Fetch RPC. KafkaRaftClient implements the
client side of this RPC. This version of the Fetch RPC is negotiated
using ApiVersions.

I hope that clarifies the two implementations. On a similar note,
Jason and I did have a brief conversation regarding if KRaft should
use a different RPC from Fetch to replicate the log of KRaft topic
partition. This could be a long term option to make these two
implementations clearer and allow them to diverge. I am not ready to
tackle that problem in this KIP.

> 35. Upgrading the controller listeners.
> 35.1 So, the protocol is that each controller will pick the first listener
> in controller.listener.names to initiate a connection?

Yes. The negative of this solution is that it requires 3 rolls of
voters (controllers) and 1 roll of observers (brokers) to replace a
voter endpoint. In the future, we can have a solution that initiates
the connection based on the state of the VotersRecord for voters RPCs.
That solution can replace an endpoint with 2 rolls of voters and 1
roll of observers.

> 35.2 Should we include the new listeners in the section "Change the
> controller listener in the brokers"?

Yes. We need to. The observers (brokers) need to know what security
protocol to use to connect to the endpoint(s) in
controller.quorum.bootstrap.servers. This is also how connections to
controller.quorum.voters work today.

> 35.3 For every RPC that returns the controller leader, do we need to
> return multiple endpoints?

KRaft only needs to return the endpoint associated with the listener
used to send the RPC request. This is similar to how the Metadata RPC
works. The Brokers field in the Metadata response only returns the
endpoints that match the listener used to receive the Metadata
request.

This is the main reason why KRaft needs to initiate connections using
a security protocol (listener name) that is supported by all of the
replicas. All of the clients (voters and observers) need to know
(security protocol) how to connect to the redirection endpoint. All of
the voters need to be listening on that listener name so that
redirection works no matter the leader.

> 35.4 The controller/observer can now get the endpoint from both records and
> RPCs. Which one takes precedence? For example, suppose that a voter is down
> for a while. It's started and gets the latest listener for the leader from
> the initial fetch response. When fetching the records, it could see an
> outdated listener. If it picks up this listener, it may not be able to
> connect to the leader.

Yeah. This is where connection and endpoint management gets tricky.
This is my implementation strategy:

1. For the RPCs Vote, BeginQuorumEpoch and EndQuorumEpoch the replicas
(votes) will always initiate connections using the endpoints described
in the VotersRecord (or controller.quorum.voters for kraft.version 0).
2. For the Fetch RPC when the leader is not known, the replicas will
use the endpoints in controller.quorum.bootstrap.servers (or
controller.quorum.voters for kraft.version 0). This is how the
replicas (observers) normally discover the latest leader.
2. For the Fetch and FetchSnapshot RPC when the leader is known, the
replicas use the endpoint that was discovered through previous RPC
response(s) or the endpoint in the BeginQuorumEpoch request.

I have been thinking a lot about this and this is the most consistent
and deterministic algorithm that I can think of. We should be able to
implement a different algorithm in the future without changing the
protocol or KIP.

> 36. Bootstrapping with multiple voters: How does a user get the replica
> uuid? In that case, do we use the specified replica uuid instead of a
> randomly generated one in the meta.properties file in metadata.log.dir?

There are two options:
1. They generate the directory.id for all of the voters using
something like "kafka-storage random-uuid" and specify those in
"kafka-storage format --controller-quorum-voters". This is the safest
option as it can detect disk replacement from bootstrap.

2. They only specify the replica id, host and endpoint with
"kafka-storage format --controller-quorum-voters
0@controller-0:1234,1@controller-1:1234,2@controller-2:1234" and let
the KRaft leader discover the directory ids (replica uuid). This is
not as safe as the first option as KRaft won't be able to detect disk
failure during this discovery phase.

This is described briefly in the "Automatic endpoint and directory id
discovery" section:
"For directory id, the leader will only override it if it was not
previously set. This behavior is useful when a cluster gets upgraded
to a kraft.version greater than 0."

I see that this description is missing from the UpdateVoter handling
section. Let me update that section.

Thanks,
-- 
-José

Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Jose,

Thanks for the reply.

30. Historically, we used MV to gate the version of Fetch request. Are you
saying that voters will ignore MV and only depend on raft.version when
choosing the version of Fetch request?

35. Upgrading the controller listeners.
35.1 So, the protocol is that each controller will pick the first listener
in controller.listener.names to initiate a connection?
35.2 Should we include the new listeners in the section "Change the
controller listener in the brokers"?
35.3 For every RPC that returns the controller leader, do we need to
return multiple endpoints?
35.4 The controller/observer can now get the endpoint from both records and
RPCs. Which one takes precedence? For example, suppose that a voter is down
for a while. It's started and gets the latest listener for the leader from
the initial fetch response. When fetching the records, it could see an
outdated listener. If it picks up this listener, it may not be able to
connect to the leader.

36. Bootstrapping with multiple voters: How does a user get the replica
uuid? In that case, do we use the specified replica uuid instead of a
randomly generated one in the meta.properties file in metadata.log.dir?

Jun


On Fri, Mar 1, 2024 at 10:51 AM José Armando García Sancio
<js...@confluent.io.invalid> wrote:

> Hi Jun,
>
> Thanks for the feedback. See my comments below.
>
> On Tue, Feb 27, 2024 at 11:27 AM Jun Rao <ju...@confluent.io.invalid> wrote:
> > 30. Who controls RPCs like Fetch, FetchSnapshot, DescribeQuorum RPC? They
> > are shared between voters and observers.
>
> For Fetch and FetchSnapshot, this KIP adds the tagged field
> ReplicaUuid to the request. This means that if the sender supports the
> latest version it can always add the replica uuid to the request. If
> the receiver supports the new tagged field it is included in the
> appropriate FetchRequestData and FetchSnapshotRequestData field. If it
> doesn't support the new tagged field it will be in the unknown tagged
> fields.
>
> For DescribeQuorum, this KIP only changes the response. The KRaft
> leader will inspect the RequestHeader::apiVersion to determine what
> information to include in the response.
>
> > Does the client use supported ApiKeys or kraft.version feature in
> > ApiVersions response for deciding whether to send AddVoter requests?
>
> That's a good point. The Admin client has access to the finalized
> kraft.version in the ApiVersions response. I was thinking of only
> using the ApiKeys since we don't have a precedence of using the
> finalized features in the Admin client. The receiver of the request
> still needs to validate the kraft.version for those requests and
> return an UNSUPPORTED_VERSION error in those cases.
>
> Do you mind if we define this in a separate KIP?
>
> Thanks,
> --
> -José
>