You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/12 20:43:30 UTC

[GitHub] [kafka] tim-patterson opened a new pull request #11493: KAFKA-12959: Prioritize assigning standby tasks to threads without any active tasks

tim-patterson opened a new pull request #11493:
URL: https://github.com/apache/kafka/pull/11493


   Kafka Streams - Currently while distributing the standby tasks streams does not check if there are threads without any tasks or with less number of tasks. This can lead to few threads getting assigned both active and standby tasks when are threads within the same instance without any tasks assigned.
   
   This PR takes into account active task assignment when assigning standbys to threads to achieve a better balance of tasks across threads


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon merged pull request #11493: KAFKA-12959: Distribute standby and active tasks across threads to better balance load between threads

Posted by GitBox <gi...@apache.org>.
showuon merged pull request #11493:
URL: https://github.com/apache/kafka/pull/11493


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r810985840



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1028,116 +1045,113 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
     }
 
     /**
-     * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * Generate an assignment that tries to preserve thread-level stickiness for stateful tasks without violating
+     * balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
+    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                          final boolean isStateful,
                                                           final SortedSet<String> consumers,
-                                                          final ClientState state) {
+                                                          final ClientState state,
+                                                          final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread if these are stateful
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
-
-                for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
-                            threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
-                        } else {
-                            unassignedTaskToPreviousOwner.put(task, consumer);
+                // The number of tasks we have to assign here to hit minTasksPerThread
+                final int tasksTargetCount = minTasksPerThread - threadLoad.getOrDefault(consumer, 0);
+
+                if (isStateful) {
+                    for (final TaskId task : state.prevTasksByLag(consumer)) {
+                        if (unassignedTasks.contains(task)) {
+                            if (threadAssignment.size() < tasksTargetCount) {
+                                threadAssignment.add(task);
+                                unassignedTasks.remove(task);
+                            } else {
+                                unassignedTaskToPreviousOwner.put(task, consumer);
+                            }
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                if (threadAssignment.size() < tasksTargetCount) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
                     throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity.
+            // The min + 1 case can occur for standbys where there's fewer standbys than consumers and after assigning
+            // the active tasks some consumers already have min + 1 one tasks assigned.
+            // The tasks still remaining should now be distributed over the consumers that are still at min capacity
+            if (!unassignedTasks.isEmpty()) {
+                for (final String consumer : consumers) {
+                    final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0);
+                    if (taskCount == minTasksPerThread) {
+                        consumersToFill.add(consumer);
+                    }
+                }
 
                 // Go over the tasks we skipped earlier and assign them to their previous owner when possible
                 for (final Map.Entry<TaskId, String> taskEntry : unassignedTaskToPreviousOwner.entrySet()) {
                     final TaskId task = taskEntry.getKey();
                     final String consumer = taskEntry.getValue();
-                    if (consumersToFill.contains(consumer) && unassignedStatefulTasks.contains(task)) {
+                    if (consumersToFill.contains(consumer) && unassignedTasks.contains(task)) {
                         assignment.get(consumer).add(task);
-                        unassignedStatefulTasks.remove(task);
+                        unassignedTasks.remove(task);
                         // Remove this consumer since we know it is now at minCapacity + 1
                         consumersToFill.remove(consumer);
                     }
                 }
 
                 // Now just distribute the remaining unassigned stateful tasks over the consumers still at min capacity
-                for (final TaskId task : unassignedStatefulTasks) {
+                for (final TaskId task : unassignedTasks) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
                 }
-
-
-                // There must be at least one consumer still at min capacity while all the others are at min
-                // capacity + 1, so start distributing stateless tasks to get all consumers back to the same count
-                while (unassignedStatelessTasksIter.hasNext()) {
-                    final String consumer = consumersToFill.poll();
-                    if (consumer != null) {
-                        final TaskId task = unassignedStatelessTasksIter.next();
-                        unassignedStatelessTasksIter.remove();
-                        assignment.get(consumer).add(task);
-                    } else {
-                        break;
-                    }
-                }
             }
         }
-
-        // Now just distribute tasks while circling through all the consumers
-        consumersToFill.addAll(consumers);
-
-        while (unassignedStatelessTasksIter.hasNext()) {
-            final TaskId task = unassignedStatelessTasksIter.next();
-            final String consumer = consumersToFill.poll();
-            assignment.get(consumer).add(task);
-            consumersToFill.offer(consumer);
+        // Update threadLoad
+        for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) {

Review comment:
       My only worry with that is that to get a proper balance it would rely on the caller to always assign stateless tasks last which might not be clear to the caller.
   Happy to stick an if statement around it if you think it's worth it though.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -997,6 +1002,152 @@ public void testAssignWithStandbyReplicas() {
         assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost());
     }
 
+    @Test
+    public void testAssignWithStandbyReplicasBalanceSparse() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
+
+        final List<String> topics = asList("topic1");
+
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        final List<String> client1Consumers = asList("consumer10", "consumer11", "consumer12", "consumer13");
+        final List<String> client2Consumers = asList("consumer20", "consumer21", "consumer22");
+
+        for (final String consumerId : client1Consumers) {
+            subscriptions.put(consumerId,
+                    new Subscription(
+                            topics,
+                            getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        }
+        for (final String consumerId : client2Consumers) {
+            subscriptions.put(consumerId,
+                    new Subscription(
+                            topics,
+                            getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        }
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Consumers
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        final AssignmentInfo info12 = AssignmentInfo.decode(assignments.get("consumer12").userData());
+        final AssignmentInfo info13 = AssignmentInfo.decode(assignments.get("consumer13").userData());
+        final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+        final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData());
+        final AssignmentInfo info22 = AssignmentInfo.decode(assignments.get("consumer22").userData());
+
+        // Check each consumer has no more than 1 task
+        assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 1);
+        assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 1);
+        assertTrue(info12.activeTasks().size() + info12.standbyTasks().size() <= 1);
+        assertTrue(info13.activeTasks().size() + info13.standbyTasks().size() <= 1);
+        assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 1);
+        assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 1);
+        assertTrue(info22.activeTasks().size() + info22.standbyTasks().size() <= 1);
+    }
+
+    @Test
+    public void testAssignWithStandbyReplicasBalanceDense() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
+
+        final List<String> topics = asList("topic1");
+
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        subscriptions.put("consumer10",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer20",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Consumers
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+
+        // Check each consumer has 3 tasks
+        assertEquals(3, info10.activeTasks().size() + info10.standbyTasks().size());
+        assertEquals(3, info20.activeTasks().size() + info20.standbyTasks().size());
+        // Check that not all the actives are on one node
+        assertTrue(info10.activeTasks().size() < 3);
+        assertTrue(info20.activeTasks().size() < 3);
+    }
+
+    @Test
+    public void testAssignWithStandbyReplicasBalanceWithStatelessTasks() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor_with_state", new MockApiProcessorSupplier<>(), "source1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor_with_state");
+
+        final List<String> topics = asList("topic1");
+
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        subscriptions.put("consumer10",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer11",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer20",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer21",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();

Review comment:
       Good catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#issuecomment-1047469153


   @tim-patterson , could we rename the PR title? It seems that we not just assign standby tasks to `threads without active tasks`. Please help update it. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#issuecomment-1030773151


   @tim-patterson , thanks for the PR. I'll review the PR next week. Thanks for the improvement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#issuecomment-1047469644


   Failed tests are flaky tests that also failed in trunk branch:
   ```
   [Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11493/9/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_17_and_Scala_2_13___testThreadPoolResize___2/)
       [Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11493/9/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_17_and_Scala_2_13___testThreadPoolResize__/)
       [Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11493/9/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_11_and_Scala_2_13___testThreadPoolResize__/)
       [Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11493/9/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_11_and_Scala_2_13___testThreadPoolResize___2/)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r805556147



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -997,6 +997,77 @@ public void testAssignWithStandbyReplicas() {
         assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost());
     }
 
+    @Test
+    public void testAssignWithStandbyReplicasBalance() {

Review comment:
       Could we add some tests to cover the are less than total tasks (active + standby), but we can still distribute them evenly (the test case should be more common for users), ex:
   tasks: TASK_0_0, TASK_0_1, TASK_0_2
   consumers: 2 in thread1, 3 in thread2
   We expected each thread should have 3 tasks, and distribute evenly. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on pull request #11493: KAFKA-12959: Prioritize assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#issuecomment-967520551


   This contribution is my original work and I license the work to the project under the Apache version 2 licence


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#issuecomment-1042767690


   Thanks for the review @showuon.
   I've pushed up some of the minor stuff, I'll finish off the unit tests and try merging `assignStatefulTasksToThreads` and `assignStatelessTasksToThreads` together tomorrow morning.
   Thanks again
   Tim


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#issuecomment-1047469153


   @tim-patterson , could we rename the PR title? It seems that we don't just assign standby tasks to `threads without active tasks`. Please help update it. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r805534017



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -784,21 +784,33 @@ private void populatePartitionsByHostMaps(final Map<HostInfo, Set<TopicPartition
             final ClientMetadata clientMetadata = clientEntry.getValue();
             final ClientState state = clientMetadata.state;
             final SortedSet<String> consumers = clientMetadata.consumers;
+            final Map<String, Integer> threadTaskCounts = new HashMap<>();
 
-            final Map<String, List<TaskId>> activeTaskAssignment = assignTasksToThreads(
+            final Map<String, List<TaskId>> activeTaskStatefulAssignment = assignStatefulTasksToThreads(
                 state.statefulActiveTasks(),
-                state.statelessActiveTasks(),
                 consumers,
-                state
+                state,
+                threadTaskCounts
             );
 
-            final Map<String, List<TaskId>> standbyTaskAssignment = assignTasksToThreads(
+            final Map<String, List<TaskId>> standbyTaskAssignment = assignStatefulTasksToThreads(
                 state.standbyTasks(),
-                Collections.emptySet(),
                 consumers,
-                state
+                state,
+                threadTaskCounts
+            );
+
+            final Map<String, List<TaskId>> activeTaskStatelessAssignment = assignStatelessTasksToThreads(
+                state.statelessActiveTasks(),
+                consumers,
+                threadTaskCounts
             );
 
+            final Map<String, List<TaskId>> activeTaskAssignment = activeTaskStatefulAssignment;

Review comment:
       could we add comment here to mention this is to combine active stateful assignment + active stateless assignmet...?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1029,104 +1041,148 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
 
     /**
      * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
-                                                          final SortedSet<String> consumers,
-                                                          final ClientState state) {
+    static Map<String, List<TaskId>> assignStatefulTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final ClientState state,
+                                                                  final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
 
                 for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    if (unassignedTasks.contains(task)) {
+                        final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);

Review comment:
       `threadLoad` is not changed before the end of the `assignTasksToThreads` method. Could we get the value:  `threadLoad.getOrDefault(consumer, 0);` at the beginning of the consumers loop?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1029,104 +1041,148 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
 
     /**
      * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
-                                                          final SortedSet<String> consumers,
-                                                          final ClientState state) {
+    static Map<String, List<TaskId>> assignStatefulTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final ClientState state,
+                                                                  final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
 
                 for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    if (unassignedTasks.contains(task)) {
+                        final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                        if (threadTaskCount < minTasksPerThread) {
                             threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
+                            unassignedTasks.remove(task);
                         } else {
                             unassignedTaskToPreviousOwner.put(task, consumer);
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                if (threadTaskCount < minTasksPerThread) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
                     throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity,
+            // the tasks still remaining that should now be distributed over the consumers that are still
+            // at min capacity
+            if (!unassignedTasks.isEmpty()) {
+                for (final String consumer : consumers) {
+                    final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0);
+                    if (taskCount == minTasksPerThread) {
+                        consumersToFill.add(consumer);

Review comment:
       I think the original algorithm assumes that when reaching this step, all consumers are at the min capacity. So that it could just do this to set `comsuersToFill`:
   ```java
   if (!unassignedStatefulTasks.isEmpty())
          consumersToFill.addAll(consumers);
   ```
   
   But I didn't see the where we change this algorithm. Please let me know where I missed. Thanks.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1029,104 +1041,148 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
 
     /**
      * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
-                                                          final SortedSet<String> consumers,
-                                                          final ClientState state) {
+    static Map<String, List<TaskId>> assignStatefulTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final ClientState state,
+                                                                  final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
 
                 for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    if (unassignedTasks.contains(task)) {
+                        final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);

Review comment:
       Also, to compute the `threadTaskCount`, we always need to add `threadLoad.getOrDefault(consumer, 0)` each time. Could we just make the `minTasksPerThread = minTasksPerThread - threadLoad.getOrDefault(consumer, 0)`, and add comments to it to make it clear, so that we don't need to do this adding `threadLoad.getOrDefault(consumer, 0)` each time? WDYT?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1140,6 +1196,13 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
             consumersToFill.offer(consumer);
         }
 
+        // Update threadLoad
+        for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) {
+            final String consumer = taskEntry.getKey();
+            final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size();
+            threadLoad.put(consumer, totalCount);
+        }
+

Review comment:
       should we keep the `threadLoad` at the end of `assignStatelessTasksToThreads`. We don't need it after `assignStatelessTasksToThreads`, right?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -997,6 +997,77 @@ public void testAssignWithStandbyReplicas() {
         assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost());
     }
 
+    @Test
+    public void testAssignWithStandbyReplicasBalance() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
+
+        final List<String> topics = asList("topic1");
+
+        final Set<TaskId> prevTasks00 = mkSet(TASK_0_0);
+        final Set<TaskId> standbyTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
+
+        createMockTaskManager(prevTasks00, standbyTasks);

Review comment:
       The `prevTasks00` and `standbyTasks` are meaningless in this test. If we just want to create a mock taskManager, could we use `createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);` instead?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1029,104 +1041,148 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
 
     /**
      * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
-                                                          final SortedSet<String> consumers,
-                                                          final ClientState state) {
+    static Map<String, List<TaskId>> assignStatefulTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final ClientState state,
+                                                                  final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
 
                 for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    if (unassignedTasks.contains(task)) {
+                        final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                        if (threadTaskCount < minTasksPerThread) {
                             threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
+                            unassignedTasks.remove(task);
                         } else {
                             unassignedTaskToPreviousOwner.put(task, consumer);
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                if (threadTaskCount < minTasksPerThread) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
                     throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity,
+            // the tasks still remaining that should now be distributed over the consumers that are still
+            // at min capacity
+            if (!unassignedTasks.isEmpty()) {
+                for (final String consumer : consumers) {
+                    final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0);
+                    if (taskCount == minTasksPerThread) {
+                        consumersToFill.add(consumer);
+                    }
+                }
 
                 // Go over the tasks we skipped earlier and assign them to their previous owner when possible
                 for (final Map.Entry<TaskId, String> taskEntry : unassignedTaskToPreviousOwner.entrySet()) {
                     final TaskId task = taskEntry.getKey();
                     final String consumer = taskEntry.getValue();
-                    if (consumersToFill.contains(consumer) && unassignedStatefulTasks.contains(task)) {
+                    if (consumersToFill.contains(consumer) && unassignedTasks.contains(task)) {
                         assignment.get(consumer).add(task);
-                        unassignedStatefulTasks.remove(task);
+                        unassignedTasks.remove(task);
                         // Remove this consumer since we know it is now at minCapacity + 1
                         consumersToFill.remove(consumer);
                     }
                 }
 
                 // Now just distribute the remaining unassigned stateful tasks over the consumers still at min capacity
-                for (final TaskId task : unassignedStatefulTasks) {
+                for (final TaskId task : unassignedTasks) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
                 }
+            }
+        }
+        // Update threadLoad
+        for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) {
+            final String consumer = taskEntry.getKey();
+            final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size();
+            threadLoad.put(consumer, totalCount);
+        }
 
+        return assignment;
+    }
 
-                // There must be at least one consumer still at min capacity while all the others are at min
-                // capacity + 1, so start distributing stateless tasks to get all consumers back to the same count
-                while (unassignedStatelessTasksIter.hasNext()) {
-                    final String consumer = consumersToFill.poll();
-                    if (consumer != null) {
-                        final TaskId task = unassignedStatelessTasksIter.next();
-                        unassignedStatelessTasksIter.remove();
-                        assignment.get(consumer).add(task);
-                    } else {
-                        break;
-                    }
-                }
+    static Map<String, List<TaskId>> assignStatelessTasksToThreads(final Collection<TaskId> statelessTasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final Map<String, Integer> threadLoad) {
+        final List<TaskId> tasksToAssign = new ArrayList<>(statelessTasksToAssign);
+        Collections.sort(tasksToAssign);
+        final Map<String, List<TaskId>> assignment = new HashMap<>();
+        for (final String consumer : consumers) {
+            assignment.put(consumer, new ArrayList<>());
+        }
+
+        int maxThreadLoad = 0;
+        for (final int load : threadLoad.values()) {
+            maxThreadLoad = Integer.max(maxThreadLoad, load);

Review comment:
       I was thinking we don't need these loops to get the maxThreadLoad and `consumersToFill`. Do you think if we can put `assignStatelessTasksToThreads` at the end of `assignStatefulTasksToThreads`. So that we will have `maxThreadLoad` and `consumersToFill` directly. So in the `assignStatefulTasksToThreads` method signature, we will have one more parameter like: `boolean shouldAssignStatelessTasks`, or `isActiveTasksAssignment`... something like that. WDYT?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -997,6 +997,77 @@ public void testAssignWithStandbyReplicas() {
         assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost());
     }
 
+    @Test
+    public void testAssignWithStandbyReplicasBalance() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
+
+        final List<String> topics = asList("topic1");
+
+        final Set<TaskId> prevTasks00 = mkSet(TASK_0_0);
+        final Set<TaskId> standbyTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
+
+        createMockTaskManager(prevTasks00, standbyTasks);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        subscriptions.put("consumer10",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer11",
+                new Subscription(
+                        emptyList(),
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer12",
+                new Subscription(
+                        emptyList(),
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer13",
+                new Subscription(
+                        emptyList(),
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer20",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, OTHER_END_POINT).encode()));
+        subscriptions.put("consumer21",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, OTHER_END_POINT).encode()));
+        subscriptions.put("consumer22",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, OTHER_END_POINT).encode()));
+
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Consumers
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        final AssignmentInfo info12 = AssignmentInfo.decode(assignments.get("consumer12").userData());
+        final AssignmentInfo info13 = AssignmentInfo.decode(assignments.get("consumer13").userData());
+        final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+        final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData());
+        final AssignmentInfo info22 = AssignmentInfo.decode(assignments.get("consumer22").userData());
+
+        // Check each consumer has no more than 1 task
+        // (client 1 has more consumers than needed so consumer13 won't get a task)
+        assertEquals(1, info10.activeTasks().size() + info10.standbyTasks().size());
+        assertEquals(1, info11.activeTasks().size() + info11.standbyTasks().size());
+        assertEquals(1, info12.activeTasks().size() + info12.standbyTasks().size());
+        assertEquals(0, info13.activeTasks().size() + info13.standbyTasks().size());
+        assertEquals(1, info20.activeTasks().size() + info20.standbyTasks().size());
+        assertEquals(1, info21.activeTasks().size() + info21.standbyTasks().size());
+        assertEquals(1, info22.activeTasks().size() + info22.standbyTasks().size());

Review comment:
       I'm thinking we can just verify the assignment size is `< 2`. We don't care about the exact number, right?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -997,6 +997,77 @@ public void testAssignWithStandbyReplicas() {
         assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost());
     }
 
+    @Test
+    public void testAssignWithStandbyReplicasBalance() {

Review comment:
       Could we add some tests to cover the are less than total tasks (active + standby), but we can still distribute them evenly, ex:
   tasks: TASK_0_0, TASK_0_1, TASK_0_2
   consumers: 2 in thread1, 3 in thread2
   We expected each thread should have 3 tasks, and distribute evenly. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1029,104 +1041,148 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
 
     /**
      * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
-                                                          final SortedSet<String> consumers,
-                                                          final ClientState state) {
+    static Map<String, List<TaskId>> assignStatefulTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final ClientState state,
+                                                                  final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
 
                 for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    if (unassignedTasks.contains(task)) {
+                        final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                        if (threadTaskCount < minTasksPerThread) {
                             threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
+                            unassignedTasks.remove(task);
                         } else {
                             unassignedTaskToPreviousOwner.put(task, consumer);
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                if (threadTaskCount < minTasksPerThread) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
                     throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity,

Review comment:
       Where does the `min + 1` case come from? Could you elaborate more? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11493: KAFKA-12959: Distribute standby and active tasks across threads to better balance load between threads

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#issuecomment-1059718476


   @tim-patterson , thanks for the contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r810748411



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -997,6 +1002,152 @@ public void testAssignWithStandbyReplicas() {
         assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost());
     }
 
+    @Test
+    public void testAssignWithStandbyReplicasBalanceSparse() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
+
+        final List<String> topics = asList("topic1");
+
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        final List<String> client1Consumers = asList("consumer10", "consumer11", "consumer12", "consumer13");
+        final List<String> client2Consumers = asList("consumer20", "consumer21", "consumer22");
+
+        for (final String consumerId : client1Consumers) {
+            subscriptions.put(consumerId,
+                    new Subscription(
+                            topics,
+                            getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        }
+        for (final String consumerId : client2Consumers) {
+            subscriptions.put(consumerId,
+                    new Subscription(
+                            topics,
+                            getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        }
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Consumers
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        final AssignmentInfo info12 = AssignmentInfo.decode(assignments.get("consumer12").userData());
+        final AssignmentInfo info13 = AssignmentInfo.decode(assignments.get("consumer13").userData());
+        final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+        final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData());
+        final AssignmentInfo info22 = AssignmentInfo.decode(assignments.get("consumer22").userData());
+
+        // Check each consumer has no more than 1 task
+        assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 1);
+        assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 1);
+        assertTrue(info12.activeTasks().size() + info12.standbyTasks().size() <= 1);
+        assertTrue(info13.activeTasks().size() + info13.standbyTasks().size() <= 1);
+        assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 1);
+        assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 1);
+        assertTrue(info22.activeTasks().size() + info22.standbyTasks().size() <= 1);
+    }
+
+    @Test
+    public void testAssignWithStandbyReplicasBalanceDense() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
+
+        final List<String> topics = asList("topic1");
+
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        subscriptions.put("consumer10",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer20",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Consumers
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+
+        // Check each consumer has 3 tasks
+        assertEquals(3, info10.activeTasks().size() + info10.standbyTasks().size());
+        assertEquals(3, info20.activeTasks().size() + info20.standbyTasks().size());
+        // Check that not all the actives are on one node
+        assertTrue(info10.activeTasks().size() < 3);
+        assertTrue(info20.activeTasks().size() < 3);
+    }
+
+    @Test
+    public void testAssignWithStandbyReplicasBalanceWithStatelessTasks() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor_with_state", new MockApiProcessorSupplier<>(), "source1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor_with_state");
+
+        final List<String> topics = asList("topic1");
+
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        subscriptions.put("consumer10",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer11",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer20",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer21",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();

Review comment:
       I ran it and found there are no stateless tests included in the assignment. Could you check again? 
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1028,116 +1045,113 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
     }
 
     /**
-     * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * Generate an assignment that tries to preserve thread-level stickiness for stateful tasks without violating
+     * balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
+    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                          final boolean isStateful,
                                                           final SortedSet<String> consumers,
-                                                          final ClientState state) {
+                                                          final ClientState state,
+                                                          final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }

Review comment:
       nit: we can use stream here:
   `int totalTasks = threadLoad.values().stream().reduce(tasksToAssign.size(), Integer::sum);`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1028,116 +1045,113 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
     }
 
     /**
-     * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * Generate an assignment that tries to preserve thread-level stickiness for stateful tasks without violating
+     * balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed

Review comment:
       Should we mention here that for stateless tasks, we just distribute them evenly to all consumers?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -997,6 +1002,152 @@ public void testAssignWithStandbyReplicas() {
         assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost());
     }
 
+    @Test
+    public void testAssignWithStandbyReplicasBalanceSparse() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
+
+        final List<String> topics = asList("topic1");
+
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        final List<String> client1Consumers = asList("consumer10", "consumer11", "consumer12", "consumer13");
+        final List<String> client2Consumers = asList("consumer20", "consumer21", "consumer22");
+
+        for (final String consumerId : client1Consumers) {
+            subscriptions.put(consumerId,
+                    new Subscription(
+                            topics,
+                            getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        }
+        for (final String consumerId : client2Consumers) {
+            subscriptions.put(consumerId,
+                    new Subscription(
+                            topics,
+                            getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        }
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Consumers
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        final AssignmentInfo info12 = AssignmentInfo.decode(assignments.get("consumer12").userData());
+        final AssignmentInfo info13 = AssignmentInfo.decode(assignments.get("consumer13").userData());
+        final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+        final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData());
+        final AssignmentInfo info22 = AssignmentInfo.decode(assignments.get("consumer22").userData());
+
+        // Check each consumer has no more than 1 task
+        assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 1);
+        assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 1);
+        assertTrue(info12.activeTasks().size() + info12.standbyTasks().size() <= 1);
+        assertTrue(info13.activeTasks().size() + info13.standbyTasks().size() <= 1);
+        assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 1);
+        assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 1);
+        assertTrue(info22.activeTasks().size() + info22.standbyTasks().size() <= 1);
+    }
+
+    @Test
+    public void testAssignWithStandbyReplicasBalanceDense() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
+
+        final List<String> topics = asList("topic1");
+
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        subscriptions.put("consumer10",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer20",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Consumers
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+
+        // Check each consumer has 3 tasks
+        assertEquals(3, info10.activeTasks().size() + info10.standbyTasks().size());
+        assertEquals(3, info20.activeTasks().size() + info20.standbyTasks().size());
+        // Check that not all the actives are on one node
+        assertTrue(info10.activeTasks().size() < 3);
+        assertTrue(info20.activeTasks().size() < 3);

Review comment:
       nice test!

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1028,116 +1045,113 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
     }
 
     /**
-     * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * Generate an assignment that tries to preserve thread-level stickiness for stateful tasks without violating
+     * balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
+    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                          final boolean isStateful,
                                                           final SortedSet<String> consumers,
-                                                          final ClientState state) {
+                                                          final ClientState state,
+                                                          final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread if these are stateful
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
-
-                for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
-                            threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
-                        } else {
-                            unassignedTaskToPreviousOwner.put(task, consumer);
+                // The number of tasks we have to assign here to hit minTasksPerThread
+                final int tasksTargetCount = minTasksPerThread - threadLoad.getOrDefault(consumer, 0);
+
+                if (isStateful) {
+                    for (final TaskId task : state.prevTasksByLag(consumer)) {
+                        if (unassignedTasks.contains(task)) {
+                            if (threadAssignment.size() < tasksTargetCount) {
+                                threadAssignment.add(task);
+                                unassignedTasks.remove(task);
+                            } else {
+                                unassignedTaskToPreviousOwner.put(task, consumer);
+                            }
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                if (threadAssignment.size() < tasksTargetCount) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
                     throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity.
+            // The min + 1 case can occur for standbys where there's fewer standbys than consumers and after assigning
+            // the active tasks some consumers already have min + 1 one tasks assigned.
+            // The tasks still remaining should now be distributed over the consumers that are still at min capacity
+            if (!unassignedTasks.isEmpty()) {
+                for (final String consumer : consumers) {
+                    final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0);
+                    if (taskCount == minTasksPerThread) {
+                        consumersToFill.add(consumer);
+                    }
+                }
 
                 // Go over the tasks we skipped earlier and assign them to their previous owner when possible
                 for (final Map.Entry<TaskId, String> taskEntry : unassignedTaskToPreviousOwner.entrySet()) {
                     final TaskId task = taskEntry.getKey();
                     final String consumer = taskEntry.getValue();
-                    if (consumersToFill.contains(consumer) && unassignedStatefulTasks.contains(task)) {
+                    if (consumersToFill.contains(consumer) && unassignedTasks.contains(task)) {
                         assignment.get(consumer).add(task);
-                        unassignedStatefulTasks.remove(task);
+                        unassignedTasks.remove(task);
                         // Remove this consumer since we know it is now at minCapacity + 1
                         consumersToFill.remove(consumer);
                     }
                 }
 
                 // Now just distribute the remaining unassigned stateful tasks over the consumers still at min capacity
-                for (final TaskId task : unassignedStatefulTasks) {
+                for (final TaskId task : unassignedTasks) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
                 }
-
-
-                // There must be at least one consumer still at min capacity while all the others are at min
-                // capacity + 1, so start distributing stateless tasks to get all consumers back to the same count
-                while (unassignedStatelessTasksIter.hasNext()) {
-                    final String consumer = consumersToFill.poll();
-                    if (consumer != null) {
-                        final TaskId task = unassignedStatelessTasksIter.next();
-                        unassignedStatelessTasksIter.remove();
-                        assignment.get(consumer).add(task);
-                    } else {
-                        break;
-                    }
-                }
             }
         }
-
-        // Now just distribute tasks while circling through all the consumers
-        consumersToFill.addAll(consumers);
-
-        while (unassignedStatelessTasksIter.hasNext()) {
-            final TaskId task = unassignedStatelessTasksIter.next();
-            final String consumer = consumersToFill.poll();
-            assignment.get(consumer).add(task);
-            consumersToFill.offer(consumer);
+        // Update threadLoad
+        for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) {

Review comment:
       Could we skip this threadLoad update for stateless tasks?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r811606664



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1028,116 +1045,113 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
     }
 
     /**
-     * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * Generate an assignment that tries to preserve thread-level stickiness for stateful tasks without violating
+     * balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
+    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                          final boolean isStateful,
                                                           final SortedSet<String> consumers,
-                                                          final ClientState state) {
+                                                          final ClientState state,
+                                                          final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread if these are stateful
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
-
-                for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
-                            threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
-                        } else {
-                            unassignedTaskToPreviousOwner.put(task, consumer);
+                // The number of tasks we have to assign here to hit minTasksPerThread
+                final int tasksTargetCount = minTasksPerThread - threadLoad.getOrDefault(consumer, 0);
+
+                if (isStateful) {
+                    for (final TaskId task : state.prevTasksByLag(consumer)) {
+                        if (unassignedTasks.contains(task)) {
+                            if (threadAssignment.size() < tasksTargetCount) {
+                                threadAssignment.add(task);
+                                unassignedTasks.remove(task);
+                            } else {
+                                unassignedTaskToPreviousOwner.put(task, consumer);
+                            }
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                if (threadAssignment.size() < tasksTargetCount) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
                     throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity.
+            // The min + 1 case can occur for standbys where there's fewer standbys than consumers and after assigning
+            // the active tasks some consumers already have min + 1 one tasks assigned.
+            // The tasks still remaining should now be distributed over the consumers that are still at min capacity
+            if (!unassignedTasks.isEmpty()) {
+                for (final String consumer : consumers) {
+                    final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0);
+                    if (taskCount == minTasksPerThread) {
+                        consumersToFill.add(consumer);
+                    }
+                }
 
                 // Go over the tasks we skipped earlier and assign them to their previous owner when possible
                 for (final Map.Entry<TaskId, String> taskEntry : unassignedTaskToPreviousOwner.entrySet()) {
                     final TaskId task = taskEntry.getKey();
                     final String consumer = taskEntry.getValue();
-                    if (consumersToFill.contains(consumer) && unassignedStatefulTasks.contains(task)) {
+                    if (consumersToFill.contains(consumer) && unassignedTasks.contains(task)) {
                         assignment.get(consumer).add(task);
-                        unassignedStatefulTasks.remove(task);
+                        unassignedTasks.remove(task);
                         // Remove this consumer since we know it is now at minCapacity + 1
                         consumersToFill.remove(consumer);
                     }
                 }
 
                 // Now just distribute the remaining unassigned stateful tasks over the consumers still at min capacity
-                for (final TaskId task : unassignedStatefulTasks) {
+                for (final TaskId task : unassignedTasks) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
                 }
-
-
-                // There must be at least one consumer still at min capacity while all the others are at min
-                // capacity + 1, so start distributing stateless tasks to get all consumers back to the same count
-                while (unassignedStatelessTasksIter.hasNext()) {
-                    final String consumer = consumersToFill.poll();
-                    if (consumer != null) {
-                        final TaskId task = unassignedStatelessTasksIter.next();
-                        unassignedStatelessTasksIter.remove();
-                        assignment.get(consumer).add(task);
-                    } else {
-                        break;
-                    }
-                }
             }
         }
-
-        // Now just distribute tasks while circling through all the consumers
-        consumersToFill.addAll(consumers);
-
-        while (unassignedStatelessTasksIter.hasNext()) {
-            final TaskId task = unassignedStatelessTasksIter.next();
-            final String consumer = consumersToFill.poll();
-            assignment.get(consumer).add(task);
-            consumersToFill.offer(consumer);
+        // Update threadLoad
+        for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) {

Review comment:
       Make sense to me. We can keep it as is. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r805557480



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -997,6 +997,77 @@ public void testAssignWithStandbyReplicas() {
         assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost());
     }
 
+    @Test
+    public void testAssignWithStandbyReplicasBalance() {

Review comment:
       Also, some tests to make sure the `statelessTasks` are also taking the thread load into account. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r808822957



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1140,6 +1196,13 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
             consumersToFill.offer(consumer);
         }
 
+        // Update threadLoad
+        for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) {
+            final String consumer = taskEntry.getKey();
+            final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size();
+            threadLoad.put(consumer, totalCount);
+        }
+

Review comment:
       Agreed 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1029,104 +1041,148 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
 
     /**
      * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
-                                                          final SortedSet<String> consumers,
-                                                          final ClientState state) {
+    static Map<String, List<TaskId>> assignStatefulTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final ClientState state,
+                                                                  final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
 
                 for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    if (unassignedTasks.contains(task)) {
+                        final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                        if (threadTaskCount < minTasksPerThread) {
                             threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
+                            unassignedTasks.remove(task);
                         } else {
                             unassignedTaskToPreviousOwner.put(task, consumer);
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                if (threadTaskCount < minTasksPerThread) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
                     throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity,

Review comment:
       It took me a while to figure out again.
   Imagine the case where there's 4 threads,
   5 active tasks and 2 standby tasks.
   
   After assigning the actives, 1 of the threads already has 2 tasks.
   So when in comes to assigning the standbys the minTasksPerThread is calculated again at 1, so before we've even assigned any standbys we already have some threads at min+1.
   
   Thanks for raising this I'll make a comment :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1029,104 +1041,148 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
 
     /**
      * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
-                                                          final SortedSet<String> consumers,
-                                                          final ClientState state) {
+    static Map<String, List<TaskId>> assignStatefulTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final ClientState state,
+                                                                  final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
 
                 for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    if (unassignedTasks.contains(task)) {
+                        final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                        if (threadTaskCount < minTasksPerThread) {
                             threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
+                            unassignedTasks.remove(task);
                         } else {
                             unassignedTaskToPreviousOwner.put(task, consumer);
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                if (threadTaskCount < minTasksPerThread) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
                     throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity,
+            // the tasks still remaining that should now be distributed over the consumers that are still
+            // at min capacity
+            if (!unassignedTasks.isEmpty()) {
+                for (final String consumer : consumers) {
+                    final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0);
+                    if (taskCount == minTasksPerThread) {
+                        consumersToFill.add(consumer);

Review comment:
       As commented above

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1029,104 +1041,148 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
 
     /**
      * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
-                                                          final SortedSet<String> consumers,
-                                                          final ClientState state) {
+    static Map<String, List<TaskId>> assignStatefulTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final ClientState state,
+                                                                  final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
 
                 for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    if (unassignedTasks.contains(task)) {
+                        final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                        if (threadTaskCount < minTasksPerThread) {
                             threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
+                            unassignedTasks.remove(task);
                         } else {
                             unassignedTaskToPreviousOwner.put(task, consumer);
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                if (threadTaskCount < minTasksPerThread) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
                     throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity,
+            // the tasks still remaining that should now be distributed over the consumers that are still
+            // at min capacity
+            if (!unassignedTasks.isEmpty()) {
+                for (final String consumer : consumers) {
+                    final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0);
+                    if (taskCount == minTasksPerThread) {
+                        consumersToFill.add(consumer);
+                    }
+                }
 
                 // Go over the tasks we skipped earlier and assign them to their previous owner when possible
                 for (final Map.Entry<TaskId, String> taskEntry : unassignedTaskToPreviousOwner.entrySet()) {
                     final TaskId task = taskEntry.getKey();
                     final String consumer = taskEntry.getValue();
-                    if (consumersToFill.contains(consumer) && unassignedStatefulTasks.contains(task)) {
+                    if (consumersToFill.contains(consumer) && unassignedTasks.contains(task)) {
                         assignment.get(consumer).add(task);
-                        unassignedStatefulTasks.remove(task);
+                        unassignedTasks.remove(task);
                         // Remove this consumer since we know it is now at minCapacity + 1
                         consumersToFill.remove(consumer);
                     }
                 }
 
                 // Now just distribute the remaining unassigned stateful tasks over the consumers still at min capacity
-                for (final TaskId task : unassignedStatefulTasks) {
+                for (final TaskId task : unassignedTasks) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
                 }
+            }
+        }
+        // Update threadLoad
+        for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) {
+            final String consumer = taskEntry.getKey();
+            final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size();
+            threadLoad.put(consumer, totalCount);
+        }
 
+        return assignment;
+    }
 
-                // There must be at least one consumer still at min capacity while all the others are at min
-                // capacity + 1, so start distributing stateless tasks to get all consumers back to the same count
-                while (unassignedStatelessTasksIter.hasNext()) {
-                    final String consumer = consumersToFill.poll();
-                    if (consumer != null) {
-                        final TaskId task = unassignedStatelessTasksIter.next();
-                        unassignedStatelessTasksIter.remove();
-                        assignment.get(consumer).add(task);
-                    } else {
-                        break;
-                    }
-                }
+    static Map<String, List<TaskId>> assignStatelessTasksToThreads(final Collection<TaskId> statelessTasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final Map<String, Integer> threadLoad) {
+        final List<TaskId> tasksToAssign = new ArrayList<>(statelessTasksToAssign);
+        Collections.sort(tasksToAssign);
+        final Map<String, List<TaskId>> assignment = new HashMap<>();
+        for (final String consumer : consumers) {
+            assignment.put(consumer, new ArrayList<>());
+        }
+
+        int maxThreadLoad = 0;
+        for (final int load : threadLoad.values()) {
+            maxThreadLoad = Integer.max(maxThreadLoad, load);

Review comment:
       Yeah we can certainly try something like that.
   Instead of a boolean the method could take `statefulTasks` and `statelessTasks`,  for the first call we could call it with `statelessTasks = Collection.emptySet()`.
   I'll get the other stuff fixed first and then push it up as a separate commit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r810597642



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1029,104 +1041,148 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
 
     /**
      * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
-                                                          final SortedSet<String> consumers,
-                                                          final ClientState state) {
+    static Map<String, List<TaskId>> assignStatefulTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final ClientState state,
+                                                                  final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
 
                 for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    if (unassignedTasks.contains(task)) {
+                        final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                        if (threadTaskCount < minTasksPerThread) {
                             threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
+                            unassignedTasks.remove(task);
                         } else {
                             unassignedTaskToPreviousOwner.put(task, consumer);
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                if (threadTaskCount < minTasksPerThread) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
                     throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity,
+            // the tasks still remaining that should now be distributed over the consumers that are still
+            // at min capacity
+            if (!unassignedTasks.isEmpty()) {
+                for (final String consumer : consumers) {
+                    final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0);
+                    if (taskCount == minTasksPerThread) {
+                        consumersToFill.add(consumer);
+                    }
+                }
 
                 // Go over the tasks we skipped earlier and assign them to their previous owner when possible
                 for (final Map.Entry<TaskId, String> taskEntry : unassignedTaskToPreviousOwner.entrySet()) {
                     final TaskId task = taskEntry.getKey();
                     final String consumer = taskEntry.getValue();
-                    if (consumersToFill.contains(consumer) && unassignedStatefulTasks.contains(task)) {
+                    if (consumersToFill.contains(consumer) && unassignedTasks.contains(task)) {
                         assignment.get(consumer).add(task);
-                        unassignedStatefulTasks.remove(task);
+                        unassignedTasks.remove(task);
                         // Remove this consumer since we know it is now at minCapacity + 1
                         consumersToFill.remove(consumer);
                     }
                 }
 
                 // Now just distribute the remaining unassigned stateful tasks over the consumers still at min capacity
-                for (final TaskId task : unassignedStatefulTasks) {
+                for (final TaskId task : unassignedTasks) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
                 }
+            }
+        }
+        // Update threadLoad
+        for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) {
+            final String consumer = taskEntry.getKey();
+            final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size();
+            threadLoad.put(consumer, totalCount);
+        }
 
+        return assignment;
+    }
 
-                // There must be at least one consumer still at min capacity while all the others are at min
-                // capacity + 1, so start distributing stateless tasks to get all consumers back to the same count
-                while (unassignedStatelessTasksIter.hasNext()) {
-                    final String consumer = consumersToFill.poll();
-                    if (consumer != null) {
-                        final TaskId task = unassignedStatelessTasksIter.next();
-                        unassignedStatelessTasksIter.remove();
-                        assignment.get(consumer).add(task);
-                    } else {
-                        break;
-                    }
-                }
+    static Map<String, List<TaskId>> assignStatelessTasksToThreads(final Collection<TaskId> statelessTasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final Map<String, Integer> threadLoad) {
+        final List<TaskId> tasksToAssign = new ArrayList<>(statelessTasksToAssign);
+        Collections.sort(tasksToAssign);
+        final Map<String, List<TaskId>> assignment = new HashMap<>();
+        for (final String consumer : consumers) {
+            assignment.put(consumer, new ArrayList<>());
+        }
+
+        int maxThreadLoad = 0;
+        for (final int load : threadLoad.values()) {
+            maxThreadLoad = Integer.max(maxThreadLoad, load);

Review comment:
       Let me know what you think of the latest commit.
   Instead of gluing the methods together I just got rid of `assignStatelessTasksToThreads`, the only real difference between the two methods was the stateful one trying to be sticky.
   I just passed in a boolean to control the part of the method that does the stickiness stuff.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#issuecomment-1046440232


   @cadonna , do you want to have a second pairs of eyes to look at the PR? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#issuecomment-1047469644


   Failed tests are flaky tests:
   ```
   [Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11493/9/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_17_and_Scala_2_13___testThreadPoolResize___2/)
       [Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11493/9/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_17_and_Scala_2_13___testThreadPoolResize__/)
       [Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11493/9/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_11_and_Scala_2_13___testThreadPoolResize__/)
       [Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11493/9/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_11_and_Scala_2_13___testThreadPoolResize___2/)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r810488636



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1029,104 +1043,151 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
 
     /**
      * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
-                                                          final SortedSet<String> consumers,
-                                                          final ClientState state) {
+    static Map<String, List<TaskId>> assignStatefulTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final ClientState state,
+                                                                  final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
+                // The number of tasks we have to assign here to hit minTasksPerThread
+                final int tasksTargetCount = minTasksPerThread - threadLoad.getOrDefault(consumer, 0);
 
                 for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    if (unassignedTasks.contains(task)) {
+                        final int threadTaskCount = threadAssignment.size();
+                        if (threadTaskCount < tasksTargetCount) {

Review comment:
       nit: we can use `threadAssignment.size()` to replace the  `threadTaskCount` variable.  Same as below.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1029,104 +1043,151 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
 
     /**
      * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys
+     * are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
-                                                          final SortedSet<String> consumers,
-                                                          final ClientState state) {
+    static Map<String, List<TaskId>> assignStatefulTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                                  final SortedSet<String> consumers,
+                                                                  final ClientState state,
+                                                                  final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        int totalTasks = tasksToAssign.size();
+        for (final Integer threadTaskCount : threadLoad.values()) {
+            totalTasks += threadTaskCount;
+        }
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
+                // The number of tasks we have to assign here to hit minTasksPerThread
+                final int tasksTargetCount = minTasksPerThread - threadLoad.getOrDefault(consumer, 0);
 
                 for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    if (unassignedTasks.contains(task)) {
+                        final int threadTaskCount = threadAssignment.size();
+                        if (threadTaskCount < tasksTargetCount) {
                             threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
+                            unassignedTasks.remove(task);
                         } else {
                             unassignedTaskToPreviousOwner.put(task, consumer);
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                final int threadTaskCount = threadAssignment.size();
+                if (threadTaskCount < tasksTargetCount) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
                     throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity.
+            // The min + 1 case can occur for standbys where there's fewer standbys than consumers and after assigning
+            // the active tasks some consumers already have min + 1 one tasks assigned.
+            // The tasks still remaining should now be distributed over the consumers that are still at min capacity

Review comment:
       Thanks for your explanation. I've got it now, because we've change the total tasks count from original `active tasks` into total ones including active tasks. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r810748411



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -997,6 +1002,152 @@ public void testAssignWithStandbyReplicas() {
         assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost());
     }
 
+    @Test
+    public void testAssignWithStandbyReplicasBalanceSparse() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
+
+        final List<String> topics = asList("topic1");
+
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        final List<String> client1Consumers = asList("consumer10", "consumer11", "consumer12", "consumer13");
+        final List<String> client2Consumers = asList("consumer20", "consumer21", "consumer22");
+
+        for (final String consumerId : client1Consumers) {
+            subscriptions.put(consumerId,
+                    new Subscription(
+                            topics,
+                            getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        }
+        for (final String consumerId : client2Consumers) {
+            subscriptions.put(consumerId,
+                    new Subscription(
+                            topics,
+                            getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        }
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Consumers
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        final AssignmentInfo info12 = AssignmentInfo.decode(assignments.get("consumer12").userData());
+        final AssignmentInfo info13 = AssignmentInfo.decode(assignments.get("consumer13").userData());
+        final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+        final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData());
+        final AssignmentInfo info22 = AssignmentInfo.decode(assignments.get("consumer22").userData());
+
+        // Check each consumer has no more than 1 task
+        assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 1);
+        assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 1);
+        assertTrue(info12.activeTasks().size() + info12.standbyTasks().size() <= 1);
+        assertTrue(info13.activeTasks().size() + info13.standbyTasks().size() <= 1);
+        assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 1);
+        assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 1);
+        assertTrue(info22.activeTasks().size() + info22.standbyTasks().size() <= 1);
+    }
+
+    @Test
+    public void testAssignWithStandbyReplicasBalanceDense() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
+
+        final List<String> topics = asList("topic1");
+
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        subscriptions.put("consumer10",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer20",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Consumers
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+
+        // Check each consumer has 3 tasks
+        assertEquals(3, info10.activeTasks().size() + info10.standbyTasks().size());
+        assertEquals(3, info20.activeTasks().size() + info20.standbyTasks().size());
+        // Check that not all the actives are on one node
+        assertTrue(info10.activeTasks().size() < 3);
+        assertTrue(info20.activeTasks().size() < 3);
+    }
+
+    @Test
+    public void testAssignWithStandbyReplicasBalanceWithStatelessTasks() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor_with_state", new MockApiProcessorSupplier<>(), "source1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor_with_state");
+
+        final List<String> topics = asList("topic1");
+
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        subscriptions.put("consumer10",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer11",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer20",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer21",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();

Review comment:
       I ran it and found there are no stateless tasks included in the assignment. Could you check again? 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tim-patterson commented on pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

Posted by GitBox <gi...@apache.org>.
tim-patterson commented on pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#issuecomment-1015895827


   Hey @ableegoldman I see you've got a few commits on this file, Would you be able to review or point us in the direction of one of your co-committers who might be able to help us here.
   (We've been running this in prod for a couple of months now but would prefer not to maintain a fork if we don't have to!)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org