You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/03/05 08:13:41 UTC

[kafka] branch trunk updated: KAFKA-12959: Distribute standby and active tasks across threads to better balance load between threads (#11493)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3ef29e  KAFKA-12959: Distribute standby and active tasks across threads to better balance load between threads (#11493)
e3ef29e is described below

commit e3ef29ea0300e3ed5edf5c800f914f19273c9851
Author: Tim Patterson <ti...@gmail.com>
AuthorDate: Sat Mar 5 21:11:42 2022 +1300

    KAFKA-12959: Distribute standby and active tasks across threads to better balance load between threads (#11493)
    
    Balance standby and active stateful tasks evenly across threads
    
    Reviewer: Luke Chen <sh...@gmail.com>
---
 .../internals/StreamsPartitionAssignor.java        | 134 ++++++++--------
 .../internals/StreamsPartitionAssignorTest.java    | 173 +++++++++++++++++++--
 2 files changed, 236 insertions(+), 71 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 80c506c..03f826a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -60,7 +60,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -804,21 +803,39 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
             final ClientMetadata clientMetadata = clientEntry.getValue();
             final ClientState state = clientMetadata.state;
             final SortedSet<String> consumers = clientMetadata.consumers;
+            final Map<String, Integer> threadTaskCounts = new HashMap<>();
 
-            final Map<String, List<TaskId>> activeTaskAssignment = assignTasksToThreads(
+            final Map<String, List<TaskId>> activeTaskStatefulAssignment = assignTasksToThreads(
                 state.statefulActiveTasks(),
-                state.statelessActiveTasks(),
+                true,
                 consumers,
-                state
+                state,
+                threadTaskCounts
             );
 
             final Map<String, List<TaskId>> standbyTaskAssignment = assignTasksToThreads(
                 state.standbyTasks(),
-                Collections.emptySet(),
+                true,
+                consumers,
+                state,
+                threadTaskCounts
+            );
+
+            final Map<String, List<TaskId>> activeTaskStatelessAssignment = assignTasksToThreads(
+                state.statelessActiveTasks(),
+                false,
                 consumers,
-                state
+                state,
+                threadTaskCounts
             );
 
+            // Combine activeTaskStatefulAssignment and activeTaskStatelessAssignment together into
+            // activeTaskStatelessAssignment
+            final Map<String, List<TaskId>> activeTaskAssignment = activeTaskStatefulAssignment;
+            for (final Map.Entry<String, List<TaskId>> threadEntry : activeTaskStatelessAssignment.entrySet()) {
+                activeTaskAssignment.get(threadEntry.getKey()).addAll(threadEntry.getValue());
+            }
+
             // Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance,
             // note once we pick the first consumer within the process to trigger probing rebalance, other consumer
             // would not set to trigger any more.
@@ -1048,61 +1065,67 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
     }
 
     /**
-     * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
-     * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
-     * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+     * Generate an assignment that tries to preserve thread-level stickiness for stateful tasks without violating
+     * balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by
+     * group id to spread subtopologies across threads and further balance the workload.
+     * Stateless tasks are simply spread across threads without taking into account previous ownership.
+     * threadLoad is a map that keeps track of task load per thread across multiple calls so active and standby
+     * tasks are evenly distributed
      */
-    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
-                                                          final Collection<TaskId> statelessTasksToAssign,
+    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> tasksToAssign,
+                                                          final boolean isStateful,
                                                           final SortedSet<String> consumers,
-                                                          final ClientState state) {
+                                                          final ClientState state,
+                                                          final Map<String, Integer> threadLoad) {
         final Map<String, List<TaskId>> assignment = new HashMap<>();
         for (final String consumer : consumers) {
             assignment.put(consumer, new ArrayList<>());
         }
 
-        final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
-        Collections.sort(unassignedStatelessTasks);
-
-        final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+        final int totalTasks = threadLoad.values().stream().reduce(tasksToAssign.size(), Integer::sum);
 
-        final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
-        final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
 
         final Queue<String> consumersToFill = new LinkedList<>();
         // keep track of tasks that we have to skip during the first pass in case we can reassign them later
         // using tree-map to make sure the iteration ordering over keys are preserved
         final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
 
-        if (!unassignedStatefulTasks.isEmpty()) {
-            // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+        if (!unassignedTasks.isEmpty()) {
+            // First assign tasks to previous owner, up to the min expected tasks/thread if these are stateful
             for (final String consumer : consumers) {
                 final List<TaskId> threadAssignment = assignment.get(consumer);
-
-                for (final TaskId task : state.prevTasksByLag(consumer)) {
-                    if (unassignedStatefulTasks.contains(task)) {
-                        if (threadAssignment.size() < minStatefulTasksPerThread) {
-                            threadAssignment.add(task);
-                            unassignedStatefulTasks.remove(task);
-                        } else {
-                            unassignedTaskToPreviousOwner.put(task, consumer);
+                // The number of tasks we have to assign here to hit minTasksPerThread
+                final int tasksTargetCount = minTasksPerThread - threadLoad.getOrDefault(consumer, 0);
+
+                if (isStateful) {
+                    for (final TaskId task : state.prevTasksByLag(consumer)) {
+                        if (unassignedTasks.contains(task)) {
+                            if (threadAssignment.size() < tasksTargetCount) {
+                                threadAssignment.add(task);
+                                unassignedTasks.remove(task);
+                            } else {
+                                unassignedTaskToPreviousOwner.put(task, consumer);
+                            }
                         }
                     }
                 }
 
-                if (threadAssignment.size() < minStatefulTasksPerThread) {
+                if (threadAssignment.size() < tasksTargetCount) {
                     consumersToFill.offer(consumer);
                 }
             }
 
             // Next interleave remaining unassigned tasks amongst unfilled consumers
             while (!consumersToFill.isEmpty()) {
-                final TaskId task = unassignedStatefulTasks.poll();
+                final TaskId task = unassignedTasks.poll();
                 if (task != null) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
-                    if (threadAssignment.size() < minStatefulTasksPerThread) {
+                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+                    if (threadTaskCount < minTasksPerThread) {
                         consumersToFill.offer(consumer);
                     }
                 } else {
@@ -1110,54 +1133,43 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 }
             }
 
-            // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
-            // stateful tasks still remaining that should now be distributed over the consumers
-            if (!unassignedStatefulTasks.isEmpty()) {
-                consumersToFill.addAll(consumers);
+            // At this point all consumers are at the min or min + 1 capacity.
+            // The min + 1 case can occur for standbys where there's fewer standbys than consumers and after assigning
+            // the active tasks some consumers already have min + 1 one tasks assigned.
+            // The tasks still remaining should now be distributed over the consumers that are still at min capacity
+            if (!unassignedTasks.isEmpty()) {
+                for (final String consumer : consumers) {
+                    final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0);
+                    if (taskCount == minTasksPerThread) {
+                        consumersToFill.add(consumer);
+                    }
+                }
 
                 // Go over the tasks we skipped earlier and assign them to their previous owner when possible
                 for (final Map.Entry<TaskId, String> taskEntry : unassignedTaskToPreviousOwner.entrySet()) {
                     final TaskId task = taskEntry.getKey();
                     final String consumer = taskEntry.getValue();
-                    if (consumersToFill.contains(consumer) && unassignedStatefulTasks.contains(task)) {
+                    if (consumersToFill.contains(consumer) && unassignedTasks.contains(task)) {
                         assignment.get(consumer).add(task);
-                        unassignedStatefulTasks.remove(task);
+                        unassignedTasks.remove(task);
                         // Remove this consumer since we know it is now at minCapacity + 1
                         consumersToFill.remove(consumer);
                     }
                 }
 
                 // Now just distribute the remaining unassigned stateful tasks over the consumers still at min capacity
-                for (final TaskId task : unassignedStatefulTasks) {
+                for (final TaskId task : unassignedTasks) {
                     final String consumer = consumersToFill.poll();
                     final List<TaskId> threadAssignment = assignment.get(consumer);
                     threadAssignment.add(task);
                 }
-
-
-                // There must be at least one consumer still at min capacity while all the others are at min
-                // capacity + 1, so start distributing stateless tasks to get all consumers back to the same count
-                while (unassignedStatelessTasksIter.hasNext()) {
-                    final String consumer = consumersToFill.poll();
-                    if (consumer != null) {
-                        final TaskId task = unassignedStatelessTasksIter.next();
-                        unassignedStatelessTasksIter.remove();
-                        assignment.get(consumer).add(task);
-                    } else {
-                        break;
-                    }
-                }
             }
         }
-
-        // Now just distribute tasks while circling through all the consumers
-        consumersToFill.addAll(consumers);
-
-        while (unassignedStatelessTasksIter.hasNext()) {
-            final TaskId task = unassignedStatelessTasksIter.next();
-            final String consumer = consumersToFill.poll();
-            assignment.get(consumer).add(task);
-            consumersToFill.offer(consumer);
+        // Update threadLoad
+        for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) {
+            final String consumer = taskEntry.getKey();
+            final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size();
+            threadLoad.put(consumer, totalCount);
         }
 
         return assignment;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 2bdc638..23e8d20 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -319,9 +319,10 @@ public class StreamsPartitionAssignorTest {
             previousAssignment,
             assignTasksToThreads(
                 allTasks,
-                emptySet(),
+                true,
                 consumers,
-                state
+                state,
+                new HashMap<>()
             )
         );
     }
@@ -353,9 +354,10 @@ public class StreamsPartitionAssignorTest {
         final Map<String, List<TaskId>> newAssignment =
             assignTasksToThreads(
                 allTasks,
-                emptySet(),
+                true,
                 consumers,
-                state
+                state,
+                new HashMap<>()
             );
 
         previousAssignment.get(CONSUMER_2).add(newTask);
@@ -386,9 +388,10 @@ public class StreamsPartitionAssignorTest {
 
         final Map<String, List<TaskId>> assignment = assignTasksToThreads(
             allTasks,
-            emptySet(),
+            true,
             consumers,
-            state
+            state,
+            new HashMap<>()
         );
 
         // Each member should have all of its previous tasks reassigned plus some of consumer 3's tasks
@@ -426,9 +429,10 @@ public class StreamsPartitionAssignorTest {
 
         final Map<String, List<TaskId>> assignment = assignTasksToThreads(
             allTasks,
-            emptySet(),
+            true,
             consumers,
-            state
+            state,
+            new HashMap<>()
         );
 
         // we should move one task each from consumer 1 and consumer 3 to the new member, and none from consumer 2
@@ -466,9 +470,10 @@ public class StreamsPartitionAssignorTest {
         final Map<String, List<TaskId>> interleavedTaskIds =
             assignTasksToThreads(
                 allTasks,
-                emptySet(),
+                true,
                 consumers,
-                state
+                state,
+                new HashMap<>()
             );
 
         assertThat(interleavedTaskIds, equalTo(assignment));
@@ -998,6 +1003,154 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
+    public void testAssignWithStandbyReplicasBalanceSparse() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
+
+        final List<String> topics = asList("topic1");
+
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        final List<String> client1Consumers = asList("consumer10", "consumer11", "consumer12", "consumer13");
+        final List<String> client2Consumers = asList("consumer20", "consumer21", "consumer22");
+
+        for (final String consumerId : client1Consumers) {
+            subscriptions.put(consumerId,
+                    new Subscription(
+                            topics,
+                            getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        }
+        for (final String consumerId : client2Consumers) {
+            subscriptions.put(consumerId,
+                    new Subscription(
+                            topics,
+                            getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        }
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Consumers
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        final AssignmentInfo info12 = AssignmentInfo.decode(assignments.get("consumer12").userData());
+        final AssignmentInfo info13 = AssignmentInfo.decode(assignments.get("consumer13").userData());
+        final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+        final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData());
+        final AssignmentInfo info22 = AssignmentInfo.decode(assignments.get("consumer22").userData());
+
+        // Check each consumer has no more than 1 task
+        assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 1);
+        assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 1);
+        assertTrue(info12.activeTasks().size() + info12.standbyTasks().size() <= 1);
+        assertTrue(info13.activeTasks().size() + info13.standbyTasks().size() <= 1);
+        assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 1);
+        assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 1);
+        assertTrue(info22.activeTasks().size() + info22.standbyTasks().size() <= 1);
+    }
+
+    @Test
+    public void testAssignWithStandbyReplicasBalanceDense() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
+
+        final List<String> topics = asList("topic1");
+
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        subscriptions.put("consumer10",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer20",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Consumers
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+
+        // Check each consumer has 3 tasks
+        assertEquals(3, info10.activeTasks().size() + info10.standbyTasks().size());
+        assertEquals(3, info20.activeTasks().size() + info20.standbyTasks().size());
+        // Check that not all the actives are on one node
+        assertTrue(info10.activeTasks().size() < 3);
+        assertTrue(info20.activeTasks().size() < 3);
+    }
+
+    @Test
+    public void testAssignWithStandbyReplicasBalanceWithStatelessTasks() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor_with_state", new MockApiProcessorSupplier<>(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor_with_state");
+
+        builder.addSource(null, "source2", null, null, null, "topic2");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source2");
+
+        final List<String> topics = asList("topic1", "topic2");
+
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+                singletonList(APPLICATION_ID + "-store1-changelog"),
+                singletonList(3))
+        );
+        configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+        subscriptions.put("consumer10",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer11",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer20",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+        subscriptions.put("consumer21",
+                new Subscription(
+                        topics,
+                        getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+
+        final Map<String, Assignment> assignments =
+                partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Consumers
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+        final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData());
+
+        // 9 tasks spread over 4 consumers, so we should have no more than 3 tasks per consumer
+        assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 3);
+        assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 3);
+        assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 3);
+        assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 3);
+        // No more than 1 standby per node.
+        assertTrue(info10.standbyTasks().size() <= 1);
+        assertTrue(info11.standbyTasks().size() <= 1);
+        assertTrue(info20.standbyTasks().size() <= 1);
+        assertTrue(info21.standbyTasks().size() <= 1);
+    }
+
+    @Test
     public void testOnAssignment() {
         taskManager = EasyMock.createStrictMock(TaskManager.class);