You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/02 06:18:15 UTC

[GitHub] [kafka] kkonstantine commented on a change in pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

kkonstantine commented on a change in pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#discussion_r568344980



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -554,15 +570,19 @@ private void resetDelay() {
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
         int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        log.debug("New rounded down (floor) average number of connectors per worker {}", floorConnectors);
+        int ceilConnectors = (int) Math.ceil((float) totalActiveConnectorsNum / totalWorkersNum);

Review comment:
       We can use the fact that these are non-negative integers. 
   
   ```suggestion
           int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -554,15 +570,19 @@ private void resetDelay() {
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
         int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        log.debug("New rounded down (floor) average number of connectors per worker {}", floorConnectors);
+        int ceilConnectors = (int) Math.ceil((float) totalActiveConnectorsNum / totalWorkersNum);
+        log.debug("New rounded down (floor) average number of connectors per worker floor connectors {} ciel connectors ", floorConnectors, ceilConnectors);
+
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
         int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        log.debug("New rounded down (floor) average number of tasks per worker {}", floorTasks);
+        int ceilTasks = (int) Math.ceil((float) totalActiveTasksNum / totalWorkersNum);

Review comment:
       We can use the fact that these are non-negative integers. 
   
   ```suggestion
           int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -260,7 +259,6 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
         // Do not revoke resources for re-assignment while a delayed rebalance is active
         // Also we do not revoke in two consecutive rebalances by the same leader
         canRevoke = delay == 0 && canRevoke;
-

Review comment:
       nit: unneeded change

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -554,15 +570,19 @@ private void resetDelay() {
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
         int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        log.debug("New rounded down (floor) average number of connectors per worker {}", floorConnectors);
+        int ceilConnectors = (int) Math.ceil((float) totalActiveConnectorsNum / totalWorkersNum);
+        log.debug("New rounded down (floor) average number of connectors per worker floor connectors {} ciel connectors ", floorConnectors, ceilConnectors);

Review comment:
       ```suggestion
           log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -554,15 +570,19 @@ private void resetDelay() {
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
         int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        log.debug("New rounded down (floor) average number of connectors per worker {}", floorConnectors);
+        int ceilConnectors = (int) Math.ceil((float) totalActiveConnectorsNum / totalWorkersNum);
+        log.debug("New rounded down (floor) average number of connectors per worker floor connectors {} ciel connectors ", floorConnectors, ceilConnectors);
+
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
         int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        log.debug("New rounded down (floor) average number of tasks per worker {}", floorTasks);
+        int ceilTasks = (int) Math.ceil((float) totalActiveTasksNum / totalWorkersNum);
+        log.debug("New average number of tasks per worker: floor= {}, ceiling= {}", floorTasks, ceilTasks);

Review comment:
       ```suggestion
           log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -554,15 +570,19 @@ private void resetDelay() {
         // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
         int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        log.debug("New rounded down (floor) average number of connectors per worker {}", floorConnectors);
+        int ceilConnectors = (int) Math.ceil((float) totalActiveConnectorsNum / totalWorkersNum);
+        log.debug("New rounded down (floor) average number of connectors per worker floor connectors {} ciel connectors ", floorConnectors, ceilConnectors);
+
 
         log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
         int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        log.debug("New rounded down (floor) average number of tasks per worker {}", floorTasks);
+        int ceilTasks = (int) Math.ceil((float) totalActiveTasksNum / totalWorkersNum);
+        log.debug("New average number of tasks per worker: floor= {}, ceiling= {}", floorTasks, ceilTasks);
+        int numToRevoke;
 
-        int numToRevoke = floorConnectors;
         for (WorkerLoad existing : existingWorkers) {
             Iterator<String> connectors = existing.connectors().iterator();
+            numToRevoke = existing.connectorsSize() - ceilConnectors;

Review comment:
       we should remove the `if` branch on the `numToRevoke` here too, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org