You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/30 03:07:49 UTC

[GitHub] [kafka] vvcephei opened a new pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

vvcephei opened a new pull request #8588:
URL: https://github.com/apache/kafka/pull/8588


   Add validation that task assignment is always balanced after convergence.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman edited a comment on pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
ableegoldman edited a comment on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-622086284


   Not sure I'm on the same page w.r.t interpreting "balance" at the client level. Here's the proposal we discussed a while back:
   
   - In the "under-capacity" case, there are more tasks than the number of threads. We aim to give each thread an equal N >= 1 number of tasks so all clients get tasks proportional to their capacity. Of course this means some clients can get more than the `balance.factor` number of tasks than another client, so this would violate "client-level" balance but satisfy "thread-level" balance.
   
   - In the "over-capacity" case, there are fewer tasks than the number of threads so some threads will necessarily be idle. This is the situation in KAFKA-9173. In this case, we can actually satisfy both thread-level and client-level balance: we get thread-level by default, so we just have to make an effort to spread the tasks evenly over clients as well. The relevant point here is we should **only verify client-level balance in the over-capacity case** (but always verify thread-level balance).
   
   Presumably most applications run instances with roughly similar capacity, in which case thread-level balance will collapse to give client-level balance as well. Since we get both from the over-capacity case as well, the only relevant edge case is when we are under-capacity with large per-machine capacity variation. Surely if you're running one machine with 10 threads and one machine with 1, and there are enough tasks to saturate both, you would expect the first machine to get 10x the task load as the first?
   
   edit: fixed the description of over/under capacity cases


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman edited a comment on pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
ableegoldman edited a comment on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-622154885


   Oh man, I just read my original comment again and I totally swapped the descriptions of the two cases. Hopefully you knew what I meant but I went back and fixed it just in case.
   
   I think we're in agreement on the first two scenarios, but I'm not so sure about the third (also I think you mean "under capacity" here, but I'm the one who messed this up in the first place). I might be misinterpreting you though: when you say we round-robin over the clients do you mean over _all_ the clients, or only over the clients with remaining unfulfilled quota? "Quota" meaning the number of tasks we'd give each client assuming an equal # tasks per thread across all instances, ie total_tasks * client_capacity / total_capacity.
   
   To be clear, what I was suggesting was the latter. I'll hold off on making a long argument for it in case we're already on the same page here, but to loop this back towards relevance to the current PR this would mean not verifying client-level balance in this case. I guess as you pointed out though, we'd also want to drop enforcing client-level balance in the "over+under" capacity case
   
   edit: I left a comment on the code, but I'm starting to convince myself otherwise, ie that we should actually balance stateful tasks at the instance level. I think this is what you've been saying, but I'll stop putting words in your mouth and let you clarify this yourself.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman edited a comment on pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
ableegoldman edited a comment on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-622154885


   Oh man, I just read my original comment again and I totally swapped the descriptions of the two cases. Hopefully you knew what I meant but I went back and fixed it just in case.
   
   I think we're in agreement on the first two scenarios, but I'm not so sure about the third (also I think you mean "under capacity" here, but I'm the one who messed this up in the first place). I might be misinterpreting you though: when you say we round-robin over the clients do you mean over _all_ the clients, or only over the clients with remaining unfulfilled quota? "Quota" meaning the number of tasks we'd give each client assuming an equal # tasks per thread across all instances, ie total_tasks * client_capacity / total_capacity.
   
   To be clear, what I was suggesting was the latter. I'll hold off on making a long argument for it in case we're already on the same page here, but to loop this back towards relevance to the current PR this would mean not verifying client-level balance in this case. I guess as you pointed out though, we'd also want to drop enforcing client-level balance in the "over+under" capacity case


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman edited a comment on pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
ableegoldman edited a comment on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-622086284


   Not sure I'm on the same page w.r.t interpreting "balance" at the client level. Here's the proposal we discussed a while back:
   
   - In the "under-capacity" case, there are more tasks than the number of threads. We aim to give each thread an equal N >= 1 number of tasks so all clients get tasks proportional to their capacity. Of course this means some clients can get more than the `balance.factor` number of tasks than another client, so this would violate "client-level" balance but satisfy "thread-level" balance.
   
   - In the "over-capacity" case, there are more threads than tasks so some threads will necessarily be idle. This is the situation in KAFKA-9173. In this case, we can actually satisfy both thread-level and client-level balance: we get thread-level by default, so we just have to make an effort to spread the tasks evenly over clients as well. The relevant point here is we should **only verify client-level balance in the over-capacity case** (but always verify thread-level balance).
   
   Presumably most applications run instances with roughly similar capacity, in which case thread-level balance will collapse to give client-level balance as well. Since we get both from the over-capacity case as well, the only relevant edge case is when we are under-capacity with large per-machine capacity variation. Surely if you're running one machine with 10 threads and one machine with 1, and there are enough tasks to saturate both, you would expect the first machine to get 10x the task load as the first?
   
   edit: fixed the description of over/under capacity cases


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r418267732



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) {
         }
     }
 
