You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/18 01:03:40 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #10345: KAFKA-12477: dynamically upgrade rebalancing protocol based on selected assignor

ableegoldman opened a new pull request #10345:
URL: https://github.com/apache/kafka/pull/10345


   To allow users to avoid the second rolling bounce when upgrading to cooperative rebalancing, we can dynamically upgrade the rebalancing protocol based on the assignor that's selected during a rebalance. This assignor is chosen by the group coordinator as the most-preferred assignor which is commonly supported by all members of the group. The assignor preference is just tied to the order in which they are listed in the `partition.assignment.strategy` config.
   
   With this in place, a user can upgrade to cooperative rebalancing following a single rolling bounce. During the upgrade they must add the "cooperative-sticky" assignor to the front of the `partition.assignment.strategy` list. If the application hasn't set this config, then they will need to set it to `["cooperative-sticky", "range"]`. After the rolling bounce is complete, all of the members will upgrade their protocol to COOPERATIVE.
   
   To avoid unsafe downgrades to versions which don't support cooperative rebalancing, we throw IllegalStateException if we discover that the selected assignor's highest supported protocol is lower than the one currently being used by the consumer.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10345: KAFKA-12477: dynamically upgrade rebalancing protocol based on selected assignor

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10345:
URL: https://github.com/apache/kafka/pull/10345#issuecomment-809585517


   Hey @C0urante & @guozhangwang , sorry for the delayed response it's been a hectic past few weeks. I think I came up with a solution to the concerns you raised that will allow us to fail-fast on downgrade, without also causing it to crash if a user ever brings up a new member with only a non-cooperative assignor whether accidentally or on purpose. You can check out my latest reply on the KIP-726 [DISCUSS] thread for more details. Lmk what you think (I haven't thought through all the details yet but I'm optimistic)
   
   I'll try to get back to this PR sometime this week


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10345: KAFKA-12477: dynamically upgrade rebalancing protocol based on selected assignor

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10345:
URL: https://github.com/apache/kafka/pull/10345#issuecomment-801539881


   cc @C0urante 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante edited a comment on pull request #10345: KAFKA-12477: dynamically upgrade rebalancing protocol based on selected assignor

Posted by GitBox <gi...@apache.org>.
C0urante edited a comment on pull request #10345:
URL: https://github.com/apache/kafka/pull/10345#issuecomment-801583332


   Thanks for the ping @ableegoldman! If I'm reading this correctly, the entire consumer group will throw an exception from `poll` whenever a new member joins with an outdated protocol, except for that new member, which doesn't know that anything is wrong since it hasn't detected the downgrade in protocol.
   
   If that's correct, I'm wondering if there's a way to soften the blow here... would it be viable for a consumer to permit and handle the downgrade in protocol by revoking all previously-held partitions and then claiming only the ones provided in the new assignment? Given the choice between a revert to eager rebalancing and having most of the group die the latter seems like it might be more user-friendly.
   
   I'm optimistic that if we do that in `onJoinComplete` we can avoid having multiple consumers process the same topic partition(s) at once, but haven't been able to confirm this just yet.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10345: KAFKA-12477: dynamically upgrade rebalancing protocol based on selected assignor

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #10345:
URL: https://github.com/apache/kafka/pull/10345#issuecomment-801583332


   Thanks for the ping @ableegoldman! If I'm reading this correctly, the entire consumer group dies whenever a new member joins with an outdated protocol, except for that new member, which doesn't know that anything is wrong since it hasn't detected the downgrade in protocol.
   
   If that's correct, I'm wondering if there's a way to soften the blow here... would it be viable for a consumer to permit and handle the downgrade in protocol by revoking all previously-held partitions and then claiming only the ones provided in the new assignment? Given the choice between a revert to eager rebalancing and having most of the group die the latter seems like it might be more user-friendly.
   
   I'm optimistic that if we do that in `onJoinComplete` we can avoid having multiple consumers process the same topic partition(s) at once, but haven't been able to confirm this just yet.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante edited a comment on pull request #10345: KAFKA-12477: dynamically upgrade rebalancing protocol based on selected assignor

