You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/30 23:31:01 UTC

[kafka] branch trunk updated: KAFKA-4969: Attempt to evenly distribute load of tasks (#4410)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c6635e8  KAFKA-4969: Attempt to evenly distribute load of tasks (#4410)
c6635e8 is described below

commit c6635e8d12fa5fed32ac75a727e62332fffa210b
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Tue Jan 30 18:30:58 2018 -0500

    KAFKA-4969: Attempt to evenly distribute load of tasks (#4410)
    
    * removed round-robin approach, try to assign tasks to consumers in a more even manner, added unit test.
    * better interleaved task approach, updated tests
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>, Matthias J. Sax <mj...@apache.org>
---
 .../internals/StreamPartitionAssignor.java         | 53 ++++++++----
 .../internals/assignment/StickyTaskAssignor.java   | 32 ++++---
 .../internals/StreamPartitionAssignorTest.java     | 97 ++++++++++++++++++++++
 .../assignment/StickyTaskAssignorTest.java         | 44 ++++++++++
 4 files changed, 197 insertions(+), 29 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 6709419..2a08308 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -45,6 +45,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -505,27 +506,26 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             final Set<String> consumers = entry.getValue().consumers;
             final ClientState state = entry.getValue().state;
 
-            final ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTaskCount());
-            final int numActiveTasks = state.activeTaskCount();
+            final List<List<TaskId>> interleavedActive = interleaveTasksByGroupId(state.activeTasks(), consumers.size());
+            final List<List<TaskId>> interleavedStandby = interleaveTasksByGroupId(state.standbyTasks(), consumers.size());
 
-            taskIds.addAll(state.activeTasks());
-            taskIds.addAll(state.standbyTasks());
+            int consumerTaskIndex = 0;
 
-            final int numConsumers = consumers.size();
-
-            int i = 0;
             for (String consumer : consumers) {
                 final Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
                 final ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>();
 
-                final int numTaskIds = taskIds.size();
-                for (int j = i; j < numTaskIds; j += numConsumers) {
-                    final TaskId taskId = taskIds.get(j);
-                    if (j < numActiveTasks) {
-                        for (TopicPartition partition : partitionsForTask.get(taskId)) {
-                            assignedPartitions.add(new AssignedPartition(taskId, partition));
-                        }
-                    } else {
+                final List<TaskId> assignedActiveList = interleavedActive.get(consumerTaskIndex);
+
+                for (final TaskId taskId : assignedActiveList) {
+                    for (final TopicPartition partition : partitionsForTask.get(taskId)) {
+                        assignedPartitions.add(new AssignedPartition(taskId, partition));
+                    }
+                }
+
+                if (!state.standbyTasks().isEmpty()) {
+                    final List<TaskId> assignedStandbyList = interleavedStandby.get(consumerTaskIndex);
+                    for (final TaskId taskId : assignedStandbyList) {
                         Set<TopicPartition> standbyPartitions = standby.get(taskId);
                         if (standbyPartitions == null) {
                             standbyPartitions = new HashSet<>();
@@ -535,6 +535,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                     }
                 }
 
+                consumerTaskIndex++;
+
                 Collections.sort(assignedPartitions);
                 final List<TaskId> active = new ArrayList<>();
                 final List<TopicPartition> activePartitions = new ArrayList<>();
@@ -545,13 +547,32 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
                 // finally, encode the assignment before sending back to coordinator
                 assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode()));
-                i++;
             }
         }
 
         return assignment;
     }
 
