You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/26 10:11:37 UTC

[GitHub] [kafka] vamossagar12 opened a new pull request, #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

vamossagar12 opened a new pull request, #12561:
URL: https://github.com/apache/kafka/pull/12561

   Currently, the Incremental rebalance protocol does not allow a subsequent revoking rebalance when a worker joins right after one. This can lead to imbalance in assignments. See [KAFKA-12495](https://issues.apache.org/jira/browse/KAFKA-12495) for more details.
   
   This PR aims to fix the above. Note that there already exists another PR: https://github.com/apache/kafka/pull/10367 to fix this. The main difference between the 2 approaches is that this one introduces an exponential backoff delay between 2 successive revoking rebalances. This is to dis-allow rebalance storms and still not wait for entire scheduled rebalance delay.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r957470478


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,11 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", delay == 0, delay);

Review Comment:
   Makes sense. Would make the change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r959526920


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,11 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", delay == 0, delay);

Review Comment:
   This is done.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -584,9 +606,18 @@ private Map<String, ConnectorsAndTasks> performTaskRevocation(ConnectorsAndTasks
         Collection<WorkerLoad> existingWorkers = completeWorkerAssignment.stream()
                 .filter(wl -> wl.size() > 0)
                 .collect(Collectors.toList());
-        int existingWorkersNum = existingWorkers.size();
+
+        // existingWorkers => workers with load > 0.
+        // if existingWorkers is empty, no workers have any load.
+        int existingWorkersNum = existingWorkers.size(); // count of workers having non-zero load.
         int totalWorkersNum = completeWorkerAssignment.size();
-        int newWorkersNum = totalWorkersNum - existingWorkersNum;
+        int newWorkersNum = totalWorkersNum - existingWorkersNum; // <total workers - workers with non-0 load>
+        // if existingWorkersNum == 0, no workers have any load i.e all workers are new.
+        // newWorkersNum = 0 when all workers have non-zero load.

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r967018912


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,13 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        if (delay > 0) {
+            log.debug("Delaying {}ms for revoking tasks.", delay);
+        }
         // Do not revoke resources for re-assignment while a delayed rebalance is active
-        // Also we do not revoke in two consecutive rebalances by the same leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", toRevoke);
+                    performTaskRevocation(configured, completeWorkerAssignment);

Review Comment:
   I'm good to revert back to `activeAssignments` and discuss in other PR or JIRA. After all, it's not relevant to this issue. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r964643605


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -520,6 +554,9 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
                 newSubmissions.tasks().addAll(lostAssignments.tasks());
             }
             resetDelay();
+            // Resetting the flag as now we can permit successive revoking rebalances.
+            // since we have gone through the full rebalance delay
+            revokedInPrevious = false;

Review Comment:
   Yes, make sense to me, to not reseting numSuccessiveRevokingRebalances here. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r975671233


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   👍 Thanks. I wish we could do this without adding an extra boolean flag but preventing regressions seems worth the additional complexity, especially since there are other issues with the allocation algorithm that need to be worked out and, when fixed, will hopefully render the flag unnecessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1305074723

   @showuon Whenever you get the chance, can you also plz review [this](https://github.com/vamossagar12/kafka/pull/1) PR  of Chris. We should be able to get this one through as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r1022364377


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java:
##########
@@ -270,7 +270,6 @@ public void testRemovingWorker() throws Exception {
                 WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
     }
 
-    @Ignore // TODO: To be re-enabled once we can make it less flaky (KAFKA-12495, KAFKA-12283)

Review Comment:
   @vamossagar12 , in L191, there's also a disabled test, I think after this patch, they should be reliable.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java:
##########
@@ -270,7 +270,6 @@ public void testRemovingWorker() throws Exception {
                 WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
     }
 
-    @Ignore // TODO: To be re-enabled once we can make it less flaky (KAFKA-12495, KAFKA-12283)

Review Comment:
   @vamossagar12 , in L191, there's also a disabled test, I think after this patch, they should be reliable. Please enable it, too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante merged PR #12561:
URL: https://github.com/apache/kafka/pull/12561


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r994977689


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   > Isn't this a regression? We shouldn't be revoking tasks in this round since, without those revocations, we'd have a balanced assignment.
   
   Hello.. I have been playing around with the revokedInPrevious and the test cases. I looked at this case which is a regression which I agree. However, based on the changes made in this PR, what I think is that post the revoking rebalance, there would be a follow up rebalance which would eventually lead to a balanced load. If you look at `IncrementalCooperativeAssignorTest#testTaskAssignmentWhenWorkerBounces` this behaviour  is exhibited. On similar lines, I made some edits to `WorkerCoordinatorIncrementalTest#testTaskAssignmentWhenWorkerBounces` method by causing a follow up rebalance after the revoking rebalance after worker 3 comes back. The assignments with which I am calling the final `onLeaderElected` may not be accurate (as I couldn't get the full assignments using the deserialzeAssignment) but I just chose something which is representative.
   
   I know this is a deviation from what the KIP proposes i.e a bounced member gets its assignments back without any revocations but if what I am stating above sounds ok, do you think it's really bad- coz it seems to be getting to a  balanced load after a a follow up rebalance. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1245436111

   @vamossagar12 there are failing unit tests, can you check those out and ping us after you've taken a look and fixed any issues?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r964493272


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -520,6 +554,9 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
                 newSubmissions.tasks().addAll(lostAssignments.tasks());
             }
             resetDelay();
+            // Resetting the flag as now we can permit successive revoking rebalances.
+            // since we have gone through the full rebalance delay
+            revokedInPrevious = false;

Review Comment:
   OK, I got your point. The result will be something like this, right?
   1. rebalance: [revokedInPrevious = false, numSuccessive = 0] -> revoke
   2. rebalance: [revokedInPrevious = true, numSuccessive = 1] -> delay n 
   3. after n (clear revokedInPrevious)
   4. rebalance: [revokedInPrevious = false, numSuccessive = 1] -> revoke
   5. rebalance: [revokedInPrevious = true, numSuccessive = 1] -> reset numSuccessive or delay



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r982661269


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   hey Chris.. Sorry to bug you again on this one. Since we anyways won't be merging this PR w/o the second one, would it make sense to incorporate the changes to this one? I believe the changes are already there in the other PR of yours?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r995680008


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Thanks @C0urante . I have reverted the test with the follow up rebalance I was basing my thoughts on the fact that KIP-415 allows having more rebalances so adding another one should be ok. Having said that, I am fine with whatever you stated as well.
   
   Regarding a separate PR, yes that makes sense. Thanks for your help on this!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r975672702


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Are there any cases where the introduction of the `canRevoke` flag will defeat the purpose of this PR, by causing an imbalanced assignment to be sent out with no guaranteed follow-up rebalances (such as those caused by a scheduled rebalance delay, or workers rejoining the cluster after receiving an assignment with revocations)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1230459432

   > Have a look at the non-test code, left some comments. And I agree with Chris that we should not introduce exponential backoff for 1st revocation, and should reset it ASAP we don't need it. Thank you for working on this.
   
   Thanks @showuon . I removed the change to revoke the first time around. The delay is now introduced only when there are successive revoking rebalances.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1315612700

   @C0urante , i did that but looks like my force push cleaned up the PR merge since I didn't pull the latest :( I see you have deleted the branch. Would it be possible to create another PR for this? Sorry about this :( 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r978581355


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Thanks Chris/Luke. Sure let me revoke the `canRevoke` flag.  Also, Chris, I looked at your PR and it's closed but not merged. I see some changes specially =>
   ```
   Perform load-balancing revocations any time the cluster appears imbalanced and there are still connectors and tasks that can be revoked from workers, instead of only when the number of workers in the cluster changes
   ```
   which sounds interesting (and remember discussing this on the ticket for this) ? Also, let me know if there's anything I can also contribute to for the other PR (Looking at your comment it appeared to me that you would be rebasing the commits from that closed PR and hence the question).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1316320428

   I think 3.4 release is also around the corner(few weeks maybe?). Would it better to have it in 3.4?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1300739363

   @showuon The follow-up PR [here](https://github.com/vamossagar12/kafka/pull/1) is ready for review; this should be the last step necessary before we can merge this PR and fix this long-standing rebalancing bug.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r956681210


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -65,26 +67,32 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
     private final int maxDelay;
     private ConnectorsAndTasks previousAssignment;
     private final ConnectorsAndTasks previousRevocation;
-    private boolean canRevoke;
     // visible for testing
+    boolean revokedInPrevious;
     protected final Set<String> candidateWorkersForReassignment;
     protected long scheduledRebalance;
     protected int delay;
     protected int previousGenerationId;
     protected Set<String> previousMembers;
 
+    private final ExponentialBackoff consecutiveRevokingRebalancesBackoff;
+
+    private int numSuccessiveRevokingRebalances;
+
     public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay) {
         this.log = logContext.logger(IncrementalCooperativeAssignor.class);
         this.time = time;
         this.maxDelay = maxDelay;
         this.previousAssignment = ConnectorsAndTasks.EMPTY;
         this.previousRevocation = new ConnectorsAndTasks.Builder().build();
-        this.canRevoke = true;
         this.scheduledRebalance = 0;
+        this.revokedInPrevious = false;
         this.candidateWorkersForReassignment = new LinkedHashSet<>();
         this.delay = 0;
         this.previousGenerationId = -1;
         this.previousMembers = Collections.emptySet();
+        this.numSuccessiveRevokingRebalances = 0;
+        this.consecutiveRevokingRebalancesBackoff = new ExponentialBackoff(10, 20, maxDelay, 0.2);

Review Comment:
   @C0urante needed your suggestion on these configs. Should we expose the initial interval parameter so that users can set it? Also, the exp base(second parameter) I believe I have set it to a slightly high value which goes upto 4 seconds in the second attempt and about a minute in the third attempt and so on. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r964939766


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,13 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        if (delay > 0) {
+            log.debug("Delaying {}ms for revoking tasks.", delay);
+        }
         // Do not revoke resources for re-assignment while a delayed rebalance is active
-        // Also we do not revoke in two consecutive rebalances by the same leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", toRevoke);
+                    performTaskRevocation(configured, completeWorkerAssignment);

Review Comment:
   Switching from `configured` to `activeAssignments` helps in some scenarios, but not others. I found a few cases to demonstrate this while reviewing https://github.com/apache/kafka/pull/10367, documented [here](https://github.com/apache/kafka/pull/10367#discussion_r810393411).
   
   I wonder if we can leave this change out for now and then address the remaining issues with incremental rebalancing in a follow-up PR? There are a lot of edge cases to consider and I don't want to block the primary change here of allowing consecutive revocations (which is super useful on its own!) by tying it to thinking through all the implications for a change that can be made independently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r964447367


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkersJoinAfterRevocations()  {
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance. We should not perform any revocations
+        // in this round
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Fourth assignment and a fourth worker joining
+        // Since the worker is joining immediately and within the rebalance delay
+        // there should not be any revoking rebalance
+        addNewEmptyWorkers("worker4");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4");
+        assertConnectorAllocations(0, 0, 1, 2);
+        assertTaskAllocations(0, 3, 3, 6);
+
+        // Add new worker immediately. Since a scheduled rebalance is in progress,
+        // There should still not be be any revocations
+        addNewEmptyWorkers("worker5");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
+        assertConnectorAllocations(0, 0, 0, 1, 2);
+        assertTaskAllocations(0, 0, 3, 3, 6);
+
+        // Add new worker but this time after crossing the delay.
+        // There would be revocations allowed
+        time.sleep(assignor.delay);
+        addNewEmptyWorkers("worker6");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
+        assertConnectorAllocations(0, 0, 0, 0, 1, 1);
+        assertTaskAllocations(0, 0, 0, 2, 2, 2);
+
+        // Follow up rebalance since there were revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
+        assertConnectorAllocations(0, 0, 0, 1, 1, 1);
+        assertTaskAllocations(2, 2, 2, 2, 2, 2);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testImmediateRevocationsWhenMaxDelayIs0()  {
+
+        // Customize assignor for this test case
+        rebalanceDelay = 0;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance but we should still revoke as rebalance delay is 0
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 1);
+        assertTaskAllocations(3, 3, 4);
+
+        // Follow up rebalance post revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(1, 1, 1);
+        assertTaskAllocations(4, 4, 4);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testSuccessiveRevocationsWhenMaxDelayIsEqualToExpBackOffInitialInterval() {
+
+        rebalanceDelay = 1;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance. We shouldn't revoke as maxDelay is 1 ms
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertDelay(1);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Follow up rebalance post revocations. No revocations should have happened
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);

Review Comment:
   I think we should remove it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r965066363


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,13 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        if (delay > 0) {
+            log.debug("Delaying {}ms for revoking tasks.", delay);
+        }
         // Do not revoke resources for re-assignment while a delayed rebalance is active
-        // Also we do not revoke in two consecutive rebalances by the same leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", toRevoke);
+                    performTaskRevocation(configured, completeWorkerAssignment);

Review Comment:
   @C0urante  When you say `I wonder if we can leave this change out for now` do you mean retaining this or switching back to using `activeAssignments` instead of `configured`? I will also take a look at the scenario you described about where this might fail. If we want to retain this, I can take a look at them in a separate PR or this one, whatever is decided over here or if we want to switch back to `activeAssignments`, then that would need more work probably in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1238247255

   > The changes look good. Left some comments. Also, could you re-enable the test in `RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining`? This fix should resolve the flaky test.
   
   Thanks @showuon . I enabled it and it passed locally. Have pushed the change along with other review comments(and a couple of clarifying questions).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1228315761

   @C0urante , @showuon , @yashmayya I have created this PR as per the discussions on the ticket. Plz review whenever you get the chance. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r962289380


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,13 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        if (delay > 0) {
+            log.debug("Delaying {}ms for revoking tasks.", delay);
+        }
         // Do not revoke resources for re-assignment while a delayed rebalance is active
-        // Also we do not revoke in two consecutive rebalances by the same leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", toRevoke);
+                    performTaskRevocation(configured, completeWorkerAssignment);

Review Comment:
   ``` 
   But I'm just wondering if there's a reason why we were doing revocations only based on existing member assignments earlier.
   ```
   Regarding this I dont know tbh, but my guess is since earlier we were not doing successive revocations, it was not needed. @C0urante can certainly throw some more light on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r962289115


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);

Review Comment:
   Thanks @C0urante , @yashmayya I made the change as suggested. I had made missed the fact that since the workers would join immediately post a load-balancing rebalance, introducing delay for that round is needless. Plz review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r965099882


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,13 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        if (delay > 0) {
+            log.debug("Delaying {}ms for revoking tasks.", delay);
+        }
         // Do not revoke resources for re-assignment while a delayed rebalance is active
-        // Also we do not revoke in two consecutive rebalances by the same leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", toRevoke);
+                    performTaskRevocation(configured, completeWorkerAssignment);

Review Comment:
   I mean switching back to using `activeAssignments`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r1002789963


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Hi @vamossagar12--sorry for the delay. I've filed the follow-up PR here: https://github.com/vamossagar12/kafka/pull/1
   
   I've marked it as a draft since it should not be merged into your KAFKA-12495 branch until it's approved to be merged into trunk here, but it's ready for review now. Can you take a look and see what you think? If it looks alright, we can ask Luke to take a look.
   
   (cc @showuon)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r961591300


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);

Review Comment:
   @C0urante , the log line is definitely incorrect and I would change it. Thanks for pointing it out. I had a question on what you proposed as the better approach though.
   With the approach I tried to use here, I am allowing `this` revoking rebalance to happen and set a delay so that if another worker joins within that delay, it would need to wait for that much of time before we can perform any revocations. This way the approach is slightly optimistic in the sense atleast the first time around we would have a balanced allocation but further allocation-balancing revocations won't allow a barrage of rebalances in succession.
   
   With what you proposed, it seems to me that even the first such worker joining would be made to wait so that in my mind seems slightly pessimistic. IMO since we are already setting the delay for future consecutive allocation-balancing revocations, it might be better to allow the first one go through. WDYT? Have I understood the context correctly or is there anything that I am missing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r963789374


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -520,6 +554,9 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
                 newSubmissions.tasks().addAll(lostAssignments.tasks());
             }
             resetDelay();
+            // Resetting the flag as now we can permit successive revoking rebalances.
+            // since we have gone through the full rebalance delay
+            revokedInPrevious = false;

Review Comment:
   I did think about it.. Problem is if we set the counter to 0 here every time then the exponential backoff would only go upto initial interval after every successive revoking rebalance. The other reason I didn't reset this is that since max interval is maxDelay, this value can be incremented w/o introducing delays > maxDelay. One thing to consider could be can this value overflow if we let it increase unbounded. Let me know what you think about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1315882758

   The single test failure is unrelated. Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1315690896

   Thanks @C0urante . i have added back the merge commit and pushed here along with fixing the checkstyle. I ran the tests locally and a few MM related ones failed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1399432804

   hwy @C0urante , I was thinking should the exponential backoff thing that we have introduced as part of this PR should go somewhere in the docs? I am saying this since this is a deviation from how things worked prior to this. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r978581355


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Thanks Chris/Luke. I have revoked the `canRevoke` flag commit.  Also, Chris, I looked at your PR and it's closed but not merged. I see some changes specially =>
   ```
   Perform load-balancing revocations any time the cluster appears imbalanced and there are still connectors and tasks that can be revoked from workers, instead of only when the number of workers in the cluster changes
   ```
   which sounds interesting (and remember discussing this on the ticket for this). Also, let me know if there's anything I can also contribute to for the other PR (Looking at your comment it appeared to me that you would be rebasing the commits from that closed PR and hence the question).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r977936338


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Yep, I was able to produce a test case that causes an imbalanced assignment to be sent out with no revocations or delay by adding a new worker in the middle of a delayed rebalance:
   
   ```java
   @Test
   public void testWorkerJoiningDuringDelayedRebalance() {
       time = new MockTime();
       initAssignor();
   
       addNewConnector("connector3", 4);
       // First assignment with 1 worker and 3 connectors configured but not yet assigned
       performStandardRebalance();
       assertDelay(0);
       assertWorkers("worker1");
       assertConnectorAllocations(3);
       assertTaskAllocations(12);
       assertBalancedAndCompleteAllocation();
   
       // Second assignment with a second worker joining and all connectors running on previous worker
       // We should revoke.
       addNewEmptyWorkers("worker2");
       performStandardRebalance();
       assertWorkers("worker1", "worker2");
       assertConnectorAllocations(0, 2);
       assertTaskAllocations(0, 6);
   
       // Third assignment immediately after revocations, and a third worker joining.
       // This is a successive revoking rebalance. We should not perform any revocations
       // in this round
       addNewEmptyWorkers("worker3");
       performStandardRebalance();
       assertTrue(assignor.delay > 0);
       assertWorkers("worker1", "worker2", "worker3");
       assertConnectorAllocations(0, 1, 2);
       assertTaskAllocations(3, 3, 6);
   
       // Fourth assignment and a fourth worker joining
       // while delayed rebalance is active. We should not revoke
       time.sleep(assignor.delay / 2);
       addNewEmptyWorkers("worker4");
       performStandardRebalance();
       assertWorkers("worker1", "worker2", "worker3", "worker4");
       assertConnectorAllocations(0, 0, 1, 2);
       assertTaskAllocations(0, 3, 3, 6);
   
       // Fifth assignment and a fifth worker joining
       // after the delay has expired. We should revoke, but we don't
       time.sleep(assignor.delay);
       addNewEmptyWorkers("worker5");
       performStandardRebalance();
       assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
       assertNoRevocations();
       assertDelay(0);
       assertBalancedAndCompleteAllocation();
   }
   ```
   
   It's also worth noting that many of the test cases in this PR need to be updated with this bit at the beginning in order to mock the `Time` instance used by the assignor:
   ```java
   time = new MockTime();
   initAssignor();
   ```
   This should be added in any test that invokes `time::sleep`.
   
   At this point, I think we may want to split this into two separate PRs that get merged together. We can revert the `canRevoke` flag from this one, and then add a downstream PR that fixes how we calculate task-balancing revocations in tricky situations like when lost or newly-created tasks have just been assigned. That should fully address cases like the one tested [here](https://github.com/apache/kafka/blob/cda5da9b65f78b51cdfe5371f712a0d392dbdb4d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java#L427).
   
   @showuon WDYT? I could extract some of the changes from https://github.com/apache/kafka/pull/12019 that would fix the load-balancing revocation logic and rebase them onto this change if it helps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1228640137

   In this PR, similar to Luke's PR, I have added the condition to do a revoking rebalance consecutively. The only difference is that whether revocation would happen or not is decided by the time vis-a-vis the next scheduled revoking rebalance as per exponential backoff. I could notice the flip-side of that as I needed to add a delay in some of the tests to get a revoking rebalance if the workers joined before the next scheduled rebalance. 
   
   I think what you are saying makes sense. Let me give it a try. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r959706989


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);

Review Comment:
   We allow values up to `Integer.MAX_VALUE` for the `scheduled.rebalance.max.delay.ms` property. If we allow for jitter with our exponential backoff, this cast to `int` may lead to overflow when `scheduled.rebalance.max.delay.ms` has very large values.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);
