You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/03/06 05:25:52 UTC

[kafka] branch 1.1 updated: KAFKA-6309: Improve task assignor load balance (#4624)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new e09eaaf  KAFKA-6309: Improve task assignor load balance (#4624)
e09eaaf is described below

commit e09eaafb766f8c8718186f130eaa329b033c4c23
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Tue Mar 6 00:24:22 2018 -0500

    KAFKA-6309: Improve task assignor load balance (#4624)
    
    Sorts TaskIds on first assignment evenly distributing tasks by topicGroupId should help with evening the load of work across topologies. This PR is an initial "strawman" approach which will be followed up (at a later date YTBD) by scoring or assigning weight to processing nodes to ensure even processing distribution.
    
    Added a new test to existing unit test.
---
 .../internals/assignment/StickyTaskAssignor.java   | 10 ++--
 .../assignment/StickyTaskAssignorTest.java         | 55 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index de8fa57..5b54d08 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -20,10 +20,13 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -106,14 +109,13 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
         }
 
         // assign any remaining unassigned tasks
-        for (final TaskId taskId : unassigned) {
+        List<TaskId> sortedTasks = new ArrayList<>(unassigned);
+        Collections.sort(sortedTasks);
+        for (final TaskId taskId : sortedTasks) {
             allocateTaskWithClientCandidates(taskId, clients.keySet(), true);
         }
-
     }
 
-
-
     private void allocateTaskWithClientCandidates(final TaskId taskId, final Set<ID> clientsWithin, final boolean active) {
         final ClientState client = findClient(taskId, clientsWithin, active);
         taskPairs.addPairs(taskId, client.assignedTasks());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 4f770c8..ed22e3c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -23,11 +23,13 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -350,6 +352,42 @@ public class StickyTaskAssignorTest {
         assertThat(clients.get(p1).assignedTaskCount(), equalTo(4));
     }
 
+    @Test
+    public void shouldEvenlyDistributeByTaskIdAndPartition() {
+        createClient(p1, 4);
+        createClient(p2, 4);
+        createClient(p3, 4);
+        createClient(p4, 4);
+
+        final List<TaskId> taskIds = new ArrayList<>();
+        final TaskId[] taskIdArray = new TaskId[16];
+
+        for (int i = 1; i <= 2; i++) {
+            for (int j = 0; j < 8; j++) {
+                taskIds.add(new TaskId(i, j));
+            }
+        }
+
+        Collections.shuffle(taskIds);
+        taskIds.toArray(taskIdArray);
+
+        final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(taskIdArray);
+        taskAssignor.assign(0);
+
+        Collections.sort(taskIds);
+        final Set<TaskId> expectedClientOneAssignment = getExpectedTaskIdAssignment(taskIds, 0, 4, 8, 12);
+        final Set<TaskId> expectedClientTwoAssignment = getExpectedTaskIdAssignment(taskIds, 1, 5, 9, 13);
+        final Set<TaskId> expectedClientThreeAssignment = getExpectedTaskIdAssignment(taskIds, 2, 6, 10, 14);
+        final Set<TaskId> expectedClientFourAssignment = getExpectedTaskIdAssignment(taskIds, 3, 7, 11, 15);
+
+        final Map<Integer, Set<TaskId>> sortedAssignments = sortClientAssignments(clients);
+
+        assertThat(sortedAssignments.get(p1), equalTo(expectedClientOneAssignment));
+        assertThat(sortedAssignments.get(p2), equalTo(expectedClientTwoAssignment));
+        assertThat(sortedAssignments.get(p3), equalTo(expectedClientThreeAssignment));
+        assertThat(sortedAssignments.get(p4), equalTo(expectedClientFourAssignment));
+    }
+
 
     @Test
     public void shouldNotHaveSameAssignmentOnAnyTwoHosts() {
@@ -665,4 +703,21 @@ public class StickyTaskAssignorTest {
         }
     }
 
+    private Map<Integer, Set<TaskId>> sortClientAssignments(final Map<Integer, ClientState> clients) {
+        final Map<Integer, Set<TaskId>> sortedAssignments = new HashMap<>();
+        for (final Map.Entry<Integer, ClientState> entry : clients.entrySet()) {
+            final Set<TaskId> sorted = new TreeSet<>(entry.getValue().activeTasks());
+            sortedAssignments.put(entry.getKey(), sorted);
+        }
+        return sortedAssignments;
+    }
+
+    private Set<TaskId> getExpectedTaskIdAssignment(final List<TaskId> tasks, final int... indices) {
+        final Set<TaskId> sortedAssignment = new TreeSet<>();
+        for (final int index : indices) {
+            sortedAssignment.add(tasks.get(index));
+        }
+        return sortedAssignment;
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.