You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/03/05 08:13:41 UTC
[kafka] branch trunk updated: KAFKA-12959: Distribute standby and active tasks across threads to better balance load between threads (#11493)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e3ef29e KAFKA-12959: Distribute standby and active tasks across threads to better balance load between threads (#11493)
e3ef29e is described below
commit e3ef29ea0300e3ed5edf5c800f914f19273c9851
Author: Tim Patterson <ti...@gmail.com>
AuthorDate: Sat Mar 5 21:11:42 2022 +1300
KAFKA-12959: Distribute standby and active tasks across threads to better balance load between threads (#11493)
Balance standby and active stateful tasks evenly across threads
Reviewer: Luke Chen <sh...@gmail.com>
---
.../internals/StreamsPartitionAssignor.java | 134 ++++++++--------
.../internals/StreamsPartitionAssignorTest.java | 173 +++++++++++++++++++--
2 files changed, 236 insertions(+), 71 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 80c506c..03f826a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -60,7 +60,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -804,21 +803,39 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final ClientMetadata clientMetadata = clientEntry.getValue();
final ClientState state = clientMetadata.state;
final SortedSet<String> consumers = clientMetadata.consumers;
+ final Map<String, Integer> threadTaskCounts = new HashMap<>();
- final Map<String, List<TaskId>> activeTaskAssignment = assignTasksToThreads(
+ final Map<String, List<TaskId>> activeTaskStatefulAssignment = assignTasksToThreads(
state.statefulActiveTasks(),
- state.statelessActiveTasks(),
+ true,
consumers,
- state
+ state,
+ threadTaskCounts
);
final Map<String, List<TaskId>> standbyTaskAssignment = assignTasksToThreads(
state.standbyTasks(),
- Collections.emptySet(),
+ true,
+ consumers,
+ state,
+ threadTaskCounts
+ );
+
+ final Map<String, List<TaskId>> activeTaskStatelessAssignment = assignTasksToThreads(
+ state.statelessActiveTasks(),
+ false,
consumers,
- state
+ state,
+ threadTaskCounts
);
+ // Combine activeTaskStatefulAssignment and activeTaskStatelessAssignment together into
+ // activeTaskStatelessAssignment
+ final Map<String, List<TaskId>> activeTaskAssignment = activeTaskStatefulAssignment;
+ for (final Map.Entry<String, List<TaskId>> threadEntry : activeTaskStatelessAssignment.entrySet()) {
+ activeTaskAssignment.get(threadEntry.getKey()).addAll(threadEntry.getValue());
+ }
+
// Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance,
// note once we pick the first consumer within the process to trigger probing rebalance, other consumer
// would not set to trigger any more.
@@ -1048,61 +1065,67 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
}
/**
- * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating
- * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners
- * will be interleaved by group id to spread subtopologies across threads and further balance the workload.
+ * Generate an assignment that tries to preserve thread-level stickiness for stateful tasks without violating
+ * balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by
+ * group id to spread subtopologies across threads and further balance the workload.
+ * Stateless tasks are simply spread across threads without taking into account previous ownership.
+ * threadLoad is a map that keeps track of task load per thread across multiple calls so active and standby
+ * tasks are evenly distributed
*/
- static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign,
- final Collection<TaskId> statelessTasksToAssign,
+ static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> tasksToAssign,
+ final boolean isStateful,
final SortedSet<String> consumers,
- final ClientState state) {
+ final ClientState state,
+ final Map<String, Integer> threadLoad) {
final Map<String, List<TaskId>> assignment = new HashMap<>();
for (final String consumer : consumers) {
assignment.put(consumer, new ArrayList<>());
}
- final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign);
- Collections.sort(unassignedStatelessTasks);
-
- final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
+ final int totalTasks = threadLoad.values().stream().reduce(tasksToAssign.size(), Integer::sum);
- final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size());
- final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign);
+ final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
+ final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);
final Queue<String> consumersToFill = new LinkedList<>();
// keep track of tasks that we have to skip during the first pass in case we can reassign them later
// using tree-map to make sure the iteration ordering over keys are preserved
final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();
- if (!unassignedStatefulTasks.isEmpty()) {
- // First assign stateful tasks to previous owner, up to the min expected tasks/thread
+ if (!unassignedTasks.isEmpty()) {
+ // First assign tasks to previous owner, up to the min expected tasks/thread if these are stateful
for (final String consumer : consumers) {
final List<TaskId> threadAssignment = assignment.get(consumer);
-
- for (final TaskId task : state.prevTasksByLag(consumer)) {
- if (unassignedStatefulTasks.contains(task)) {
- if (threadAssignment.size() < minStatefulTasksPerThread) {
- threadAssignment.add(task);
- unassignedStatefulTasks.remove(task);
- } else {
- unassignedTaskToPreviousOwner.put(task, consumer);
+ // The number of tasks we have to assign here to hit minTasksPerThread
+ final int tasksTargetCount = minTasksPerThread - threadLoad.getOrDefault(consumer, 0);
+
+ if (isStateful) {
+ for (final TaskId task : state.prevTasksByLag(consumer)) {
+ if (unassignedTasks.contains(task)) {
+ if (threadAssignment.size() < tasksTargetCount) {
+ threadAssignment.add(task);
+ unassignedTasks.remove(task);
+ } else {
+ unassignedTaskToPreviousOwner.put(task, consumer);
+ }
}
}
}
- if (threadAssignment.size() < minStatefulTasksPerThread) {
+ if (threadAssignment.size() < tasksTargetCount) {
consumersToFill.offer(consumer);
}
}
// Next interleave remaining unassigned tasks amongst unfilled consumers
while (!consumersToFill.isEmpty()) {
- final TaskId task = unassignedStatefulTasks.poll();
+ final TaskId task = unassignedTasks.poll();
if (task != null) {
final String consumer = consumersToFill.poll();
final List<TaskId> threadAssignment = assignment.get(consumer);
threadAssignment.add(task);
- if (threadAssignment.size() < minStatefulTasksPerThread) {
+ final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
+ if (threadTaskCount < minTasksPerThread) {
consumersToFill.offer(consumer);
}
} else {
@@ -1110,54 +1133,43 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
}
}
- // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned
- // stateful tasks still remaining that should now be distributed over the consumers
- if (!unassignedStatefulTasks.isEmpty()) {
- consumersToFill.addAll(consumers);
+ // At this point all consumers are at the min or min + 1 capacity.
+ // The min + 1 case can occur for standbys where there's fewer standbys than consumers and after assigning
+ // the active tasks some consumers already have min + 1 one tasks assigned.
+ // The tasks still remaining should now be distributed over the consumers that are still at min capacity
+ if (!unassignedTasks.isEmpty()) {
+ for (final String consumer : consumers) {
+ final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0);
+ if (taskCount == minTasksPerThread) {
+ consumersToFill.add(consumer);
+ }
+ }
// Go over the tasks we skipped earlier and assign them to their previous owner when possible
for (final Map.Entry<TaskId, String> taskEntry : unassignedTaskToPreviousOwner.entrySet()) {
final TaskId task = taskEntry.getKey();
final String consumer = taskEntry.getValue();
- if (consumersToFill.contains(consumer) && unassignedStatefulTasks.contains(task)) {
+ if (consumersToFill.contains(consumer) && unassignedTasks.contains(task)) {
assignment.get(consumer).add(task);
- unassignedStatefulTasks.remove(task);
+ unassignedTasks.remove(task);
// Remove this consumer since we know it is now at minCapacity + 1
consumersToFill.remove(consumer);
}
}
// Now just distribute the remaining unassigned stateful tasks over the consumers still at min capacity
- for (final TaskId task : unassignedStatefulTasks) {
+ for (final TaskId task : unassignedTasks) {
final String consumer = consumersToFill.poll();
final List<TaskId> threadAssignment = assignment.get(consumer);
threadAssignment.add(task);
}
-
-
- // There must be at least one consumer still at min capacity while all the others are at min
- // capacity + 1, so start distributing stateless tasks to get all consumers back to the same count
- while (unassignedStatelessTasksIter.hasNext()) {
- final String consumer = consumersToFill.poll();
- if (consumer != null) {
- final TaskId task = unassignedStatelessTasksIter.next();
- unassignedStatelessTasksIter.remove();
- assignment.get(consumer).add(task);
- } else {
- break;
- }
- }
}
}
-
- // Now just distribute tasks while circling through all the consumers
- consumersToFill.addAll(consumers);
-
- while (unassignedStatelessTasksIter.hasNext()) {
- final TaskId task = unassignedStatelessTasksIter.next();
- final String consumer = consumersToFill.poll();
- assignment.get(consumer).add(task);
- consumersToFill.offer(consumer);
+ // Update threadLoad
+ for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) {
+ final String consumer = taskEntry.getKey();
+ final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size();
+ threadLoad.put(consumer, totalCount);
}
return assignment;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 2bdc638..23e8d20 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -319,9 +319,10 @@ public class StreamsPartitionAssignorTest {
previousAssignment,
assignTasksToThreads(
allTasks,
- emptySet(),
+ true,
consumers,
- state
+ state,
+ new HashMap<>()
)
);
}
@@ -353,9 +354,10 @@ public class StreamsPartitionAssignorTest {
final Map<String, List<TaskId>> newAssignment =
assignTasksToThreads(
allTasks,
- emptySet(),
+ true,
consumers,
- state
+ state,
+ new HashMap<>()
);
previousAssignment.get(CONSUMER_2).add(newTask);
@@ -386,9 +388,10 @@ public class StreamsPartitionAssignorTest {
final Map<String, List<TaskId>> assignment = assignTasksToThreads(
allTasks,
- emptySet(),
+ true,
consumers,
- state
+ state,
+ new HashMap<>()
);
// Each member should have all of its previous tasks reassigned plus some of consumer 3's tasks
@@ -426,9 +429,10 @@ public class StreamsPartitionAssignorTest {
final Map<String, List<TaskId>> assignment = assignTasksToThreads(
allTasks,
- emptySet(),
+ true,
consumers,
- state
+ state,
+ new HashMap<>()
);
// we should move one task each from consumer 1 and consumer 3 to the new member, and none from consumer 2
@@ -466,9 +470,10 @@ public class StreamsPartitionAssignorTest {
final Map<String, List<TaskId>> interleavedTaskIds =
assignTasksToThreads(
allTasks,
- emptySet(),
+ true,
consumers,
- state
+ state,
+ new HashMap<>()
);
assertThat(interleavedTaskIds, equalTo(assignment));
@@ -998,6 +1003,154 @@ public class StreamsPartitionAssignorTest {
}
@Test
+ public void testAssignWithStandbyReplicasBalanceSparse() {
+ builder.addSource(null, "source1", null, null, null, "topic1");
+ builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+ builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
+
+ final List<String> topics = asList("topic1");
+
+ createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+ adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+ singletonList(APPLICATION_ID + "-store1-changelog"),
+ singletonList(3))
+ );
+ configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+ final List<String> client1Consumers = asList("consumer10", "consumer11", "consumer12", "consumer13");
+ final List<String> client2Consumers = asList("consumer20", "consumer21", "consumer22");
+
+ for (final String consumerId : client1Consumers) {
+ subscriptions.put(consumerId,
+ new Subscription(
+ topics,
+ getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+ }
+ for (final String consumerId : client2Consumers) {
+ subscriptions.put(consumerId,
+ new Subscription(
+ topics,
+ getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+ }
+
+ final Map<String, Assignment> assignments =
+ partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+ // Consumers
+ final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+ final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+ final AssignmentInfo info12 = AssignmentInfo.decode(assignments.get("consumer12").userData());
+ final AssignmentInfo info13 = AssignmentInfo.decode(assignments.get("consumer13").userData());
+ final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+ final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData());
+ final AssignmentInfo info22 = AssignmentInfo.decode(assignments.get("consumer22").userData());
+
+ // Check each consumer has no more than 1 task
+ assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 1);
+ assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 1);
+ assertTrue(info12.activeTasks().size() + info12.standbyTasks().size() <= 1);
+ assertTrue(info13.activeTasks().size() + info13.standbyTasks().size() <= 1);
+ assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 1);
+ assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 1);
+ assertTrue(info22.activeTasks().size() + info22.standbyTasks().size() <= 1);
+ }
+
+ @Test
+ public void testAssignWithStandbyReplicasBalanceDense() {
+ builder.addSource(null, "source1", null, null, null, "topic1");
+ builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1");
+ builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
+
+ final List<String> topics = asList("topic1");
+
+ createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+ adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+ singletonList(APPLICATION_ID + "-store1-changelog"),
+ singletonList(3))
+ );
+ configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+ subscriptions.put("consumer10",
+ new Subscription(
+ topics,
+ getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+ subscriptions.put("consumer20",
+ new Subscription(
+ topics,
+ getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+
+ final Map<String, Assignment> assignments =
+ partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+ // Consumers
+ final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+ final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+
+ // Check each consumer has 3 tasks
+ assertEquals(3, info10.activeTasks().size() + info10.standbyTasks().size());
+ assertEquals(3, info20.activeTasks().size() + info20.standbyTasks().size());
+ // Check that not all the actives are on one node
+ assertTrue(info10.activeTasks().size() < 3);
+ assertTrue(info20.activeTasks().size() < 3);
+ }
+
+ @Test
+ public void testAssignWithStandbyReplicasBalanceWithStatelessTasks() {
+ builder.addSource(null, "source1", null, null, null, "topic1");
+ builder.addProcessor("processor_with_state", new MockApiProcessorSupplier<>(), "source1");
+ builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor_with_state");
+
+ builder.addSource(null, "source2", null, null, null, "topic2");
+ builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source2");
+
+ final List<String> topics = asList("topic1", "topic2");
+
+ createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+ adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+ singletonList(APPLICATION_ID + "-store1-changelog"),
+ singletonList(3))
+ );
+ configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+
+ subscriptions.put("consumer10",
+ new Subscription(
+ topics,
+ getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+ subscriptions.put("consumer11",
+ new Subscription(
+ topics,
+ getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+ subscriptions.put("consumer20",
+ new Subscription(
+ topics,
+ getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+ subscriptions.put("consumer21",
+ new Subscription(
+ topics,
+ getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()));
+
+ final Map<String, Assignment> assignments =
+ partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+ // Consumers
+ final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+ final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+ final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+ final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData());
+
+ // 9 tasks spread over 4 consumers, so we should have no more than 3 tasks per consumer
+ assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 3);
+ assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 3);
+ assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 3);
+ assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 3);
+ // No more than 1 standby per node.
+ assertTrue(info10.standbyTasks().size() <= 1);
+ assertTrue(info11.standbyTasks().size() <= 1);
+ assertTrue(info20.standbyTasks().size() <= 1);
+ assertTrue(info21.standbyTasks().size() <= 1);
+ }
+
+ @Test
public void testOnAssignment() {
taskManager = EasyMock.createStrictMock(TaskManager.class);