You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/03 14:18:23 UTC

[GitHub] [kafka] yashmayya commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

yashmayya commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r962156128


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,13 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
+        if (delay > 0) {
+            log.debug("Delaying {}ms for revoking tasks.", delay);
+        }
         // Do not revoke resources for re-assignment while a delayed rebalance is active
-        // Also we do not revoke in two consecutive rebalances by the same leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", toRevoke);
+                    performTaskRevocation(configured, completeWorkerAssignment);

Review Comment:
   Why are we changing the first parameter here from `activeAssignments` -> `configured`? `configured` is basically the latest set of tasks and connectors as per the leader's view of the config topic, whereas `activeAssignments` is the assimilation of tasks and connectors from the member assignments of the group in the current round of rebalancing. Is this so that we don't take into account deleted connectors/tasks while doing revocation? This will also take into account new connectors/tasks as well as connectors/tasks lost from the last assignment while calculating the average connector/task load per worker for performing revocations - is this intentional? This seems reasonable to do since if there's no scheduled delayed rebalance, the new as well as lost connector/tasks will be assigned to the workers in this round of rebalance anyway. But I'm just wondering if there's a reason why we were doing revocations only based on existing member assignments earlier.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);

Review Comment:
   Yeah, +1 to @C0urante's point that scheduling a rebalance here doesn't seem to serve any purpose?
   
   > I think a better approach would be to recompute the potential backoff delay between consecutive allocation-balancing revocations if delay == 0, and if it is non-zero, then skip those revocations during the current round.
   
   In this case, we wouldn't know how much time has passed between the previous round of revoking rebalance and the current rebalance right? Or do we not account for that with the assumption that members will rejoin with a new round of rebalance immediately after a revoking round of rebalance anyway?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org