You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2021/02/03 22:06:29 UTC

[kafka] branch 2.5 updated: KAFKA-10413: Allow for even distribution of lost/new tasks when multiple Connect workers join at the same time (#9319)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new a645c25  KAFKA-10413: Allow for even distribution of lost/new tasks when multiple Connect workers join at the same time (#9319)
a645c25 is described below

commit a645c25669b66ed1812f63846227d8adf8966b82
Author: Ramesh Krishnan M <ra...@gmail.com>
AuthorDate: Wed Feb 3 01:34:06 2021 +0530

    KAFKA-10413: Allow for even distribution of lost/new tasks when multiple Connect workers join at the same time (#9319)
    
    First issue: When more than one workers join the Connect group the incremental cooperative assignor revokes and reassigns at most average number of tasks per worker.
    Side-effect: This results in the additional workers joining the group stay idle and would require more future rebalances to happen to have even distribution of tasks.
    Fix: As part of task assignment calculation following a deployment, the reassignment of tasks are calculated by revoking all the tasks above the rounded up (ceil) average number of tasks.
    
    Second issue: When more than one worker is lost and rejoins the group at most one worker will be re assigned with the lost tasks from all the workers that left the group.
    Side-effect: In scenarios where more than one worker is lost and rejoins the group only one among them gets assigned all the partitions that were lost in the past. The additional workers that have joined would not get any task assigned to them until a rebalance that happens in future.
    Fix: As part fo lost task re assignment all the new workers that have joined the group would be considered for task assignment and would be assigned in a round robin fashion with the new tasks.
    
    Testing strategy :
    * System testing in a Kubernetes environment completed
    * New integration tests to test for balanced tasks
    * Updated unit tests.
    
    Co-authored-by: Rameshkrishnan Muthusamy <ra...@apple.com>
    Co-authored-by: Randall Hauch <rh...@gmail.com>
    Co-authored-by: Konstantine Karantasis <ko...@confluent.io>
    
    Reviewers: Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <k....@gmail.com>
---
 .../IncrementalCooperativeAssignor.java            | 54 +++++++++-----
 .../RebalanceSourceConnectorsIntegrationTest.java  | 84 ++++++++++++++++++++--
 .../IncrementalCooperativeAssignorTest.java        | 24 ++++---
 .../WorkerCoordinatorIncrementalTest.java          | 19 ++---
 4 files changed, 140 insertions(+), 41 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index 744a099..c3f2f4b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -34,7 +34,6 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.function.Function;
@@ -445,16 +444,34 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         if (scheduledRebalance > 0 && now >= scheduledRebalance) {
             // delayed rebalance expired and it's time to assign resources
             log.debug("Delayed rebalance expired. Reassigning lost tasks");
-            Optional<WorkerLoad> candidateWorkerLoad = Optional.empty();
+            List<WorkerLoad> candidateWorkerLoad = Collections.emptyList();
             if (!candidateWorkersForReassignment.isEmpty()) {
                 candidateWorkerLoad = pickCandidateWorkerForReassignment(completeWorkerAssignment);
             }
 
-            if (candidateWorkerLoad.isPresent()) {
-                WorkerLoad workerLoad = candidateWorkerLoad.get();
-                log.debug("A candidate worker has been found to assign lost tasks: {}", workerLoad.worker());
-                lostAssignments.connectors().forEach(workerLoad::assign);
-                lostAssignments.tasks().forEach(workerLoad::assign);
+            if (!candidateWorkerLoad.isEmpty()) {
+                log.debug("Assigning lost tasks to {} candidate workers: {}", 
+                        candidateWorkerLoad.size(),
+                        candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));
+                Iterator<WorkerLoad> candidateWorkerIterator = candidateWorkerLoad.iterator();
+                for (String connector : lostAssignments.connectors()) {
+                    // Loop over the the candidate workers as many times as it takes
+                    if (!candidateWorkerIterator.hasNext()) {
+                        candidateWorkerIterator = candidateWorkerLoad.iterator();
+                    }
+                    WorkerLoad worker = candidateWorkerIterator.next();
+                    log.debug("Assigning connector id {} to member {}", connector, worker.worker());
+                    worker.assign(connector);
+                }
+                candidateWorkerIterator = candidateWorkerLoad.iterator();
+                for (ConnectorTaskId task : lostAssignments.tasks()) {
+                    if (!candidateWorkerIterator.hasNext()) {
+                        candidateWorkerIterator = candidateWorkerLoad.iterator();
+                    }
+                    WorkerLoad worker = candidateWorkerIterator.next();
+                    log.debug("Assigning task id {} to member {}", task, worker.worker());
+                    worker.assign(task);
+                }
             } else {
                 log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks");
                 newSubmissions.connectors().addAll(lostAssignments.connectors());
@@ -498,13 +515,13 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
                 .collect(Collectors.toSet());
     }
 
-    private Optional<WorkerLoad> pickCandidateWorkerForReassignment(List<WorkerLoad> completeWorkerAssignment) {
+    private List<WorkerLoad> pickCandidateWorkerForReassignment(List<WorkerLoad> completeWorkerAssignment) {
         Map<String, WorkerLoad> activeWorkers = completeWorkerAssignment.stream()
                 .collect(Collectors.toMap(WorkerLoad::worker, Function.identity()));
         return candidateWorkersForReassignment.stream()
                 .map(activeWorkers::get)
                 .filter(Objects::nonNull)
-                .findFirst();
+                .collect(Collectors.toList());
     }
 
     /**
@@ -554,38 +571,37 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
         int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        log.debug("New rounded down (floor) average number of connectors per worker {}", floorConnectors);
+        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
+        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
+
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
         int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        log.debug("New rounded down (floor) average number of tasks per worker {}", floorTasks);
+        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
+        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
+        int numToRevoke;
 
-        int numToRevoke = floorConnectors;
         for (WorkerLoad existing : existingWorkers) {
             Iterator<String> connectors = existing.connectors().iterator();
+            numToRevoke = existing.connectorsSize() - ceilConnectors;
             for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
                 ConnectorsAndTasks resources = revoking.computeIfAbsent(
                     existing.worker(),
                     w -> new ConnectorsAndTasks.Builder().build());
                 resources.connectors().add(connectors.next());
             }
-            if (numToRevoke == 0) {
-                break;
-            }
         }
 
-        numToRevoke = floorTasks;
         for (WorkerLoad existing : existingWorkers) {
             Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
+            numToRevoke = existing.tasksSize() - ceilTasks;
+            log.debug("Tasks on worker {} is higher than ceiling, so revoking {} tasks", existing, numToRevoke);
             for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) {
                 ConnectorsAndTasks resources = revoking.computeIfAbsent(
                     existing.worker(),
                     w -> new ConnectorsAndTasks.Builder().build());
                 resources.tasks().add(tasks.next());
             }
-            if (numToRevoke == 0) {
-                break;
-            }
         }
 
         return revoking;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index be8ce61..f98a300 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -226,7 +227,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME + 3,
                 "Connector tasks did not stop in time.");
 
-        waitForCondition(this::assertConnectorAndTasksAreUnique,
+        waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
                 WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
     }
 
@@ -262,7 +263,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
                 "Connector tasks did not start in time.");
 
-        waitForCondition(this::assertConnectorAndTasksAreUnique,
+        waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
                 WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
     }
 
@@ -295,11 +296,70 @@ public class RebalanceSourceConnectorsIntegrationTest {
         connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 1,
                 "Connect workers did not start in time.");
 
-        waitForCondition(this::assertConnectorAndTasksAreUnique,
+        waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
                 WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
     }
 
-    private boolean assertConnectorAndTasksAreUnique() {
+    @Test
+    public void testMultipleWorkersRejoining() throws Exception {
+        // create test topic
+        connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the source connector
+        Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+
+        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+                "Connect workers did not start in time.");
+
+        // start a source connector
+        IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props));
+
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
+                "Connector tasks did not start in time.");
+
+        waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
+                WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
+
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+        connect.removeWorker();
+        connect.removeWorker();
+
+        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 2,
+                "Connect workers did not stop in time.");
+
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+        connect.addWorker();
+        connect.addWorker();
+
+        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+                "Connect workers did not start in time.");
+
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+        for (int i = 0; i < 4; ++i) {
+            connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + i, NUM_TASKS, "Connector tasks did not start in time.");
+        }
+
+        waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
+                WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
+    }
+
+    private Map<String, String> defaultSourceConnectorProps(String topic) {
+        // setup up props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put(TOPIC_CONFIG, topic);
+        props.put("throughput", String.valueOf(10));
+        props.put("messages.per.poll", String.valueOf(10));
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        return props;
+    }
+
+    private boolean assertConnectorAndTasksAreUniqueAndBalanced() {
         try {
             Map<String, Collection<String>> connectors = new HashMap<>();
             Map<String, Collection<String>> tasks = new HashMap<>();
@@ -313,7 +373,12 @@ public class RebalanceSourceConnectorsIntegrationTest {
             }
 
             int maxConnectors = connectors.values().stream().mapToInt(Collection::size).max().orElse(0);
+            int minConnectors = connectors.values().stream().mapToInt(Collection::size).min().orElse(0);
             int maxTasks = tasks.values().stream().mapToInt(Collection::size).max().orElse(0);
+            int minTasks = tasks.values().stream().mapToInt(Collection::size).min().orElse(0);
+
+            log.debug("Connector balance: {}", formatAssignment(connectors));
+            log.debug("Task balance: {}", formatAssignment(tasks));
 
             assertNotEquals("Found no connectors running!", maxConnectors, 0);
             assertNotEquals("Found no tasks running!", maxTasks, 0);
@@ -323,6 +388,8 @@ public class RebalanceSourceConnectorsIntegrationTest {
             assertEquals("Task assignments are not unique: " + tasks,
                     tasks.values().size(),
                     tasks.values().stream().distinct().collect(Collectors.toList()).size());
+            assertTrue("Connectors are imbalanced: " + formatAssignment(connectors), maxConnectors - minConnectors < 2);
+            assertTrue("Tasks are imbalanced: " + formatAssignment(tasks), maxTasks - minTasks < 2);
             return true;
         } catch (Exception e) {
             log.error("Could not check connector state info.", e);
@@ -330,4 +397,13 @@ public class RebalanceSourceConnectorsIntegrationTest {
         }
     }
 
+    private static String formatAssignment(Map<String, Collection<String>> assignment) {
+        StringBuilder result = new StringBuilder();
+        for (String worker : assignment.keySet().stream().sorted().collect(Collectors.toList())) {
+            result.append(String.format("\n%s=%s", worker, assignment.getOrDefault(worker,
+                    Collections.emptyList())));
+        }
+        return result.toString();
+    }
+
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index 684da46..0fe1531 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -989,18 +989,24 @@ public class IncrementalCooperativeAssignorTest {
         assignor.handleLostAssignments(lostAssignments, newSubmissions,
                 new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
-        // newWorker joined first, so should be picked up first as a candidate for reassignment
+        // both the newWorkers would need to be considered for re assignment of connectors and tasks
+        List<String> listOfConnectorsInLast2Workers = new ArrayList<>();
+        listOfConnectorsInLast2Workers.addAll(configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build())
+            .connectors());
+        listOfConnectorsInLast2Workers.addAll(configuredAssignment.getOrDefault(flakyWorker, new WorkerLoad.Builder(flakyWorker).build())
+            .connectors());
+        List<ConnectorTaskId> listOfTasksInLast2Workers = new ArrayList<>();
+        listOfTasksInLast2Workers.addAll(configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build())
+            .tasks());
+        listOfTasksInLast2Workers.addAll(configuredAssignment.getOrDefault(flakyWorker, new WorkerLoad.Builder(flakyWorker).build())
+            .tasks());
         assertTrue("Wrong assignment of lost connectors",
-                configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build())
-                        .connectors()
-                        .containsAll(lostAssignments.connectors()));
+            listOfConnectorsInLast2Workers.containsAll(lostAssignments.connectors()));
         assertTrue("Wrong assignment of lost tasks",
-                configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build())
-                        .tasks()
-                        .containsAll(lostAssignments.tasks()));
+            listOfTasksInLast2Workers.containsAll(lostAssignments.tasks()));
         assertThat("Wrong set of workers for reassignments",
-                Collections.emptySet(),
-                is(assignor.candidateWorkersForReassignment));
+            Collections.emptySet(),
+            is(assignor.candidateWorkersForReassignment));
         assertEquals(0, assignor.scheduledRebalance);
         assertEquals(0, assignor.delay);
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
index 04c764c..ee4bf6a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
@@ -302,23 +302,24 @@ public class WorkerCoordinatorIncrementalTest {
 
         result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);
 
+        //Equally distributing tasks across member
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
-                Collections.emptyList(), 0,
-                Collections.emptyList(), 2,
-                leaderAssignment);
+            Collections.emptyList(), 0,
+            Collections.emptyList(), 1,
+            leaderAssignment);
 
         memberAssignment = deserializeAssignment(result, memberId);
         assertAssignment(leaderId, offset,
-                Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
-                memberAssignment);
+            Collections.emptyList(), 0,
+            Collections.emptyList(), 1,
+            memberAssignment);
 
         ExtendedAssignment anotherMemberAssignment = deserializeAssignment(result, anotherMemberId);
         assertAssignment(leaderId, offset,
-                Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
-                anotherMemberAssignment);
+            Collections.emptyList(), 0,
+            Collections.emptyList(), 0,
+            anotherMemberAssignment);
 
         verify(configStorage, times(configStorageCalls)).snapshot();
     }