You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/11/09 15:54:56 UTC
(kafka) branch 3.6 updated: KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 6b4ba0eb625 KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647)
6b4ba0eb625 is described below
commit 6b4ba0eb6252f38e95fd25ac7b678e0a5b3f455f
Author: Chris Egerton <ch...@aiven.io>
AuthorDate: Thu Nov 9 10:48:43 2023 -0500
KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647)
Reviewers: Sagar Rao <sa...@gmail.com>, Yash Mayya <ya...@gmail.com>
---
.../IncrementalCooperativeAssignor.java | 23 ++++---
.../runtime/distributed/WorkerCoordinator.java | 6 ++
.../IncrementalCooperativeAssignorTest.java | 77 ++++++++++++++++++++++
3 files changed, 96 insertions(+), 10 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index d48589423dc..0836bf2c4ba 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -48,6 +48,7 @@ import java.util.stream.IntStream;
import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.Assignment;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.LeaderState;
import static org.apache.kafka.connect.util.ConnectUtils.combineCollections;
@@ -337,10 +338,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
// The complete set of connectors and tasks that should be newly-assigned during this round
ConnectorsAndTasks toAssign = new ConnectorsAndTasks.Builder()
- .addConnectors(created.connectors())
- .addTasks(created.tasks())
- .addConnectors(lostAssignmentsToReassign.connectors())
- .addTasks(lostAssignmentsToReassign.tasks())
+ .addAll(created)
+ .addAll(lostAssignmentsToReassign)
.build();
assignConnectors(nextWorkerAssignment, toAssign.connectors());
@@ -460,8 +459,14 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
+ "missing assignments that the leader is detecting are probably due to some "
+ "workers failing to receive the new assignments in the previous rebalance. "
+ "Will reassign missing tasks as new tasks");
- lostAssignmentsToReassign.addConnectors(lostAssignments.connectors());
- lostAssignmentsToReassign.addTasks(lostAssignments.tasks());
+ lostAssignmentsToReassign.addAll(lostAssignments);
+ return;
+ } else if (maxDelay == 0) {
+ log.debug("Scheduled rebalance delays are disabled ({} = 0); "
+ + "reassigning all lost connectors and tasks immediately",
+ SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG
+ );
+ lostAssignmentsToReassign.addAll(lostAssignments);
return;
}
@@ -498,8 +503,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
}
} else {
log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks");
- lostAssignmentsToReassign.addConnectors(lostAssignments.connectors());
- lostAssignmentsToReassign.addTasks(lostAssignments.tasks());
+ lostAssignmentsToReassign.addAll(lostAssignments);
}
resetDelay();
// Resetting the flag as now we can permit successive revoking rebalances.
@@ -840,8 +844,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
private static void addAll(Map<String, ConnectorsAndTasks.Builder> base, Map<String, ConnectorsAndTasks> toAdd) {
toAdd.forEach((worker, assignment) -> base
.computeIfAbsent(worker, w -> new ConnectorsAndTasks.Builder())
- .addConnectors(assignment.connectors())
- .addTasks(assignment.tasks())
+ .addAll(assignment)
);
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 9ebe71e4692..fab9e0a65e7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -480,6 +480,12 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
return this;
}
+ public ConnectorsAndTasks.Builder addAll(ConnectorsAndTasks connectorsAndTasks) {
+ return this
+ .addConnectors(connectorsAndTasks.connectors())
+ .addTasks(connectorsAndTasks.tasks());
+ }
+
public ConnectorsAndTasks build() {
return new ConnectorsAndTasks(withConnectors, withTasks);
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index a07391e1152..3edb4d52dc0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -1037,6 +1037,83 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(0, assignor.delay);
}
+ @Test
+ public void testLostAssignmentHandlingWhenScheduledDelayIsDisabled() {
+ // Customize assignor for this test case
+ rebalanceDelay = 0;
+ time = new MockTime();
+ initAssignor();
+
+ assertTrue(assignor.candidateWorkersForReassignment.isEmpty());
+ assertEquals(0, assignor.scheduledRebalance);
+ assertEquals(0, assignor.delay);
+
+ Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
+ configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
+ configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
+ configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
+
+ // No lost assignments
+ assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
+ new ConnectorsAndTasks.Builder(),
+ new ArrayList<>(configuredAssignment.values()));
+
+ assertEquals("Wrong set of workers for reassignments",
+ Collections.emptySet(),
+ assignor.candidateWorkersForReassignment);
+ assertEquals(0, assignor.scheduledRebalance);
+ assertEquals(0, assignor.delay);
+
+ assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
+
+ String veryFlakyWorker = "worker1";
+ WorkerLoad lostLoad = configuredAssignment.remove(veryFlakyWorker);
+ ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
+ .with(lostLoad.connectors(), lostLoad.tasks()).build();
+
+ // Lost assignments detected - Immediately reassigned
+ ConnectorsAndTasks.Builder lostAssignmentsToReassign = new ConnectorsAndTasks.Builder();
+ assignor.handleLostAssignments(lostAssignments, lostAssignmentsToReassign,
+ new ArrayList<>(configuredAssignment.values()));
+
+ assertEquals("Wrong set of workers for reassignments",
+ Collections.emptySet(),
+ assignor.candidateWorkersForReassignment);
+ assertEquals(0, assignor.scheduledRebalance);
+ assertEquals(0, assignor.delay);
+ assertEquals("Wrong assignment of lost connectors",
+ lostAssignments.connectors(), lostAssignmentsToReassign.build().connectors());
+ assertEquals("Wrong assignment of lost tasks",
+ lostAssignments.tasks(), lostAssignmentsToReassign.build().tasks());
+ }
+
+ @Test
+ public void testScheduledDelayIsDisabled() {
+ // Customize assignor for this test case
+ rebalanceDelay = 0;
+ time = new MockTime();
+ initAssignor();
+
+ // First assignment with 2 workers and 2 connectors configured but not yet assigned
+ addNewEmptyWorkers("worker2");
+ performStandardRebalance();
+ assertDelay(0);
+ assertWorkers("worker1", "worker2");
+ assertConnectorAllocations(1, 1);
+ assertTaskAllocations(4, 4);
+ assertBalancedAndCompleteAllocation();
+
+ // Second assignment with only one worker remaining in the group. The worker that left the
+ // group was a follower. Re-assignments take place immediately
+ removeWorkers("worker2");
+ performStandardRebalance();
+ assertDelay(rebalanceDelay);
+ assertWorkers("worker1");
+ assertConnectorAllocations(2);
+ assertTaskAllocations(8);
+ assertBalancedAndCompleteAllocation();
+ }
+
@Test
public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() {
// First assignment with 1 worker and 2 connectors configured but not yet assigned