You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/03/06 05:24:31 UTC

[kafka] branch trunk updated: KAFKA-6309: Improve task assignor load balance (#4624)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a1b01f4  KAFKA-6309: Improve task assignor load balance (#4624)
a1b01f4 is described below

commit a1b01f48e9ef06291f3e052a37bde887b891e708
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Tue Mar 6 00:24:22 2018 -0500

    KAFKA-6309: Improve task assignor load balance (#4624)
    
    Sorts TaskIds on first assignment evenly distributing tasks by topicGroupId should help with evening the load of work across topologies. This PR is an initial "strawman" approach which will be followed up (at a later date YTBD) by scoring or assigning weight to processing nodes to ensure even processing distribution.
    
    Added a new test to existing unit test.
---
 .../internals/assignment/StickyTaskAssignor.java   | 10 ++--
 .../assignment/StickyTaskAssignorTest.java         | 55 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index de8fa57..5b54d08 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -20,10 +20,13 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -106,14 +109,13 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
         }
 
         // assign any remaining unassigned tasks
-        for (final TaskId taskId : unassigned) {
+        List<TaskId> sortedTasks = new ArrayList<>(unassigned);
+        Collections.sort(sortedTasks);
+        for (final TaskId taskId : sortedTasks) {
             allocateTaskWithClientCandidates(taskId, clients.keySet(), true);
         }
-
     }
 
-
-
     private void allocateTaskWithClientCandidates(final TaskId taskId, final Set<ID> clientsWithin, final boolean active) {
         final ClientState client = findClient(taskId, clientsWithin, active);
         taskPairs.addPairs(taskId, client.assignedTasks());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 4f770c8..ed22e3c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -23,11 +23,13 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -350,6 +352,42 @@ public class StickyTaskAssignorTest {
         assertThat(clients.get(p1).assignedTaskCount(), equalTo(4));
     }
 
+    @Test
+    public void shouldEvenlyDistributeByTaskIdAndPartition() {
+        createClient(p1, 4);
+        createClient(p2, 4);
+        createClient(p3, 4);
+        createClient(p4, 4);
+
+        final List<TaskId> taskIds = new ArrayList<>();
+        final TaskId[] taskIdArray = new TaskId[16];
+
+        for (int i = 1; i <= 2; i++) {
+            for (int j = 0; j < 8; j++) {
+                taskIds.add(new TaskId(i, j));
+            }
+        }
+
+        Collections.shuffle(taskIds);
+        taskIds.toArray(taskIdArray);
+
+        final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(taskIdArray);
+        taskAssignor.assign(0);
+
+        Collections.sort(taskIds);
+        final Set<TaskId> expectedClientOneAssignment = getExpectedTaskIdAssignment(taskIds, 0, 4, 8, 12);
+        final Set<TaskId> expectedClientTwoAssignment = getExpectedTaskIdAssignment(taskIds, 1, 5, 9, 13);
+        final Set<TaskId> expectedClientThreeAssignment = getExpectedTaskIdAssignment(taskIds, 2, 6, 10, 14);
+        final Set<TaskId> expectedClientFourAssignment = getExpectedTaskIdAssignment(taskIds, 3, 7, 11, 15);
+
+        final Map<Integer, Set<TaskId>> sortedAssignments = sortClientAssignments(clients);
+
+        assertThat(sortedAssignments.get(p1), equalTo(expectedClientOneAssignment));
+        assertThat(sortedAssignments.get(p2), equalTo(expectedClientTwoAssignment));
+        assertThat(sortedAssignments.get(p3), equalTo(expectedClientThreeAssignment));
+        assertThat(sortedAssignments.get(p4), equalTo(expectedClientFourAssignment));
+    }
+
 
     @Test
     public void shouldNotHaveSameAssignmentOnAnyTwoHosts() {
@@ -665,4 +703,21 @@ public class StickyTaskAssignorTest {
         }
     }
 
+    private Map<Integer, Set<TaskId>> sortClientAssignments(final Map<Integer, ClientState> clients) {
+        final Map<Integer, Set<TaskId>> sortedAssignments = new HashMap<>();
+        for (final Map.Entry<Integer, ClientState> entry : clients.entrySet()) {
+            final Set<TaskId> sorted = new TreeSet<>(entry.getValue().activeTasks());
+            sortedAssignments.put(entry.getKey(), sorted);
+        }
+        return sortedAssignments;
+    }
+
+    private Set<TaskId> getExpectedTaskIdAssignment(final List<TaskId> tasks, final int... indices) {
+        final Set<TaskId> sortedAssignment = new TreeSet<>();
+        for (final int index : indices) {
+            sortedAssignment.add(tasks.get(index));
+        }
+        return sortedAssignment;
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.