+    private static void verifyBalancedAssignment(final Harness harness, final int balanceFactor) {
+        final int activeDiff;
+        final int activeStatefulDiff;
+        final double activeStatelessPerThreadDiff;
+        final int assignedDiff;
+        final int standbyDiff;
+
+        {
+            final int maxActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i -> i)).orElse(0);
+            activeDiff = maxActive - minActive;
+        }
+        {
+            final int maxActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> i)).orElse(0);
+            final int minActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> i)).orElse(0);
+            activeStatefulDiff = maxActiveStateful - minActiveStateful;
+        }
+        {
+            final double maxActiveStatefulPerThread = harness.clientStates.values().stream().map(s1 -> 1.0 * intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0);

Review comment:
       The HATA originally tried to balance each type of task individually (ie stateful active, standby, stateless active) and IIRC you made a convincing argument against doing that during the review and for balancing only the total task load. What's the rationale for enforcing this now? Or did I misremember and/or misinterpret your earlier point




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424830762



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java
##########
@@ -16,77 +16,58 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.streams.processor.TaskId;
+
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
-import org.apache.kafka.streams.processor.TaskId;
+import java.util.function.Function;
 
 /**
  * Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment
  */
-class ValidClientsByTaskLoadQueue {
+class ConstrainedPrioritySet {
 
     private final PriorityQueue<UUID> clientsByTaskLoad;
-    private final BiFunction<UUID, TaskId, Boolean> validClientCriteria;
+    private final BiFunction<UUID, TaskId, Boolean> constraint;
     private final Set<UUID> uniqueClients = new HashSet<>();
 
-    ValidClientsByTaskLoadQueue(final Map<UUID, ClientState> clientStates,
-                                final BiFunction<UUID, TaskId, Boolean> validClientCriteria) {
-        this.validClientCriteria = validClientCriteria;
-
-        clientsByTaskLoad = new PriorityQueue<>(
-            (client, other) -> {
-                final double clientTaskLoad = clientStates.get(client).taskLoad();
-                final double otherTaskLoad = clientStates.get(other).taskLoad();
-                if (clientTaskLoad < otherTaskLoad) {
-                    return -1;
-                } else if (clientTaskLoad > otherTaskLoad) {
-                    return 1;
-                } else {
-                    return client.compareTo(other);
-                }
-            });
+    ConstrainedPrioritySet(final BiFunction<UUID, TaskId, Boolean> constraint,
+                           final Function<UUID, Double> weight) {
+        this.constraint = constraint;
+        clientsByTaskLoad = new PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> clientId));
     }
 
     /**
      * @return the next least loaded client that satisfies the given criteria, or null if none do
      */
-    UUID poll(final TaskId task) {
-        final List<UUID> validClient = poll(task, 1);
-        return validClient.isEmpty() ? null : validClient.get(0);
-    }
-
-    /**
-     * @return the next N <= {@code numClientsPerTask} clients in the underlying priority queue that are valid candidates for the given task
-     */
-    List<UUID> poll(final TaskId task, final int numClients) {
-        final List<UUID> nextLeastLoadedValidClients = new LinkedList<>();
+    UUID poll(final TaskId task, final Function<UUID, Boolean> extraConstraint) {

Review comment:
       > we know that we cannot consider C1 again for the second poll
   
   Yep, that's what I was getting at above. I'm totally on board with reducing the number of assumptions, especially as this class becomes more generally used. I was just intrigued by what you said initially and thought "This actually results in better balancing characteristics when assigning standbys" meant that you had actually seen a difference in the tests.
   
   Thanks for continuing to improve this class!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424718290



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##########
@@ -75,4 +94,303 @@
     static UUID uuidForInt(final int n) {
         return new UUID(0, n);
     }
+
+    static void assertValidAssignment(final int numStandbyReplicas,
+                                      final Set<TaskId> statefulTasks,
+                                      final Set<TaskId> statelessTasks,
+                                      final Map<UUID, ClientState> assignedStates,
+                                      final StringBuilder failureContext) {
+        assertValidAssignment(
+            numStandbyReplicas,
+            0,
+            statefulTasks,
+            statelessTasks,
+            assignedStates,
+            failureContext
+        );
+    }
+
+    static void assertValidAssignment(final int numStandbyReplicas,
+                                      final int maxWarmupReplicas,
+                                      final Set<TaskId> statefulTasks,
+                                      final Set<TaskId> statelessTasks,
+                                      final Map<UUID, ClientState> assignedStates,
+                                      final StringBuilder failureContext) {
+        final Map<TaskId, Set<UUID>> assignments = new TreeMap<>();
+        for (final TaskId taskId : statefulTasks) {
+            assignments.put(taskId, new TreeSet<>());
+        }
+        for (final TaskId taskId : statelessTasks) {
+            assignments.put(taskId, new TreeSet<>());
+        }
+        for (final Map.Entry<UUID, ClientState> entry : assignedStates.entrySet()) {
+            validateAndAddActiveAssignments(statefulTasks, statelessTasks, failureContext, assignments, entry);
+            validateAndAddStandbyAssignments(statefulTasks, statelessTasks, failureContext, assignments, entry);
+        }
+
+        final AtomicInteger remainingWarmups = new AtomicInteger(maxWarmupReplicas);
+
+        final TreeMap<TaskId, Set<UUID>> misassigned =
+            assignments
+                .entrySet()
+                .stream()
+                .filter(entry -> {
+                    final int expectedActives = 1;
+                    final boolean isStateless = statelessTasks.contains(entry.getKey());
+                    final int expectedStandbys = isStateless ? 0 : numStandbyReplicas;
+                    // We'll never assign even the expected number of standbys if they don't actually fit in the cluster
+                    final int expectedAssignments = Math.min(
+                        assignedStates.size(),
+                        expectedActives + expectedStandbys
+                    );
+                    final int actualAssignments = entry.getValue().size();
+                    if (actualAssignments == expectedAssignments) {
+                        return false; // not misassigned
+                    } else {
+                        if (actualAssignments == expectedAssignments + 1 && remainingWarmups.get() > 0) {
+                            remainingWarmups.getAndDecrement();
+                            return false; // it's a warmup, so it's fine
+                        } else {
+                            return true; // misassigned
+                        }
+                    }
+                })
+                .collect(entriesToMap(TreeMap::new));
+
+        if (!misassigned.isEmpty()) {

Review comment:
       L131-158 is just gathering the information about whether each task is correctly assigned or not, based on its type and the standby configs (and maybe the warmup config). It doesn't make any assertions. So this check is actually the assertion, that no tasks are incorrectly assigned.
   
   Doing it this way is nicer, since when it fails, it tells you _all_ the incorrectly assigned tasks, not just the first one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-621590516


   Oh, and just to put it out there, I know I need to fix the diff computations. I just stamped out something quick to see what you thought about the approach.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424821641



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
##########
@@ -35,262 +44,161 @@
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasProperty;
 import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import org.apache.kafka.streams.processor.TaskId;
-import org.junit.Test;
+import static org.hamcrest.Matchers.is;
 
 public class TaskMovementTest {
-    private final ClientState client1 = new ClientState(1);
-    private final ClientState client2 = new ClientState(1);
-    private final ClientState client3 = new ClientState(1);
-
-    private final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
-
-    private final Map<UUID, List<TaskId>> emptyWarmupAssignment = mkMap(
-        mkEntry(UUID_1, EMPTY_TASK_LIST),
-        mkEntry(UUID_2, EMPTY_TASK_LIST),
-        mkEntry(UUID_3, EMPTY_TASK_LIST)
-    );
-
     @Test
     public void shouldAssignTasksToClientsAndReturnFalseWhenAllClientsCaughtUp() {
         final int maxWarmupReplicas = Integer.MAX_VALUE;
         final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2);
 
-        final Map<UUID, List<TaskId>> balancedAssignment = mkMap(
-            mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)),
-            mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1)),
-            mkEntry(UUID_3, asList(TASK_0_2, TASK_1_2))
-        );
-
         final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = new HashMap<>();
         for (final TaskId task : allTasks) {
             tasksToCaughtUpClients.put(task, mkSortedSet(UUID_1, UUID_2, UUID_3));
         }
-        
-        assertFalse(
+
+        final ClientState client1 = getClientStateWithActiveAssignment(asList(TASK_0_0, TASK_1_0));
+        final ClientState client2 = getClientStateWithActiveAssignment(asList(TASK_0_1, TASK_1_1));
+        final ClientState client3 = getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2));
+
+        assertThat(
             assignTaskMovements(
-                balancedAssignment,
                 tasksToCaughtUpClients,
-                clientStates,
-                getMapWithNumStandbys(allTasks, 1),
-                maxWarmupReplicas)
+                getClientStatesMap(client1, client2, client3),
+                maxWarmupReplicas),
+            is(false)
         );
-
-        verifyClientStateAssignments(balancedAssignment, emptyWarmupAssignment);
     }
 
     @Test
     public void shouldAssignAllTasksToClientsAndReturnFalseIfNoClientsAreCaughtUp() {
-        final int maxWarmupReplicas = 2;
-        final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2);
+        final int maxWarmupReplicas = Integer.MAX_VALUE;
 