+                scheduledRebalance = time.milliseconds() + delay;
+            } else if (!toExplicitlyRevoke.isEmpty()) {
+                // We had a revocation in this round but not in the previous round. Let's store that state.
+                log.debug("Revoking rebalance. Setting the revokedInPrevious flag to true");
+                revokedInPrevious = true;
+            } else if (revokedInPrevious) {
+                // No revocations in this round but the previous round had one. Probably the workers
+                // have converged to a balanced load. We can reset the rebalance clock
+                log.debug("Previous round had revocations but this round didn't. Probably, the cluster has reached a " +
+                        "balanced load. Resetting the exponential backoff clock");
+                numSuccessiveRevokingRebalances = 0;
+                revokedInPrevious = false;
+            } else {
+                // revokedInPrevious is false and no revocations needed in this round. no-op.
+                log.debug("No revocations in previous and current round.");
+            }
         } else {
-            canRevoke = delay == 0;
+            log.debug("Connector and task to revoke assignments: {}", toRevoke);

Review Comment:
   Why only log this in the `else` branch? Isn't it applicable in both cases?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);
+                scheduledRebalance = time.milliseconds() + delay;
+            } else if (!toExplicitlyRevoke.isEmpty()) {
+                // We had a revocation in this round but not in the previous round. Let's store that state.
+                log.debug("Revoking rebalance. Setting the revokedInPrevious flag to true");

Review Comment:
   The name of the flag is probably not very useful to users who aren't familiar with Java, and there's no guarantee that this log line will remain in sync with changes to the field name or type. This could be clearer with something like:
   ```suggestion
                   log.debug("Performing allocation-balancing revocation immediately as no revocations took place during the previous rebalance");
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);

Review Comment:
   This isn't actually true, is it? We'll need to wait `delay` ms to perform the _next_ allocation-balancing revocation (if another one is necessary), but we're performing the revocation for the current round immediately.
   
   This also highlights a larger issue with the approach here: we should only issue an assignment with a delay when that delay is the time until the re-allocation of lost assignments and/or the next round where we will permit allocation-balancing revocations. With this approach, we'll perform allocation-balancing revocations immediately, but also issue an assignment with a delay, which doesn't really serve much purpose since workers automatically rejoin the group (triggering a rebalance) immediately after every round that included a revocation (see [here](https://github.com/apache/kafka/blob/6f4778301b1fcac1e2750cc697043d674eaa230d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1567-L1572)).
   
   I think a better approach would be to recompute the potential backoff delay between consecutive allocation-balancing revocations if `delay == 0`, and if it is non-zero, then skip those revocations during the current round.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +119,93 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkerJoinsAfterRevocations() throws InterruptedException {
+
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        addNewConnector("connector3", 4);
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+        // Flag should not be set
+        assertFalse(assignor.revokedInPrevious);

Review Comment:
   Why are we making assertions against this internal detail? Shouldn't the logic introduced in this PR be visible already by making assertions on connector/task allocations and the delay in each assignment?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -65,26 +67,32 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
     private final int maxDelay;
     private ConnectorsAndTasks previousAssignment;
     private final ConnectorsAndTasks previousRevocation;
-    private boolean canRevoke;
     // visible for testing
+    boolean revokedInPrevious;
     protected final Set<String> candidateWorkersForReassignment;
     protected long scheduledRebalance;
     protected int delay;
     protected int previousGenerationId;
     protected Set<String> previousMembers;
 
+    private final ExponentialBackoff consecutiveRevokingRebalancesBackoff;
+
+    private int numSuccessiveRevokingRebalances;
+
     public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay) {
         this.log = logContext.logger(IncrementalCooperativeAssignor.class);
         this.time = time;
         this.maxDelay = maxDelay;
         this.previousAssignment = ConnectorsAndTasks.EMPTY;
         this.previousRevocation = new ConnectorsAndTasks.Builder().build();
-        this.canRevoke = true;
         this.scheduledRebalance = 0;
+        this.revokedInPrevious = false;
         this.candidateWorkersForReassignment = new LinkedHashSet<>();
         this.delay = 0;
         this.previousGenerationId = -1;
         this.previousMembers = Collections.emptySet();
+        this.numSuccessiveRevokingRebalances = 0;
+        this.consecutiveRevokingRebalancesBackoff = new ExponentialBackoff(10, 20, maxDelay, 0.2);

Review Comment:
   > Should we expose the initial interval parameter so that users can set it?
   
   I don't think so; that would require a KIP to introduce a new user-facing configuration property.
   
   > Also, the exp base(second parameter) I believe I have set it to a slightly high value which goes upto 4 seconds in the second attempt and about a minute in the third attempt and so on.
   
   I think this is alright 👍
   I don't see why we need jitter, though, and I'm hesitant to add it since it can lead to rebalance delays that exceed the `scheduled.rebalance.max.delay.ms` parameter. Is there a reason you opted for jitter here?
   
   Also, it's worth noting that the behavior here is to always use a backoff of 10 ms if the `scheduled.rebalance.max.delay.ms` property is set to 10 or less. I'm not sure this is optimal; people may set the property to zero if they want to disable delayed rebalances altogether, and with this change, we'd be reintroducing delays with no way to disable them. Maybe we should:
   - Set the initial interval to zero
   - Check if the result of invoking `backoff` is zero, and if it is, skip the delay altogether and perform the revocation immediately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r962288729


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,13 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        if (delay > 0) {
+            log.debug("Delaying {}ms for revoking tasks.", delay);
+        }
         // Do not revoke resources for re-assignment while a delayed rebalance is active
-        // Also we do not revoke in two consecutive rebalances by the same leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", toRevoke);
+                    performTaskRevocation(configured, completeWorkerAssignment);

Review Comment:
   Yes that's exactly the point. I got this logic from the original PR from @showuon  here => https://github.com/apache/kafka/pull/10367 . To quote =>
   
   ```
   we also passed the configured assignment into performTaskRevocation instead of activeAssignment, so that we can compute the correct expected max assignment number (totalSize / workerSize), instead of the activeTotalSize / workerSize, because the activeTotalSize doesn't include the newAssignments, which will cause the wrong computation, and cause uneven rebalance, or need more round of revoking rebalance.
   ```
   
   I also noticed that if I don't pass configured, then post allowed + successive revoking rebalance, the assignments were  not balanced. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r961849928


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -65,26 +67,32 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
     private final int maxDelay;
     private ConnectorsAndTasks previousAssignment;
     private final ConnectorsAndTasks previousRevocation;
-    private boolean canRevoke;
     // visible for testing
+    boolean revokedInPrevious;
     protected final Set<String> candidateWorkersForReassignment;
     protected long scheduledRebalance;
     protected int delay;
     protected int previousGenerationId;
     protected Set<String> previousMembers;
 
+    private final ExponentialBackoff consecutiveRevokingRebalancesBackoff;
+
+    private int numSuccessiveRevokingRebalances;
+
     public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay) {
         this.log = logContext.logger(IncrementalCooperativeAssignor.class);
         this.time = time;
         this.maxDelay = maxDelay;
         this.previousAssignment = ConnectorsAndTasks.EMPTY;
         this.previousRevocation = new ConnectorsAndTasks.Builder().build();
-        this.canRevoke = true;
         this.scheduledRebalance = 0;
+        this.revokedInPrevious = false;
         this.candidateWorkersForReassignment = new LinkedHashSet<>();
         this.delay = 0;
         this.previousGenerationId = -1;
         this.previousMembers = Collections.emptySet();
+        this.numSuccessiveRevokingRebalances = 0;
+        this.consecutiveRevokingRebalancesBackoff = new ExponentialBackoff(10, 20, maxDelay, 0.2);

Review Comment:
   Oops, turns out we can't set the initial interval to `0` with the `ExponentialBackoff` class as that causes the backoff to be zero for every attempt. Hmm... this may involve a little bit of special handling on our end to accomplish.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1316289378

   @showuon I was thinking about it--IMO the additional complexity here is a little too risky to backport but I'm almost on the fence here. If you'd like to see it on 3.3 I wouldn't be opposed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1250045922

   > Details
   
   I checked in the changes by fixing the 2 unit tests that were failing. This run, there didn't seem to be connect specific failures. My bad on the oversight the last time around.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r974209237


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Isn't this a regression? We shouldn't be revoking tasks in this round since, without those revocations, we'd have a balanced assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r978567165


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Wow, nice catch!
   
   > At this point, I think we may want to split this into two separate PRs that get merged together. We can revert the canRevoke flag from this one, and then add a downstream PR that fixes how we calculate task-balancing revocations in tricky situations like when lost or newly-created tasks have just been assigned. That should fully address cases like the one tested [here](https://github.com/apache/kafka/blob/cda5da9b65f78b51cdfe5371f712a0d392dbdb4d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java#L427).
   
   Agree! Let's revert the canRevoke flag and then deal with tricky cases in a follow-up PR. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r994980800


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Hey @vamossagar12--I'd rather err on the side of not increasing the number of rebalances required for simple cases like the one covered in `WorkerCoordinatorIncrementalTest::testTaskAssignmentWhenWorkerBounces`.
   
   I plan to file the downstream PR sometime next week. For ease of review I'd prefer to keep it separate from this one, which has grown fairly large and complex on its own.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1242211508

   @C0urante I made the switch back as suggested. Plz review whenever you get the chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1245454894

   > @vamossagar12 there are failing unit tests, can you check those out and ping us after you've taken a look and fixed any issues?
   
   Oops.. Sorry about that. I didn't check the test results after the latest changes. Will check/fix those and call for review post that. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r1022991333


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java:
##########
@@ -270,7 +270,6 @@ public void testRemovingWorker() throws Exception {
                 WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
     }
 
-    @Ignore // TODO: To be re-enabled once we can make it less flaky (KAFKA-12495, KAFKA-12283)

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1315647630

   @vamossagar12 It looks like the commit is still present in GitHub: https://github.com/vamossagar12/kafka/tree/422c81fc0a2dd19fee0ed1eb033ef35fc9e27ba1
   
   Can you fetch that commit locally, then add a commit on top of it that removes the `@Ignore` annotation (and the import for it), and push that to this branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1316140226

   One question to @C0urante , do you think we should backport to v3.3 branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r974548386


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Yeah that's true. Revoking tasks at this point leads to imbalance. This was happening since as per the new changes, the moment the delay expires we were allowing revocation. So, at this point, the flag is true, the delay goes to 0 and since revocation is technically possible at this point, the code was doing it. I have added the `canRevoke` flag back to handle this case with which this testcase another test `IncrementalCooperativeAssignorTest#testTaskAssignmentWhenWorkerBounces` which seemed to have a similar issue seem to be fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r962288275


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);

