You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/11/09 15:54:56 UTC

(kafka) branch 3.6 updated: KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 6b4ba0eb625 KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647)
6b4ba0eb625 is described below

commit 6b4ba0eb6252f38e95fd25ac7b678e0a5b3f455f
Author: Chris Egerton <ch...@aiven.io>
AuthorDate: Thu Nov 9 10:48:43 2023 -0500

    KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647)
    
    Reviewers: Sagar Rao <sa...@gmail.com>, Yash Mayya <ya...@gmail.com>
---
 .../IncrementalCooperativeAssignor.java            | 23 ++++---
 .../runtime/distributed/WorkerCoordinator.java     |  6 ++
 .../IncrementalCooperativeAssignorTest.java        | 77 ++++++++++++++++++++++
 3 files changed, 96 insertions(+), 10 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index d48589423dc..0836bf2c4ba 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -48,6 +48,7 @@ import java.util.stream.IntStream;
 
 import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
 import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.Assignment;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG;
 import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
 import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.LeaderState;
 import static org.apache.kafka.connect.util.ConnectUtils.combineCollections;
@@ -337,10 +338,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
 
         // The complete set of connectors and tasks that should be newly-assigned during this round
         ConnectorsAndTasks toAssign = new ConnectorsAndTasks.Builder()
-                .addConnectors(created.connectors())
-                .addTasks(created.tasks())
-                .addConnectors(lostAssignmentsToReassign.connectors())
-                .addTasks(lostAssignmentsToReassign.tasks())
+                .addAll(created)
+                .addAll(lostAssignmentsToReassign)
                 .build();
 
         assignConnectors(nextWorkerAssignment, toAssign.connectors());
@@ -460,8 +459,14 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
                     + "missing assignments that the leader is detecting are probably due to some "
                     + "workers failing to receive the new assignments in the previous rebalance. "
                     + "Will reassign missing tasks as new tasks");
-            lostAssignmentsToReassign.addConnectors(lostAssignments.connectors());
-            lostAssignmentsToReassign.addTasks(lostAssignments.tasks());
+            lostAssignmentsToReassign.addAll(lostAssignments);
+            return;
+        } else if (maxDelay == 0) {
+            log.debug("Scheduled rebalance delays are disabled ({} = 0); "
+                    + "reassigning all lost connectors and tasks immediately",
+                    SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG
+            );
+            lostAssignmentsToReassign.addAll(lostAssignments);
             return;
         }
 
@@ -498,8 +503,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
                 }
             } else {
                 log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks");
-                lostAssignmentsToReassign.addConnectors(lostAssignments.connectors());
-                lostAssignmentsToReassign.addTasks(lostAssignments.tasks());
+                lostAssignmentsToReassign.addAll(lostAssignments);
             }
             resetDelay();
             // Resetting the flag as now we can permit successive revoking rebalances.
@@ -840,8 +844,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
     private static void addAll(Map<String, ConnectorsAndTasks.Builder> base, Map<String, ConnectorsAndTasks> toAdd) {
         toAdd.forEach((worker, assignment) -> base
                 .computeIfAbsent(worker, w -> new ConnectorsAndTasks.Builder())
-                .addConnectors(assignment.connectors())
-                .addTasks(assignment.tasks())
+                .addAll(assignment)
         );
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 9ebe71e4692..fab9e0a65e7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -480,6 +480,12 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
                 return this;
             }
 
+            public ConnectorsAndTasks.Builder addAll(ConnectorsAndTasks connectorsAndTasks) {
+                return this
+                        .addConnectors(connectorsAndTasks.connectors())
+                        .addTasks(connectorsAndTasks.tasks());
+            }
+
             public ConnectorsAndTasks build() {
                 return new ConnectorsAndTasks(withConnectors, withTasks);
             }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index a07391e1152..3edb4d52dc0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -1037,6 +1037,83 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.delay);
     }
 
+    @Test
+    public void testLostAssignmentHandlingWhenScheduledDelayIsDisabled() {
+        // Customize assignor for this test case
+        rebalanceDelay = 0;
+        time = new MockTime();
+        initAssignor();
+
+        assertTrue(assignor.candidateWorkersForReassignment.isEmpty());
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+
+        Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
+        configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
+        configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
+        configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
+
+        // No lost assignments
+        assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
+                new ConnectorsAndTasks.Builder(),
+                new ArrayList<>(configuredAssignment.values()));
+
+        assertEquals("Wrong set of workers for reassignments",
+                Collections.emptySet(),
+                assignor.candidateWorkersForReassignment);
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+
+        assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
+
+        String veryFlakyWorker = "worker1";
+        WorkerLoad lostLoad = configuredAssignment.remove(veryFlakyWorker);
+        ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
+                .with(lostLoad.connectors(), lostLoad.tasks()).build();
+
+        // Lost assignments detected - Immediately reassigned
+        ConnectorsAndTasks.Builder lostAssignmentsToReassign = new ConnectorsAndTasks.Builder();
+        assignor.handleLostAssignments(lostAssignments, lostAssignmentsToReassign,
+                new ArrayList<>(configuredAssignment.values()));
+
+        assertEquals("Wrong set of workers for reassignments",
+                Collections.emptySet(),
+                assignor.candidateWorkersForReassignment);
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+        assertEquals("Wrong assignment of lost connectors",
+                lostAssignments.connectors(), lostAssignmentsToReassign.build().connectors());
+        assertEquals("Wrong assignment of lost tasks",
+                lostAssignments.tasks(), lostAssignmentsToReassign.build().tasks());
+    }
+
+    @Test
+    public void testScheduledDelayIsDisabled() {
+        // Customize assignor for this test case
+        rebalanceDelay = 0;
+        time = new MockTime();
+        initAssignor();
+
+        // First assignment with 2 workers and 2 connectors configured but not yet assigned
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(1, 1);
+        assertTaskAllocations(4, 4);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with only one worker remaining in the group. The worker that left the
+        // group was a follower. Re-assignments take place immediately
+        removeWorkers("worker2");
+        performStandardRebalance();
+        assertDelay(rebalanceDelay);
+        assertWorkers("worker1");
+        assertConnectorAllocations(2);
+        assertTaskAllocations(8);
+        assertBalancedAndCompleteAllocation();
+    }
+
     @Test
     public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() {
         // First assignment with 1 worker and 2 connectors configured but not yet assigned