You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/19 16:55:33 UTC

[GitHub] [kafka] kkonstantine commented on a change in pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

kkonstantine commented on a change in pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#discussion_r597833380



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,68 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
-
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
+
+        return revoking;
+    }
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
-            numToRevoke = existing.tasksSize() - ceilTasks;
-            log.debug("Tasks on worker {} is higher than ceiling, so revoking {} tasks", existing, numToRevoke);
-            for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) {
+    static void computeRevoked(final Map<String, ConnectorsAndTasks> revoking,
+                               final Collection<WorkerLoad> existingWorkers,
+                               final int numberOfWorkers,
+                               final int numberOfActiveTasks,
+                               final boolean forTask) {
+        final int floor = numberOfActiveTasks / numberOfWorkers;

Review comment:
       Refactoring similar code to common methods is good and improves readability. 
   
   But during code shuffling and especially if we add functional changes let's not apply also minor stylistic changes. 
   We don't usually use `final` for local variables in Connect. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org