You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/19 07:36:11 UTC

[GitHub] [kafka] showuon commented on a change in pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

showuon commented on a change in pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#discussion_r597454163



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,
+                               boolean forTask) {
+        int floor = numberOfActives / numberOfTotalWorks;
+        int numberOfBiggers = numberOfActives % numberOfTotalWorks;
+        int ceil = numberOfBiggers == 0 ? floor : floor + 1;
         for (WorkerLoad existing : existingWorkers) {
-            Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
-            numToRevoke = existing.tasksSize() - ceilTasks;
-            log.debug("Tasks on worker {} is higher than ceiling, so revoking {} tasks", existing, numToRevoke);
-            for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) {
+            int currentSize = forTask ? existing.tasksSize() : existing.connectorsSize();
+            int expectedSize;
+            if (existingWorkers.size() == 1 || currentSize == 1) expectedSize = ceil;

Review comment:
       Why do we need this condition? And why under this case, we don't need to have `numberOfBiggers--`? After all, we assigned one ceiling to this worker, didn't we?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,

Review comment:
       And they can be `final`

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,

Review comment:
       Could we rename this parameters as `numberOfActiveTasks` to be more clear?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,
+                               boolean forTask) {
+        int floor = numberOfActives / numberOfTotalWorks;
+        int numberOfBiggers = numberOfActives % numberOfTotalWorks;

Review comment:
       I believe you're trying to name it `numberOfRemainder` (remainder: 餘數 XD)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org