You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/22 16:41:52 UTC

[kafka] branch trunk updated: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness (#8696)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9fdd877  KAFKA-6145: KIP-441: Enforce Standby Task Stickiness (#8696)
9fdd877 is described below

commit 9fdd87715ed1c725e268caa14d8415d4f1428ebc
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri May 22 11:41:15 2020 -0500

    KAFKA-6145: KIP-441: Enforce Standby Task Stickiness (#8696)
    
    We should treat standbys similarly to active stateful tasks and
    re-assign them to instances that are already caught-up on them
    while we warm them up on the desired destination, instead of
    immediately moving them to the destination.
    
    Reviewers: Bruno Cadonna <br...@confluent.io>
---
 .../assignment/HighAvailabilityTaskAssignor.java   | 38 ++++++++-
 .../internals/assignment/TaskMovement.java         | 95 +++++++++++++++++++---
 .../internals/assignment/AssignmentTestUtils.java  |  4 +
 .../HighAvailabilityTaskAssignorTest.java          | 60 ++++++++++++++
 .../assignment/TaskAssignorConvergenceTest.java    |  2 +-
 .../internals/assignment/TaskMovementTest.java     | 44 ++++++----
 6 files changed, 212 insertions(+), 31 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
index 2799d79..acb5d6f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
@@ -31,12 +31,14 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.diff;
-import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
+import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignActiveTaskMovements;
+import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignStandbyTaskMovements;
 
 public class HighAvailabilityTaskAssignor implements TaskAssignor {
     private static final Logger log = LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class);
@@ -57,14 +59,42 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
             configs.numStandbyReplicas
         );
 
-        final boolean probingRebalanceNeeded = assignTaskMovements(
-            tasksToCaughtUpClients(statefulTasks, clientStates, configs.acceptableRecoveryLag),
+        final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas);
+
+        final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = tasksToCaughtUpClients(
+            statefulTasks,
+            clientStates,
+            configs.acceptableRecoveryLag
+        );
+
+        // We temporarily need to know which standby tasks were intended as warmups
+        // for active tasks, so that we don't move them (again) when we plan standby
+        // task movements. We can then immediately treat warmups exactly the same as
+        // hot-standby replicas, so we just track it right here as metadata, rather
+        // than add "warmup" assignments to ClientState, for example.
+        final Map<UUID, Set<TaskId>> warmups = new TreeMap<>();
+
+        final int neededActiveTaskMovements = assignActiveTaskMovements(
+            tasksToCaughtUpClients,
             clientStates,
-            configs.maxWarmupReplicas
+            warmups,
+            remainingWarmupReplicas
+        );
+
+        final int neededStandbyTaskMovements = assignStandbyTaskMovements(
+            tasksToCaughtUpClients,
+            clientStates,
+            remainingWarmupReplicas,
+            warmups
         );
 
         assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks));
 
