You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "A. Sophie Blee-Goldman (Jira)" <ji...@apache.org> on 2021/07/08 22:45:00 UTC

[jira] [Updated] (KAFKA-12477) Smart rebalancing with dynamic protocol selection

     [ https://issues.apache.org/jira/browse/KAFKA-12477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

A. Sophie Blee-Goldman updated KAFKA-12477:
-------------------------------------------
    Fix Version/s:     (was: 3.0.0)
                   3.1.0

> Smart rebalancing with dynamic protocol selection
> -------------------------------------------------
>
>                 Key: KAFKA-12477
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12477
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: A. Sophie Blee-Goldman
>            Assignee: A. Sophie Blee-Goldman
>            Priority: Major
>             Fix For: 3.1.0
>
>
> Users who want to upgrade their applications and enable the COOPERATIVE rebalancing protocol in their consumer apps are required to follow a double rolling bounce upgrade path. The reason for this is laid out in the [Consumer Upgrades|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer] section of KIP-429. Basically, the ConsumerCoordinator picks a rebalancing protocol in its constructor based on the list of supported partition assignors. The protocol is selected as the highest protocol that is commonly supported by all assignors in the list, and never changes after that.
> This is a bit unfortunate because it may end up using an older protocol even after every member in the group has been updated to support the newer protocol. After the first rolling bounce of the upgrade, all members will have two assignors: "cooperative-sticky" and "range" (or sticky/round-robin/etc). At this point the EAGER protocol will still be selected due to the presence of the "range" assignor, but it's the "cooperative-sticky" assignor that will ultimately be selected for use in rebalances if that assignor is preferred (ie positioned first in the list). The only reason for the second rolling bounce is to strip off the "range" assignor and allow the upgraded members to switch over to COOPERATIVE. We can't allow them to use cooperative rebalancing until everyone has been upgraded, but once they have it's safe to do so.
> And there is already a way for the client to detect that everyone is on the new byte code: if the CooperativeStickyAssignor is selected by the group coordinator, then that means it is supported by all consumers in the group and therefore everyone must be upgraded. 
> We may be able to save the second rolling bounce by dynamically updating the rebalancing protocol inside the ConsumerCoordinator as "the highest protocol supported by the assignor chosen by the group coordinator". This means we'll still be using EAGER at the first rebalance, since we of course need to wait for this initial rebalance to get the response from the group coordinator. But we should take the hint from the chosen assignor rather than dropping this information on the floor and sticking with the original protocol.
> Concrete Proposal:
> This assumes we will change the default assignor to ["cooperative-sticky", "range"] in KIP-726. It also acknowledges that users may attempt any kind of upgrade without reading the docs, and so we need to put in safeguards against data corruption rather than assume everyone will follow the safe upgrade path.
> With this proposal, 
> 1) New applications on 3.0 will enable cooperative rebalancing by default
> 2) Existing applications which don’t set an assignor can safely upgrade to 3.0 using a single rolling bounce with no extra steps, and will automatically transition to cooperative rebalancing
> 3) Existing applications which do set an assignor that uses EAGER can likewise upgrade their applications to COOPERATIVE with a single rolling bounce
> 4) Once on 3.0, applications can safely go back and forth between EAGER and COOPERATIVE
> 5) Applications can safely downgrade away from 3.0
> The high-level idea for dynamic protocol upgrades is that the group will leverage the assignor selected by the group coordinator to determine when it’s safe to upgrade to COOPERATIVE, and trigger a fail-safe to protect the group in case of rare events or user misconfiguration. The group coordinator selects the most preferred assignor that’s supported by all members of the group, so we know that all members will support COOPERATIVE once we receive the “cooperative-sticky” assignor after a rebalance. At this point, each member can upgrade their own protocol to COOPERATIVE. However, there may be situations in which an EAGER member may join the group even after upgrading to COOPERATIVE. For example, during a rolling upgrade if the last remaining member on the old bytecode misses a rebalance, the other members will be allowed to upgrade to COOPERATIVE. If the old member rejoins and is chosen to be the group leader before it’s upgraded to 3.0, it won’t be aware that the other members of the group have not yet revoked their partitions when computing the assignment.
> Short Circuit:
> The risk of mixing the cooperative and eager rebalancing protocols is that a partition may be assigned to one member while it has yet to be revoked from its previous owner. The danger is that the new owner may begin processing and committing offsets for this partition while the previous owner is also committing offsets in its #onPartitionsRevoked callback, which is invoked at the end of the rebalance in the cooperative protocol. This can result in these consumers overwriting each other’s offsets and getting a corrupted view of the partition. Note that it’s not possible to commit during a rebalance, so we can protect against offset corruption by blocking further commits after we detect that the group leader may not understand COOPERATIVE, but before we invoke #onPartitionsRevoked. This is the “short-circuit” — if we detect that the group is in an unsafe state, we invoke #onPartitionsLost instead of #onPartitionsRevoked and explicitly prevent offsets from being committed on those revoked partitions.
> Consumer procedure:
> Upon startup, the consumer will initially select the highest commonly-supported protocol across its configured assignors. With ["cooperative-sticky", "range”], the initial protocol will be EAGER when the member first joins the group. Following a rebalance, each member will check the selected assignor. If the chosen assignor supports COOPERATIVE, the member can upgrade their used protocol to COOPERATIVE and no further action is required. If the member is already on COOPERATIVE but the selected assignor does NOT support it, then we need to trigger the short-circuit. In this case we will invoke #onPartitionsLost instead of #onPartitionsRevoked, and set a flag to block any attempts at committing those partitions which have been revoked. If a commit is attempted, as may be the case if the user does not implement #onPartitionsLost (see KAFKA-12638), we will throw a CommitFailedException which will be bubbled up through poll() after completing the rebalance. The member will then downgrade its protocol to EAGER for the next rebalance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)