You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/19 21:40:32 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness

vvcephei commented on a change in pull request #8696:
URL: https://github.com/apache/kafka/pull/8696#discussion_r427615915



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -338,7 +338,7 @@ private static void runRandomizedScenario(final long seed) {
                         throw new IllegalStateException("Unexpected event: " + event);
                 }
                 if (!harness.clientStates.isEmpty()) {
-                    testForConvergence(harness, configs, numStatefulTasks * 2);
+                    testForConvergence(harness, configs, 2 * (numStatefulTasks + numStatefulTasks * numStandbyReplicas));

Review comment:
       Now that we're warming up standbys also, we need to relax the convergence limit.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -57,14 +59,42 @@ public boolean assign(final Map<UUID, ClientState> clients,
             configs.numStandbyReplicas
         );
 
-        final boolean probingRebalanceNeeded = assignTaskMovements(
-            tasksToCaughtUpClients(statefulTasks, clientStates, configs.acceptableRecoveryLag),
+        final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas);

Review comment:
       Moved the counter out here because we need to decrement it while assigning both active and standby warmups

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##########
@@ -78,6 +80,64 @@
         /*probingRebalanceIntervalMs*/ 60 * 1000L
     );
 
+    @Test
+    public void shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups() {

Review comment:
       First test for stickiness: we should be 100% sticky and also not schedule a probing rebalance when we are configured for no warmups.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##########
@@ -78,6 +80,64 @@
         /*probingRebalanceIntervalMs*/ 60 * 1000L
     );
 
+    @Test
+    public void shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups() {
+        final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
+        final ClientState clientState1 = new ClientState(allTaskIds, emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), 1);
+        final ClientState clientState2 = new ClientState(emptySet(), allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)), 1);
+        final ClientState clientState3 = new ClientState(emptySet(), emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)), 1);
+
+        final Map<UUID, ClientState> clientStates = mkMap(
+            mkEntry(UUID_1, clientState1),
+            mkEntry(UUID_2, clientState2),
+            mkEntry(UUID_3, clientState3)
+        );
+
+        final boolean unstable = new HighAvailabilityTaskAssignor().assign(
+            clientStates,
+            allTaskIds,
+            allTaskIds,
+            new AssignmentConfigs(11L, 0, 1, 0L)
+        );
+
+        assertThat(clientState1, hasAssignedTasks(allTaskIds.size()));
+
+        assertThat(clientState2, hasAssignedTasks(allTaskIds.size()));
+
+        assertThat(clientState3, hasAssignedTasks(0));
+
+        assertThat(unstable, is(false));
+    }
+
+    @Test
+    public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {

Review comment:
       Main test case for stickiness: we should be sticky for standbys, and also schedule warmups.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -64,12 +66,10 @@ private static boolean taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final Task
         return caughtUpClients == null || caughtUpClients.contains(client);
     }
 