+    // visible for testing
+    List<List<TaskId>> interleaveTasksByGroupId(final Collection<TaskId> taskIds, final int numberThreads) {
+        final LinkedList<TaskId> sortedTasks = new LinkedList<>(taskIds);
+        Collections.sort(sortedTasks);
+        final List<List<TaskId>> taskIdsForConsumerAssignment = new ArrayList<>(numberThreads);
+        for (int i = 0; i < numberThreads; i++) {
+            taskIdsForConsumerAssignment.add(new ArrayList<TaskId>());
+        }
+        while (!sortedTasks.isEmpty()) {
+            for (final List<TaskId> taskIdList : taskIdsForConsumerAssignment) {
+                final TaskId taskId = sortedTasks.poll();
+                if (taskId == null) {
+                    break;
+                }
+                taskIdList.add(taskId);
+            }
+        }
+        return taskIdsForConsumerAssignment;
+    }
+
     /**
      * @throws TaskAssignmentException if there is no task id for one of the partitions specified
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 91738e0..de8fa57 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -115,7 +115,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
 
 
     private void allocateTaskWithClientCandidates(final TaskId taskId, final Set<ID> clientsWithin, final boolean active) {
-        final ClientState client = findClient(taskId, clientsWithin);
+        final ClientState client = findClient(taskId, clientsWithin, active);
         taskPairs.addPairs(taskId, client.assignedTasks());
         client.assign(taskId, active);
     }
@@ -137,7 +137,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
     }
 
 
-    private ClientState findClient(final TaskId taskId, final Set<ID> clientsWithin) {
+    private ClientState findClient(final TaskId taskId, final Set<ID> clientsWithin, boolean active) {
 
         // optimize the case where there is only 1 id to search within.
         if (clientsWithin.size() == 1) {
@@ -146,14 +146,14 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
 
         final ClientState previous = findClientsWithPreviousAssignedTask(taskId, clientsWithin);
         if (previous == null) {
-            return leastLoaded(taskId, clientsWithin);
+            return leastLoaded(taskId, clientsWithin, active);
         }
 
         if (shouldBalanceLoad(previous)) {
             final ClientState standby = findLeastLoadedClientWithPreviousStandByTask(taskId, clientsWithin);
             if (standby == null
                     || shouldBalanceLoad(standby)) {
-                return leastLoaded(taskId, clientsWithin);
+                return leastLoaded(taskId, clientsWithin, active);
             }
             return standby;
         }
@@ -190,20 +190,21 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
         }
         final HashSet<ID> constrainTo = new HashSet<>(ids);
         constrainTo.retainAll(clientsWithin);
-        return leastLoaded(taskId, constrainTo);
+        return leastLoaded(taskId, constrainTo, false);
     }
 
-    private ClientState leastLoaded(final TaskId taskId, final Set<ID> clientIds) {
-        final ClientState leastLoaded = findLeastLoaded(taskId, clientIds, true);
+    private ClientState leastLoaded(final TaskId taskId, final Set<ID> clientIds, final boolean active) {
+        final ClientState leastLoaded = findLeastLoaded(taskId, clientIds, true, active);
         if (leastLoaded == null) {
-            return findLeastLoaded(taskId, clientIds, false);
+            return findLeastLoaded(taskId, clientIds, false, active);
         }
         return leastLoaded;
     }
 
     private ClientState findLeastLoaded(final TaskId taskId,
-                                                final Set<ID> clientIds,
-                                                boolean checkTaskPairs) {
+                                        final Set<ID> clientIds,
+                                        final boolean checkTaskPairs,
+                                        final boolean active) {
         ClientState leastLoaded = null;
         for (final ID id : clientIds) {
             final ClientState client = clients.get(id);
@@ -214,7 +215,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
             if (leastLoaded == null || client.hasMoreAvailableCapacityThan(leastLoaded)) {
                 if (!checkTaskPairs) {
                     leastLoaded = client;
-                } else if (taskPairs.hasNewPair(taskId, client.assignedTasks())) {
+                } else if (taskPairs.hasNewPair(taskId, client.assignedTasks(), active)) {
                     leastLoaded = client;
                 }
             }
@@ -257,12 +258,17 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
             this.pairs = new HashSet<>(maxPairs);
         }
 
-        boolean hasNewPair(final TaskId task1, final Set<TaskId> taskIds) {
+        boolean hasNewPair(final TaskId task1,
+                           final Set<TaskId> taskIds,
+                           final boolean active) {
             if (pairs.size() == maxPairs) {
                 return false;
             }
             for (final TaskId taskId : taskIds) {
-                if (!pairs.contains(pair(task1, taskId))) {
+                if (!active && !pairs.contains(pair(task1, taskId))) {
+                    return true;
+                }
+                if (!pairs.contains(pair(task1, taskId)) && task1.topicGroupId != taskId.topicGroupId) {
                     return true;
                 }
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 582c70b..02ab803 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -70,9 +70,11 @@ public class StreamPartitionAssignorTest {
     private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
     private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
     private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
+    private final TopicPartition t1p3 = new TopicPartition("topic1", 3);
     private final TopicPartition t2p0 = new TopicPartition("topic2", 0);
     private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
     private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
+    private final TopicPartition t2p3 = new TopicPartition("topic2", 3);
     private final TopicPartition t3p0 = new TopicPartition("topic3", 0);
     private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
     private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
@@ -138,6 +140,33 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
+    public void shouldInterleaveTasksByGroupId() {
+        final TaskId taskIdA0 = new TaskId(0, 0);
+        final TaskId taskIdA1 = new TaskId(0, 1);
+        final TaskId taskIdA2 = new TaskId(0, 2);
+        final TaskId taskIdA3 = new TaskId(0, 3);
+
+        final TaskId taskIdB0 = new TaskId(1, 0);
+        final TaskId taskIdB1 = new TaskId(1, 1);
+        final TaskId taskIdB2 = new TaskId(1, 2);
+
+        final TaskId taskIdC0 = new TaskId(2, 0);
+        final TaskId taskIdC1 = new TaskId(2, 1);
+
+        final List<TaskId> expectedSubList1 = Arrays.asList(taskIdA0, taskIdA3, taskIdB2);
+        final List<TaskId> expectedSubList2 = Arrays.asList(taskIdA1, taskIdB0, taskIdC0);
+        final List<TaskId> expectedSubList3 = Arrays.asList(taskIdA2, taskIdB1, taskIdC1);
+        final List<List<TaskId>> embeddedList = Arrays.asList(expectedSubList1, expectedSubList2, expectedSubList3);
+
+        List<TaskId> tasks = Arrays.asList(taskIdC0, taskIdC1, taskIdB0, taskIdB1, taskIdB2, taskIdA0, taskIdA1, taskIdA2, taskIdA3);
+        Collections.shuffle(tasks);
+
+        final List<List<TaskId>> interleavedTaskIds = partitionAssignor.interleaveTasksByGroupId(tasks, 3);
+
+        assertThat(interleavedTaskIds, equalTo(embeddedList));
+    }
+
+    @Test
     public void testSubscription() throws Exception {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
@@ -230,6 +259,74 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
+    public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() throws Exception {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addSource(null, "source2", null, null, null, "topic2");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1");
+        builder.addProcessor("processorII", new MockProcessorSupplier(),  "source2");
+
+        final List<PartitionInfo> localInfos = Arrays.asList(
+            new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic1", 3, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic2", 3, Node.noNode(), new Node[0], new Node[0])
+        );
+
+        final Cluster localMetadata = new Cluster(
+            "cluster",
+            Collections.singletonList(Node.noNode()),
+            localInfos, Collections.<String>emptySet(),
+            Collections.<String>emptySet());
+
+        final List<String> topics = Utils.mkList("topic1", "topic2");
+
+        final TaskId taskIdA0 = new TaskId(0, 0);
+        final TaskId taskIdA1 = new TaskId(0, 1);
+        final TaskId taskIdA2 = new TaskId(0, 2);
+        final TaskId taskIdA3 = new TaskId(0, 3);
+
+        final TaskId taskIdB0 = new TaskId(1, 0);
+        final TaskId taskIdB1 = new TaskId(1, 1);
+        final TaskId taskIdB2 = new TaskId(1, 2);
+        final TaskId taskIdB3 = new TaskId(1, 3);
+
+        final UUID uuid1 = UUID.randomUUID();
+
+        mockTaskManager(new HashSet<TaskId>(), new HashSet<TaskId>(), uuid1, builder);
+        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
+
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<TaskId>(), new HashSet<TaskId>(), userEndPoint).encode()));
+        subscriptions.put("consumer11",
+                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<TaskId>(), new HashSet<TaskId>(), userEndPoint).encode()));
+
+        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(localMetadata, subscriptions);
+
+        // check assigned partitions
+        assertEquals(Utils.mkSet(Utils.mkSet(t2p2, t1p0, t1p2, t2p0), Utils.mkSet(t1p1, t2p1, t1p3, t2p3)),
+                     Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
+
+        // the first consumer
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+
+        final List<TaskId> expectedInfo10TaskIds = Arrays.asList(taskIdA1, taskIdA3, taskIdB1, taskIdB3);
+        assertEquals(expectedInfo10TaskIds, info10.activeTasks);
+
+        // the second consumer
+        final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        final List<TaskId> expectedInfo11TaskIds = Arrays.asList(taskIdA0, taskIdA2, taskIdB0, taskIdB2);
+
+        assertEquals(expectedInfo11TaskIds, info11.activeTasks);
+    }
+
+    @Test
     public void testAssignWithPartialTopology() throws Exception {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 86af0be..4f770c8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -44,6 +44,16 @@ public class StickyTaskAssignorTest {
     private final TaskId task03 = new TaskId(0, 3);
     private final TaskId task04 = new TaskId(0, 4);
     private final TaskId task05 = new TaskId(0, 5);
+
+    private final TaskId task10 = new TaskId(1, 0);
+    private final TaskId task11 = new TaskId(1, 1);
+    private final TaskId task12 = new TaskId(1, 2);
+    private final TaskId task20 = new TaskId(2, 0);
+    private final TaskId task21 = new TaskId(2, 1);
+    private final TaskId task22 = new TaskId(2, 2);
+
+    private final List<Integer> expectedTopicGroupIds = Arrays.asList(1, 2);
+
     private final Map<Integer, ClientState> clients = new TreeMap<>();
     private final Integer p1 = 1;
     private final Integer p2 = 2;
@@ -65,6 +75,28 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
+    public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithNoStandByTasks() {
+        createClient(p1, 2);
+        createClient(p2, 2);
+        createClient(p3, 2);
+
+        final StickyTaskAssignor taskAssignor = createTaskAssignor(task10, task11, task22, task20, task21, task12);
+        taskAssignor.assign(0);
+        assertActiveTaskTopicGroupIdsEvenlyDistributed();
+    }
+
+    @Test
+    public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithStandByTasks() {
+        createClient(p1, 2);
+        createClient(p2, 2);
+        createClient(p3, 2);
+
+        final StickyTaskAssignor taskAssignor = createTaskAssignor(task20, task11, task12, task10, task21, task22);
+        taskAssignor.assign(1);
+        assertActiveTaskTopicGroupIdsEvenlyDistributed();
+    }
+
+    @Test
     public void shouldNotMigrateActiveTaskToOtherProcess() {
         createClientWithPreviousActiveTasks(p1, 1, task00);
         createClientWithPreviousActiveTasks(p2, 1, task01);
@@ -621,4 +653,16 @@ public class StickyTaskAssignorTest {
         return clientState;
     }
 
+    private void assertActiveTaskTopicGroupIdsEvenlyDistributed() {
+        for (final Map.Entry<Integer, ClientState> clientStateEntry : clients.entrySet()) {
+            final List<Integer> topicGroupIds = new ArrayList<>();
+            final Set<TaskId> activeTasks = clientStateEntry.getValue().activeTasks();
+            for (final TaskId activeTask : activeTasks) {
+                topicGroupIds.add(activeTask.topicGroupId);
+            }
+            Collections.sort(topicGroupIds);
+            assertThat(topicGroupIds, equalTo(expectedTopicGroupIds));
+        }
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.