-        final Map<UUID, List<TaskId>> balancedAssignment = mkMap(
-            mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)),
-            mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1)),
-            mkEntry(UUID_3, asList(TASK_0_2, TASK_1_2))
-        );
+        final ClientState client1 = getClientStateWithActiveAssignment(asList(TASK_0_0, TASK_1_0));
+        final ClientState client2 = getClientStateWithActiveAssignment(asList(TASK_0_1, TASK_1_1));
+        final ClientState client3 = getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2));
 
-        assertFalse(
+        assertThat(
             assignTaskMovements(
-                balancedAssignment,
                 emptyMap(),
-                clientStates,
-                getMapWithNumStandbys(allTasks, 1),
-                maxWarmupReplicas)
+                getClientStatesMap(client1, client2, client3),
+                maxWarmupReplicas),
+            is(false)
         );
-        verifyClientStateAssignments(balancedAssignment, emptyWarmupAssignment);
     }
 
     @Test
     public void shouldMoveTasksToCaughtUpClientsAndAssignWarmupReplicasInTheirPlace() {
         final int maxWarmupReplicas = Integer.MAX_VALUE;
-        final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
+        final ClientState client1 = getClientStateWithActiveAssignment(singletonList(TASK_0_0));
+        final ClientState client2 = getClientStateWithActiveAssignment(singletonList(TASK_0_1));
+        final ClientState client3 = getClientStateWithActiveAssignment(singletonList(TASK_0_2));
+        final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
 
-        final Map<UUID, List<TaskId>> balancedAssignment = mkMap(
-            mkEntry(UUID_1, singletonList(TASK_0_0)),
-            mkEntry(UUID_2, singletonList(TASK_0_1)),
-            mkEntry(UUID_3, singletonList(TASK_0_2))
+        final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = mkMap(
+            mkEntry(TASK_0_0, mkSortedSet(UUID_1)),
+            mkEntry(TASK_0_1, mkSortedSet(UUID_3)),
+            mkEntry(TASK_0_2, mkSortedSet(UUID_2))
         );
 
-        final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = new HashMap<>();
-        tasksToCaughtUpClients.put(TASK_0_0, mkSortedSet(UUID_1));
-        tasksToCaughtUpClients.put(TASK_0_1, mkSortedSet(UUID_3));
-        tasksToCaughtUpClients.put(TASK_0_2, mkSortedSet(UUID_2));
-
-        final Map<UUID, List<TaskId>> expectedActiveTaskAssignment = mkMap(
-            mkEntry(UUID_1, singletonList(TASK_0_0)),
-            mkEntry(UUID_2, singletonList(TASK_0_2)),
-            mkEntry(UUID_3, singletonList(TASK_0_1))
-        );
-
-        final Map<UUID, List<TaskId>> expectedWarmupTaskAssignment = mkMap(
-            mkEntry(UUID_1, EMPTY_TASK_LIST),
-            mkEntry(UUID_2, singletonList(TASK_0_1)),
-            mkEntry(UUID_3, singletonList(TASK_0_2))
-        );
-
-        assertTrue(
+        assertThat(
+            "should have assigned movements",
             assignTaskMovements(
-                balancedAssignment,
                 tasksToCaughtUpClients,
                 clientStates,
-                getMapWithNumStandbys(allTasks, 1),
-                maxWarmupReplicas)
-        );
-        verifyClientStateAssignments(expectedActiveTaskAssignment, expectedWarmupTaskAssignment);
-    }
-
-    @Test
-    public void shouldProduceBalancedAndStateConstrainedAssignment() {

Review comment:
       Ah, thanks. I guess I'll just leave it out, then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-628236334


   It looks like the test failures were unrelated (because they were all different), some known flaky:
   
   java 8:org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   
   Java 11:     org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
       org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled
   
   Java 14:
       kafka.api.TransactionsBounceTest.testWithGroupId
       kafka.api.TransactionsTest.testBumpTransactionalEpoch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-621589430


   Hey @ableegoldman @cadonna ,
   
   I thought it would be good to add verification that the converged assignment is actually balanced to our randomized test. Really, I had https://issues.apache.org/jira/browse/KAFKA-9173 in mind. This won't fix the StickyTaskAssignor, but maybe it can make sure that HATA won't ever do the same thing.
   
   What do you think about the test condition in particular? It's a more aggressive interpretation of "balance" than what we've discussed before; is it appropriate? The test fails for me pretty reliably, which makes sense, because we didn't design the algorithm with this definition of balance in mind.
   
   For example, you can reproduce the failure I'm looking at with seed `8608745620218291125`, in which has an imbalance of 2 for active tasks and for standbys.
   
   The active tasks get imbalanced because we put some stateless tasks on instances that have active tasks already, even though there are empty instances. 
   
   The standby tasks are imbalanced, too, and I'm not sure why. There are some nodes with six standbys and some nodes with only four. It looks like the nodes with four are also the nodes with two active tasks, so that might be the reason. I haven't looked back at the standby assignment code yet.
   
   Thoughts?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-628635918


   The test failure was unrelated: kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-622154885


   Oh man, I just read my original comment again and I totally swapped the descriptions of the two cases. Hopefully you knew what I meant but I went back and fixed it just in case.
   
   I think we're in agreement on the first two scenarios, but I'm not so sure about the third (also I think you mean "under capacity" here, but I'm the one who messed this up in the first place). I might be misinterpreting you though: when you say we round-robin over the clients do you mean over _all_ the clients, or only over the clients with remaining unfulfilled quota? "Quota" meaning the number of tasks we'd give each client assuming an equal # tasks per thread across all instances, ie total_tasks * client_capacity / total_capacity.
   To be clear, what I was suggesting was the latter. I'll hold off on making a long argument for it in case we're already on the same page here, but to loop this back towards relevance to the current PR this would mean not verifying client-level balance in this case. I guess as you pointed out though, we'd also want to drop enforcing client-level balance in the "over+under" capacity case


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei edited a comment on pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
vvcephei edited a comment on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-622137407


   Thanks @ableegoldman !
   
   Regarding the client-capacity-vs-thread-capacity dilemma, I think I'm remembering that conversation a little differently. I thought:
   * in the "under capacity" case (where all clients have more threads than tasks), we would assign an equal number of tasks to all clients.
   * in the "over+under capacity" case (where some clients have more threads than others and a uniform assignment would saturate the smallest node, but there are still more total _threads_ in the cluster than tasks), then we'll assign tasks such that no node is over capacity.
   * in the "over capacity" case (where there are more tasks than threads), then we first fill up everyone's capacity, and then go back to a simple round-robin over _clients_.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424822166



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -236,16 +233,17 @@ public void staticAssignmentShouldConvergeWithTheFirstAssignment() {
                                                                 0,
                                                                 1000L);
 
-        final Harness harness = Harness.initializeCluster(1, 1, 1);
+        final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1);
 
         testForConvergence(harness, configs, 1);
         verifyValidAssignment(0, harness);
+        verifyBalancedAssignment(harness);
     }
 
     @Test
     public void assignmentShouldConvergeAfterAddingNode() {
-        final int numStatelessTasks = 15;
-        final int numStatefulTasks = 13;
+        final int numStatelessTasks = 7;

Review comment:
       Right, but I think there's a "3" or a "5" in there somewhere, maybe in the other test. Anyway, my _intent_ was to make them all prime so I wouldn't have to think to hard about whether they were all coprime. But, in reality, I managed to screw up both.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r418768140



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) {
         }
     }
 
+    private static void verifyBalancedAssignment(final Harness harness, final int balanceFactor) {
+        final int activeDiff;
+        final int activeStatefulDiff;
+        final double activeStatelessPerThreadDiff;
+        final int assignedDiff;
+        final int standbyDiff;
+
+        {
+            final int maxActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i -> i)).orElse(0);
+            activeDiff = maxActive - minActive;
+        }
+        {
+            final int maxActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> i)).orElse(0);
+            final int minActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> i)).orElse(0);
+            activeStatefulDiff = maxActiveStateful - minActiveStateful;
+        }
+        {
+            final double maxActiveStatefulPerThread = harness.clientStates.values().stream().map(s1 -> 1.0 * intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0);
+            final double minActiveStatefulPerThread = harness.clientStates.values().stream().map(s -> 1.0 * intersection(TreeSet::new, s.activeTasks(), harness.statelessTasks).size() / s.capacity()).min(comparingDouble(i -> i)).orElse(0.0);
+            activeStatelessPerThreadDiff = maxActiveStatefulPerThread - minActiveStatefulPerThread;
+        }
+        {
+            final int maxAssigned = harness.clientStates.values().stream().map(ClientState::assignedTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minAssigned = harness.clientStates.values().stream().map(ClientState::assignedTaskCount).min(comparingInt(i -> i)).orElse(0);
+            assignedDiff = maxAssigned - minAssigned;
+        }
+        {
+            final int maxStandby = harness.clientStates.values().stream().map(ClientState::standbyTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minStandby = harness.clientStates.values().stream().map(ClientState::standbyTaskCount).min(comparingInt(i -> i)).orElse(0);
+            standbyDiff = maxStandby - minStandby;
+        }
+
+        final Map<String, ? extends Number> results = new TreeMap<>(mkMap(
+            mkEntry("activeDiff", activeDiff),
+            mkEntry("activeStatefulDiff", activeStatefulDiff),
+            mkEntry("activeStatelessPerThreadDiff", activeStatelessPerThreadDiff),

Review comment:
       Just kidding; I will _not_ update the PR today, but I will do it soon.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-622086284


   Not sure I'm on the same page w.r.t interpreting "balance" at the client level. Here's the proposal we discussed a while back:
   
   - In the "under-capacity" case, there are more threads in total than the number of tasks. We aim to give each thread an equal N >= 1 number of tasks so all clients get tasks proportional to their capacity. Of course this means some clients can get more than the `balance.factor` number of tasks than another client, so this would violate "client-level" balance but satisfy "thread-level" balance.
   
   - In the "over-capacity" case, there are fewer total threads than the number of tasks so some threads will necessarily be idle. This is the situation in KAFKA-9173. In this case, we can actually satisfy both thread-level and client-level balance: we get thread-level by default, so we just have to make an effort to spread the tasks evenly over clients as well. The relevant point here is we should **only verify client-level balance in the over-capacity case** (but always verify thread-level balance).
   
   Presumably most applications run instances with roughly similar capacity, in which case thread-level balance will collapse to give client-level balance as well. Since we get both from the over-capacity case as well, the only relevant edge case is when we are under-capacity with large per-machine capacity variation. Surely if you're running one machine with 10 threads and one machine with 1, and there are enough tasks to saturate both, you would expect the first machine to get 10x the task load as the first?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r418346499



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) {
         }
     }
 
+    private static void verifyBalancedAssignment(final Harness harness, final int balanceFactor) {
+        final int activeDiff;
+        final int activeStatefulDiff;
+        final double activeStatelessPerThreadDiff;
+        final int assignedDiff;
+        final int standbyDiff;
+
+        {
+            final int maxActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i -> i)).orElse(0);
+            activeDiff = maxActive - minActive;
+        }
+        {
+            final int maxActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> i)).orElse(0);
+            final int minActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> i)).orElse(0);
+            activeStatefulDiff = maxActiveStateful - minActiveStateful;
+        }
+        {
+            final double maxActiveStatefulPerThread = harness.clientStates.values().stream().map(s1 -> 1.0 * intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0);
+            final double minActiveStatefulPerThread = harness.clientStates.values().stream().map(s -> 1.0 * intersection(TreeSet::new, s.activeTasks(), harness.statelessTasks).size() / s.capacity()).min(comparingDouble(i -> i)).orElse(0.0);
+            activeStatelessPerThreadDiff = maxActiveStatefulPerThread - minActiveStatefulPerThread;
+        }
+        {
+            final int maxAssigned = harness.clientStates.values().stream().map(ClientState::assignedTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minAssigned = harness.clientStates.values().stream().map(ClientState::assignedTaskCount).min(comparingInt(i -> i)).orElse(0);
+            assignedDiff = maxAssigned - minAssigned;
+        }
+        {
+            final int maxStandby = harness.clientStates.values().stream().map(ClientState::standbyTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minStandby = harness.clientStates.values().stream().map(ClientState::standbyTaskCount).min(comparingInt(i -> i)).orElse(0);
+            standbyDiff = maxStandby - minStandby;
+        }
+
+        final Map<String, ? extends Number> results = new TreeMap<>(mkMap(
+            mkEntry("activeDiff", activeDiff),
+            mkEntry("activeStatefulDiff", activeStatefulDiff),
+            mkEntry("activeStatelessPerThreadDiff", activeStatelessPerThreadDiff),

Review comment:
       Ok, this gives me an idea of where you're coming from w.r.t client-level balance. I was thinking that we should scale the entire task load with the thread capacity, but that only makes sense when considering some resources. Mainly (or only?) cpu, which I suppose it unlikely to be the bottleneck or resource constraint in a stateful application. Of course, it would still be for stateless tasks. So I guess I do see that we might want to balance stateless tasks at a thread level, and anything stateful at the client-level where IO is more likely to be the constraint.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424825641



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java
##########
@@ -16,77 +16,58 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.streams.processor.TaskId;
+
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
-import org.apache.kafka.streams.processor.TaskId;
+import java.util.function.Function;
 
 /**
  * Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment
  */
-class ValidClientsByTaskLoadQueue {
+class ConstrainedPrioritySet {
 
     private final PriorityQueue<UUID> clientsByTaskLoad;
-    private final BiFunction<UUID, TaskId, Boolean> validClientCriteria;
+    private final BiFunction<UUID, TaskId, Boolean> constraint;
     private final Set<UUID> uniqueClients = new HashSet<>();
 
-    ValidClientsByTaskLoadQueue(final Map<UUID, ClientState> clientStates,
-                                final BiFunction<UUID, TaskId, Boolean> validClientCriteria) {
-        this.validClientCriteria = validClientCriteria;
-
-        clientsByTaskLoad = new PriorityQueue<>(
-            (client, other) -> {
-                final double clientTaskLoad = clientStates.get(client).taskLoad();
-                final double otherTaskLoad = clientStates.get(other).taskLoad();
-                if (clientTaskLoad < otherTaskLoad) {
-                    return -1;
-                } else if (clientTaskLoad > otherTaskLoad) {
-                    return 1;
-                } else {
-                    return client.compareTo(other);
-                }
-            });
+    ConstrainedPrioritySet(final BiFunction<UUID, TaskId, Boolean> constraint,
+                           final Function<UUID, Double> weight) {
+        this.constraint = constraint;
+        clientsByTaskLoad = new PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> clientId));
     }
 
     /**
      * @return the next least loaded client that satisfies the given criteria, or null if none do
      */
-    UUID poll(final TaskId task) {
-        final List<UUID> validClient = poll(task, 1);
-        return validClient.isEmpty() ? null : validClient.get(0);
-    }
-
-    /**
-     * @return the next N <= {@code numClientsPerTask} clients in the underlying priority queue that are valid candidates for the given task
-     */
-    List<UUID> poll(final TaskId task, final int numClients) {
-        final List<UUID> nextLeastLoadedValidClients = new LinkedList<>();
+    UUID poll(final TaskId task, final Function<UUID, Boolean> extraConstraint) {

Review comment:
       I was operating more on intuition here. To be honest, I had a suspicion you would call this out, so I probably should have just saved time and taken the time to prove it.
   
   Forgetting about the constraint for a minute, I think that what I had in mind for balance is something like, suppose you have two clients "C1" and "C2"... C1 has one task and C2 has two. You poll and get C1 and add a task. Now, they both have two.
   
   If you add it back and poll again, you might prefer to get C1 back again. Maybe because the "weight" function takes into account more than just the task load, or maybe just because of the total order we impose based on clientId, in which `C1 < C2`. But if you just poll two clients to begin with, then C1 doesn't get a chance to be included for the second poll, you just automatically get C1 and C2.
   
   In retrospect, this might be moot in practice, because the only time we actually polled for multiple clients was when assigning standbys, and specifically when we were assigning multiple replicas of the same task, in which case, we know that we _cannot_ consider C1 again for the second poll.
   
   From a computer-sciencey perspective, it doesn't seem like the data structure should be able to make this assumption, though, since it can't know that polling a client also invalidates it for a subsequent poll with the same last-mile predicate.
   
   So, even in retrospect, I'm tempted to leave it this way (big surprise there), although I'd acknowledge that the outcome is actually not different in the way that we would use the method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424819838



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -53,75 +67,94 @@ private static boolean taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final Task
     /**
      * @return whether any warmup replicas were assigned
      */
-    static boolean assignTaskMovements(final Map<UUID, List<TaskId>> statefulActiveTaskAssignment,
-                                       final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
+    static boolean assignTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
                                        final Map<UUID, ClientState> clientStates,
-                                       final Map<TaskId, Integer> tasksToRemainingStandbys,
                                        final int maxWarmupReplicas) {
-        boolean warmupReplicasAssigned = false;
+        final BiFunction<UUID, TaskId, Boolean> caughtUpPredicate =
+            (client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
 
-        final ValidClientsByTaskLoadQueue clientsByTaskLoad = new ValidClientsByTaskLoadQueue(
-            clientStates,
-            (client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients)
+        final ConstrainedPrioritySet clientsByTaskLoad = new ConstrainedPrioritySet(

Review comment:
       sure, that's a good idea.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-625630519


   NB: This is still a work in progress. I just pushed to the remote branch to back up my work.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei edited a comment on pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
vvcephei edited a comment on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-621590516


   Oh, and just to put it out there, I know I need to fix the messy diff computations. I just stamped out something quick to see what you thought about the approach.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424820836



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -53,75 +67,94 @@ private static boolean taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final Task
     /**
      * @return whether any warmup replicas were assigned
      */
-    static boolean assignTaskMovements(final Map<UUID, List<TaskId>> statefulActiveTaskAssignment,
-                                       final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
+    static boolean assignTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
                                        final Map<UUID, ClientState> clientStates,
-                                       final Map<TaskId, Integer> tasksToRemainingStandbys,
                                        final int maxWarmupReplicas) {
-        boolean warmupReplicasAssigned = false;
+        final BiFunction<UUID, TaskId, Boolean> caughtUpPredicate =
+            (client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
 
-        final ValidClientsByTaskLoadQueue clientsByTaskLoad = new ValidClientsByTaskLoadQueue(
-            clientStates,
-            (client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients)
+        final ConstrainedPrioritySet clientsByTaskLoad = new ConstrainedPrioritySet(
+            caughtUpPredicate,
+            client -> clientStates.get(client).taskLoad()
         );
 
-        final SortedSet<TaskMovement> taskMovements = new TreeSet<>(
-            (movement, other) -> {
-                final int numCaughtUpClients = movement.caughtUpClients.size();
-                final int otherNumCaughtUpClients = other.caughtUpClients.size();
-                if (numCaughtUpClients != otherNumCaughtUpClients) {
-                    return Integer.compare(numCaughtUpClients, otherNumCaughtUpClients);
-                } else {
-                    return movement.task.compareTo(other.task);
-                }
-            }
+        final Queue<TaskMovement> taskMovements = new PriorityQueue<>(
+            Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task)
         );
 
-        for (final Map.Entry<UUID, List<TaskId>> assignmentEntry : statefulActiveTaskAssignment.entrySet()) {
-            final UUID client = assignmentEntry.getKey();
-            final ClientState state = clientStates.get(client);
-            for (final TaskId task : assignmentEntry.getValue()) {
-                if (taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients)) {
-                    state.assignActive(task);
-                } else {
-                    final TaskMovement taskMovement = new TaskMovement(task, client, tasksToCaughtUpClients.get(task));
-                    taskMovements.add(taskMovement);
+        for (final Map.Entry<UUID, ClientState> clientStateEntry : clientStates.entrySet()) {
+            final UUID client = clientStateEntry.getKey();
+            final ClientState state = clientStateEntry.getValue();
+            for (final TaskId task : state.activeTasks()) {
+                // if the desired client is not caught up, and there is another client that _is_ caught up, then
+                // we schedule a movement, so we can move the active task to the caught-up client. We'll try to
+                // assign a warm-up to the desired client so that we can move it later on.
+                if (!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients)) {
+                    taskMovements.add(new TaskMovement(task, client, tasksToCaughtUpClients.get(task)));
                 }
             }
             clientsByTaskLoad.offer(client);
         }
 
+        final boolean movementsNeeded = !taskMovements.isEmpty();
+
         final AtomicInteger remainingWarmupReplicas = new AtomicInteger(maxWarmupReplicas);
         for (final TaskMovement movement : taskMovements) {
-            final UUID sourceClient = clientsByTaskLoad.poll(movement.task);
-            if (sourceClient == null) {
-                throw new IllegalStateException("Tried to move task to caught-up client but none exist");
-            }
-
-            final ClientState sourceClientState = clientStates.get(sourceClient);
-            sourceClientState.assignActive(movement.task);
-            clientsByTaskLoad.offer(sourceClient);
+            final UUID standbySourceClient = clientsByTaskLoad.poll(

Review comment:
       Agreed, it would just be good luck right now, but I figured we might as well capitalize on the luck. I'm planning to follow up pretty soon with the standby stickiness.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424758127



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -236,16 +233,17 @@ public void staticAssignmentShouldConvergeWithTheFirstAssignment() {
                                                                 0,
                                                                 1000L);
 
-        final Harness harness = Harness.initializeCluster(1, 1, 1);
+        final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1);
 
         testForConvergence(harness, configs, 1);
         verifyValidAssignment(0, harness);
+        verifyBalancedAssignment(harness);
     }
 
     @Test
     public void assignmentShouldConvergeAfterAddingNode() {
-        final int numStatelessTasks = 15;
-        final int numStatefulTasks = 13;
+        final int numStatelessTasks = 7;

Review comment:
       Well, if you have a set of N prime numbers and one number which isn't, aren't they all still coprime? :P




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r418390934



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) {
         }
     }
 
+    private static void verifyBalancedAssignment(final Harness harness, final int balanceFactor) {
+        final int activeDiff;
+        final int activeStatefulDiff;
+        final double activeStatelessPerThreadDiff;
+        final int assignedDiff;
+        final int standbyDiff;
+
+        {
+            final int maxActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i -> i)).orElse(0);
+            activeDiff = maxActive - minActive;
+        }
+        {
+            final int maxActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> i)).orElse(0);
+            final int minActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> i)).orElse(0);
+            activeStatefulDiff = maxActiveStateful - minActiveStateful;
+        }
+        {
+            final double maxActiveStatefulPerThread = harness.clientStates.values().stream().map(s1 -> 1.0 * intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0);
+            final double minActiveStatefulPerThread = harness.clientStates.values().stream().map(s -> 1.0 * intersection(TreeSet::new, s.activeTasks(), harness.statelessTasks).size() / s.capacity()).min(comparingDouble(i -> i)).orElse(0.0);
+            activeStatelessPerThreadDiff = maxActiveStatefulPerThread - minActiveStatefulPerThread;
+        }
+        {
+            final int maxAssigned = harness.clientStates.values().stream().map(ClientState::assignedTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minAssigned = harness.clientStates.values().stream().map(ClientState::assignedTaskCount).min(comparingInt(i -> i)).orElse(0);
+            assignedDiff = maxAssigned - minAssigned;
+        }
+        {
+            final int maxStandby = harness.clientStates.values().stream().map(ClientState::standbyTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minStandby = harness.clientStates.values().stream().map(ClientState::standbyTaskCount).min(comparingInt(i -> i)).orElse(0);
+            standbyDiff = maxStandby - minStandby;
+        }
+
+        final Map<String, ? extends Number> results = new TreeMap<>(mkMap(
+            mkEntry("activeDiff", activeDiff),
+            mkEntry("activeStatefulDiff", activeStatefulDiff),
+            mkEntry("activeStatelessPerThreadDiff", activeStatelessPerThreadDiff),

Review comment:
       Nailed it! This is what I was working up toward. It seems dangerous to consider only the number of threads when assigning stateful tasks. We actually don't know how much disk is available, but it seems more reasonable to assume each computer has about the same amount of disk than that each thread has about the same amount of disk.
   
   So, now (and this may be a departure from past conversations), I'm thinking perhaps we should do the following:
   1. round-robin assign active stateful tasks over the _clients_ that have spare capacity (aka threads). Once no client has spare capacity, then we just round-robin over all clients
   2. do the same with each standby replica. If the colocation constraint is violated, we just skip the client and keep going. If my intuition serves, this should still result in a diff of no more than one between clients.
   3. do the same with stateless tasks. The one trick is that for the stateless tasks, we start round-robining on the first node after we left off with the active stateful tasks. Assuming the round-robin algorithm above, this should ultimately produce an assignment that is just as balanced as picking the least loaded client for each task.
   
   And we would always sort the tasks by subtopology first, then by partition, so that we still get workload parallelism. Actually, I should add this to the validation.
   
   I'll update the PR tomorrow with these ideas, so you can see them in code form, since words kind of suck for this kind of thing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #8588:
URL: https://github.com/apache/kafka/pull/8588


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-622137407


   Thanks @ableegoldman !
   
   Regarding the client-capacity-vs-thread-capacity dilemma, I think I'm remembering that conversation a little differently. I thought:
   * in the "over capacity" case (where all clients have more threads than tasks), we would assign an equal number of tasks to all clients.
   * in the "over+under capacity" case (where some clients have more threads than others and a uniform assignment would saturate the smallest node, but there are still more total _threads_ in the cluster than tasks), then we'll assign tasks such that no node is over capacity.
   * in the "over capacity" case (where there are more tasks than threads), then we first fill up everyone's capacity, and then go back to a simple round-robin over _clients_.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r418270317



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) {
         }
     }
 
+    private static void verifyBalancedAssignment(final Harness harness, final int balanceFactor) {
+        final int activeDiff;
+        final int activeStatefulDiff;
+        final double activeStatelessPerThreadDiff;
+        final int assignedDiff;
+        final int standbyDiff;
+
+        {
+            final int maxActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i -> i)).orElse(0);
+            activeDiff = maxActive - minActive;
+        }
+        {
+            final int maxActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> i)).orElse(0);
+            final int minActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> i)).orElse(0);
+            activeStatefulDiff = maxActiveStateful - minActiveStateful;
+        }
+        {
+            final double maxActiveStatefulPerThread = harness.clientStates.values().stream().map(s1 -> 1.0 * intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0);

Review comment:
       FWIW we'll probably want to revisit this if/when we migrate standby processing to a separate thread(s) but I'm +1 for enforcing this now. Just wondering what the current reasoning is.
   
   On the other hand, maybe it's better to enforce a good data parallelism over an equal per-task-type balance. Like if subtopology 1 is significantly heavier than subtopology 2, then it could be more balanced to have active 1_0 and active 2_0 on one instance and standby 1_0 and standby 2_0 on the other. But maybe that's not just worth optimizing 
    at this point




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424729806



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java
##########
@@ -16,77 +16,58 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.streams.processor.TaskId;
+
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
-import org.apache.kafka.streams.processor.TaskId;
+import java.util.function.Function;
 
 /**
  * Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment
  */
-class ValidClientsByTaskLoadQueue {
+class ConstrainedPrioritySet {
 
     private final PriorityQueue<UUID> clientsByTaskLoad;
-    private final BiFunction<UUID, TaskId, Boolean> validClientCriteria;
+    private final BiFunction<UUID, TaskId, Boolean> constraint;
     private final Set<UUID> uniqueClients = new HashSet<>();
 
-    ValidClientsByTaskLoadQueue(final Map<UUID, ClientState> clientStates,
-                                final BiFunction<UUID, TaskId, Boolean> validClientCriteria) {
-        this.validClientCriteria = validClientCriteria;
-
-        clientsByTaskLoad = new PriorityQueue<>(
-            (client, other) -> {
-                final double clientTaskLoad = clientStates.get(client).taskLoad();
-                final double otherTaskLoad = clientStates.get(other).taskLoad();
-                if (clientTaskLoad < otherTaskLoad) {
-                    return -1;
-                } else if (clientTaskLoad > otherTaskLoad) {
-                    return 1;
-                } else {
-                    return client.compareTo(other);
-                }
-            });
+    ConstrainedPrioritySet(final BiFunction<UUID, TaskId, Boolean> constraint,
+                           final Function<UUID, Double> weight) {
+        this.constraint = constraint;
+        clientsByTaskLoad = new PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> clientId));
     }
 
     /**
      * @return the next least loaded client that satisfies the given criteria, or null if none do
      */
-    UUID poll(final TaskId task) {
-        final List<UUID> validClient = poll(task, 1);
-        return validClient.isEmpty() ? null : validClient.get(0);
-    }
-
-    /**
-     * @return the next N <= {@code numClientsPerTask} clients in the underlying priority queue that are valid candidates for the given task
-     */
-    List<UUID> poll(final TaskId task, final int numClients) {
-        final List<UUID> nextLeastLoadedValidClients = new LinkedList<>();
+    UUID poll(final TaskId task, final Function<UUID, Boolean> extraConstraint) {

Review comment:
       I'm not sure I see how the returned clients could ever be different using "poll N clients" vs "poll N times". Only the clients which are getting a new task assigned will have their weight changed while in the middle of an N poll, and once we assign this task to that client it no longer meets the criteria so we don't care about it anyway right?
   
   The reason for the "poll N clients" method was to save on some of the poll-and-reoffer of clients that don't meet the criteria, but I don't think that's really worth worrying over. I'm fine with whatever code is easiest to read -- just want to understand why this affects the balance?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
##########
@@ -35,262 +44,161 @@
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasProperty;
 import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import org.apache.kafka.streams.processor.TaskId;
-import org.junit.Test;
+import static org.hamcrest.Matchers.is;
 
 public class TaskMovementTest {
-    private final ClientState client1 = new ClientState(1);
-    private final ClientState client2 = new ClientState(1);
-    private final ClientState client3 = new ClientState(1);
-
-    private final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
-
-    private final Map<UUID, List<TaskId>> emptyWarmupAssignment = mkMap(
-        mkEntry(UUID_1, EMPTY_TASK_LIST),
-        mkEntry(UUID_2, EMPTY_TASK_LIST),
-        mkEntry(UUID_3, EMPTY_TASK_LIST)
-    );
-
     @Test
     public void shouldAssignTasksToClientsAndReturnFalseWhenAllClientsCaughtUp() {
         final int maxWarmupReplicas = Integer.MAX_VALUE;
         final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2);
 
-        final Map<UUID, List<TaskId>> balancedAssignment = mkMap(
-            mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)),
-            mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1)),
-            mkEntry(UUID_3, asList(TASK_0_2, TASK_1_2))
-        );
-
         final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = new HashMap<>();
         for (final TaskId task : allTasks) {
             tasksToCaughtUpClients.put(task, mkSortedSet(UUID_1, UUID_2, UUID_3));
         }
-        
-        assertFalse(
+
+        final ClientState client1 = getClientStateWithActiveAssignment(asList(TASK_0_0, TASK_1_0));
+        final ClientState client2 = getClientStateWithActiveAssignment(asList(TASK_0_1, TASK_1_1));
+        final ClientState client3 = getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2));
+
+        assertThat(
             assignTaskMovements(
-                balancedAssignment,
                 tasksToCaughtUpClients,
-                clientStates,
-                getMapWithNumStandbys(allTasks, 1),
-                maxWarmupReplicas)
+                getClientStatesMap(client1, client2, client3),
+                maxWarmupReplicas),
+            is(false)
         );
-
-        verifyClientStateAssignments(balancedAssignment, emptyWarmupAssignment);
     }
 
     @Test
     public void shouldAssignAllTasksToClientsAndReturnFalseIfNoClientsAreCaughtUp() {
-        final int maxWarmupReplicas = 2;
-        final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2);
+        final int maxWarmupReplicas = Integer.MAX_VALUE;
 
-        final Map<UUID, List<TaskId>> balancedAssignment = mkMap(
-            mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)),
-            mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1)),
-            mkEntry(UUID_3, asList(TASK_0_2, TASK_1_2))
-        );
+        final ClientState client1 = getClientStateWithActiveAssignment(asList(TASK_0_0, TASK_1_0));
+        final ClientState client2 = getClientStateWithActiveAssignment(asList(TASK_0_1, TASK_1_1));
+        final ClientState client3 = getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2));
 
