You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/22 16:41:52 UTC
[kafka] branch trunk updated: KAFKA-6145: KIP-441: Enforce Standby
Task Stickiness (#8696)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9fdd877 KAFKA-6145: KIP-441: Enforce Standby Task Stickiness (#8696)
9fdd877 is described below
commit 9fdd87715ed1c725e268caa14d8415d4f1428ebc
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri May 22 11:41:15 2020 -0500
KAFKA-6145: KIP-441: Enforce Standby Task Stickiness (#8696)
We should treat standbys similarly to active stateful tasks and
re-assign them to instances that are already caught-up on them
while we warm them up on the desired destination, instead of
immediately moving them to the destination.
Reviewers: Bruno Cadonna <br...@confluent.io>
---
.../assignment/HighAvailabilityTaskAssignor.java | 38 ++++++++-
.../internals/assignment/TaskMovement.java | 95 +++++++++++++++++++---
.../internals/assignment/AssignmentTestUtils.java | 4 +
.../HighAvailabilityTaskAssignorTest.java | 60 ++++++++++++++
.../assignment/TaskAssignorConvergenceTest.java | 2 +-
.../internals/assignment/TaskMovementTest.java | 44 ++++++----
6 files changed, 212 insertions(+), 31 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
index 2799d79..acb5d6f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
@@ -31,12 +31,14 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.diff;
-import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
+import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignActiveTaskMovements;
+import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignStandbyTaskMovements;
public class HighAvailabilityTaskAssignor implements TaskAssignor {
private static final Logger log = LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class);
@@ -57,14 +59,42 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
configs.numStandbyReplicas
);
- final boolean probingRebalanceNeeded = assignTaskMovements(
- tasksToCaughtUpClients(statefulTasks, clientStates, configs.acceptableRecoveryLag),
+ final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas);
+
+ final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = tasksToCaughtUpClients(
+ statefulTasks,
+ clientStates,
+ configs.acceptableRecoveryLag
+ );
+
+ // We temporarily need to know which standby tasks were intended as warmups
+ // for active tasks, so that we don't move them (again) when we plan standby
+ // task movements. We can then immediately treat warmups exactly the same as
+ // hot-standby replicas, so we just track it right here as metadata, rather
+ // than add "warmup" assignments to ClientState, for example.
+ final Map<UUID, Set<TaskId>> warmups = new TreeMap<>();
+
+ final int neededActiveTaskMovements = assignActiveTaskMovements(
+ tasksToCaughtUpClients,
clientStates,
- configs.maxWarmupReplicas
+ warmups,
+ remainingWarmupReplicas
+ );
+
+ final int neededStandbyTaskMovements = assignStandbyTaskMovements(
+ tasksToCaughtUpClients,
+ clientStates,
+ remainingWarmupReplicas,
+ warmups
);
assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks));
+ // We shouldn't plan a probing rebalance if we _needed_ task movements, but couldn't do any
+ // due to being configured for no warmups.
+ final boolean probingRebalanceNeeded =
+ configs.maxWarmupReplicas > 0 && neededActiveTaskMovements + neededStandbyTaskMovements > 0;
+
log.info("Decided on assignment: " +
clientStates +
" with " +
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
index 0cfb304..04e0a41 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
@@ -18,12 +18,14 @@ package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
@@ -64,12 +66,10 @@ final class TaskMovement {
return caughtUpClients == null || caughtUpClients.contains(client);
}
- /**
- * @return whether any warmup replicas were assigned
- */
- static boolean assignTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
- final Map<UUID, ClientState> clientStates,
- final int maxWarmupReplicas) {
+ static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
+ final Map<UUID, ClientState> clientStates,
+ final Map<UUID, Set<TaskId>> warmups,
+ final AtomicInteger remainingWarmupReplicas) {
final BiFunction<UUID, TaskId, Boolean> caughtUpPredicate =
(client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
@@ -96,9 +96,8 @@ final class TaskMovement {
caughtUpClientsByTaskLoad.offer(client);
}
- final boolean movementsNeeded = !taskMovements.isEmpty();
+ final int movementsNeeded = taskMovements.size();
- final AtomicInteger remainingWarmupReplicas = new AtomicInteger(maxWarmupReplicas);
for (final TaskMovement movement : taskMovements) {
final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
movement.task,
@@ -115,7 +114,8 @@ final class TaskMovement {
remainingWarmupReplicas,
movement.task,
clientStates.get(sourceClient),
- clientStates.get(movement.destination)
+ clientStates.get(movement.destination),
+ warmups.computeIfAbsent(movement.destination, x -> new TreeSet<>())
);
caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, movement.destination));
} else {
@@ -132,15 +132,75 @@ final class TaskMovement {
return movementsNeeded;
}
+ static int assignStandbyTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
+ final Map<UUID, ClientState> clientStates,
+ final AtomicInteger remainingWarmupReplicas,
+ final Map<UUID, Set<TaskId>> warmups) {
+ final BiFunction<UUID, TaskId, Boolean> caughtUpPredicate =
+ (client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
+
+ final ConstrainedPrioritySet caughtUpClientsByTaskLoad = new ConstrainedPrioritySet(
+ caughtUpPredicate,
+ client -> clientStates.get(client).assignedTaskLoad()
+ );
+
+ final Queue<TaskMovement> taskMovements = new PriorityQueue<>(
+ Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task)
+ );
+
+ for (final Map.Entry<UUID, ClientState> clientStateEntry : clientStates.entrySet()) {
+ final UUID destination = clientStateEntry.getKey();
+ final ClientState state = clientStateEntry.getValue();
+ for (final TaskId task : state.standbyTasks()) {
+ if (warmups.getOrDefault(destination, Collections.emptySet()).contains(task)) {
+ // this is a warmup, so we won't move it.
+ } else if (!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, destination, tasksToCaughtUpClients)) {
+ // if the desired client is not caught up, and there is another client that _is_ caught up, then
+ // we schedule a movement, so we can move the active task to the caught-up client. We'll try to
+ // assign a warm-up to the desired client so that we can move it later on.
+ taskMovements.add(new TaskMovement(task, destination, tasksToCaughtUpClients.get(task)));
+ }
+ }
+ caughtUpClientsByTaskLoad.offer(destination);
+ }
+
+ int movementsNeeded = 0;
+
+ for (final TaskMovement movement : taskMovements) {
+ final UUID sourceClient = caughtUpClientsByTaskLoad.poll(
+ movement.task,
+ clientId -> !clientStates.get(clientId).hasAssignedTask(movement.task)
+ );
+
+ if (sourceClient == null) {
+ // then there's no caught-up client that doesn't already have a copy of this task, so there's
+ // nowhere to move it.
+ } else {
+ moveStandbyAndTryToWarmUp(
+ remainingWarmupReplicas,
+ movement.task,
+ clientStates.get(sourceClient),
+ clientStates.get(movement.destination)
+ );
+ caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, movement.destination));
+ movementsNeeded++;
+ }
+ }
+
+ return movementsNeeded;
+ }
+
private static void moveActiveAndTryToWarmUp(final AtomicInteger remainingWarmupReplicas,
final TaskId task,
final ClientState sourceClientState,
- final ClientState destinationClientState) {
+ final ClientState destinationClientState,
+ final Set<TaskId> warmups) {
sourceClientState.assignActive(task);
if (remainingWarmupReplicas.getAndDecrement() > 0) {
destinationClientState.unassignActive(task);
destinationClientState.assignStandby(task);
+ warmups.add(task);
} else {
// we have no more standbys or warmups to hand out, so we have to try and move it
// to the destination in a follow-on rebalance
@@ -148,6 +208,21 @@ final class TaskMovement {
}
}
+ private static void moveStandbyAndTryToWarmUp(final AtomicInteger remainingWarmupReplicas,
+ final TaskId task,
+ final ClientState sourceClientState,
+ final ClientState destinationClientState) {
+ sourceClientState.assignStandby(task);
+
+ if (remainingWarmupReplicas.getAndDecrement() > 0) {
+ // Then we can leave it also assigned to the destination as a warmup
+ } else {
+ // we have no more warmups to hand out, so we have to try and move it
+ // to the destination in a follow-on rebalance
+ destinationClientState.unassignStandby(task);
+ }
+ }
+
private static void swapStandbyAndActive(final TaskId task,
final ClientState sourceClientState,
final ClientState destinationClientState) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
index 7cfd8b6..f0139ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
@@ -326,6 +326,10 @@ public final class AssignmentTestUtils {
return new TaskSkewReport(maxTaskSkew, skewedSubtopologies, subtopologyToClientsWithPartition);
}
+ static Matcher<ClientState> hasAssignedTasks(final int taskCount) {
+ return hasProperty("assignedTasks", ClientState::assignedTaskCount, taskCount);
+ }
+
static Matcher<ClientState> hasActiveTasks(final int taskCount) {
return hasProperty("activeTasks", ClientState::activeTaskCount, taskCount);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
index fa9e17b..7e07d30 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
@@ -47,6 +47,7 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_2;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.analyzeTaskAssignmentBalance;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedActiveAssignment;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedStatefulAssignment;
@@ -54,6 +55,7 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasActiveTasks;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasAssignedTasks;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasStandbyTasks;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -79,6 +81,64 @@ public class HighAvailabilityTaskAssignorTest {
);
@Test
+ public void shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups() {
+ final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
+ final ClientState clientState1 = new ClientState(allTaskIds, emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), 1);
+ final ClientState clientState2 = new ClientState(emptySet(), allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)), 1);
+ final ClientState clientState3 = new ClientState(emptySet(), emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)), 1);
+
+ final Map<UUID, ClientState> clientStates = mkMap(
+ mkEntry(UUID_1, clientState1),
+ mkEntry(UUID_2, clientState2),
+ mkEntry(UUID_3, clientState3)
+ );
+
+ final boolean unstable = new HighAvailabilityTaskAssignor().assign(
+ clientStates,
+ allTaskIds,
+ allTaskIds,
+ new AssignmentConfigs(11L, 0, 1, 0L)
+ );
+
+ assertThat(clientState1, hasAssignedTasks(allTaskIds.size()));
+
+ assertThat(clientState2, hasAssignedTasks(allTaskIds.size()));
+
+ assertThat(clientState3, hasAssignedTasks(0));
+
+ assertThat(unstable, is(false));
+ }
+
+ @Test
+ public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {
+ final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
+ final ClientState clientState1 = new ClientState(allTaskIds, emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), 1);
+ final ClientState clientState2 = new ClientState(emptySet(), allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)), 1);
+ final ClientState clientState3 = new ClientState(emptySet(), emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)), 1);
+
+ final Map<UUID, ClientState> clientStates = mkMap(
+ mkEntry(UUID_1, clientState1),
+ mkEntry(UUID_2, clientState2),
+ mkEntry(UUID_3, clientState3)
+ );
+
+ final boolean unstable = new HighAvailabilityTaskAssignor().assign(
+ clientStates,
+ allTaskIds,
+ allTaskIds,
+ new AssignmentConfigs(11L, 2, 1, 0L)
+ );
+
+ assertThat(clientState1, hasAssignedTasks(allTaskIds.size()));
+
+ assertThat(clientState2, hasAssignedTasks(allTaskIds.size()));
+
+ assertThat(clientState3, hasAssignedTasks(2));
+
+ assertThat(unstable, is(true));
+ }
+
+ @Test
public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfClientsIntegralDivisorOfNumberOfTasks() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
final Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
index 38dde37..cb89cd9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
@@ -338,7 +338,7 @@ public class TaskAssignorConvergenceTest {
throw new IllegalStateException("Unexpected event: " + event);
}
if (!harness.clientStates.isEmpty()) {
- testForConvergence(harness, configs, numStatefulTasks * 2);
+ testForConvergence(harness, configs, 2 * (numStatefulTasks + numStatefulTasks * numStandbyReplicas));
verifyValidAssignment(numStandbyReplicas, harness);
verifyBalancedAssignment(harness);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
index 4d42082..811ba4e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
@@ -24,7 +24,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
@@ -45,7 +47,7 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasProperty;
-import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
+import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignActiveTaskMovements;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -65,11 +67,13 @@ public class TaskMovementTest {
final ClientState client3 = getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2));
assertThat(
- assignTaskMovements(
+ assignActiveTaskMovements(
tasksToCaughtUpClients,
getClientStatesMap(client1, client2, client3),
- maxWarmupReplicas),
- is(false)
+ new TreeMap<>(),
+ new AtomicInteger(maxWarmupReplicas)
+ ),
+ is(0)
);
}
@@ -82,11 +86,13 @@ public class TaskMovementTest {
final ClientState client3 = getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2));
assertThat(
- assignTaskMovements(
+ assignActiveTaskMovements(
emptyMap(),
getClientStatesMap(client1, client2, client3),
- maxWarmupReplicas),
- is(false)
+ new TreeMap<>(),
+ new AtomicInteger(maxWarmupReplicas)
+ ),
+ is(0)
);
}
@@ -106,11 +112,13 @@ public class TaskMovementTest {
assertThat(
"should have assigned movements",
- assignTaskMovements(
+ assignActiveTaskMovements(
tasksToCaughtUpClients,
clientStates,
- maxWarmupReplicas),
- is(true)
+ new TreeMap<>(),
+ new AtomicInteger(maxWarmupReplicas)
+ ),
+ is(2)
);
// The active tasks have changed to the ones that each client is caught up on
assertThat(client1, hasProperty("activeTasks", ClientState::activeTasks, mkSet(TASK_0_0)));
@@ -139,11 +147,13 @@ public class TaskMovementTest {
assertThat(
"should have assigned movements",
- assignTaskMovements(
+ assignActiveTaskMovements(
tasksToCaughtUpClients,
clientStates,
- maxWarmupReplicas),
- is(true)
+ new TreeMap<>(),
+ new AtomicInteger(maxWarmupReplicas)
+ ),
+ is(2)
);
// The active tasks have changed to the ones that each client is caught up on
assertThat(client1, hasProperty("activeTasks", ClientState::activeTasks, mkSet(TASK_0_0)));
@@ -175,11 +185,13 @@ public class TaskMovementTest {
assertThat(
"should have assigned movements",
- assignTaskMovements(
+ assignActiveTaskMovements(
tasksToCaughtUpClients,
clientStates,
- maxWarmupReplicas),
- is(true)
+ new TreeMap<>(),
+ new AtomicInteger(maxWarmupReplicas)
+ ),
+ is(1)
);
// Even though we have no warmups allowed, we still let client1 take over active processing while
// client2 "warms up" because client1 was a caught-up standby, so it can "trade" standby status with