Review Comment:
   Removed.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);
+                scheduledRebalance = time.milliseconds() + delay;
+            } else if (!toExplicitlyRevoke.isEmpty()) {
+                // We had a revocation in this round but not in the previous round. Let's store that state.
+                log.debug("Revoking rebalance. Setting the revokedInPrevious flag to true");
+                revokedInPrevious = true;
+            } else if (revokedInPrevious) {
+                // No revocations in this round but the previous round had one. Probably the workers
+                // have converged to a balanced load. We can reset the rebalance clock
+                log.debug("Previous round had revocations but this round didn't. Probably, the cluster has reached a " +
+                        "balanced load. Resetting the exponential backoff clock");
+                numSuccessiveRevokingRebalances = 0;
+                revokedInPrevious = false;
+            } else {
+                // revokedInPrevious is false and no revocations needed in this round. no-op.
+                log.debug("No revocations in previous and current round.");
+            }
         } else {
-            canRevoke = delay == 0;
+            log.debug("Connector and task to revoke assignments: {}", toRevoke);

Review Comment:
   Added relevant logs..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r962289115


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);

Review Comment:
   Thanks @C0urante , @yashmayya I made the change as suggested. I had made missed the fact that since the workers would join immediately post a load-balancing rebalance, introducing delay for that round is needless. I needed to tweak some logic at a few places so a review would certainly be helpful!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r963205224


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -286,30 +294,43 @@ ClusterAssignment performTaskAssignment(
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
         // Do not revoke resources for re-assignment while a delayed rebalance is active
-        // Also we do not revoke in two consecutive rebalances by the same leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", toRevoke);
-
-            toExplicitlyRevoke.forEach(
-                (worker, assignment) -> {
-                    ConnectorsAndTasks existing = toRevoke.computeIfAbsent(
-                        worker,
-                        v -> new ConnectorsAndTasks.Builder().build());
-                    existing.connectors().addAll(assignment.connectors());
-                    existing.tasks().addAll(assignment.tasks());
+                    performTaskRevocation(configured, completeWorkerAssignment);
+
+            // If this round and the previous round involved revocation, we will calculate a delay for
+            // the next round when revoking rebalance would be allowed. Note that delay could be 0, in which
+            // case we would always revoke.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                log.debug("Consecutive revoking rebalances observed. Computing delay and next scheduled rebalance.");
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                if (delay != 0) {
+                    scheduledRebalance = time.milliseconds() + delay;
+                    log.debug("Skipping revocations in the current round with a delay of {}ms. Next scheduled rebalance:{}",
+                            delay, scheduledRebalance);
+                } else {
+                    log.debug("Revoking assignments as scheduled.rebalance.max.delay.ms is set to 0");

Review Comment:
   maybe: `Revoking assignments immediately since scheduled.rebalance.max.delay.ms is set to 0`



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,13 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        if (delay > 0) {
+            log.debug("Delaying {}ms for revoking tasks.", delay);
+        }
         // Do not revoke resources for re-assignment while a delayed rebalance is active
-        // Also we do not revoke in two consecutive rebalances by the same leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", toRevoke);
+                    performTaskRevocation(configured, completeWorkerAssignment);

Review Comment:
   @yashmayya , yes, your analysis is correct, I was trying to take the new added/deleted connectors/tasks into account for the revocation calculation, because I was trying to do consecutive revocation without delay.
   
   > But I'm just wondering if there's a reason why we were doing revocations only based on existing member assignments earlier.
   
   I guess that's because we didn't consider too complicated scenarios. Anyway, I think this change makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r978581355


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Thanks Chris/Luke. Sure let me revoke the `canRevoke` flag but that would mean `testTaskAssignmentWhenWorkerBounces` would fail on this PR. Also, Chris, I looked at your PR and it's closed but not merged. I see some radical changes specially =>
   ```
   Perform load-balancing revocations any time the cluster appears imbalanced and there are still connectors and tasks that can be revoked from workers, instead of only when the number of workers in the cluster changes
   ```
   which sounds interesting (and remember discussing this on the ticket for this) ? Also, let me know if there's anything I can also contribute to for the other PR (Looking at your comment it appeared to me that you would be rebasing the commits from that closed PR and hence the question).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r977936338


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Yep, I was able to produce a test case that causes an imbalanced assignment to be sent out with no revocations or delay by adding a new worker in the middle of a delayed rebalance:
   
   ```java
   @Test
   public void testWorkerJoiningDuringDelayedRebalance() {
       time = new MockTime();
       initAssignor();
   
       addNewConnector("connector3", 4);
       // First assignment with 1 worker and 3 connectors configured but not yet assigned
       performStandardRebalance();
       assertDelay(0);
       assertWorkers("worker1");
       assertConnectorAllocations(3);
       assertTaskAllocations(12);
       assertBalancedAndCompleteAllocation();
   
       // Second assignment with a second worker joining and all connectors running on previous worker
       // We should revoke.
       addNewEmptyWorkers("worker2");
       performStandardRebalance();
       assertWorkers("worker1", "worker2");
       assertConnectorAllocations(0, 2);
       assertTaskAllocations(0, 6);
   
       // Third assignment immediately after revocations, and a third worker joining.
       // This is a successive revoking rebalance. We should not perform any revocations
       // in this round
       addNewEmptyWorkers("worker3");
       performStandardRebalance();
       assertTrue(assignor.delay > 0);
       assertWorkers("worker1", "worker2", "worker3");
       assertConnectorAllocations(0, 1, 2);
       assertTaskAllocations(3, 3, 6);
   
       // Fourth assignment and a fourth worker joining
       // while delayed rebalance is active. We should not revoke
       time.sleep(assignor.delay / 2);
       addNewEmptyWorkers("worker4");
       performStandardRebalance();
       assertWorkers("worker1", "worker2", "worker3", "worker4");
       assertConnectorAllocations(0, 0, 1, 2);
       assertTaskAllocations(0, 3, 3, 6);
   
       // Fifth assignment and a fifth worker joining
       // after the delay has expired. We should revoke
       time.sleep(assignor.delay);
       addNewEmptyWorkers("worker5");
       performStandardRebalance();
       assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
       assertNoRevocations();
       assertDelay(0);
       assertBalancedAndCompleteAllocation();
   }
   ```
   
   It's also worth noting that many of the test cases in this PR need to be updated with this bit at the beginning in order to mock the `Time` instance used by the assignor:
   ```java
   time = new MockTime();
   initAssignor();
   ```
   This should be added in any test that invokes `time::sleep`.
   
   At this point, I think we may want to split this into two separate PRs that get merged together. We can revert the `canRevoke` flag from this one, and then add a downstream PR that fixes how we calculate task-balancing revocations in tricky situations like when lost or newly-created tasks have just been assigned. That should fully address cases like the one tested [here](https://github.com/apache/kafka/blob/cda5da9b65f78b51cdfe5371f712a0d392dbdb4d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java#L427).
   
   @showuon WDYT? I could extract some of the changes from https://github.com/apache/kafka/pull/12019 that would fix the load-balancing revocation logic and rebase them onto this change if it helps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r964585809


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -520,6 +554,9 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
                 newSubmissions.tasks().addAll(lostAssignments.tasks());
             }
             resetDelay();
+            // Resetting the flag as now we can permit successive revoking rebalances.
+            // since we have gone through the full rebalance delay
+            revokedInPrevious = false;

Review Comment:
   Yeah so point #5, with the approach I have would set numSuccessive = 2 and calculate backoff delay based on that. If I reset numSuccessive to 0 in #3 from above, then at 5, it would be back to `rebalance: [revokedInPrevious = true, numSuccessive = 1] ` and delay would be equal to the delay we had at #2 above. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r961591300


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);

Review Comment:
   @C0urante , the log line is definitely incorrect. Thanks for pointing it out. I had a question on what you proposed as the better approach. 
   With the approach I tried to use here, I am allowing `this` revoking rebalance to happen and set a delay so that if another worker joins within that delay, it would need to wait for that much of time before we can perform any revocations. This way the approach is slightly optimistic in the sense atleast the first time around we would have a balanced allocation but further allocation-balancing revocations won't allow a barrage of rebalances in succession.
   
   With what you proposed, it seems to me that even the first such worker joining would be made to wait so that in my mind seems slightly pessimistic. IMO since we are already setting the delay for future consecutive allocation-balancing revocations, it might be better to allow the first one go through. WDYT? Have I understood the context correctly or is there anything that I am missing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r963289740


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -843,12 +1002,6 @@ public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() {
         performStandardRebalance();
         assertDelay(0);
         assertConnectorAllocations(1, 1);
-        assertTaskAllocations(2, 4);
-
-        // fourth rebalance after revocations

Review Comment:
   The comment in L1008 should change to 
   // `Fourth` rebalance should not change assignments



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -520,6 +554,9 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
                 newSubmissions.tasks().addAll(lostAssignments.tasks());
             }
             resetDelay();
+            // Resetting the flag as now we can permit successive revoking rebalances.
+            // since we have gone through the full rebalance delay
+            revokedInPrevious = false;

Review Comment:
   Should we also reset `numSuccessiveRevokingRebalances = 0;`? Otherwise, after another successive revoking, the exponential backoff will increase immediately, right?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkersJoinAfterRevocations()  {
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance. We should not perform any revocations
+        // in this round
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Fourth assignment and a fourth worker joining
+        // Since the worker is joining immediately and within the rebalance delay
+        // there should not be any revoking rebalance
+        addNewEmptyWorkers("worker4");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4");
+        assertConnectorAllocations(0, 0, 1, 2);
+        assertTaskAllocations(0, 3, 3, 6);
+
+        // Add new worker immediately. Since a scheduled rebalance is in progress,
+        // There should still not be be any revocations
+        addNewEmptyWorkers("worker5");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
+        assertConnectorAllocations(0, 0, 0, 1, 2);
+        assertTaskAllocations(0, 0, 3, 3, 6);
+
+        // Add new worker but this time after crossing the delay.
+        // There would be revocations allowed
+        time.sleep(assignor.delay);
+        addNewEmptyWorkers("worker6");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
+        assertConnectorAllocations(0, 0, 0, 0, 1, 1);
+        assertTaskAllocations(0, 0, 0, 2, 2, 2);
+
+        // Follow up rebalance since there were revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
+        assertConnectorAllocations(0, 0, 0, 1, 1, 1);
+        assertTaskAllocations(2, 2, 2, 2, 2, 2);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testImmediateRevocationsWhenMaxDelayIs0()  {
+
+        // Customize assignor for this test case
+        rebalanceDelay = 0;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance but we should still revoke as rebalance delay is 0
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 1);
+        assertTaskAllocations(3, 3, 4);
+
+        // Follow up rebalance post revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(1, 1, 1);
+        assertTaskAllocations(4, 4, 4);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testSuccessiveRevocationsWhenMaxDelayIsEqualToExpBackOffInitialInterval() {
+
+        rebalanceDelay = 1;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance. We shouldn't revoke as maxDelay is 1 ms
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertDelay(1);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Follow up rebalance post revocations. No revocations should have happened
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);

Review Comment:
   Why is there the follow-up rebalance? We don't have revocation happened in previous round, do we? 



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkersJoinAfterRevocations()  {
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance. We should not perform any revocations
+        // in this round
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Fourth assignment and a fourth worker joining
+        // Since the worker is joining immediately and within the rebalance delay
+        // there should not be any revoking rebalance
+        addNewEmptyWorkers("worker4");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4");
+        assertConnectorAllocations(0, 0, 1, 2);
+        assertTaskAllocations(0, 3, 3, 6);
+
+        // Add new worker immediately. Since a scheduled rebalance is in progress,
+        // There should still not be be any revocations
+        addNewEmptyWorkers("worker5");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
+        assertConnectorAllocations(0, 0, 0, 1, 2);
+        assertTaskAllocations(0, 0, 3, 3, 6);
+
+        // Add new worker but this time after crossing the delay.
+        // There would be revocations allowed
+        time.sleep(assignor.delay);
+        addNewEmptyWorkers("worker6");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
+        assertConnectorAllocations(0, 0, 0, 0, 1, 1);
+        assertTaskAllocations(0, 0, 0, 2, 2, 2);
+
+        // Follow up rebalance since there were revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
+        assertConnectorAllocations(0, 0, 0, 1, 1, 1);
+        assertTaskAllocations(2, 2, 2, 2, 2, 2);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testImmediateRevocationsWhenMaxDelayIs0()  {
+
+        // Customize assignor for this test case
+        rebalanceDelay = 0;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance but we should still revoke as rebalance delay is 0
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 1);
+        assertTaskAllocations(3, 3, 4);
+
+        // Follow up rebalance post revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(1, 1, 1);
+        assertTaskAllocations(4, 4, 4);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testSuccessiveRevocationsWhenMaxDelayIsEqualToExpBackOffInitialInterval() {
+
+        rebalanceDelay = 1;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned

Review Comment:
   3 connectors?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -877,15 +1030,9 @@ public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted()
         assertDelay(0);
         assertWorkers("worker1", "worker2");
         assertConnectorAllocations(0, 1);
-        assertTaskAllocations(0, 4);
-
-        // Third assignment after revocations
-        performStandardRebalance();
-        assertDelay(0);
-        assertConnectorAllocations(0, 1);
         assertTaskAllocations(0, 2);
 
-        // fourth rebalance after revocations
+        // Third assignment after revocations

Review Comment:
   The comment in L1042 should change to 
   // Fourth rebalance should not change assignments



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkersJoinAfterRevocations()  {
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned

Review Comment:
   3 connectors?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkersJoinAfterRevocations()  {
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance. We should not perform any revocations
+        // in this round
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Fourth assignment and a fourth worker joining
+        // Since the worker is joining immediately and within the rebalance delay
+        // there should not be any revoking rebalance
+        addNewEmptyWorkers("worker4");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4");
+        assertConnectorAllocations(0, 0, 1, 2);
+        assertTaskAllocations(0, 3, 3, 6);
+
+        // Add new worker immediately. Since a scheduled rebalance is in progress,
+        // There should still not be be any revocations
+        addNewEmptyWorkers("worker5");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
+        assertConnectorAllocations(0, 0, 0, 1, 2);
+        assertTaskAllocations(0, 0, 3, 3, 6);
+
+        // Add new worker but this time after crossing the delay.
+        // There would be revocations allowed
+        time.sleep(assignor.delay);
+        addNewEmptyWorkers("worker6");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
+        assertConnectorAllocations(0, 0, 0, 0, 1, 1);
+        assertTaskAllocations(0, 0, 0, 2, 2, 2);
+
+        // Follow up rebalance since there were revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
+        assertConnectorAllocations(0, 0, 0, 1, 1, 1);
+        assertTaskAllocations(2, 2, 2, 2, 2, 2);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testImmediateRevocationsWhenMaxDelayIs0()  {
+
+        // Customize assignor for this test case
+        rebalanceDelay = 0;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned

Review Comment:
   3 connectors?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1315723471

   Thanks @vamossagar12. I've run the tests locally and they've succeeded; will wait to see what happens with CI but I think this should be okay.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1402008138

   @C0urante , thanks for your response. Makes sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1232878652

   Also, I have made the changes to use the existing delay/scheduledRebalance mechanism to delay the revocation. cc @showuon , @C0urante 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r961848839


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);

Review Comment:
   By gating the delay behind `revokedInPrevious`, we're always allowing the first round that would require allocation-balancing revocations to actually include those revocations. The proposal I made should only cause us to introduce the delay (and gate revocations behind that delay) after the second consecutive rebalance that would require allocation-balancing revocations.
   
   I'd also like to reiterate this point:
   > we should only issue an assignment with a delay when that delay is the time until the re-allocation of lost assignments and/or the next round where we will permit allocation-balancing revocations.
   
   I agree that if we feel comfortable permitting revocations in a given round, then we should just go ahead and include those revocations in the assignment--but in that case, we don't need to include a delay in the assignment, especially since workers will all automatically rejoin the cluster after they've revoked the appropriate connectors/tasks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r978581355


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Thanks Chris/Luke. Sure let me revoke the `canRevoke` flag.  Also, Chris, I looked at your PR and it's closed but not merged. I see some changes specially =>
   ```
   Perform load-balancing revocations any time the cluster appears imbalanced and there are still connectors and tasks that can be revoked from workers, instead of only when the number of workers in the cluster changes
   ```
   which sounds interesting (and remember discussing this on the ticket for this). Also, let me know if there's anything I can also contribute to for the other PR (Looking at your comment it appeared to me that you would be rebasing the commits from that closed PR and hence the question).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r963791864


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkersJoinAfterRevocations()  {
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance. We should not perform any revocations
+        // in this round
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Fourth assignment and a fourth worker joining
+        // Since the worker is joining immediately and within the rebalance delay
+        // there should not be any revoking rebalance
+        addNewEmptyWorkers("worker4");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4");
+        assertConnectorAllocations(0, 0, 1, 2);
+        assertTaskAllocations(0, 3, 3, 6);
+
+        // Add new worker immediately. Since a scheduled rebalance is in progress,
+        // There should still not be be any revocations
+        addNewEmptyWorkers("worker5");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
+        assertConnectorAllocations(0, 0, 0, 1, 2);
+        assertTaskAllocations(0, 0, 3, 3, 6);
+
+        // Add new worker but this time after crossing the delay.
+        // There would be revocations allowed
+        time.sleep(assignor.delay);
+        addNewEmptyWorkers("worker6");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
+        assertConnectorAllocations(0, 0, 0, 0, 1, 1);
+        assertTaskAllocations(0, 0, 0, 2, 2, 2);
+
+        // Follow up rebalance since there were revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
+        assertConnectorAllocations(0, 0, 0, 1, 1, 1);
+        assertTaskAllocations(2, 2, 2, 2, 2, 2);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testImmediateRevocationsWhenMaxDelayIs0()  {
+
+        // Customize assignor for this test case
+        rebalanceDelay = 0;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance but we should still revoke as rebalance delay is 0
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 1);
+        assertTaskAllocations(3, 3, 4);
+
+        // Follow up rebalance post revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(1, 1, 1);
+        assertTaskAllocations(4, 4, 4);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testSuccessiveRevocationsWhenMaxDelayIsEqualToExpBackOffInitialInterval() {
+
+        rebalanceDelay = 1;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance. We shouldn't revoke as maxDelay is 1 ms
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertDelay(1);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Follow up rebalance post revocations. No revocations should have happened
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);

Review Comment:
   We didn't. I just added it to show even if there's a rebalance it won't cause any revocation. It's unrealistic so to speak but thought would be useful to test. I can remove it if you think it doesn't make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r964585809


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -520,6 +554,9 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
                 newSubmissions.tasks().addAll(lostAssignments.tasks());
             }
             resetDelay();
+            // Resetting the flag as now we can permit successive revoking rebalances.
+            // since we have gone through the full rebalance delay
+            revokedInPrevious = false;

Review Comment:
   Yeah so point `#5`, with the approach I have would set numSuccessive = 2 and calculate backoff delay based on that. If I reset numSuccessive to 0 in `#3` from above, then at `#5`, it would be back to `rebalance: [revokedInPrevious = true, numSuccessive = 1] ` and delay would be equal to the delay we had at `#2` above. 
   
   Basically every successive revoking rebalance would have the same delay because numSuccessive would always be 1 as it's reset and incremented only once. Let me know if that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r964585809


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -520,6 +554,9 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
                 newSubmissions.tasks().addAll(lostAssignments.tasks());
             }
             resetDelay();
+            // Resetting the flag as now we can permit successive revoking rebalances.
+            // since we have gone through the full rebalance delay
+            revokedInPrevious = false;

Review Comment:
   Yeah so point `#5`, with the approach I have would set numSuccessive = 2 and calculate backoff delay based on that. If I reset numSuccessive to 0 in `#3` from above, then at `#5`, it would be back to `rebalance: [revokedInPrevious = true, numSuccessive = 1] ` and delay would be equal to the delay we had at `#2` above. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1316428275

   Yes, I have the same thought as Chris. Besides, this bug already exist for a long long time, it's not that urgent to put it into 3.3. So, let's keep it in 3.4 (trunk branch), which is what we currently did. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1316138924

   Yeah!!! Thank you @vamossagar12 @C0urante ! Nice team work!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1315589024

   @vamossagar12 the build is failing with a Checkstyle error; can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r967271849


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,13 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        if (delay > 0) {
+            log.debug("Delaying {}ms for revoking tasks.", delay);
+        }
         // Do not revoke resources for re-assignment while a delayed rebalance is active
-        // Also we do not revoke in two consecutive rebalances by the same leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", toRevoke);
+                    performTaskRevocation(configured, completeWorkerAssignment);

Review Comment:
   Thanks @showuon I reverted it back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r962096234


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -65,26 +67,32 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
     private final int maxDelay;
     private ConnectorsAndTasks previousAssignment;
     private final ConnectorsAndTasks previousRevocation;
-    private boolean canRevoke;
     // visible for testing
+    boolean revokedInPrevious;
     protected final Set<String> candidateWorkersForReassignment;
     protected long scheduledRebalance;
     protected int delay;
     protected int previousGenerationId;
     protected Set<String> previousMembers;
 
+    private final ExponentialBackoff consecutiveRevokingRebalancesBackoff;
+
+    private int numSuccessiveRevokingRebalances;
+
     public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay) {
         this.log = logContext.logger(IncrementalCooperativeAssignor.class);
         this.time = time;
         this.maxDelay = maxDelay;
         this.previousAssignment = ConnectorsAndTasks.EMPTY;
         this.previousRevocation = new ConnectorsAndTasks.Builder().build();
-        this.canRevoke = true;
         this.scheduledRebalance = 0;
+        this.revokedInPrevious = false;
         this.candidateWorkersForReassignment = new LinkedHashSet<>();
         this.delay = 0;
         this.previousGenerationId = -1;
         this.previousMembers = Collections.emptySet();
+        this.numSuccessiveRevokingRebalances = 0;
+        this.consecutiveRevokingRebalancesBackoff = new ExponentialBackoff(10, 20, maxDelay, 0.2);

Review Comment:
   yeah i recall now that even i had tested it with 0 initial interval and it was always returning 0. Let me try to incorporate this . 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r962288239


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -65,26 +67,32 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
     private final int maxDelay;
     private ConnectorsAndTasks previousAssignment;
     private final ConnectorsAndTasks previousRevocation;
-    private boolean canRevoke;
     // visible for testing
+    boolean revokedInPrevious;
     protected final Set<String> candidateWorkersForReassignment;
     protected long scheduledRebalance;
     protected int delay;
     protected int previousGenerationId;
     protected Set<String> previousMembers;
 
+    private final ExponentialBackoff consecutiveRevokingRebalancesBackoff;
+
+    private int numSuccessiveRevokingRebalances;
+
     public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay) {
         this.log = logContext.logger(IncrementalCooperativeAssignor.class);
         this.time = time;
         this.maxDelay = maxDelay;
         this.previousAssignment = ConnectorsAndTasks.EMPTY;
         this.previousRevocation = new ConnectorsAndTasks.Builder().build();
-        this.canRevoke = true;
         this.scheduledRebalance = 0;
+        this.revokedInPrevious = false;
         this.candidateWorkersForReassignment = new LinkedHashSet<>();
         this.delay = 0;
         this.previousGenerationId = -1;
         this.previousMembers = Collections.emptySet();
+        this.numSuccessiveRevokingRebalances = 0;
+        this.consecutiveRevokingRebalancesBackoff = new ExponentialBackoff(10, 20, maxDelay, 0.2);

Review Comment:
   I added some logic to set delay to 0 when maxDelay is 0 in which case we would always revoke. I bumped up the multiplier from 20-> 30 as well since initial interval is reduced now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r962288306


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +119,93 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkerJoinsAfterRevocations() throws InterruptedException {
+
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        addNewConnector("connector3", 4);
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+        // Flag should not be set
+        assertFalse(assignor.revokedInPrevious);

Review Comment:
   Removed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r963252105


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,13 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        if (delay > 0) {
+            log.debug("Delaying {}ms for revoking tasks.", delay);
+        }
         // Do not revoke resources for re-assignment while a delayed rebalance is active
-        // Also we do not revoke in two consecutive rebalances by the same leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", toRevoke);
+                    performTaskRevocation(configured, completeWorkerAssignment);

Review Comment:
   Thanks @vamossagar12 and @showuon, I do agree that this change makes sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r963789374


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -520,6 +554,9 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
                 newSubmissions.tasks().addAll(lostAssignments.tasks());
             }
             resetDelay();
+            // Resetting the flag as now we can permit successive revoking rebalances.
+            // since we have gone through the full rebalance delay
+            revokedInPrevious = false;

Review Comment:
   I did think about it.. Problem is if we set the counter to 0 here every time then the exponential backoff would only go upto initial interval after every successive revoking rebalance. The other reason I didn't reset this is that since max interval is maxDelay, this value can be incremented w/o introducing delays > maxDelay. One thing to consider could be can this value overflow if we let it increase. Let me know what you think about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r964939766


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,13 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        if (delay > 0) {
+            log.debug("Delaying {}ms for revoking tasks.", delay);
+        }
         // Do not revoke resources for re-assignment while a delayed rebalance is active
-        // Also we do not revoke in two consecutive rebalances by the same leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", toRevoke);
+                    performTaskRevocation(configured, completeWorkerAssignment);

Review Comment:
   Switching from `configured` to `activeAssignments` helps in some scenarios, but not others. I found a few cases to demonstrate this while reviewing https://github.com/apache/kafka/pull/10367, documented [here](https://github.com/apache/kafka/pull/10367#discussion_r810393411).
   
   I wonder if we can leave this change out for now and then address the remaining issues with incremental rebalancing in a follow-up PR? There are a lot of edge cases to consider and I don't want to block the primary change here of allowing consecutive revocations (which is super useful on its own!) by tying it to thinking through all the implications for a change that can be made independently.
   
   @showuon what are your thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r964690356


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkersJoinAfterRevocations()  {
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance. We should not perform any revocations
+        // in this round
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Fourth assignment and a fourth worker joining
+        // Since the worker is joining immediately and within the rebalance delay
+        // there should not be any revoking rebalance
+        addNewEmptyWorkers("worker4");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4");
+        assertConnectorAllocations(0, 0, 1, 2);
+        assertTaskAllocations(0, 3, 3, 6);
+
+        // Add new worker immediately. Since a scheduled rebalance is in progress,
+        // There should still not be be any revocations
+        addNewEmptyWorkers("worker5");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
+        assertConnectorAllocations(0, 0, 0, 1, 2);
+        assertTaskAllocations(0, 0, 3, 3, 6);
+
+        // Add new worker but this time after crossing the delay.
+        // There would be revocations allowed
+        time.sleep(assignor.delay);
+        addNewEmptyWorkers("worker6");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
+        assertConnectorAllocations(0, 0, 0, 0, 1, 1);
+        assertTaskAllocations(0, 0, 0, 2, 2, 2);
+
+        // Follow up rebalance since there were revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6");
+        assertConnectorAllocations(0, 0, 0, 1, 1, 1);
+        assertTaskAllocations(2, 2, 2, 2, 2, 2);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testImmediateRevocationsWhenMaxDelayIs0()  {
+
+        // Customize assignor for this test case
+        rebalanceDelay = 0;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance but we should still revoke as rebalance delay is 0
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 1);
+        assertTaskAllocations(3, 3, 4);
+
+        // Follow up rebalance post revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(1, 1, 1);
+        assertTaskAllocations(4, 4, 4);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testSuccessiveRevocationsWhenMaxDelayIsEqualToExpBackOffInitialInterval() {
+
+        rebalanceDelay = 1;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker joining.
+        // This is a successive revoking rebalance. We shouldn't revoke as maxDelay is 1 ms
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertDelay(1);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Follow up rebalance post revocations. No revocations should have happened
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);

Review Comment:
   Thank you. Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1400491466

   @vamossagar12 I don't think it's necessary to call this out anywhere unless it's caused unexpected issues with our users. The intention behind the exponential backoff is to avoid rebalance storms but I'm still not sure when those would ever realistically happen, so until we see otherwise, I'd prefer to leave it as an internal implementation detail.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r975980445


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   TBH, even I didn't want to re-introduce the flag back but seemed the easiest way to get around the regression. I guess, as you said it might be easier to work through other issues on the allocation algorithm to finally have the flag redundant.
   
   Regarding the side-effects of the re-introduction of this flag, I had imagined that adding the flag back would break some of the tests but that didn't happen which may or mayn't be a good thing. I did look at the logic again and compared with the original algorithm it seemed to me that this line:
   
   https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L310
   
   is the line that prevented successive revoking rebalances. The other check here: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L312 
   
   gives us a window to set it to true and force a revocation in the next round which kind of made me believe that it should be a safe check. That said, if there are scenarios where we think we need testing, I would be happy to do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r1004454702


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##########
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
                 Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
+                Collections.emptyList(), 1,

Review Comment:
   Thanks @C0urante ! I have added some comments for some understanding and around naming. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1229396505

   @C0urante , i made the changes based on my understanding of your suggestions. Plz review whenever you get the chance. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1315549977

   @C0urante , you want to take a final pass on this one whenever you get the chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r957470125


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +307,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                long processAfter = consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", processAfter);
+                time.sleep(processAfter); // Is it a good idea to sleep?

Review Comment:
   yeah that's what i thought as well. Using `delay/scheduledRebalance` might be a good idea. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r957468730


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -584,9 +606,18 @@ private Map<String, ConnectorsAndTasks> performTaskRevocation(ConnectorsAndTasks
         Collection<WorkerLoad> existingWorkers = completeWorkerAssignment.stream()
                 .filter(wl -> wl.size() > 0)
                 .collect(Collectors.toList());
-        int existingWorkersNum = existingWorkers.size();
+
+        // existingWorkers => workers with load > 0.
+        // if existingWorkers is empty, no workers have any load.
+        int existingWorkersNum = existingWorkers.size(); // count of workers having non-zero load.
         int totalWorkersNum = completeWorkerAssignment.size();
-        int newWorkersNum = totalWorkersNum - existingWorkersNum;
+        int newWorkersNum = totalWorkersNum - existingWorkersNum; // <total workers - workers with non-0 load>
+        // if existingWorkersNum == 0, no workers have any load i.e all workers are new.
+        // newWorkersNum = 0 when all workers have non-zero load.

Review Comment:
   I need to remove the comments. It was only for my understanding 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r959527478


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +307,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                long processAfter = consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", processAfter);
+                time.sleep(processAfter); // Is it a good idea to sleep?

Review Comment:
   I have made the changes to use the existing delay/scheduledRebalance mechanism to delay the revocation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r956850067


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -584,9 +606,18 @@ private Map<String, ConnectorsAndTasks> performTaskRevocation(ConnectorsAndTasks
         Collection<WorkerLoad> existingWorkers = completeWorkerAssignment.stream()
                 .filter(wl -> wl.size() > 0)
                 .collect(Collectors.toList());
-        int existingWorkersNum = existingWorkers.size();
+
+        // existingWorkers => workers with load > 0.
+        // if existingWorkers is empty, no workers have any load.
+        int existingWorkersNum = existingWorkers.size(); // count of workers having non-zero load.
         int totalWorkersNum = completeWorkerAssignment.size();
-        int newWorkersNum = totalWorkersNum - existingWorkersNum;
+        int newWorkersNum = totalWorkersNum - existingWorkersNum; // <total workers - workers with non-0 load>
+        // if existingWorkersNum == 0, no workers have any load i.e all workers are new.
+        // newWorkersNum = 0 when all workers have non-zero load.

Review Comment:
   newWorkersNum =[=] 0 ?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,11 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", delay == 0, delay);

Review Comment:
   Since we don't have `canRevoke` now, I think we should rephrase this sentence. ex:
   ```java
   if (delay > 0)  
       log.debug("Delaying {}ms for revoking tasks.");
   ```
   WDYT?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +307,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                long processAfter = consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", processAfter);
+                time.sleep(processAfter); // Is it a good idea to sleep?

Review Comment:
   No, we can't sleep during the rebalance process. If there are some new tasks/connectors needed to distribute to workers, it'll be delayed for `processAfter` ms. I think we should use the existing `delay`/`scheduledRebalance` mechanism to delay the revocation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1228504571

   Thanks @vamossagar12. I haven't looked too closely at the code yet but have a couple high-level thoughts:
   
   1. We should not introduce a backoff delay the first time a new worker joins the cluster. We should only introduce the delay in between consecutive rounds with load-balancing revocation.
   2. We should reset the backoff delay as soon as we're able to complete a round of rebalancing that does not require any load-balancing revocations. IOW, as soon as everyone rejoins the cluster and we don't see any imbalances that need to be rectified by revoking connectors/tasks from one set of workers in one round, then re-allocating them to a different set of workers in a follow-up round. If we've just performed a round of revocation, and everyone has now joined the cluster and we're going to allocate those revoked connectors/tasks to new workers, as long as that allocation would lead to a balanced assignment, we can reset the clock.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r961593073


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);

Review Comment:
   That's a good point and I didn't think about it. I used jitter to add some randomness but as you pointed out, it may not be needed. Would remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1235405233

   Thanks @C0urante . I had one question on the proposed approach regarding delays. Since the other changes are smallish in nature, I would wait for your response on that one before making the rest of the changes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r962288821


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);

Review Comment:
   ```
   Or do we not account for that with the assumption that members will rejoin with a new round of rebalance immediately after a revoking round of rebalance anyway
   ```
   
   I guess we go by this assumption.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r962156128


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,13 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        if (delay > 0) {
+            log.debug("Delaying {}ms for revoking tasks.", delay);
+        }
         // Do not revoke resources for re-assignment while a delayed rebalance is active
-        // Also we do not revoke in two consecutive rebalances by the same leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", toRevoke);
+                    performTaskRevocation(configured, completeWorkerAssignment);

Review Comment:
   Why are we changing the first parameter here from `activeAssignments` -> `configured`? `configured` is basically the latest set of tasks and connectors as per the leader's view of the config topic, whereas `activeAssignments` is the assimilation of tasks and connectors from the member assignments of the group in the current round of rebalancing. Is this so that we don't take into account deleted connectors/tasks while doing revocation? This will also take into account new connectors/tasks as well as connectors/tasks lost from the last assignment while calculating the average connector/task load per worker for performing revocations - is this intentional? This seems reasonable to do since if there's no scheduled delayed rebalance, the new as well as lost connector/tasks will be assigned to the workers in this round of rebalance anyway. But I'm just wondering if there's a reason why we were doing revocations only based on existing member assignments earlier.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);

Review Comment:
   Yeah, +1 to @C0urante's point that scheduling a rebalance here doesn't seem to serve any purpose?
   
   > I think a better approach would be to recompute the potential backoff delay between consecutive allocation-balancing revocations if delay == 0, and if it is non-zero, then skip those revocations during the current round.
   
   In this case, we wouldn't know how much time has passed between the previous round of revoking rebalance and the current rebalance right? Or do we not account for that with the assumption that members will rejoin with a new round of rebalance immediately after a revoking round of rebalance anyway?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r966783019


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,13 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        if (delay > 0) {
+            log.debug("Delaying {}ms for revoking tasks.", delay);
+        }
         // Do not revoke resources for re-assignment while a delayed rebalance is active
-        // Also we do not revoke in two consecutive rebalances by the same leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", toRevoke);
+                    performTaskRevocation(configured, completeWorkerAssignment);

Review Comment:
   @showuon let me know what you think about this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1239339745

   > LGTM! Thanks for the improvement!
   
   Thank you !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org