You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/07 16:27:16 UTC

[GitHub] [kafka] chia7712 opened a new pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

chia7712 opened a new pull request #10077:
URL: https://github.com/apache/kafka/pull/10077


   issue: https://issues.apache.org/jira/browse/KAFKA-12309
   
   ### Assignments:
   "W0" -> 8 connectors/tasks
   "W1" -> 8 connectors/tasks
   (New) "W2" -> 0 connectors/tasks
   
   ### Revoked connectors/tasks  (trunk)
   "W0" -> 2 connectors/tasks
   "W1" -> 2 connectors/tasks
   
   ### Revoked connectors/tasks (patch)
   "W0" -> 2 connectors/tasks
   "W1" -> 3 connectors/tasks
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#discussion_r597454163



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,
+                               boolean forTask) {
+        int floor = numberOfActives / numberOfTotalWorks;
+        int numberOfBiggers = numberOfActives % numberOfTotalWorks;
+        int ceil = numberOfBiggers == 0 ? floor : floor + 1;
         for (WorkerLoad existing : existingWorkers) {
-            Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
-            numToRevoke = existing.tasksSize() - ceilTasks;
-            log.debug("Tasks on worker {} is higher than ceiling, so revoking {} tasks", existing, numToRevoke);
-            for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) {
+            int currentSize = forTask ? existing.tasksSize() : existing.connectorsSize();
+            int expectedSize;
+            if (existingWorkers.size() == 1 || currentSize == 1) expectedSize = ceil;

Review comment:
       Why do we need this condition? And why under this case, we don't need to have `numberOfBiggers--`? After all, we assigned one ceiling to this worker, didn't we?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,

Review comment:
       And they can be `final`

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,

Review comment:
       Could we rename this parameters as `numberOfActiveTasks` to be more clear?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,
+                               boolean forTask) {
+        int floor = numberOfActives / numberOfTotalWorks;
+        int numberOfBiggers = numberOfActives % numberOfTotalWorks;

Review comment:
       I believe you're trying to name it `numberOfRemainder` (remainder: 餘數 XD)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#issuecomment-803772106


   @kkonstantine thanks for your feedback.
   
   > I'd like to return for a detailed review after we exclude stylistic changes that are not necessary. Also, it'd be good to enhance the description besides the example and call out the corner cases for which logic has been added.
   
   Sorry that I did not follow the code style in connect module. I have reverted all incorrect style. Also, I have updated the description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#discussion_r597466050



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,
+                               boolean forTask) {
+        int floor = numberOfActives / numberOfTotalWorks;
+        int numberOfBiggers = numberOfActives % numberOfTotalWorks;
+        int ceil = numberOfBiggers == 0 ? floor : floor + 1;
         for (WorkerLoad existing : existingWorkers) {
-            Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
-            numToRevoke = existing.tasksSize() - ceilTasks;
-            log.debug("Tasks on worker {} is higher than ceiling, so revoking {} tasks", existing, numToRevoke);
-            for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) {
+            int currentSize = forTask ? existing.tasksSize() : existing.connectorsSize();
+            int expectedSize;
+            if (existingWorkers.size() == 1 || currentSize == 1) expectedSize = ceil;

Review comment:
       That is a specify case as the node having single task is already in balance. We can move the job to another node but it does not balance anything but it brings extra down-time.
   
   I will add comments for that case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#discussion_r597461198



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,
+                               boolean forTask) {
+        int floor = numberOfActives / numberOfTotalWorks;
+        int numberOfBiggers = numberOfActives % numberOfTotalWorks;

Review comment:
       Oh, I got what you mean, maybe `numberOfCeilingMembers` would be more clear?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#issuecomment-802076787


   @rhauch Could you take a look? this uneven distributions can be reproduced easily so it would be nice to fix it asap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#discussion_r597833380



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,68 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
-
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
+
+        return revoking;
+    }
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
-            numToRevoke = existing.tasksSize() - ceilTasks;
-            log.debug("Tasks on worker {} is higher than ceiling, so revoking {} tasks", existing, numToRevoke);
-            for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) {
+    static void computeRevoked(final Map<String, ConnectorsAndTasks> revoking,
+                               final Collection<WorkerLoad> existingWorkers,
+                               final int numberOfWorkers,
+                               final int numberOfActiveTasks,
+                               final boolean forTask) {
+        final int floor = numberOfActiveTasks / numberOfWorkers;

Review comment:
       Refactoring similar code to common methods is good and improves readability. 
   
   But during code shuffling and especially if we add functional changes let's not apply also minor stylistic changes. 
   We don't usually use `final` for local variables in Connect. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#discussion_r597459388



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,
+                               boolean forTask) {
+        int floor = numberOfActives / numberOfTotalWorks;
+        int numberOfBiggers = numberOfActives % numberOfTotalWorks;

Review comment:
       I believe you're trying to name it `numberOfRemainder`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#discussion_r571642581



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
##########
@@ -359,7 +359,9 @@ private boolean assertConnectorAndTasksAreUniqueAndBalanced() {
                     tasks.values().size(),
                     tasks.values().stream().distinct().collect(Collectors.toList()).size());
             assertTrue("Connectors are imbalanced: " + formatAssignment(connectors), maxConnectors - minConnectors < 2);
+            if (minConnectors > 1) assertEquals("Some workers have no connectors", connectors.size(), connect.workers().size());

Review comment:
       make sure there is no idle worker




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#discussion_r597466256



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,
+                               boolean forTask) {
+        int floor = numberOfActives / numberOfTotalWorks;
+        int numberOfBiggers = numberOfActives % numberOfTotalWorks;

Review comment:
       ok. will copy that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#discussion_r597836689



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,

Review comment:
       Let's not change local variables and arguments with `final` when there isn't this style in place. Sorry for the back and forth.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#discussion_r597466185



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -562,50 +561,63 @@ private void resetDelay() {
                     existingWorkersNum, newWorkersNum, totalWorkersNum);
             // This is intentionally empty but mutable, because the map is used to include deleted
             // connectors and tasks as well
-            return revoking;
+            return Collections.emptyMap();
         }
 
         log.debug("Task revocation is required; workers with existing load: {} workers with "
                 + "no load {} total workers {}",
                 existingWorkersNum, newWorkersNum, totalWorkersNum);
 
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
-        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveConnectorsNum,
+            false
+        );
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
-        int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
-        log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
-        int numToRevoke;
+        computeRevoked(
+            revoking,
+            existingWorkers,
+            totalWorkersNum,
+            totalActiveTasksNum,
+            true
+        );
 
-        for (WorkerLoad existing : existingWorkers) {
-            Iterator<String> connectors = existing.connectors().iterator();
-            numToRevoke = existing.connectorsSize() - ceilConnectors;
-            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
-                ConnectorsAndTasks resources = revoking.computeIfAbsent(
-                    existing.worker(),
-                    w -> new ConnectorsAndTasks.Builder().build());
-                resources.connectors().add(connectors.next());
-            }
-        }
+        return revoking;
+    }
 
+    static void computeRevoked(Map<String, ConnectorsAndTasks> revoking,
+                               Collection<WorkerLoad> existingWorkers,
+                               int numberOfTotalWorks,
+                               int numberOfActives,

Review comment:
       nice catch. will copy that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org