Posted by GitBox <gi...@apache.org>.
C0urante edited a comment on pull request #10345:
URL: https://github.com/apache/kafka/pull/10345#issuecomment-801583332


   Thanks for the ping @ableegoldman! If I'm reading this correctly, the entire consumer group will throw an exception from `poll` whenever a new member joins with an outdated protocol, except for that new member, which doesn't know that anything is wrong since it hasn't detected the downgrade in protocol.
   
   If that's correct, I'm wondering if there's a way to soften the blow here... would it be viable for a consumer to permit and handle the downgrade in protocol by revoking all previously-held partitions and then claiming only the ones provided in the new assignment? Given the choice between a revert to eager rebalancing and having most of the group die the latter seems like it might be more user-friendly.
   
   I'm optimistic that if we do that in `onJoinComplete` we can avoid having multiple consumers process the same topic partition(s) at once, but haven't been able to confirm this just yet.
   
   I'm also curious--would throwing an exception from `onJoinComplete` be enough to prevent users from re-invoking `poll` a second time with different behavior? Or would we need to cache the failed state here and do something with it on re-invocations of `poll`?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10345: KAFKA-12477: dynamically upgrade rebalancing protocol based on selected assignor

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10345:
URL: https://github.com/apache/kafka/pull/10345#issuecomment-810431786


   Sounds great, will follow-up on KIP-726.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman edited a comment on pull request #10345: KAFKA-12477: dynamically upgrade rebalancing protocol based on selected assignor

Posted by GitBox <gi...@apache.org>.
ableegoldman edited a comment on pull request #10345:
URL: https://github.com/apache/kafka/pull/10345#issuecomment-809585517


   Hey @C0urante & @guozhangwang , sorry for the delayed response it's been a hectic past few weeks. I think I came up with a solution to the concerns you raised that will allow us to fail-fast on downgrade, without also causing it to crash if a user ever brings up a new member with only a non-cooperative assignor whether accidentally or on purpose. You can check out my latest reply on the KIP-726 [DISCUSS] thread for more details. Lmk what you think (I haven't thought through all the details yet so it's possible I missed something)
   
   I'll try to get back to this PR sometime this week


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante edited a comment on pull request #10345: KAFKA-12477: dynamically upgrade rebalancing protocol based on selected assignor

Posted by GitBox <gi...@apache.org>.
C0urante edited a comment on pull request #10345:
URL: https://github.com/apache/kafka/pull/10345#issuecomment-801583332


   Thanks for the ping @ableegoldman! If I'm reading this correctly, the entire consumer group will throw an exception from `poll` whenever a new member joins with an outdated protocol, except for that new member, which doesn't know that anything is wrong since it hasn't detected the downgrade in protocol.
   
   If that's correct, I'm wondering if there's a way to soften the blow here... would it be viable for a consumer to permit and handle the downgrade in protocol by revoking all previously-held partitions and then claiming only the ones provided in the new assignment? Given the choice between a revert to eager rebalancing and having most of the group die the latter seems like it might be more user-friendly.
   
   ~I'm optimistic that if we do that in `onJoinComplete` we can avoid having multiple consumers process the same topic partition(s) at once, but haven't been able to confirm this just yet.~ Actually, thinking this through more--there's still a risk of duplicate partition processing if partitions are reassigned from a cooperative consumer to an eager one, as the cooperative consumer may not revoke those partitions before the eager consumer begins processing them.
   
   I've also toyed with the idea that the leader could detect the downgrade in protocol and respond by sending out an empty assignment to all members until everyone that joins the group sends an empty assignment in their metadata. Then, the leader could perform a normal assignment and everyone could resume processing data using the eager protocol. Unfortunately, this is pretty brittle; if an eager member becomes the leader before everyone's finished revoking their partitions, it might claim some partitions for itself that haven't been revoked yet.
   
   I'm also curious--would throwing an exception from `onJoinComplete` be enough to prevent users from re-invoking `poll` a second time with different behavior? Or would we need to cache the failed state here and do something with it on re-invocations of `poll`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10345: KAFKA-12477: dynamically upgrade rebalancing protocol based on selected assignor

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10345:
URL: https://github.com/apache/kafka/pull/10345#discussion_r598954254



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -198,6 +198,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
             Collections.sort(supportedProtocols);
 
             protocol = supportedProtocols.get(supportedProtocols.size() - 1);
+            log.debug("Using rebalance protocol {}", protocol);

Review comment:
       nit: Initializing rebalance protocol to?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -356,6 +357,23 @@ protected void onJoinComplete(int generation,
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
 
+        log.debug("{} assignor selected by the group coordinator", assignor);
+
+        List<RebalanceProtocol> assignorSupportedProtocols = assignor.supportedProtocols();
+        Collections.sort(assignorSupportedProtocols);
+        RebalanceProtocol newProtocol = assignorSupportedProtocols.get(assignorSupportedProtocols.size() - 1);
+        if (newProtocol.id() < protocol.id()) {
+            log.error("Latest commonly supported rebalance protocol is {} which is lower than the current commonly supported rebalance protocol {}. "

Review comment:
       I think a more common error scenario is when a new member joining a group with ONLY old assignor in the configs, while all the existing members in the current generation have both old and new assignors. In this case the old would have to be selected.
   
   So in the error message, we should also clarify this case, e.g. saying something that "once a new protocol is selected, even though everyone still supports the old protocol it cannot be downgraded anymore --- even if a new member with only the old protocol joins the group"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org