You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/17 09:54:21 UTC

[GitHub] [kafka] tim-patterson commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

tim-patterson commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r808822957



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1140,6 +1196,13 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
             consumersToFill.offer(consumer);
         }
 
+        // Update threadLoad
+        for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) {
+            final String consumer = taskEntry.getKey();
+            final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size();
+            threadLoad.put(consumer, totalCount);
+        }
+

Review comment:
       Agreed 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1029,104 +1041,148 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
 
     /**
      * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
-                                                          final SortedSet<String> consumers,
-                                                          final ClientState state) {
+    static Map<String, List<TaskId>> assignStatefulTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final ClientState state,
+                                                                  final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
 
                 for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    if (unassignedTasks.contains(task)) {
+                        final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                        if (threadTaskCount < minTasksPerThread) {
                             threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
+                            unassignedTasks.remove(task);
                         } else {
                             unassignedTaskToPreviousOwner.put(task, consumer);
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                if (threadTaskCount < minTasksPerThread) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
                     throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity,

Review comment:
       It took me a while to figure out again.
   Imagine the case where there's 4 threads,
   5 active tasks and 2 standby tasks.
   
   After assigning the actives, 1 of the threads already has 2 tasks.
   So when in comes to assigning the standbys the minTasksPerThread is calculated again at 1, so before we've even assigned any standbys we already have some threads at min+1.
   
   Thanks for raising this I'll make a comment :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1029,104 +1041,148 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
 
     /**
      * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
-                                                          final SortedSet<String> consumers,
-                                                          final ClientState state) {
+    static Map<String, List<TaskId>> assignStatefulTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final ClientState state,
+                                                                  final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
 
                 for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    if (unassignedTasks.contains(task)) {
+                        final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                        if (threadTaskCount < minTasksPerThread) {
                             threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
+                            unassignedTasks.remove(task);
                         } else {
                             unassignedTaskToPreviousOwner.put(task, consumer);
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                if (threadTaskCount < minTasksPerThread) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
                     throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity,
+            // the tasks still remaining that should now be distributed over the consumers that are still
+            // at min capacity
+            if (!unassignedTasks.isEmpty()) {
+                for (final String consumer : consumers) {
+                    final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0);
+                    if (taskCount == minTasksPerThread) {
+                        consumersToFill.add(consumer);

Review comment:
       As commented above

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1029,104 +1041,148 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
 
     /**
      * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
-                                                          final SortedSet<String> consumers,
-                                                          final ClientState state) {
+    static Map<String, List<TaskId>> assignStatefulTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final ClientState state,
+                                                                  final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
 
                 for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    if (unassignedTasks.contains(task)) {
+                        final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                        if (threadTaskCount < minTasksPerThread) {
                             threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
+                            unassignedTasks.remove(task);
                         } else {
                             unassignedTaskToPreviousOwner.put(task, consumer);
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                if (threadTaskCount < minTasksPerThread) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
                     throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity,
+            // the tasks still remaining that should now be distributed over the consumers that are still
+            // at min capacity
+            if (!unassignedTasks.isEmpty()) {
+                for (final String consumer : consumers) {
+                    final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0);
+                    if (taskCount == minTasksPerThread) {
+                        consumersToFill.add(consumer);
+                    }
+                }
 
                 // Go over the tasks we skipped earlier and assign them to their previous owner when possible
                 for (final Map.Entry<TaskId, String> taskEntry : unassignedTaskToPreviousOwner.entrySet()) {
                     final TaskId task = taskEntry.getKey();
                     final String consumer = taskEntry.getValue();
-                    if (consumersToFill.contains(consumer) && unassignedStatefulTasks.contains(task)) {
+                    if (consumersToFill.contains(consumer) && unassignedTasks.contains(task)) {
                         assignment.get(consumer).add(task);
-                        unassignedStatefulTasks.remove(task);
+                        unassignedTasks.remove(task);
                         // Remove this consumer since we know it is now at minCapacity + 1
                         consumersToFill.remove(consumer);
                     }
                 }
 
                 // Now just distribute the remaining unassigned stateful tasks over the consumers still at min capacity
-                for (final TaskId task : unassignedStatefulTasks) {
+                for (final TaskId task : unassignedTasks) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
                 }
+            }
+        }
+        // Update threadLoad
+        for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) {
+            final String consumer = taskEntry.getKey();
+            final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size();
+            threadLoad.put(consumer, totalCount);
+        }
 
+        return assignment;
+    }
 
-                // There must be at least one consumer still at min capacity while all the others are at min
-                // capacity + 1, so start distributing stateless tasks to get all consumers back to the same count
-                while (unassignedStatelessTasksIter.hasNext()) {
-                    final String consumer = consumersToFill.poll();
-                    if (consumer != null) {
-                        final TaskId task = unassignedStatelessTasksIter.next();
-                        unassignedStatelessTasksIter.remove();
-                        assignment.get(consumer).add(task);
-                    } else {
-                        break;
-                    }
-                }
+    static Map<String, List<TaskId>> assignStatelessTasksToThreads(final Collection<TaskId> statelessTasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final Map<String, Integer> threadLoad) {
+        final List<TaskId> tasksToAssign = new ArrayList<>(statelessTasksToAssign);
+        Collections.sort(tasksToAssign);
+        final Map<String, List<TaskId>> assignment = new HashMap<>();
+        for (final String consumer : consumers) {
+            assignment.put(consumer, new ArrayList<>());
+        }
+
+        int maxThreadLoad = 0;
+        for (final int load : threadLoad.values()) {
+            maxThreadLoad = Integer.max(maxThreadLoad, load);

Review comment:
       Yeah we can certainly try something like that.
   Instead of a boolean the method could take `statefulTasks` and `statelessTasks`,  for the first call we could call it with `statelessTasks = Collection.emptySet()`.
   I'll get the other stuff fixed first and then push it up as a separate commit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org