-    /**
-     * @return whether any warmup replicas were assigned
-     */
-    static boolean assignTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
-                                       final Map<UUID, ClientState> clientStates,
-                                       final int maxWarmupReplicas) {
+    static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,

Review comment:
       I changed this to return an int just because it made stepping through the assignment in the debugger a bit easier to understand. It serves no algorithmic purpose.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##########
@@ -326,6 +326,10 @@ static TaskSkewReport analyzeTaskAssignmentBalance(final Map<UUID, ClientState>
         return new TaskSkewReport(maxTaskSkew, skewedSubtopologies, subtopologyToClientsWithPartition);
     }
 
+    static Matcher<ClientState> hasAssignedTasks(final int taskCount) {
+        return hasProperty("assignedTasks", ClientState::assignedTaskCount, taskCount);
+    }

Review comment:
       Similar to the other matchers, it just gives us mildly nicer test output.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -57,14 +59,42 @@ public boolean assign(final Map<UUID, ClientState> clients,
             configs.numStandbyReplicas
         );
 
-        final boolean probingRebalanceNeeded = assignTaskMovements(
-            tasksToCaughtUpClients(statefulTasks, clientStates, configs.acceptableRecoveryLag),
+        final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas);
+
+        final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = tasksToCaughtUpClients(
+            statefulTasks,
+            clientStates,
+            configs.acceptableRecoveryLag
+        );
+
+        // We temporarily need to know which standby tasks were intended as warmups
+        // for active tasks, so that we don't move them (again) when we plan standby
+        // task movements. We can then immediately treat warmups exactly the same as
+        // hot-standby replicas, so we just track it right here as metadata, rather
+        // than add "warmup" assignments to ClientState, for example.
+        final Map<UUID, Set<TaskId>> warmups = new TreeMap<>();
+
+        final int neededActiveTaskMovements = assignActiveTaskMovements(
+            tasksToCaughtUpClients,
             clientStates,
-            configs.maxWarmupReplicas
+            warmups,
+            remainingWarmupReplicas
+        );
+
+        final int neededStandbyTaskMovements = assignStandbyTaskMovements(
+            tasksToCaughtUpClients,
+            clientStates,
+            remainingWarmupReplicas,
+            warmups

Review comment:
       The mechanism by which we enforce "stickiness" is by assigning movements after computing the ideal assignment, so if we want standbys as well as actives to be sticky, we need to assign movements for those as well.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -57,14 +59,42 @@ public boolean assign(final Map<UUID, ClientState> clients,
             configs.numStandbyReplicas
         );
 
-        final boolean probingRebalanceNeeded = assignTaskMovements(
-            tasksToCaughtUpClients(statefulTasks, clientStates, configs.acceptableRecoveryLag),
+        final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas);
+
+        final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = tasksToCaughtUpClients(
+            statefulTasks,
+            clientStates,
+            configs.acceptableRecoveryLag
+        );
+
+        // We temporarily need to know which standby tasks were intended as warmups
+        // for active tasks, so that we don't move them (again) when we plan standby
+        // task movements. We can then immediately treat warmups exactly the same as
+        // hot-standby replicas, so we just track it right here as metadata, rather
+        // than add "warmup" assignments to ClientState, for example.
+        final Map<UUID, Set<TaskId>> warmups = new TreeMap<>();
+
+        final int neededActiveTaskMovements = assignActiveTaskMovements(
+            tasksToCaughtUpClients,
             clientStates,
-            configs.maxWarmupReplicas
+            warmups,
+            remainingWarmupReplicas
+        );
+
+        final int neededStandbyTaskMovements = assignStandbyTaskMovements(
+            tasksToCaughtUpClients,
+            clientStates,
+            remainingWarmupReplicas,
+            warmups
         );
 
         assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks));
 
+        // We shouldn't plan a probing rebalance if we _needed_ task movements, but couldn't do any
+        // due to being configured for no warmups.

Review comment:
       One might wonder whether we should even allow "max_warmups := 0". I think this is actually ok, as someone might want to completely disable this state shuffling mechanism and instead just be 100% sticky. Also factoring into my thinking is that it's pretty obvious what will happen if you configure "no warmups", so I don't think it's going to hurt someone who didn't actually want to completely disable the warmup mechanism.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -132,22 +132,97 @@ static boolean assignTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCau
         return movementsNeeded;
     }
 
+    static int assignStandbyTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,

Review comment:
       This algorithm is similar to the active one, but there are also important differences, so I didn't converge them.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
##########
@@ -65,11 +67,13 @@ public void shouldAssignTasksToClientsAndReturnFalseWhenAllClientsCaughtUp() {
         final ClientState client3 = getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2));
 
         assertThat(
-            assignTaskMovements(
+            assignActiveTaskMovements(
                 tasksToCaughtUpClients,
                 getClientStatesMap(client1, client2, client3),
-                maxWarmupReplicas),
-            is(false)
+                new TreeMap<>(),
+                new AtomicInteger(maxWarmupReplicas)
+            ),
+            is(0)

Review comment:
       Just accommodating the new method signature, no semantic changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org