You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Igor Soarez (Jira)" <ji...@apache.org> on 2022/05/07 14:39:00 UTC

[jira] [Comment Edited] (KAFKA-9837) New RPC for notifying controller of failed replica

    [ https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17533287#comment-17533287 ] 

Igor Soarez edited comment on KAFKA-9837 at 5/7/22 2:38 PM:
------------------------------------------------------------

I'm trying to figure out what's the best way forward here.

It is nice that the controller does not decide which log directory gets to host each replica. This can allow each broker to make better allocation decisions, e.g. based on disk usage information. This benefit was also discussed as part of KIP-589 so I think we should try to keep that property if possible.

It would be useful to clarify our concerns here a bit further.

In ZooKeeper mode the broker notifies the controller via a z-node update, which then triggers a full LeaderAndIsr request back to the broker, and relies on per partition errors to identify the failed replicas. The approach in [~dengziming] 's PR improves this with a single request that includes all the failed replicas. So this is already an improvement, as we go from:

{{Broker updates log dir failure z-node without specific replica information -> ZK updates the z-node -> z-node watch triggered in Controller -> Controller fetch notification -> ZK replies with notifications -> Controller sends LeaderAndIsr -> Broker answers -> Controller identifies failed replicas}}

To:

{{Broker sends full list of failed replicas -> Controller identifies failed replicas}}

However, if there are a lot of replicas in the same log directory that can mean both that:
 # The RPC from the broker to the controller indicating failed replicas in the log directory can be a very large request including O ( n ) replicas.
 # The metadata records generated in the KRaft controller are O ( n ).

Which of these are we aiming to tackle [~cmccabe] ? I may be missing some other concern here, please let me know.

I also have a couple of questions regarding each of these concerns:

Regarding 1., if we use Topic IDs instead of topic names the request it is likely to be smaller and have a more predictable size. e.g. offlining one million partitions should be about a 20 MB request (1M * (128 bits topic ID + 32 bits partition ID)), is this unreasonable?

Regarding 2., currently, when a broker is fenced, metadata records are generated for every changed ISR set. That'll include all log directories, so this same concern should apply also for fenced brokers. Do we want to update this mechanism as well to have a single record identify all failed partitions?

One way to prevent O ( n ) requests and updates is to replace the ALTER_REPLICA_STATE RPC proposed in KIP-589 with a different set of RPCs:
 * ASSIGN_REPLICAS_TO_FAIL_GROUP - Each broker sends this RPC to the controller after assigning one or more
  replicas to any log directory, before activating the replica. It associates each replica to its assigned log directory. It introduces some extra delay in assigning new replicas to brokers but to the extra roundtrip and batching but that might be okay.
 * FAIL_GROUP - When a log directory fails, a single small request with the
  broker ID, and failure group indicates that all previously associated replicas are now offline.

The metadata log would include equivalent records to these RPCs. A metadata delta would be able to identify all the fail replicas from the group.

We can also use the same concept - the conceptual grouping replicas into "failure groups/domains" to minimize the metadata updates when a broker is fenced.

WDYT? If this sounds like it may be the general right direction I can work on a KIP.

 

 


was (Author: soarez):
I'm trying to figure out what's the best way forward here.

It is nice that the controller does not decide which log directory gets to host each replica. This can allow each broker to make better allocation decisions, e.g. based on disk usage information. This benefit was also discussed as part of KIP-589 so I think we should try to keep that property if possible.

It would be useful to clarify our concerns here a bit further.

In ZooKeeper mode the broker notifies the controller via a z-node update, which then triggers a full LeaderAndIsr request back to the broker, and relies on per partition errors to identify the failed replicas. The approach in [~dengziming] 's PR improves this with a single request that includes all the failed replicas. So this is already an improvement, as we go from:

{{Broker updates log dir failure z-node without specific replica information -> ZK updates the z-node -> z-node watch triggered in Controller -> Controller fetch notification -> ZK replies with notifications -> Controller sends LeaderAndIsr -> Broker answers -> Controller identifies failed replicas}}

To:

{{Broker sends full list of failed replicas -> Controller identifies failed replicas}}

However, if there are a lot of replicas in the same log directory that can mean both that:
 # The RPC from the broker to the controller indicating failed replicas in the log directory can be a very large request including O ( n ) replicas.
 # The metadata records generated in the KRaft controller are O ( n ).

Which of these are we aiming to tackle [~cmccabe] ? I may be missing some other concern here, please let me know.

I also have a couple of questions regarding each of these concerns:

Regarding 1., if we use Topic IDs instead of topic names the request it is likely to be smaller and have a more predictable size. e.g. offlining one million partitions should be about a 20 MB request (1M * (128 bits topic ID + 32 bits partition ID)), is this unreasonable?

Regarding 2., currently, when a broker is fenced, metadata records are generated for every changed ISR set. That'll include all log directories, so this same concern should apply also for fenced brokers. Do we want to update this mechanism as well to have a single record identify all failed partitions?

One way to prevent O(n) requests and updates is to replace the ALTER_REPLICA_STATE RPC proposed in KIP-589 with a different set of RPCs:
 * ASSIGN_REPLICAS_TO_FAIL_GROUP - Each broker sends this RPC to the controller after assigning one or more
  replicas to any log directory, before activating the replica. It associates each replica to its assigned log directory. It introduces some extra delay in assigning new replicas to brokers but to the extra roundtrip and batching but that might be okay.
 * FAIL_GROUP - When a log directory fails, a single small request with the
  broker ID, and failure group indicates that all previously associated replicas are now offline.

The metadata log would include equivalent records to these RPCs. A metadata delta would be able to identify all the fail replicas from the group.

We can also use the same concept - the conceptual grouping replicas into "failure groups/domains" to minimize the metadata updates when a broker is fenced.

WDYT? If this sounds like it may be the general right direction I can work on a KIP.

 

 

> New RPC for notifying controller of failed replica
> --------------------------------------------------
>
>                 Key: KAFKA-9837
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9837
>             Project: Kafka
>          Issue Type: New Feature
>          Components: controller, core
>            Reporter: David Arthur
>            Assignee: dengziming
>            Priority: Major
>              Labels: kip-500
>
> This is the tracking ticket for [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller]. For the bridge release, brokers should no longer use ZooKeeper to notify the controller that a log dir has failed. It should instead use an RPC mechanism.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)