You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/22 12:04:04 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness

cadonna commented on a change in pull request #8696:
URL: https://github.com/apache/kafka/pull/8696#discussion_r429196241



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -132,22 +132,97 @@ static boolean assignTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCau
         return movementsNeeded;
     }
 
+    static int assignStandbyTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
+                                          final Map<UUID, ClientState> clientStates,
+                                          final AtomicInteger remainingWarmupReplicas,
+                                          final Map<UUID, Set<TaskId>> warmups) {
+        final BiFunction<UUID, TaskId, Boolean> caughtUpPredicate =
+            (client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
+
+        final ConstrainedPrioritySet caughtUpClientsByTaskLoad = new ConstrainedPrioritySet(
+            caughtUpPredicate,
+            client -> clientStates.get(client).assignedTaskLoad()
+        );
+
+        final Queue<TaskMovement> taskMovements = new PriorityQueue<>(
+            Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task)
+        );
+
+        for (final Map.Entry<UUID, ClientState> clientStateEntry : clientStates.entrySet()) {
+            final UUID destination = clientStateEntry.getKey();
+            final ClientState state = clientStateEntry.getValue();
+            for (final TaskId task : state.standbyTasks()) {
+                if (warmups.getOrDefault(destination, Collections.emptySet()).contains(task)) {
+                    // this is a warmup, so we won't move it.
+                } else if (!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, destination, tasksToCaughtUpClients)) {

Review comment:
       prop: Could you add a method `taskIsNotCaughtUpOnClientAndCaughtUpClientsExist()`? Applying De Morgan's law every time I read this code gives me headache.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -57,14 +59,42 @@ public boolean assign(final Map<UUID, ClientState> clients,
             configs.numStandbyReplicas
         );
 
-        final boolean probingRebalanceNeeded = assignTaskMovements(
-            tasksToCaughtUpClients(statefulTasks, clientStates, configs.acceptableRecoveryLag),
+        final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas);
+
+        final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = tasksToCaughtUpClients(
+            statefulTasks,
+            clientStates,
+            configs.acceptableRecoveryLag
+        );
+
+        // We temporarily need to know which standby tasks were intended as warmups
+        // for active tasks, so that we don't move them (again) when we plan standby
+        // task movements. We can then immediately treat warmups exactly the same as
+        // hot-standby replicas, so we just track it right here as metadata, rather
+        // than add "warmup" assignments to ClientState, for example.
+        final Map<UUID, Set<TaskId>> warmups = new TreeMap<>();
+
+        final int neededActiveTaskMovements = assignActiveTaskMovements(
+            tasksToCaughtUpClients,
             clientStates,
-            configs.maxWarmupReplicas
+            warmups,
+            remainingWarmupReplicas
+        );
+
+        final int neededStandbyTaskMovements = assignStandbyTaskMovements(
+            tasksToCaughtUpClients,
+            clientStates,
+            remainingWarmupReplicas,
+            warmups
         );
 
         assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks));
 
+        // We shouldn't plan a probing rebalance if we _needed_ task movements, but couldn't do any
+        // due to being configured for no warmups.

Review comment:
       To be clear, according to `StreamsConfig`, we do NOT allow `max.warmup.replicas = 0`. It must at least be 1.  Or was your statement hypothetical, that it would be OK to allow it? Anyway, I am in favour of keeping the `> 0` check here. 
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org