+        // We shouldn't plan a probing rebalance if we _needed_ task movements, but couldn't do any
+        // due to being configured for no warmups.
+        final boolean probingRebalanceNeeded =
+            configs.maxWarmupReplicas > 0 && neededActiveTaskMovements + neededStandbyTaskMovements > 0;
+
         log.info("Decided on assignment: " +
                      clientStates +
                      " with " +
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
index 0cfb304..04e0a41 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
@@ -18,12 +18,14 @@ package org.apache.kafka.streams.processor.internals.assignment;
 
 import org.apache.kafka.streams.processor.TaskId;
 
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
@@ -64,12 +66,10 @@ final class TaskMovement {
         return caughtUpClients == null || caughtUpClients.contains(client);
     }
 
-    /**
-     * @return whether any warmup replicas were assigned
-     */
-    static boolean assignTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
-                                       final Map<UUID, ClientState> clientStates,
-                                       final int maxWarmupReplicas) {
+    static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
+                                         final Map<UUID, ClientState> clientStates,
+                                         final Map<UUID, Set<TaskId>> warmups,
+                                         final AtomicInteger remainingWarmupReplicas) {
         final BiFunction<UUID, TaskId, Boolean> caughtUpPredicate =
             (client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
 
@@ -96,9 +96,8 @@ final class TaskMovement {
             caughtUpClientsByTaskLoad.offer(client);
         }
 
-        final boolean movementsNeeded = !taskMovements.isEmpty();
+        final int movementsNeeded = taskMovements.size();
 
-        final AtomicInteger remainingWarmupReplicas = new AtomicInteger(maxWarmupReplicas);
         for (final TaskMovement movement : taskMovements) {
             final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
                 movement.task,
@@ -115,7 +114,8 @@ final class TaskMovement {
                     remainingWarmupReplicas,
                     movement.task,
                     clientStates.get(sourceClient),
-                    clientStates.get(movement.destination)
+                    clientStates.get(movement.destination),
+                    warmups.computeIfAbsent(movement.destination, x -> new TreeSet<>())
                 );
                 caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, movement.destination));
             } else {
@@ -132,15 +132,75 @@ final class TaskMovement {
         return movementsNeeded;
     }
 
+    static int assignStandbyTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
+                                          final Map<UUID, ClientState> clientStates,
+                                          final AtomicInteger remainingWarmupReplicas,
+                                          final Map<UUID, Set<TaskId>> warmups) {
+        final BiFunction<UUID, TaskId, Boolean> caughtUpPredicate =
+            (client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
+
+        final ConstrainedPrioritySet caughtUpClientsByTaskLoad = new ConstrainedPrioritySet(
+            caughtUpPredicate,
+            client -> clientStates.get(client).assignedTaskLoad()
+        );
+
+        final Queue<TaskMovement> taskMovements = new PriorityQueue<>(
+            Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task)
+        );
+
+        for (final Map.Entry<UUID, ClientState> clientStateEntry : clientStates.entrySet()) {
+            final UUID destination = clientStateEntry.getKey();
+            final ClientState state = clientStateEntry.getValue();
+            for (final TaskId task : state.standbyTasks()) {
+                if (warmups.getOrDefault(destination, Collections.emptySet()).contains(task)) {
+                    // this is a warmup, so we won't move it.
+                } else if (!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, destination, tasksToCaughtUpClients)) {
+                    // if the desired client is not caught up, and there is another client that _is_ caught up, then
+                    // we schedule a movement, so we can move the active task to the caught-up client. We'll try to
+                    // assign a warm-up to the desired client so that we can move it later on.
+                    taskMovements.add(new TaskMovement(task, destination, tasksToCaughtUpClients.get(task)));
+                }
+            }
+            caughtUpClientsByTaskLoad.offer(destination);
+        }
+
+        int movementsNeeded = 0;
+
+        for (final TaskMovement movement : taskMovements) {
+            final UUID sourceClient = caughtUpClientsByTaskLoad.poll(
+                movement.task,
+                clientId -> !clientStates.get(clientId).hasAssignedTask(movement.task)
+            );
+
+            if (sourceClient == null) {
+                // then there's no caught-up client that doesn't already have a copy of this task, so there's
+                // nowhere to move it.
+            } else {
+                moveStandbyAndTryToWarmUp(
+                    remainingWarmupReplicas,
+                    movement.task,
+                    clientStates.get(sourceClient),
+                    clientStates.get(movement.destination)
+                );
+                caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, movement.destination));
+                movementsNeeded++;
+            }
+        }
+
+        return movementsNeeded;
+    }
+
     private static void moveActiveAndTryToWarmUp(final AtomicInteger remainingWarmupReplicas,
                                                  final TaskId task,
                                                  final ClientState sourceClientState,
-                                                 final ClientState destinationClientState) {
+                                                 final ClientState destinationClientState,
+                                                 final Set<TaskId> warmups) {
         sourceClientState.assignActive(task);
 
         if (remainingWarmupReplicas.getAndDecrement() > 0) {
             destinationClientState.unassignActive(task);
             destinationClientState.assignStandby(task);
+            warmups.add(task);
         } else {
             // we have no more standbys or warmups to hand out, so we have to try and move it
             // to the destination in a follow-on rebalance
@@ -148,6 +208,21 @@ final class TaskMovement {
         }
     }
 
+    private static void moveStandbyAndTryToWarmUp(final AtomicInteger remainingWarmupReplicas,
+                                                  final TaskId task,
+                                                  final ClientState sourceClientState,
+                                                  final ClientState destinationClientState) {
+        sourceClientState.assignStandby(task);
+
+        if (remainingWarmupReplicas.getAndDecrement() > 0) {
+            // Then we can leave it also assigned to the destination as a warmup
+        } else {
+            // we have no more warmups to hand out, so we have to try and move it
+            // to the destination in a follow-on rebalance
+            destinationClientState.unassignStandby(task);
+        }
+    }
+
     private static void swapStandbyAndActive(final TaskId task,
                                              final ClientState sourceClientState,
                                              final ClientState destinationClientState) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
index 7cfd8b6..f0139ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
@@ -326,6 +326,10 @@ public final class AssignmentTestUtils {
         return new TaskSkewReport(maxTaskSkew, skewedSubtopologies, subtopologyToClientsWithPartition);
     }
 
+    static Matcher<ClientState> hasAssignedTasks(final int taskCount) {
+        return hasProperty("assignedTasks", ClientState::assignedTaskCount, taskCount);
+    }
+
     static Matcher<ClientState> hasActiveTasks(final int taskCount) {
         return hasProperty("activeTasks", ClientState::activeTaskCount, taskCount);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
index fa9e17b..7e07d30 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
@@ -47,6 +47,7 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_2;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.analyzeTaskAssignmentBalance;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedActiveAssignment;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedStatefulAssignment;
@@ -54,6 +55,7 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasActiveTasks;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasAssignedTasks;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasStandbyTasks;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -79,6 +81,64 @@ public class HighAvailabilityTaskAssignorTest {
     );
 
     @Test
+    public void shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups() {
+        final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
+        final ClientState clientState1 = new ClientState(allTaskIds, emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), 1);
+        final ClientState clientState2 = new ClientState(emptySet(), allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)), 1);
+        final ClientState clientState3 = new ClientState(emptySet(), emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)), 1);
+
+        final Map<UUID, ClientState> clientStates = mkMap(
+            mkEntry(UUID_1, clientState1),
+            mkEntry(UUID_2, clientState2),
+            mkEntry(UUID_3, clientState3)
+        );
+
+        final boolean unstable = new HighAvailabilityTaskAssignor().assign(
+            clientStates,
+            allTaskIds,
+            allTaskIds,
+            new AssignmentConfigs(11L, 0, 1, 0L)
+        );
+
+        assertThat(clientState1, hasAssignedTasks(allTaskIds.size()));
+
+        assertThat(clientState2, hasAssignedTasks(allTaskIds.size()));
+
+        assertThat(clientState3, hasAssignedTasks(0));
+
+        assertThat(unstable, is(false));
+    }
+
+    @Test
+    public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {
+        final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
+        final ClientState clientState1 = new ClientState(allTaskIds, emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), 1);
+        final ClientState clientState2 = new ClientState(emptySet(), allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)), 1);
+        final ClientState clientState3 = new ClientState(emptySet(), emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)), 1);
+
+        final Map<UUID, ClientState> clientStates = mkMap(
+            mkEntry(UUID_1, clientState1),
+            mkEntry(UUID_2, clientState2),
+            mkEntry(UUID_3, clientState3)
+        );
+
+        final boolean unstable = new HighAvailabilityTaskAssignor().assign(
+            clientStates,
+            allTaskIds,
+            allTaskIds,
+            new AssignmentConfigs(11L, 2, 1, 0L)
+        );
+
+        assertThat(clientState1, hasAssignedTasks(allTaskIds.size()));
+
+        assertThat(clientState2, hasAssignedTasks(allTaskIds.size()));
+
+        assertThat(clientState3, hasAssignedTasks(2));
+
+        assertThat(unstable, is(true));
+    }
+
+    @Test
     public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfClientsIntegralDivisorOfNumberOfTasks() {
         final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
         final Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
index 38dde37..cb89cd9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
@@ -338,7 +338,7 @@ public class TaskAssignorConvergenceTest {
                         throw new IllegalStateException("Unexpected event: " + event);
                 }
                 if (!harness.clientStates.isEmpty()) {
-                    testForConvergence(harness, configs, numStatefulTasks * 2);
+                    testForConvergence(harness, configs, 2 * (numStatefulTasks + numStatefulTasks * numStandbyReplicas));
                     verifyValidAssignment(numStandbyReplicas, harness);
                     verifyBalancedAssignment(harness);
                 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
index 4d42082..811ba4e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
@@ -24,7 +24,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
@@ -45,7 +47,7 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasProperty;
-import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
+import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignActiveTaskMovements;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
@@ -65,11 +67,13 @@ public class TaskMovementTest {
         final ClientState client3 = getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2));
 
         assertThat(
-            assignTaskMovements(
+            assignActiveTaskMovements(
                 tasksToCaughtUpClients,
                 getClientStatesMap(client1, client2, client3),
-                maxWarmupReplicas),
-            is(false)
+                new TreeMap<>(),
+                new AtomicInteger(maxWarmupReplicas)
+            ),
+            is(0)
         );
     }
 
