You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/30 23:31:01 UTC
[kafka] branch trunk updated: KAFKA-4969: Attempt to evenly
distribute load of tasks (#4410)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c6635e8 KAFKA-4969: Attempt to evenly distribute load of tasks (#4410)
c6635e8 is described below
commit c6635e8d12fa5fed32ac75a727e62332fffa210b
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Tue Jan 30 18:30:58 2018 -0500
KAFKA-4969: Attempt to evenly distribute load of tasks (#4410)
* removed round-robin approach, try to assign tasks to consumers in a more even manner, added unit test.
* better interleaved task approach, updated tests
Reviewers: Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>, Matthias J. Sax <mj...@apache.org>
---
.../internals/StreamPartitionAssignor.java | 53 ++++++++----
.../internals/assignment/StickyTaskAssignor.java | 32 ++++---
.../internals/StreamPartitionAssignorTest.java | 97 ++++++++++++++++++++++
.../assignment/StickyTaskAssignorTest.java | 44 ++++++++++
4 files changed, 197 insertions(+), 29 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 6709419..2a08308 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -45,6 +45,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -505,27 +506,26 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
final Set<String> consumers = entry.getValue().consumers;
final ClientState state = entry.getValue().state;
- final ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTaskCount());
- final int numActiveTasks = state.activeTaskCount();
+ final List<List<TaskId>> interleavedActive = interleaveTasksByGroupId(state.activeTasks(), consumers.size());
+ final List<List<TaskId>> interleavedStandby = interleaveTasksByGroupId(state.standbyTasks(), consumers.size());
- taskIds.addAll(state.activeTasks());
- taskIds.addAll(state.standbyTasks());
+ int consumerTaskIndex = 0;
- final int numConsumers = consumers.size();
-
- int i = 0;
for (String consumer : consumers) {
final Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
final ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>();
- final int numTaskIds = taskIds.size();
- for (int j = i; j < numTaskIds; j += numConsumers) {
- final TaskId taskId = taskIds.get(j);
- if (j < numActiveTasks) {
- for (TopicPartition partition : partitionsForTask.get(taskId)) {
- assignedPartitions.add(new AssignedPartition(taskId, partition));
- }
- } else {
+ final List<TaskId> assignedActiveList = interleavedActive.get(consumerTaskIndex);
+
+ for (final TaskId taskId : assignedActiveList) {
+ for (final TopicPartition partition : partitionsForTask.get(taskId)) {
+ assignedPartitions.add(new AssignedPartition(taskId, partition));
+ }
+ }
+
+ if (!state.standbyTasks().isEmpty()) {
+ final List<TaskId> assignedStandbyList = interleavedStandby.get(consumerTaskIndex);
+ for (final TaskId taskId : assignedStandbyList) {
Set<TopicPartition> standbyPartitions = standby.get(taskId);
if (standbyPartitions == null) {
standbyPartitions = new HashSet<>();
@@ -535,6 +535,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
}
+ consumerTaskIndex++;
+
Collections.sort(assignedPartitions);
final List<TaskId> active = new ArrayList<>();
final List<TopicPartition> activePartitions = new ArrayList<>();
@@ -545,13 +547,32 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
// finally, encode the assignment before sending back to coordinator
assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode()));
- i++;
}
}
return assignment;
}
+ // visible for testing
+ List<List<TaskId>> interleaveTasksByGroupId(final Collection<TaskId> taskIds, final int numberThreads) {
+ final LinkedList<TaskId> sortedTasks = new LinkedList<>(taskIds);
+ Collections.sort(sortedTasks);
+ final List<List<TaskId>> taskIdsForConsumerAssignment = new ArrayList<>(numberThreads);
+ for (int i = 0; i < numberThreads; i++) {
+ taskIdsForConsumerAssignment.add(new ArrayList<TaskId>());
+ }
+ while (!sortedTasks.isEmpty()) {
+ for (final List<TaskId> taskIdList : taskIdsForConsumerAssignment) {
+ final TaskId taskId = sortedTasks.poll();
+ if (taskId == null) {
+ break;
+ }
+ taskIdList.add(taskId);
+ }
+ }
+ return taskIdsForConsumerAssignment;
+ }
+
/**
* @throws TaskAssignmentException if there is no task id for one of the partitions specified
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 91738e0..de8fa57 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -115,7 +115,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
private void allocateTaskWithClientCandidates(final TaskId taskId, final Set<ID> clientsWithin, final boolean active) {
- final ClientState client = findClient(taskId, clientsWithin);
+ final ClientState client = findClient(taskId, clientsWithin, active);
taskPairs.addPairs(taskId, client.assignedTasks());
client.assign(taskId, active);
}
@@ -137,7 +137,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
}
- private ClientState findClient(final TaskId taskId, final Set<ID> clientsWithin) {
+ private ClientState findClient(final TaskId taskId, final Set<ID> clientsWithin, boolean active) {
// optimize the case where there is only 1 id to search within.
if (clientsWithin.size() == 1) {
@@ -146,14 +146,14 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
final ClientState previous = findClientsWithPreviousAssignedTask(taskId, clientsWithin);
if (previous == null) {
- return leastLoaded(taskId, clientsWithin);
+ return leastLoaded(taskId, clientsWithin, active);
}
if (shouldBalanceLoad(previous)) {
final ClientState standby = findLeastLoadedClientWithPreviousStandByTask(taskId, clientsWithin);
if (standby == null
|| shouldBalanceLoad(standby)) {
- return leastLoaded(taskId, clientsWithin);
+ return leastLoaded(taskId, clientsWithin, active);
}
return standby;
}
@@ -190,20 +190,21 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
}
final HashSet<ID> constrainTo = new HashSet<>(ids);
constrainTo.retainAll(clientsWithin);
- return leastLoaded(taskId, constrainTo);
+ return leastLoaded(taskId, constrainTo, false);
}
- private ClientState leastLoaded(final TaskId taskId, final Set<ID> clientIds) {
- final ClientState leastLoaded = findLeastLoaded(taskId, clientIds, true);
+ private ClientState leastLoaded(final TaskId taskId, final Set<ID> clientIds, final boolean active) {
+ final ClientState leastLoaded = findLeastLoaded(taskId, clientIds, true, active);
if (leastLoaded == null) {
- return findLeastLoaded(taskId, clientIds, false);
+ return findLeastLoaded(taskId, clientIds, false, active);
}
return leastLoaded;
}
private ClientState findLeastLoaded(final TaskId taskId,
- final Set<ID> clientIds,
- boolean checkTaskPairs) {
+ final Set<ID> clientIds,
+ final boolean checkTaskPairs,
+ final boolean active) {
ClientState leastLoaded = null;
for (final ID id : clientIds) {
final ClientState client = clients.get(id);
@@ -214,7 +215,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
if (leastLoaded == null || client.hasMoreAvailableCapacityThan(leastLoaded)) {
if (!checkTaskPairs) {
leastLoaded = client;
- } else if (taskPairs.hasNewPair(taskId, client.assignedTasks())) {
+ } else if (taskPairs.hasNewPair(taskId, client.assignedTasks(), active)) {
leastLoaded = client;
}
}
@@ -257,12 +258,17 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
this.pairs = new HashSet<>(maxPairs);
}
- boolean hasNewPair(final TaskId task1, final Set<TaskId> taskIds) {
+ boolean hasNewPair(final TaskId task1,
+ final Set<TaskId> taskIds,
+ final boolean active) {
if (pairs.size() == maxPairs) {
return false;
}
for (final TaskId taskId : taskIds) {
- if (!pairs.contains(pair(task1, taskId))) {
+ if (!active && !pairs.contains(pair(task1, taskId))) {
+ return true;
+ }
+ if (!pairs.contains(pair(task1, taskId)) && task1.topicGroupId != taskId.topicGroupId) {
return true;
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 582c70b..02ab803 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -70,9 +70,11 @@ public class StreamPartitionAssignorTest {
private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
+ private final TopicPartition t1p3 = new TopicPartition("topic1", 3);
private final TopicPartition t2p0 = new TopicPartition("topic2", 0);
private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
+ private final TopicPartition t2p3 = new TopicPartition("topic2", 3);
private final TopicPartition t3p0 = new TopicPartition("topic3", 0);
private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
@@ -138,6 +140,33 @@ public class StreamPartitionAssignorTest {
}
@Test
+ public void shouldInterleaveTasksByGroupId() {
+ final TaskId taskIdA0 = new TaskId(0, 0);
+ final TaskId taskIdA1 = new TaskId(0, 1);
+ final TaskId taskIdA2 = new TaskId(0, 2);
+ final TaskId taskIdA3 = new TaskId(0, 3);
+
+ final TaskId taskIdB0 = new TaskId(1, 0);
+ final TaskId taskIdB1 = new TaskId(1, 1);
+ final TaskId taskIdB2 = new TaskId(1, 2);
+
+ final TaskId taskIdC0 = new TaskId(2, 0);
+ final TaskId taskIdC1 = new TaskId(2, 1);
+
+ final List<TaskId> expectedSubList1 = Arrays.asList(taskIdA0, taskIdA3, taskIdB2);
+ final List<TaskId> expectedSubList2 = Arrays.asList(taskIdA1, taskIdB0, taskIdC0);
+ final List<TaskId> expectedSubList3 = Arrays.asList(taskIdA2, taskIdB1, taskIdC1);
+ final List<List<TaskId>> embeddedList = Arrays.asList(expectedSubList1, expectedSubList2, expectedSubList3);
+
+ List<TaskId> tasks = Arrays.asList(taskIdC0, taskIdC1, taskIdB0, taskIdB1, taskIdB2, taskIdA0, taskIdA1, taskIdA2, taskIdA3);
+ Collections.shuffle(tasks);
+
+ final List<List<TaskId>> interleavedTaskIds = partitionAssignor.interleaveTasksByGroupId(tasks, 3);
+
+ assertThat(interleavedTaskIds, equalTo(embeddedList));
+ }
+
+ @Test
public void testSubscription() throws Exception {
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addSource(null, "source2", null, null, null, "topic2");
@@ -230,6 +259,74 @@ public class StreamPartitionAssignorTest {
}
@Test
+ public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() throws Exception {
+ builder.addSource(null, "source1", null, null, null, "topic1");
+ builder.addSource(null, "source2", null, null, null, "topic2");
+ builder.addProcessor("processor", new MockProcessorSupplier(), "source1");
+ builder.addProcessor("processorII", new MockProcessorSupplier(), "source2");
+
+ final List<PartitionInfo> localInfos = Arrays.asList(
+ new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic1", 3, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic2", 3, Node.noNode(), new Node[0], new Node[0])
+ );
+
+ final Cluster localMetadata = new Cluster(
+ "cluster",
+ Collections.singletonList(Node.noNode()),
+ localInfos, Collections.<String>emptySet(),
+ Collections.<String>emptySet());
+
+ final List<String> topics = Utils.mkList("topic1", "topic2");
+
+ final TaskId taskIdA0 = new TaskId(0, 0);
+ final TaskId taskIdA1 = new TaskId(0, 1);
+ final TaskId taskIdA2 = new TaskId(0, 2);
+ final TaskId taskIdA3 = new TaskId(0, 3);
+
+ final TaskId taskIdB0 = new TaskId(1, 0);
+ final TaskId taskIdB1 = new TaskId(1, 1);
+ final TaskId taskIdB2 = new TaskId(1, 2);
+ final TaskId taskIdB3 = new TaskId(1, 3);
+
+ final UUID uuid1 = UUID.randomUUID();
+
+ mockTaskManager(new HashSet<TaskId>(), new HashSet<TaskId>(), uuid1, builder);
+ configurePartitionAssignor(Collections.<String, Object>emptyMap());
+
+ partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
+
+ final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+ subscriptions.put("consumer10",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<TaskId>(), new HashSet<TaskId>(), userEndPoint).encode()));
+ subscriptions.put("consumer11",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<TaskId>(), new HashSet<TaskId>(), userEndPoint).encode()));
+
+ final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(localMetadata, subscriptions);
+
+ // check assigned partitions
+ assertEquals(Utils.mkSet(Utils.mkSet(t2p2, t1p0, t1p2, t2p0), Utils.mkSet(t1p1, t2p1, t1p3, t2p3)),
+ Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
+
+ // the first consumer
+ final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+
+ final List<TaskId> expectedInfo10TaskIds = Arrays.asList(taskIdA1, taskIdA3, taskIdB1, taskIdB3);
+ assertEquals(expectedInfo10TaskIds, info10.activeTasks);
+
+ // the second consumer
+ final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+ final List<TaskId> expectedInfo11TaskIds = Arrays.asList(taskIdA0, taskIdA2, taskIdB0, taskIdB2);
+
+ assertEquals(expectedInfo11TaskIds, info11.activeTasks);
+ }
+
+ @Test
public void testAssignWithPartialTopology() throws Exception {
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 86af0be..4f770c8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -44,6 +44,16 @@ public class StickyTaskAssignorTest {
private final TaskId task03 = new TaskId(0, 3);
private final TaskId task04 = new TaskId(0, 4);
private final TaskId task05 = new TaskId(0, 5);
+
+ private final TaskId task10 = new TaskId(1, 0);
+ private final TaskId task11 = new TaskId(1, 1);
+ private final TaskId task12 = new TaskId(1, 2);
+ private final TaskId task20 = new TaskId(2, 0);
+ private final TaskId task21 = new TaskId(2, 1);
+ private final TaskId task22 = new TaskId(2, 2);
+
+ private final List<Integer> expectedTopicGroupIds = Arrays.asList(1, 2);
+
private final Map<Integer, ClientState> clients = new TreeMap<>();
private final Integer p1 = 1;
private final Integer p2 = 2;
@@ -65,6 +75,28 @@ public class StickyTaskAssignorTest {
}
@Test
+ public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithNoStandByTasks() {
+ createClient(p1, 2);
+ createClient(p2, 2);
+ createClient(p3, 2);
+
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(task10, task11, task22, task20, task21, task12);
+ taskAssignor.assign(0);
+ assertActiveTaskTopicGroupIdsEvenlyDistributed();
+ }
+
+ @Test
+ public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithStandByTasks() {
+ createClient(p1, 2);
+ createClient(p2, 2);
+ createClient(p3, 2);
+
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(task20, task11, task12, task10, task21, task22);
+ taskAssignor.assign(1);
+ assertActiveTaskTopicGroupIdsEvenlyDistributed();
+ }
+
+ @Test
public void shouldNotMigrateActiveTaskToOtherProcess() {
createClientWithPreviousActiveTasks(p1, 1, task00);
createClientWithPreviousActiveTasks(p2, 1, task01);
@@ -621,4 +653,16 @@ public class StickyTaskAssignorTest {
return clientState;
}
+ private void assertActiveTaskTopicGroupIdsEvenlyDistributed() {
+ for (final Map.Entry<Integer, ClientState> clientStateEntry : clients.entrySet()) {
+ final List<Integer> topicGroupIds = new ArrayList<>();
+ final Set<TaskId> activeTasks = clientStateEntry.getValue().activeTasks();
+ for (final TaskId activeTask : activeTasks) {
+ topicGroupIds.add(activeTask.topicGroupId);
+ }
+ Collections.sort(topicGroupIds);
+ assertThat(topicGroupIds, equalTo(expectedTopicGroupIds));
+ }
+ }
+
}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.