You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/19 07:49:06 UTC

[GitHub] [kafka] chia7712 commented on a change in pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

chia7712 commented on a change in pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#discussion_r597466050



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,
+                               boolean forTask) {
+        int floor = numberOfActives / numberOfTotalWorks;
+        int numberOfBiggers = numberOfActives % numberOfTotalWorks;
+        int ceil = numberOfBiggers == 0 ? floor : floor + 1;
         for (WorkerLoad existing : existingWorkers) {
-            Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
-            numToRevoke = existing.tasksSize() - ceilTasks;
-            log.debug("Tasks on worker {} is higher than ceiling, so revoking {} tasks", existing, numToRevoke);
-            for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) {
+            int currentSize = forTask ? existing.tasksSize() : existing.connectorsSize();
+            int expectedSize;
+            if (existingWorkers.size() == 1 || currentSize == 1) expectedSize = ceil;

Review comment:
       That is a specify case as the node having single task is already in balance. We can move the job to another node but it does not balance anything but it brings extra down-time.
   
   I will add comments for that case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org