@@ -82,11 +86,13 @@ public class TaskMovementTest {
         final ClientState client3 = getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2));
 
         assertThat(
-            assignTaskMovements(
+            assignActiveTaskMovements(
                 emptyMap(),
                 getClientStatesMap(client1, client2, client3),
-                maxWarmupReplicas),
-            is(false)
+                new TreeMap<>(),
+                new AtomicInteger(maxWarmupReplicas)
+            ),
+            is(0)
         );
     }
 
@@ -106,11 +112,13 @@ public class TaskMovementTest {
 
         assertThat(
             "should have assigned movements",
-            assignTaskMovements(
+            assignActiveTaskMovements(
                 tasksToCaughtUpClients,
                 clientStates,
-                maxWarmupReplicas),
-            is(true)
+                new TreeMap<>(),
+                new AtomicInteger(maxWarmupReplicas)
+            ),
+            is(2)
         );
         // The active tasks have changed to the ones that each client is caught up on
         assertThat(client1, hasProperty("activeTasks", ClientState::activeTasks, mkSet(TASK_0_0)));
@@ -139,11 +147,13 @@ public class TaskMovementTest {
 
         assertThat(
             "should have assigned movements",
-            assignTaskMovements(
+            assignActiveTaskMovements(
                 tasksToCaughtUpClients,
                 clientStates,
-                maxWarmupReplicas),
-            is(true)
+                new TreeMap<>(),
+                new AtomicInteger(maxWarmupReplicas)
+            ),
+            is(2)
         );
         // The active tasks have changed to the ones that each client is caught up on
         assertThat(client1, hasProperty("activeTasks", ClientState::activeTasks, mkSet(TASK_0_0)));
@@ -175,11 +185,13 @@ public class TaskMovementTest {
 
         assertThat(
             "should have assigned movements",
-            assignTaskMovements(
+            assignActiveTaskMovements(
                 tasksToCaughtUpClients,
                 clientStates,
-                maxWarmupReplicas),
-            is(true)
+                new TreeMap<>(),
+                new AtomicInteger(maxWarmupReplicas)
+            ),
+            is(1)
         );
         // Even though we have no warmups allowed, we still let client1 take over active processing while
         // client2 "warms up" because client1 was a caught-up standby, so it can "trade" standby status with