You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/01 02:36:31 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r418390934



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) {
         }
     }
 
+    private static void verifyBalancedAssignment(final Harness harness, final int balanceFactor) {
+        final int activeDiff;
+        final int activeStatefulDiff;
+        final double activeStatelessPerThreadDiff;
+        final int assignedDiff;
+        final int standbyDiff;
+
+        {
+            final int maxActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i -> i)).orElse(0);
+            activeDiff = maxActive - minActive;
+        }
+        {
+            final int maxActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> i)).orElse(0);
+            final int minActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> i)).orElse(0);
+            activeStatefulDiff = maxActiveStateful - minActiveStateful;
+        }
+        {
+            final double maxActiveStatefulPerThread = harness.clientStates.values().stream().map(s1 -> 1.0 * intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0);
+            final double minActiveStatefulPerThread = harness.clientStates.values().stream().map(s -> 1.0 * intersection(TreeSet::new, s.activeTasks(), harness.statelessTasks).size() / s.capacity()).min(comparingDouble(i -> i)).orElse(0.0);
+            activeStatelessPerThreadDiff = maxActiveStatefulPerThread - minActiveStatefulPerThread;
+        }
+        {
+            final int maxAssigned = harness.clientStates.values().stream().map(ClientState::assignedTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minAssigned = harness.clientStates.values().stream().map(ClientState::assignedTaskCount).min(comparingInt(i -> i)).orElse(0);
+            assignedDiff = maxAssigned - minAssigned;
+        }
+        {
+            final int maxStandby = harness.clientStates.values().stream().map(ClientState::standbyTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minStandby = harness.clientStates.values().stream().map(ClientState::standbyTaskCount).min(comparingInt(i -> i)).orElse(0);
+            standbyDiff = maxStandby - minStandby;
+        }
+
+        final Map<String, ? extends Number> results = new TreeMap<>(mkMap(
+            mkEntry("activeDiff", activeDiff),
+            mkEntry("activeStatefulDiff", activeStatefulDiff),
+            mkEntry("activeStatelessPerThreadDiff", activeStatelessPerThreadDiff),

Review comment:
       Nailed it! This is what I was working up toward. It seems dangerous to consider only the number of threads when assigning stateful tasks. We actually don't know how much disk is available, but it seems more reasonable to assume each computer has about the same amount of disk than that each thread has about the same amount of disk.
   
   So, now (and this may be a departure from past conversations), I'm thinking perhaps we should do the following:
   1. round-robin assign active stateful tasks over the _clients_ that have spare capacity (aka threads). Once no client has spare capacity, then we just round-robin over all clients
   2. do the same with each standby replica. If the colocation constraint is violated, we just skip the client and keep going. If my intuition serves, this should still result in a diff of no more than one between clients.
   3. do the same with stateless tasks. The one trick is that for the stateless tasks, we start round-robining on the first node after we left off with the active stateful tasks. Assuming the round-robin algorithm above, this should ultimately produce an assignment that is just as balanced as picking the least loaded client for each task.
   
   And we would always sort the tasks by subtopology first, then by partition, so that we still get workload parallelism. Actually, I should add this to the validation.
   
   I'll update the PR tomorrow with these ideas, so you can see them in code form, since words kind of suck for this kind of thing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org