You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Manuel Garcia Cabrera (Jira)" <ji...@apache.org> on 2022/05/11 17:03:00 UTC

[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

    [ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17535020#comment-17535020 ] 

Manuel Garcia Cabrera commented on KAFKA-12495:
-----------------------------------------------

I see two different issues when doing deployments with 2.6.2. In my deployments I add 5 Workers at once and then remove 5 other Workers at once (starting with 11, so it goes from 11 to 16 and then back to 11). I say at once but they don't really get started and removed at once, there maybe could be a 30 seconds different between the first one being ready and the last one being ready. Problems I see are:
 # What's mentioned in this ticket when Workers are being added. Basically, there are more workers in the second round than they were in the first round, which leads to unbalanced assignments.
 # When Workers are going away, we sometimes end up with one Worker with ALL the assignments that were in the 5 Workers that went away. The Worker that gets all this assignments is the last one that got started. When this happens, I see that in one generation that Worker had no assignments, and in the next one it has but it also shows the delay that comes from waiting on the Workers that left ({{{}scheduled.rebalance.max.delay.ms{}}}). After that delay expires, all assignments from the 5 Workers that went away go into that one Worker. My theory here by looking at the code is that it may have become the only one in {{candidateWorkersForReassignment}} at the time were it had no assignments, and then remained that way even when we started waiting on the Workers that went away even though this Worker had assignments by now. Either way, I don't really get this `candidateWorkersForReassignment`, because such workers would get all the assignments of the ones that went away, right? What if more Workers went away than the ones that don't have assignments?

As a note, I also looked at what happens during my deployments when everything ends up being balanced, and I don't think it's working as it should even if the end result is balanced. I noticed that doing this same deploying of adding 5 and then removing 5, it never waited on the 5 that left as it should according to the configured {{{}scheduled.rebalance.max.delay.ms{}}}. Looking at the logs, it's always shown as 0 in this case (as opposed to the previous case where it actually shows up as expected). I'm thinking that there are race conditions here as well.

> Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12495
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12495
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Luke Chen
>            Priority: Critical
>         Attachments: image-2021-03-18-15-04-57-854.png, image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm based on KIP-415 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened? Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, but we didn't revoke any more C/T in this round, which cause unbalanced distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> Because we didn't allow to do consecutive revoke in two consecutive rebalances (under the same leader), we will have this uneven distribution under this situation. We should allow consecutive rebalance to have another round of revocation to revoke the C/T to the other members in this case.
> expected:
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, **and also revoke some C/T** 
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W4(delay: 0, assigned: [BT4, BT5], revoked: [])
> // another round of rebalance to assign the new revoked C/T to the other members
> W1 rejoins with assignment: [AC0, AT1, AT2] 
> Rebalance is triggered 
> W2 joins with assignment: [AT4, AT5, BC0] 
> W3 joins with assignment: [BT1, BT2, BT4]
> W4 joins with assignment: [BT4, BT5]
> W1 becomes leader 
> W1 computes and sends assignments:
> // (final) We assigned all the previous revoked Connectors/Tasks to the members
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: []) 
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) 
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) 
> W2(delay: 0, assigned: [BT4, BT5, AT3], revoked: [])
> {code}
> Note: The consumer's cooperative sticky assignor won't have this issue since we re-compute the assignment in each round.
>  
> Note2: this issue makes KAFKA-12283 test flaky.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)