-        assertFalse(
+        assertThat(
             assignTaskMovements(
-                balancedAssignment,
                 emptyMap(),
-                clientStates,
-                getMapWithNumStandbys(allTasks, 1),
-                maxWarmupReplicas)
+                getClientStatesMap(client1, client2, client3),
+                maxWarmupReplicas),
+            is(false)
         );
-        verifyClientStateAssignments(balancedAssignment, emptyWarmupAssignment);
     }
 
     @Test
     public void shouldMoveTasksToCaughtUpClientsAndAssignWarmupReplicasInTheirPlace() {
         final int maxWarmupReplicas = Integer.MAX_VALUE;
-        final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
+        final ClientState client1 = getClientStateWithActiveAssignment(singletonList(TASK_0_0));
+        final ClientState client2 = getClientStateWithActiveAssignment(singletonList(TASK_0_1));
+        final ClientState client3 = getClientStateWithActiveAssignment(singletonList(TASK_0_2));
+        final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
 
-        final Map<UUID, List<TaskId>> balancedAssignment = mkMap(
-            mkEntry(UUID_1, singletonList(TASK_0_0)),
-            mkEntry(UUID_2, singletonList(TASK_0_1)),
-            mkEntry(UUID_3, singletonList(TASK_0_2))
+        final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = mkMap(
+            mkEntry(TASK_0_0, mkSortedSet(UUID_1)),
+            mkEntry(TASK_0_1, mkSortedSet(UUID_3)),
+            mkEntry(TASK_0_2, mkSortedSet(UUID_2))
         );
 
-        final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = new HashMap<>();
-        tasksToCaughtUpClients.put(TASK_0_0, mkSortedSet(UUID_1));
-        tasksToCaughtUpClients.put(TASK_0_1, mkSortedSet(UUID_3));
-        tasksToCaughtUpClients.put(TASK_0_2, mkSortedSet(UUID_2));
-
-        final Map<UUID, List<TaskId>> expectedActiveTaskAssignment = mkMap(
-            mkEntry(UUID_1, singletonList(TASK_0_0)),
-            mkEntry(UUID_2, singletonList(TASK_0_2)),
-            mkEntry(UUID_3, singletonList(TASK_0_1))
-        );
-
-        final Map<UUID, List<TaskId>> expectedWarmupTaskAssignment = mkMap(
-            mkEntry(UUID_1, EMPTY_TASK_LIST),
-            mkEntry(UUID_2, singletonList(TASK_0_1)),
-            mkEntry(UUID_3, singletonList(TASK_0_2))
-        );
-
-        assertTrue(
+        assertThat(
+            "should have assigned movements",
             assignTaskMovements(
-                balancedAssignment,
                 tasksToCaughtUpClients,
                 clientStates,
-                getMapWithNumStandbys(allTasks, 1),
-                maxWarmupReplicas)
-        );
-        verifyClientStateAssignments(expectedActiveTaskAssignment, expectedWarmupTaskAssignment);
-    }
-
-    @Test
-    public void shouldProduceBalancedAndStateConstrainedAssignment() {

Review comment:
       IIRC this was covering an edge case where it might produce an unbalanced assignment. But it may be moot at this point (and besides, we don't necessarily need to produce a perfectly balanced assignment here)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -53,75 +67,94 @@ private static boolean taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final Task
     /**
      * @return whether any warmup replicas were assigned
      */
-    static boolean assignTaskMovements(final Map<UUID, List<TaskId>> statefulActiveTaskAssignment,
-                                       final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
+    static boolean assignTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
                                        final Map<UUID, ClientState> clientStates,
-                                       final Map<TaskId, Integer> tasksToRemainingStandbys,
                                        final int maxWarmupReplicas) {
-        boolean warmupReplicasAssigned = false;
+        final BiFunction<UUID, TaskId, Boolean> caughtUpPredicate =
+            (client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
 
-        final ValidClientsByTaskLoadQueue clientsByTaskLoad = new ValidClientsByTaskLoadQueue(
-            clientStates,
-            (client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients)
+        final ConstrainedPrioritySet clientsByTaskLoad = new ConstrainedPrioritySet(

Review comment:
       nit: I know I named this in the first place but can we change it to `caughtUpClientsByTaskLoad` or something?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -53,75 +67,94 @@ private static boolean taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final Task
     /**
      * @return whether any warmup replicas were assigned
      */
-    static boolean assignTaskMovements(final Map<UUID, List<TaskId>> statefulActiveTaskAssignment,
-                                       final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
+    static boolean assignTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
                                        final Map<UUID, ClientState> clientStates,
-                                       final Map<TaskId, Integer> tasksToRemainingStandbys,
                                        final int maxWarmupReplicas) {
-        boolean warmupReplicasAssigned = false;
+        final BiFunction<UUID, TaskId, Boolean> caughtUpPredicate =
+            (client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
 
-        final ValidClientsByTaskLoadQueue clientsByTaskLoad = new ValidClientsByTaskLoadQueue(
-            clientStates,
-            (client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients)
+        final ConstrainedPrioritySet clientsByTaskLoad = new ConstrainedPrioritySet(
+            caughtUpPredicate,
+            client -> clientStates.get(client).taskLoad()
         );
 
-        final SortedSet<TaskMovement> taskMovements = new TreeSet<>(
-            (movement, other) -> {
-                final int numCaughtUpClients = movement.caughtUpClients.size();
-                final int otherNumCaughtUpClients = other.caughtUpClients.size();
-                if (numCaughtUpClients != otherNumCaughtUpClients) {
-                    return Integer.compare(numCaughtUpClients, otherNumCaughtUpClients);
-                } else {
-                    return movement.task.compareTo(other.task);
-                }
-            }
+        final Queue<TaskMovement> taskMovements = new PriorityQueue<>(
+            Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task)
         );
 
-        for (final Map.Entry<UUID, List<TaskId>> assignmentEntry : statefulActiveTaskAssignment.entrySet()) {
-            final UUID client = assignmentEntry.getKey();
-            final ClientState state = clientStates.get(client);
-            for (final TaskId task : assignmentEntry.getValue()) {
-                if (taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients)) {
-                    state.assignActive(task);
-                } else {
-                    final TaskMovement taskMovement = new TaskMovement(task, client, tasksToCaughtUpClients.get(task));
-                    taskMovements.add(taskMovement);
+        for (final Map.Entry<UUID, ClientState> clientStateEntry : clientStates.entrySet()) {
+            final UUID client = clientStateEntry.getKey();
+            final ClientState state = clientStateEntry.getValue();
+            for (final TaskId task : state.activeTasks()) {
+                // if the desired client is not caught up, and there is another client that _is_ caught up, then
+                // we schedule a movement, so we can move the active task to the caught-up client. We'll try to
+                // assign a warm-up to the desired client so that we can move it later on.
+                if (!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients)) {
+                    taskMovements.add(new TaskMovement(task, client, tasksToCaughtUpClients.get(task)));
                 }
             }
             clientsByTaskLoad.offer(client);
         }
 
+        final boolean movementsNeeded = !taskMovements.isEmpty();
+
         final AtomicInteger remainingWarmupReplicas = new AtomicInteger(maxWarmupReplicas);
         for (final TaskMovement movement : taskMovements) {
-            final UUID sourceClient = clientsByTaskLoad.poll(movement.task);
-            if (sourceClient == null) {
-                throw new IllegalStateException("Tried to move task to caught-up client but none exist");
-            }
-
-            final ClientState sourceClientState = clientStates.get(sourceClient);
-            sourceClientState.assignActive(movement.task);
-            clientsByTaskLoad.offer(sourceClient);
+            final UUID standbySourceClient = clientsByTaskLoad.poll(

Review comment:
       This is a really nice touch 👍 Although without attempting some degree of stickiness in the standby task assignment it seems unlikely to actually find a standby on a caught-up client..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r418304275



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) {
         }
     }
 
+    private static void verifyBalancedAssignment(final Harness harness, final int balanceFactor) {
+        final int activeDiff;
+        final int activeStatefulDiff;
+        final double activeStatelessPerThreadDiff;
+        final int assignedDiff;
+        final int standbyDiff;
+
+        {
+            final int maxActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i -> i)).orElse(0);
+            final int minActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i -> i)).orElse(0);
+            activeDiff = maxActive - minActive;
+        }
+        {
+            final int maxActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> i)).orElse(0);
+            final int minActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> i)).orElse(0);
+            activeStatefulDiff = maxActiveStateful - minActiveStateful;
+        }
+        {
+            final double maxActiveStatefulPerThread = harness.clientStates.values().stream().map(s1 -> 1.0 * intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0);

Review comment:
       I can neither confirm nor deny any statements that I may or may not have made in the past ;)  Kidding aside, I don't recall what that convincing argument might have been. Perhaps just that that kind of multivariate optimization would be complicated to implement, and maybe it's better to just start simple.
   
   Whatever the intent might have been, though, the actual implementation of our balancer should already achieve a "perfect" balance of stateful-active tasks. I'm not sure about standbys, but I don't see why we shouldn't just make them balanced as well.
   
   Regarding stateless tasks, it does seem suboptimal to assign them to instances that already have active tasks when there are instances available with no tasks at all.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org