You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/31 18:14:02 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r959706989


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);

Review Comment:
   We allow values up to `Integer.MAX_VALUE` for the `scheduled.rebalance.max.delay.ms` property. If we allow for jitter with our exponential backoff, this cast to `int` may lead to overflow when `scheduled.rebalance.max.delay.ms` has very large values.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);
+                scheduledRebalance = time.milliseconds() + delay;
+            } else if (!toExplicitlyRevoke.isEmpty()) {
+                // We had a revocation in this round but not in the previous round. Let's store that state.
+                log.debug("Revoking rebalance. Setting the revokedInPrevious flag to true");
+                revokedInPrevious = true;
+            } else if (revokedInPrevious) {
+                // No revocations in this round but the previous round had one. Probably the workers
+                // have converged to a balanced load. We can reset the rebalance clock
+                log.debug("Previous round had revocations but this round didn't. Probably, the cluster has reached a " +
+                        "balanced load. Resetting the exponential backoff clock");
+                numSuccessiveRevokingRebalances = 0;
+                revokedInPrevious = false;
+            } else {
+                // revokedInPrevious is false and no revocations needed in this round. no-op.
+                log.debug("No revocations in previous and current round.");
+            }
         } else {
-            canRevoke = delay == 0;
+            log.debug("Connector and task to revoke assignments: {}", toRevoke);

Review Comment:
   Why only log this in the `else` branch? Isn't it applicable in both cases?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);
+                scheduledRebalance = time.milliseconds() + delay;
+            } else if (!toExplicitlyRevoke.isEmpty()) {
+                // We had a revocation in this round but not in the previous round. Let's store that state.
+                log.debug("Revoking rebalance. Setting the revokedInPrevious flag to true");

Review Comment:
   The name of the flag is probably not very useful to users who aren't familiar with Java, and there's no guarantee that this log line will remain in sync with changes to the field name or type. This could be clearer with something like:
   ```suggestion
                   log.debug("Performing allocation-balancing revocation immediately as no revocations took place during the previous rebalance");
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to wait for {} ms", delay);

Review Comment:
   This isn't actually true, is it? We'll need to wait `delay` ms to perform the _next_ allocation-balancing revocation (if another one is necessary), but we're performing the revocation for the current round immediately.
   
   This also highlights a larger issue with the approach here: we should only issue an assignment with a delay when that delay is the time until the re-allocation of lost assignments and/or the next round where we will permit allocation-balancing revocations. With this approach, we'll perform allocation-balancing revocations immediately, but also issue an assignment with a delay, which doesn't really serve much purpose since workers automatically rejoin the group (triggering a rebalance) immediately after every round that included a revocation (see [here](https://github.com/apache/kafka/blob/6f4778301b1fcac1e2750cc697043d674eaa230d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1567-L1572)).
   
   I think a better approach would be to recompute the potential backoff delay between consecutive allocation-balancing revocations if `delay == 0`, and if it is non-zero, then skip those revocations during the current round.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +119,93 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkerJoinsAfterRevocations() throws InterruptedException {
+
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        addNewConnector("connector3", 4);
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+        // Flag should not be set
+        assertFalse(assignor.revokedInPrevious);

Review Comment:
   Why are we making assertions against this internal detail? Shouldn't the logic introduced in this PR be visible already by making assertions on connector/task allocations and the delay in each assignment?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -65,26 +67,32 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
     private final int maxDelay;
     private ConnectorsAndTasks previousAssignment;
     private final ConnectorsAndTasks previousRevocation;
-    private boolean canRevoke;
     // visible for testing
+    boolean revokedInPrevious;
     protected final Set<String> candidateWorkersForReassignment;
     protected long scheduledRebalance;
     protected int delay;
     protected int previousGenerationId;
     protected Set<String> previousMembers;
 
+    private final ExponentialBackoff consecutiveRevokingRebalancesBackoff;
+
+    private int numSuccessiveRevokingRebalances;
+
     public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay) {
         this.log = logContext.logger(IncrementalCooperativeAssignor.class);
         this.time = time;
         this.maxDelay = maxDelay;
         this.previousAssignment = ConnectorsAndTasks.EMPTY;
         this.previousRevocation = new ConnectorsAndTasks.Builder().build();
-        this.canRevoke = true;
         this.scheduledRebalance = 0;
+        this.revokedInPrevious = false;
         this.candidateWorkersForReassignment = new LinkedHashSet<>();
         this.delay = 0;
         this.previousGenerationId = -1;
         this.previousMembers = Collections.emptySet();
+        this.numSuccessiveRevokingRebalances = 0;
+        this.consecutiveRevokingRebalancesBackoff = new ExponentialBackoff(10, 20, maxDelay, 0.2);

Review Comment:
   > Should we expose the initial interval parameter so that users can set it?
   
   I don't think so; that would require a KIP to introduce a new user-facing configuration property.
   
   > Also, the exp base(second parameter) I believe I have set it to a slightly high value which goes upto 4 seconds in the second attempt and about a minute in the third attempt and so on.
   
   I think this is alright 👍
   I don't see why we need jitter, though, and I'm hesitant to add it since it can lead to rebalance delays that exceed the `scheduled.rebalance.max.delay.ms` parameter. Is there a reason you opted for jitter here?
   
   Also, it's worth noting that the behavior here is to always use a backoff of 10 ms if the `scheduled.rebalance.max.delay.ms` property is set to 10 or less. I'm not sure this is optimal; people may set the property to zero if they want to disable delayed rebalances altogether, and with this change, we'd be reintroducing delays with no way to disable them. Maybe we should:
   - Set the initial interval to zero
   - Check if the result of invoking `backoff` is zero, and if it is, skip the delay altogether and perform the revocation immediately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org