You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/04/03 18:54:12 UTC
[kafka] branch trunk updated: MINOR: clean up Streams assignment
classes and tests (#8406)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6e0d553 MINOR: clean up Streams assignment classes and tests (#8406)
6e0d553 is described below
commit 6e0d553350cef876f4fd2de0e3b8e6e40ce6be44
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Apr 3 11:53:51 2020 -0700
MINOR: clean up Streams assignment classes and tests (#8406)
First set of cleanup pushed to followup PR after KIP-441 Pt. 5. Main changes are:
1. Moved `RankedClient` and the static `buildClientRankingsByTask` to a new file
2. Moved `Movement` and the static `getMovements` to a new file (also renamed to `TaskMovement`)
3. Consolidated the many common variables throughout the assignment tests to the new `AssignmentTestUtils`
4. New utility to generate comparable/predictable UUIDs for tests, and removed the generic from `TaskAssignor` and all related classes
Reviewers: John Roesler <vv...@apache.org>, Andrew Choi <a2...@edu.uwaterloo.ca>
---
checkstyle/suppressions.xml | 3 -
.../internals/StreamsPartitionAssignor.java | 8 +-
.../internals/assignment/BalancedAssignor.java | 14 +-
.../assignment/DefaultBalancedAssignor.java | 32 +-
.../DefaultStateConstrainedBalancedAssignor.java | 134 ++---
.../assignment/HighAvailabilityTaskAssignor.java | 277 ++-------
.../internals/assignment/RankedClient.java | 120 ++++
.../StateConstrainedBalancedAssignor.java | 12 +-
.../internals/assignment/StickyTaskAssignor.java | 49 +-
.../internals/assignment/TaskAssignor.java | 2 +-
.../internals/assignment/TaskMovement.java | 101 ++++
.../internals/StreamsPartitionAssignorTest.java | 271 ++++-----
.../internals/assignment/AssignmentInfoTest.java | 16 +-
.../internals/assignment/AssignmentTestUtils.java | 86 +++
.../internals/assignment/ClientStateTest.java | 103 ++--
.../assignment/DefaultBalancedAssignorTest.java | 272 ++++-----
...efaultStateConstrainedBalancedAssignorTest.java | 433 +++++++-------
.../HighAvailabilityTaskAssignorTest.java | 306 ++--------
.../internals/assignment/RankedClientTest.java | 163 +++++
.../assignment/StickyTaskAssignorTest.java | 656 ++++++++++-----------
.../internals/assignment/SubscriptionInfoTest.java | 82 +--
.../internals/assignment/TaskMovementTest.java | 129 ++++
22 files changed, 1740 insertions(+), 1529 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 45dc98e..bca372b 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -208,9 +208,6 @@
<suppress checks="MethodLength"
files="RocksDBWindowStoreTest.java"/>
- <suppress checks="MemberName"
- files="StreamsPartitionAssignorTest.java"/>
-
<suppress checks="ClassDataAbstractionCoupling"
files=".*[/\\]streams[/\\].*test[/\\].*.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index a59c4ab..c6c2e48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -708,19 +708,19 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
log.debug("Assigning tasks {} to clients {} with number of replicas {}",
allTasks, clientStates, numStandbyReplicas());
- final TaskAssignor<UUID> taskAssignor;
+ final TaskAssignor taskAssignor;
if (highAvailabilityEnabled) {
if (lagComputationSuccessful) {
- taskAssignor = new HighAvailabilityTaskAssignor<>(
+ taskAssignor = new HighAvailabilityTaskAssignor(
clientStates,
allTasks,
statefulTasks,
assignmentConfigs);
} else {
- taskAssignor = new StickyTaskAssignor<>(clientStates, allTasks, statefulTasks, assignmentConfigs, true);
+ taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true);
}
} else {
- taskAssignor = new StickyTaskAssignor<>(clientStates, allTasks, statefulTasks, assignmentConfigs, false);
+ taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, false);
}
taskAssignor.assign();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalancedAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalancedAssignor.java
index 127b33e..5c2136a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalancedAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalancedAssignor.java
@@ -16,16 +16,16 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
-import org.apache.kafka.streams.processor.TaskId;
-
+import java.util.UUID;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
+import org.apache.kafka.streams.processor.TaskId;
-public interface BalancedAssignor<ID extends Comparable<? super ID>> {
+public interface BalancedAssignor {
- Map<ID, List<TaskId>> assign(final SortedSet<ID> clients,
- final SortedSet<TaskId> tasks,
- final Map<ID, Integer> clientsToNumberOfStreamThreads,
- final int balanceFactor);
+ Map<UUID, List<TaskId>> assign(final SortedSet<UUID> clients,
+ final SortedSet<TaskId> tasks,
+ final Map<UUID, Integer> clientsToNumberOfStreamThreads,
+ final int balanceFactor);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignor.java
index 58a6741..60d96c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignor.java
@@ -16,8 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
-import org.apache.kafka.streams.processor.TaskId;
-
+import java.util.UUID;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -25,27 +24,28 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
+import org.apache.kafka.streams.processor.TaskId;
-public class DefaultBalancedAssignor<ID extends Comparable<? super ID>> implements BalancedAssignor<ID> {
+public class DefaultBalancedAssignor implements BalancedAssignor {
@Override
- public Map<ID, List<TaskId>> assign(final SortedSet<ID> clients,
- final SortedSet<TaskId> tasks,
- final Map<ID, Integer> clientsToNumberOfStreamThreads,
- final int balanceFactor) {
- final Map<ID, List<TaskId>> assignment = new HashMap<>();
+ public Map<UUID, List<TaskId>> assign(final SortedSet<UUID> clients,
+ final SortedSet<TaskId> tasks,
+ final Map<UUID, Integer> clientsToNumberOfStreamThreads,
+ final int balanceFactor) {
+ final Map<UUID, List<TaskId>> assignment = new HashMap<>();
clients.forEach(client -> assignment.put(client, new ArrayList<>()));
distributeTasksEvenlyOverClients(assignment, clients, tasks);
balanceTasksOverStreamThreads(assignment, clients, clientsToNumberOfStreamThreads, balanceFactor);
return assignment;
}
- private void distributeTasksEvenlyOverClients(final Map<ID, List<TaskId>> assignment,
- final SortedSet<ID> clients,
+ private void distributeTasksEvenlyOverClients(final Map<UUID, List<TaskId>> assignment,
+ final SortedSet<UUID> clients,
final SortedSet<TaskId> tasks) {
final LinkedList<TaskId> tasksToAssign = new LinkedList<>(tasks);
while (!tasksToAssign.isEmpty()) {
- for (final ID client : clients) {
+ for (final UUID client : clients) {
final TaskId task = tasksToAssign.poll();
if (task == null) {
@@ -56,16 +56,16 @@ public class DefaultBalancedAssignor<ID extends Comparable<? super ID>> implemen
}
}
- private void balanceTasksOverStreamThreads(final Map<ID, List<TaskId>> assignment,
- final SortedSet<ID> clients,
- final Map<ID, Integer> clientsToNumberOfStreamThreads,
+ private void balanceTasksOverStreamThreads(final Map<UUID, List<TaskId>> assignment,
+ final SortedSet<UUID> clients,
+ final Map<UUID, Integer> clientsToNumberOfStreamThreads,
final int balanceFactor) {
boolean stop = false;
while (!stop) {
stop = true;
- for (final ID sourceClient : clients) {
+ for (final UUID sourceClient : clients) {
final List<TaskId> sourceTasks = assignment.get(sourceClient);
- for (final ID destinationClient : clients) {
+ for (final UUID destinationClient : clients) {
if (sourceClient.equals(destinationClient)) {
continue;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStateConstrainedBalancedAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStateConstrainedBalancedAssignor.java
index a6a6cf7..5404733 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStateConstrainedBalancedAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStateConstrainedBalancedAssignor.java
@@ -26,11 +26,11 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.UUID;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
-import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor.RankedClient;
-public class DefaultStateConstrainedBalancedAssignor<ID extends Comparable<? super ID>> implements StateConstrainedBalancedAssignor<ID> {
+public class DefaultStateConstrainedBalancedAssignor implements StateConstrainedBalancedAssignor {
/**
* This assignment algorithm guarantees that all task for which caught-up clients exist are assigned to one of the
@@ -44,13 +44,13 @@ public class DefaultStateConstrainedBalancedAssignor<ID extends Comparable<? sup
* @return assignment
*/
@Override
- public Map<ID, List<TaskId>> assign(final SortedMap<TaskId, SortedSet<RankedClient<ID>>> statefulTasksToRankedClients,
- final int balanceFactor,
- final Set<ID> clients,
- final Map<ID, Integer> clientsToNumberOfStreamThreads) {
+ public Map<UUID, List<TaskId>> assign(final SortedMap<TaskId, SortedSet<RankedClient>> statefulTasksToRankedClients,
+ final int balanceFactor,
+ final Set<UUID> clients,
+ final Map<UUID, Integer> clientsToNumberOfStreamThreads) {
checkClientsAndNumberOfStreamThreads(clientsToNumberOfStreamThreads, clients);
- final Map<ID, List<TaskId>> assignment = initAssignment(clients);
- final Map<TaskId, List<ID>> tasksToCaughtUpClients = tasksToCaughtUpClients(statefulTasksToRankedClients);
+ final Map<UUID, List<TaskId>> assignment = initAssignment(clients);
+ final Map<TaskId, List<UUID>> tasksToCaughtUpClients = tasksToCaughtUpClients(statefulTasksToRankedClients);
assignTasksWithCaughtUpClients(
assignment,
tasksToCaughtUpClients,
@@ -71,15 +71,15 @@ public class DefaultStateConstrainedBalancedAssignor<ID extends Comparable<? sup
return assignment;
}
- private void checkClientsAndNumberOfStreamThreads(final Map<ID, Integer> clientsToNumberOfStreamThreads,
- final Set<ID> clients) {
+ private void checkClientsAndNumberOfStreamThreads(final Map<UUID, Integer> clientsToNumberOfStreamThreads,
+ final Set<UUID> clients) {
if (clients.isEmpty()) {
throw new IllegalStateException("Set of clients must not be empty");
}
if (clientsToNumberOfStreamThreads.isEmpty()) {
throw new IllegalStateException("Map from clients to their number of stream threads must not be empty");
}
- final Set<ID> copyOfClients = new HashSet<>(clients);
+ final Set<UUID> copyOfClients = new HashSet<>(clients);
copyOfClients.removeAll(clientsToNumberOfStreamThreads.keySet());
if (!copyOfClients.isEmpty()) {
throw new IllegalStateException(
@@ -95,8 +95,8 @@ public class DefaultStateConstrainedBalancedAssignor<ID extends Comparable<? sup
* @param clients list of clients
* @return initialised assignment with empty lists
*/
- private Map<ID, List<TaskId>> initAssignment(final Set<ID> clients) {
- final Map<ID, List<TaskId>> assignment = new HashMap<>();
+ private Map<UUID, List<TaskId>> initAssignment(final Set<UUID> clients) {
+ final Map<UUID, List<TaskId>> assignment = new HashMap<>();
clients.forEach(client -> assignment.put(client, new ArrayList<>()));
return assignment;
}
@@ -107,11 +107,11 @@ public class DefaultStateConstrainedBalancedAssignor<ID extends Comparable<? sup
* @param statefulTasksToRankedClients ranked clients map
* @return map from tasks with caught-up clients to the list of client candidates
*/
- private Map<TaskId, List<ID>> tasksToCaughtUpClients(final SortedMap<TaskId, SortedSet<RankedClient<ID>>> statefulTasksToRankedClients) {
- final Map<TaskId, List<ID>> taskToCaughtUpClients = new HashMap<>();
- for (final SortedMap.Entry<TaskId, SortedSet<RankedClient<ID>>> taskToRankedClients : statefulTasksToRankedClients.entrySet()) {
- final SortedSet<RankedClient<ID>> rankedClients = taskToRankedClients.getValue();
- for (final RankedClient<ID> rankedClient : rankedClients) {
+ private Map<TaskId, List<UUID>> tasksToCaughtUpClients(final SortedMap<TaskId, SortedSet<RankedClient>> statefulTasksToRankedClients) {
+ final Map<TaskId, List<UUID>> taskToCaughtUpClients = new HashMap<>();
+ for (final SortedMap.Entry<TaskId, SortedSet<RankedClient>> taskToRankedClients : statefulTasksToRankedClients.entrySet()) {
+ final SortedSet<RankedClient> rankedClients = taskToRankedClients.getValue();
+ for (final RankedClient rankedClient : rankedClients) {
if (rankedClient.rank() == Task.LATEST_OFFSET || rankedClient.rank() == 0) {
final TaskId taskId = taskToRankedClients.getKey();
taskToCaughtUpClients.computeIfAbsent(taskId, ignored -> new ArrayList<>()).add(rankedClient.clientId());
@@ -126,12 +126,12 @@ public class DefaultStateConstrainedBalancedAssignor<ID extends Comparable<? sup
/**
* Maps a task to the client that host the task according to the previous assignment.
*
- * @return map from task IDs to clients hosting the corresponding task
+ * @return map from task UUIDs to clients hosting the corresponding task
*/
- private Map<TaskId, ID> previouslyRunningTasksToPreviousClients(final Map<TaskId, SortedSet<RankedClient<ID>>> statefulTasksToRankedClients) {
- final Map<TaskId, ID> tasksToPreviousClients = new HashMap<>();
- for (final Map.Entry<TaskId, SortedSet<RankedClient<ID>>> taskToRankedClients : statefulTasksToRankedClients.entrySet()) {
- final RankedClient<ID> topRankedClient = taskToRankedClients.getValue().first();
+ private Map<TaskId, UUID> previouslyRunningTasksToPreviousClients(final Map<TaskId, SortedSet<RankedClient>> statefulTasksToRankedClients) {
+ final Map<TaskId, UUID> tasksToPreviousClients = new HashMap<>();
+ for (final Map.Entry<TaskId, SortedSet<RankedClient>> taskToRankedClients : statefulTasksToRankedClients.entrySet()) {
+ final RankedClient topRankedClient = taskToRankedClients.getValue().first();
if (topRankedClient.rank() == Task.LATEST_OFFSET) {
tasksToPreviousClients.put(taskToRankedClients.getKey(), topRankedClient.clientId());
}
@@ -142,13 +142,13 @@ public class DefaultStateConstrainedBalancedAssignor<ID extends Comparable<? sup
/**
* Assigns tasks for which one or more caught-up clients exist to one of the caught-up clients.
* @param assignment assignment
- * @param tasksToCaughtUpClients map from task IDs to lists of caught-up clients
+ * @param tasksToCaughtUpClients map from task UUIDs to lists of caught-up clients
*/
- private void assignTasksWithCaughtUpClients(final Map<ID, List<TaskId>> assignment,
- final Map<TaskId, List<ID>> tasksToCaughtUpClients,
- final Map<TaskId, SortedSet<RankedClient<ID>>> statefulTasksToRankedClients) {
+ private void assignTasksWithCaughtUpClients(final Map<UUID, List<TaskId>> assignment,
+ final Map<TaskId, List<UUID>> tasksToCaughtUpClients,
+ final Map<TaskId, SortedSet<RankedClient>> statefulTasksToRankedClients) {
// If a task was previously assigned to a client that is caught-up and still exists, give it back to the client
- final Map<TaskId, ID> previouslyRunningTasksToPreviousClients =
+ final Map<TaskId, UUID> previouslyRunningTasksToPreviousClients =
previouslyRunningTasksToPreviousClients(statefulTasksToRankedClients);
previouslyRunningTasksToPreviousClients.forEach((task, client) -> assignment.get(client).add(task));
final List<TaskId> unassignedTasksWithCaughtUpClients = new ArrayList<>(tasksToCaughtUpClients.keySet());
@@ -157,10 +157,10 @@ public class DefaultStateConstrainedBalancedAssignor<ID extends Comparable<? sup
// If a task's previous host client was not caught-up or no longer exists, assign it to the caught-up client
// with the least tasks
for (final TaskId taskId : unassignedTasksWithCaughtUpClients) {
- final List<ID> caughtUpClients = tasksToCaughtUpClients.get(taskId);
- ID clientWithLeastTasks = null;
+ final List<UUID> caughtUpClients = tasksToCaughtUpClients.get(taskId);
+ UUID clientWithLeastTasks = null;
int minTaskPerStreamThread = Integer.MAX_VALUE;
- for (final ID client : caughtUpClients) {
+ for (final UUID client : caughtUpClients) {
final int assignedTasks = assignment.get(client).size();
if (minTaskPerStreamThread > assignedTasks) {
clientWithLeastTasks = client;
@@ -175,22 +175,22 @@ public class DefaultStateConstrainedBalancedAssignor<ID extends Comparable<? sup
* Assigns tasks for which no caught-up clients exist.
* A task is assigned to one of the clients with the highest rank and the least tasks assigned.
* @param assignment assignment
- * @param tasksToCaughtUpClients map from task IDs to lists of caught-up clients
+ * @param tasksToCaughtUpClients map from task UUIDs to lists of caught-up clients
* @param statefulTasksToRankedClients ranked clients map
*/
- private void assignTasksWithoutCaughtUpClients(final Map<ID, List<TaskId>> assignment,
- final Map<TaskId, List<ID>> tasksToCaughtUpClients,
- final Map<TaskId, SortedSet<RankedClient<ID>>> statefulTasksToRankedClients) {
+ private void assignTasksWithoutCaughtUpClients(final Map<UUID, List<TaskId>> assignment,
+ final Map<TaskId, List<UUID>> tasksToCaughtUpClients,
+ final Map<TaskId, SortedSet<RankedClient>> statefulTasksToRankedClients) {
final SortedSet<TaskId> unassignedTasksWithoutCaughtUpClients = new TreeSet<>(statefulTasksToRankedClients.keySet());
unassignedTasksWithoutCaughtUpClients.removeAll(tasksToCaughtUpClients.keySet());
for (final TaskId taskId : unassignedTasksWithoutCaughtUpClients) {
- final SortedSet<RankedClient<ID>> rankedClients = statefulTasksToRankedClients.get(taskId);
+ final SortedSet<RankedClient> rankedClients = statefulTasksToRankedClients.get(taskId);
final long topRank = rankedClients.first().rank();
int minTasksPerStreamThread = Integer.MAX_VALUE;
- ID clientWithLeastTasks = rankedClients.first().clientId();
- for (final RankedClient<ID> rankedClient : rankedClients) {
+ UUID clientWithLeastTasks = rankedClients.first().clientId();
+ for (final RankedClient rankedClient : rankedClients) {
if (rankedClient.rank() == topRank) {
- final ID clientId = rankedClient.clientId();
+ final UUID clientId = rankedClient.clientId();
final int assignedTasks = assignment.get(clientId).size();
if (minTasksPerStreamThread > assignedTasks) {
clientWithLeastTasks = clientId;
@@ -209,17 +209,17 @@ public class DefaultStateConstrainedBalancedAssignor<ID extends Comparable<? sup
* @param assignment assignment
* @param balanceFactor balance factor
* @param statefulTasksToRankedClients ranked clients map
- * @param tasksToCaughtUpClients map from task IDs to lists of caught-up clients
+ * @param tasksToCaughtUpClients map from task UUIDs to lists of caught-up clients
* @param clientsToNumberOfStreamThreads map from clients to their number of stream threads
*/
- private void balance(final Map<ID, List<TaskId>> assignment,
+ private void balance(final Map<UUID, List<TaskId>> assignment,
final int balanceFactor,
- final Map<TaskId, SortedSet<RankedClient<ID>>> statefulTasksToRankedClients,
- final Map<TaskId, List<ID>> tasksToCaughtUpClients,
- final Map<ID, Integer> clientsToNumberOfStreamThreads) {
- final List<ID> clients = new ArrayList<>(assignment.keySet());
+ final Map<TaskId, SortedSet<RankedClient>> statefulTasksToRankedClients,
+ final Map<TaskId, List<UUID>> tasksToCaughtUpClients,
+ final Map<UUID, Integer> clientsToNumberOfStreamThreads) {
+ final List<UUID> clients = new ArrayList<>(assignment.keySet());
Collections.sort(clients);
- for (final ID sourceClientId : clients) {
+ for (final UUID sourceClientId : clients) {
final List<TaskId> sourceTasks = assignment.get(sourceClientId);
maybeMoveSourceTasksWithoutCaughtUpClients(
assignment,
@@ -241,18 +241,18 @@ public class DefaultStateConstrainedBalancedAssignor<ID extends Comparable<? sup
}
}
- private void maybeMoveSourceTasksWithoutCaughtUpClients(final Map<ID, List<TaskId>> assignment,
+ private void maybeMoveSourceTasksWithoutCaughtUpClients(final Map<UUID, List<TaskId>> assignment,
final int balanceFactor,
- final Map<TaskId, SortedSet<RankedClient<ID>>> statefulTasksToRankedClients,
- final Map<TaskId, List<ID>> tasksToCaughtUpClients,
- final Map<ID, Integer> clientsToNumberOfStreamThreads,
- final ID sourceClientId,
+ final Map<TaskId, SortedSet<RankedClient>> statefulTasksToRankedClients,
+ final Map<TaskId, List<UUID>> tasksToCaughtUpClients,
+ final Map<UUID, Integer> clientsToNumberOfStreamThreads,
+ final UUID sourceClientId,
final List<TaskId> sourceTasks) {
for (final TaskId task : assignedTasksWithoutCaughtUpClientsThatMightBeMoved(sourceTasks, tasksToCaughtUpClients)) {
final int assignedTasksPerStreamThreadAtSource =
sourceTasks.size() / clientsToNumberOfStreamThreads.get(sourceClientId);
- for (final RankedClient<ID> clientAndRank : statefulTasksToRankedClients.get(task)) {
- final ID destinationClientId = clientAndRank.clientId();
+ for (final RankedClient clientAndRank : statefulTasksToRankedClients.get(task)) {
+ final UUID destinationClientId = clientAndRank.clientId();
final List<TaskId> destination = assignment.get(destinationClientId);
final int assignedTasksPerStreamThreadAtDestination =
destination.size() / clientsToNumberOfStreamThreads.get(destinationClientId);
@@ -265,16 +265,16 @@ public class DefaultStateConstrainedBalancedAssignor<ID extends Comparable<? sup
}
}
- private void maybeMoveSourceTasksWithCaughtUpClients(final Map<ID, List<TaskId>> assignment,
+ private void maybeMoveSourceTasksWithCaughtUpClients(final Map<UUID, List<TaskId>> assignment,
final int balanceFactor,
- final Map<TaskId, List<ID>> tasksToCaughtUpClients,
- final Map<ID, Integer> clientsToNumberOfStreamThreads,
- final ID sourceClientId,
+ final Map<TaskId, List<UUID>> tasksToCaughtUpClients,
+ final Map<UUID, Integer> clientsToNumberOfStreamThreads,
+ final UUID sourceClientId,
final List<TaskId> sourceTasks) {
for (final TaskId task : assignedTasksWithCaughtUpClientsThatMightBeMoved(sourceTasks, tasksToCaughtUpClients)) {
final int assignedTasksPerStreamThreadAtSource =
sourceTasks.size() / clientsToNumberOfStreamThreads.get(sourceClientId);
- for (final ID destinationClientId : tasksToCaughtUpClients.get(task)) {
+ for (final UUID destinationClientId : tasksToCaughtUpClients.get(task)) {
final List<TaskId> destination = assignment.get(destinationClientId);
final int assignedTasksPerStreamThreadAtDestination =
destination.size() / clientsToNumberOfStreamThreads.get(destinationClientId);
@@ -290,29 +290,29 @@ public class DefaultStateConstrainedBalancedAssignor<ID extends Comparable<? sup
/**
* Returns a sublist of tasks in the given list that does not have a caught-up client.
*
- * @param tasks list of task IDs
- * @param tasksToCaughtUpClients map from task IDs to lists of caught-up clients
- * @return a list of task IDs that does not have a caught-up client
+ * @param tasks list of task UUIDs
+ * @param tasksToCaughtUpClients map from task UUIDs to lists of caught-up clients
+ * @return a list of task UUIDs that does not have a caught-up client
*/
private List<TaskId> assignedTasksWithoutCaughtUpClientsThatMightBeMoved(final List<TaskId> tasks,
- final Map<TaskId, List<ID>> tasksToCaughtUpClients) {
+ final Map<TaskId, List<UUID>> tasksToCaughtUpClients) {
return assignedTasksThatMightBeMoved(tasks, tasksToCaughtUpClients, false);
}
/**
* Returns a sublist of tasks in the given list that have a caught-up client.
*
- * @param tasks list of task IDs
- * @param tasksToCaughtUpClients map from task IDs to lists of caught-up clients
- * @return a list of task IDs that have a caught-up client
+ * @param tasks list of task UUIDs
+ * @param tasksToCaughtUpClients map from task UUIDs to lists of caught-up clients
+ * @return a list of task UUIDs that have a caught-up client
*/
private List<TaskId> assignedTasksWithCaughtUpClientsThatMightBeMoved(final List<TaskId> tasks,
- final Map<TaskId, List<ID>> tasksToCaughtUpClients) {
+ final Map<TaskId, List<UUID>> tasksToCaughtUpClients) {
return assignedTasksThatMightBeMoved(tasks, tasksToCaughtUpClients, true);
}
private List<TaskId> assignedTasksThatMightBeMoved(final List<TaskId> tasks,
- final Map<TaskId, List<ID>> tasksToCaughtUpClients,
+ final Map<TaskId, List<UUID>> tasksToCaughtUpClients,
final boolean isCaughtUp) {
final List<TaskId> tasksWithCaughtUpClients = new ArrayList<>();
for (int i = tasks.size() - 1; i >= 0; --i) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
index 276db81..09fc8c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
@@ -17,7 +17,8 @@
package org.apache.kafka.streams.processor.internals.assignment;
import static java.util.Arrays.asList;
-import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
+import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.buildClientRankingsByTask;
+import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.getMovements;
import java.util.ArrayList;
import java.util.Collection;
@@ -26,15 +27,13 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
-import java.util.Objects;
import java.util.PriorityQueue;
import java.util.SortedMap;
import java.util.SortedSet;
-import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,12 +41,12 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Set;
-public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements TaskAssignor<ID> {
+public class HighAvailabilityTaskAssignor implements TaskAssignor {
private static final Logger log = LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class);
- private final Map<ID, ClientState> clientStates;
- private final Map<ID, Integer> clientsToNumberOfThreads;
- private final SortedSet<ID> sortedClients;
+ private final Map<UUID, ClientState> clientStates;
+ private final Map<UUID, Integer> clientsToNumberOfThreads;
+ private final SortedSet<UUID> sortedClients;
private final Set<TaskId> allTasks;
private final SortedSet<TaskId> statefulTasks;
@@ -55,9 +54,9 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
private final AssignmentConfigs configs;
- private final SortedMap<TaskId, SortedSet<RankedClient<ID>>> statefulTasksToRankedCandidates;
+ private final SortedMap<TaskId, SortedSet<RankedClient>> statefulTasksToRankedCandidates;
- public HighAvailabilityTaskAssignor(final Map<ID, ClientState> clientStates,
+ public HighAvailabilityTaskAssignor(final Map<UUID, ClientState> clientStates,
final Set<TaskId> allTasks,
final Set<TaskId> statefulTasks,
final AssignmentConfigs configs) {
@@ -87,14 +86,14 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
return false;
}
- final Map<ID, List<TaskId>> warmupTaskAssignment = initializeEmptyTaskAssignmentMap();
- final Map<ID, List<TaskId>> standbyTaskAssignment = initializeEmptyTaskAssignmentMap();
- final Map<ID, List<TaskId>> statelessActiveTaskAssignment = initializeEmptyTaskAssignmentMap();
+ final Map<UUID, List<TaskId>> warmupTaskAssignment = initializeEmptyTaskAssignmentMap(sortedClients);
+ final Map<UUID, List<TaskId>> standbyTaskAssignment = initializeEmptyTaskAssignmentMap(sortedClients);
+ final Map<UUID, List<TaskId>> statelessActiveTaskAssignment = initializeEmptyTaskAssignmentMap(sortedClients);
// ---------------- Stateful Active Tasks ---------------- //
- final Map<ID, List<TaskId>> statefulActiveTaskAssignment =
- new DefaultStateConstrainedBalancedAssignor<ID>().assign(
+ final Map<UUID, List<TaskId>> statefulActiveTaskAssignment =
+ new DefaultStateConstrainedBalancedAssignor().assign(
statefulTasksToRankedCandidates,
configs.balanceFactor,
sortedClients,
@@ -103,30 +102,32 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
// ---------------- Warmup Replica Tasks ---------------- //
- final Map<ID, List<TaskId>> balancedStatefulActiveTaskAssignment =
- new DefaultBalancedAssignor<ID>().assign(
+ final Map<UUID, List<TaskId>> balancedStatefulActiveTaskAssignment =
+ new DefaultBalancedAssignor().assign(
sortedClients,
statefulTasks,
clientsToNumberOfThreads,
configs.balanceFactor);
- final List<Movement<ID>> movements =
- getMovements(statefulActiveTaskAssignment, balancedStatefulActiveTaskAssignment,
- configs.maxWarmupReplicas);
- for (final Movement<ID> movement : movements) {
+ final List<TaskMovement> movements = getMovements(
+ statefulActiveTaskAssignment,
+ balancedStatefulActiveTaskAssignment,
+ configs.maxWarmupReplicas);
+
+ for (final TaskMovement movement : movements) {
warmupTaskAssignment.get(movement.destination).add(movement.task);
}
// ---------------- Standby Replica Tasks ---------------- //
- final List<Map<ID, List<TaskId>>> allTaskAssignments = asList(
+ final List<Map<UUID, List<TaskId>>> allTaskAssignments = asList(
statefulActiveTaskAssignment,
warmupTaskAssignment,
standbyTaskAssignment,
statelessActiveTaskAssignment
);
- final ValidClientsByTaskLoadQueue<ID> clientsByStandbyTaskLoad =
+ final ValidClientsByTaskLoadQueue<UUID> clientsByStandbyTaskLoad =
new ValidClientsByTaskLoadQueue<>(
configs.numStandbyReplicas,
getClientPriorityQueueByTaskLoad(allTaskAssignments),
@@ -134,8 +135,8 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
);
for (final TaskId task : statefulTasksToRankedCandidates.keySet()) {
- final List<ID> clients = clientsByStandbyTaskLoad.poll(task);
- for (final ID client : clients) {
+ final List<UUID> clients = clientsByStandbyTaskLoad.poll(task);
+ for (final UUID client : clients) {
standbyTaskAssignment.get(client).add(task);
}
clientsByStandbyTaskLoad.offer(clients);
@@ -151,10 +152,10 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
// ---------------- Stateless Active Tasks ---------------- //
- final PriorityQueue<ID> statelessActiveTaskClientsQueue = getClientPriorityQueueByTaskLoad(allTaskAssignments);
+ final PriorityQueue<UUID> statelessActiveTaskClientsQueue = getClientPriorityQueueByTaskLoad(allTaskAssignments);
for (final TaskId task : statelessTasks) {
- final ID client = statelessActiveTaskClientsQueue.poll();
+ final UUID client = statelessActiveTaskClientsQueue.poll();
statelessActiveTaskAssignment.get(client).add(task);
statelessActiveTaskClientsQueue.offer(client);
}
@@ -170,47 +171,6 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
}
/**
- * Returns a list of the movements of tasks from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment
- * @param statefulActiveTaskAssignment the initial assignment, with source clients
- * @param balancedStatefulActiveTaskAssignment the final assignment, with destination clients
- */
- static <ID> List<Movement<ID>> getMovements(final Map<ID, List<TaskId>> statefulActiveTaskAssignment,
- final Map<ID, List<TaskId>> balancedStatefulActiveTaskAssignment,
- final int maxWarmupReplicas) {
- if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) {
- throw new IllegalStateException("Tried to compute movements but assignments differ in size.");
- }
-
- final Map<TaskId, ID> taskToDestinationClient = new HashMap<>();
- for (final Map.Entry<ID, List<TaskId>> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) {
- final ID destination = clientEntry.getKey();
- for (final TaskId task : clientEntry.getValue()) {
- taskToDestinationClient.put(task, destination);
- }
- }
-
- final List<Movement<ID>> movements = new LinkedList<>();
- for (final Map.Entry<ID, List<TaskId>> sourceClientEntry : statefulActiveTaskAssignment.entrySet()) {
- final ID source = sourceClientEntry.getKey();
-
- for (final TaskId task : sourceClientEntry.getValue()) {
- final ID destination = taskToDestinationClient.get(task);
- if (destination == null) {
- log.error("Task {} is assigned to client {} in initial assignment but has no owner in the final " +
- "balanced assignment.", task, source);
- throw new IllegalStateException("Found task in initial assignment that was not assigned in the final.");
- } else if (!source.equals(destination)) {
- movements.add(new Movement<>(task, source, destination));
- if (movements.size() == maxWarmupReplicas) {
- return movements;
- }
- }
- }
- }
- return movements;
- }
-
- /**
* @return true iff all active tasks with caught-up client are assigned to one of them, and all tasks are assigned
*/
boolean previousAssignmentIsValid() {
@@ -221,8 +181,8 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
new HashMap<>(statefulTasksToRankedCandidates.keySet().stream()
.collect(Collectors.toMap(task -> task, task -> configs.numStandbyReplicas)));
- for (final Map.Entry<ID, ClientState> clientEntry : clientStates.entrySet()) {
- final ID client = clientEntry.getKey();
+ for (final Map.Entry<UUID, ClientState> clientEntry : clientStates.entrySet()) {
+ final UUID client = clientEntry.getKey();
final ClientState state = clientEntry.getValue();
final Set<TaskId> prevActiveTasks = state.prevActiveTasks();
@@ -253,13 +213,13 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
/**
* @return true if this client is caught-up for this task, or the task has no caught-up clients
*/
- boolean taskIsCaughtUpOnClient(final TaskId task, final ID client) {
+ boolean taskIsCaughtUpOnClient(final TaskId task, final UUID client) {
boolean hasNoCaughtUpClients = true;
- final SortedSet<RankedClient<ID>> rankedClients = statefulTasksToRankedCandidates.get(task);
+ final SortedSet<RankedClient> rankedClients = statefulTasksToRankedCandidates.get(task);
if (rankedClients == null) {
return true;
}
- for (final RankedClient<ID> rankedClient : rankedClients) {
+ for (final RankedClient rankedClient : rankedClients) {
if (rankedClient.rank() <= 0L) {
if (rankedClient.clientId().equals(client)) {
return true;
@@ -277,46 +237,6 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
}
/**
- * Rankings are computed as follows, with lower being more caught up:
- * Rank -1: active running task
- * Rank 0: standby or restoring task whose overall lag is within the acceptableRecoveryLag bounds
- * Rank 1: tasks whose lag is unknown, eg because it was not encoded in an older version subscription.
- * Since it may have been caught-up, we rank it higher than clients whom we know are not caught-up
- * to give it priority without classifying it as caught-up and risking violating high availability
- * Rank 1+: all other tasks are ranked according to their actual total lag
- * @return Sorted set of all client candidates for each stateful task, ranked by their overall lag. Tasks are
- */
- static <ID extends Comparable<ID>> SortedMap<TaskId, SortedSet<RankedClient<ID>>> buildClientRankingsByTask(final Set<TaskId> statefulTasks,
- final Map<ID, ClientState> clientStates,
- final long acceptableRecoveryLag) {
- final SortedMap<TaskId, SortedSet<RankedClient<ID>>> statefulTasksToRankedCandidates = new TreeMap<>();
-
- for (final TaskId task : statefulTasks) {
- final SortedSet<RankedClient<ID>> rankedClientCandidates = new TreeSet<>();
- statefulTasksToRankedCandidates.put(task, rankedClientCandidates);
-
- for (final Map.Entry<ID, ClientState> clientEntry : clientStates.entrySet()) {
- final ID clientId = clientEntry.getKey();
- final long taskLag = clientEntry.getValue().lagFor(task);
- final long clientRank;
- if (taskLag == Task.LATEST_OFFSET) {
- clientRank = Task.LATEST_OFFSET;
- } else if (taskLag == UNKNOWN_OFFSET_SUM) {
- clientRank = 1L;
- } else if (taskLag <= acceptableRecoveryLag) {
- clientRank = 0L;
- } else {
- clientRank = taskLag;
- }
- rankedClientCandidates.add(new RankedClient<>(clientId, clientRank));
- }
- }
- log.trace("Computed statefulTasksToRankedCandidates map as {}", statefulTasksToRankedCandidates);
-
- return statefulTasksToRankedCandidates;
- }
-
- /**
* Compute the balance factor as the difference in stateful active task count per thread between the most and
* least loaded clients
*/
@@ -357,21 +277,21 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
}
}
- private Map<ID, List<TaskId>> initializeEmptyTaskAssignmentMap() {
- return sortedClients.stream().collect(Collectors.toMap(id -> id, id -> new ArrayList<>()));
+ private static Map<UUID, List<TaskId>> initializeEmptyTaskAssignmentMap(final Set<UUID> clients) {
+ return clients.stream().collect(Collectors.toMap(id -> id, id -> new ArrayList<>()));
}
- private void assignActiveTasksToClients(final Map<ID, List<TaskId>> activeTasks) {
- for (final Map.Entry<ID, ClientState> clientEntry : clientStates.entrySet()) {
- final ID clientId = clientEntry.getKey();
+ private void assignActiveTasksToClients(final Map<UUID, List<TaskId>> activeTasks) {
+ for (final Map.Entry<UUID, ClientState> clientEntry : clientStates.entrySet()) {
+ final UUID clientId = clientEntry.getKey();
final ClientState state = clientEntry.getValue();
state.assignActiveTasks(activeTasks.get(clientId));
}
}
- private void assignStandbyTasksToClients(final Map<ID, List<TaskId>> standbyTasks) {
- for (final Map.Entry<ID, ClientState> clientEntry : clientStates.entrySet()) {
- final ID clientId = clientEntry.getKey();
+ private void assignStandbyTasksToClients(final Map<UUID, List<TaskId>> standbyTasks) {
+ for (final Map.Entry<UUID, ClientState> clientEntry : clientStates.entrySet()) {
+ final UUID clientId = clientEntry.getKey();
final ClientState state = clientEntry.getValue();
state.assignStandbyTasks(standbyTasks.get(clientId));
}
@@ -384,8 +304,8 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
}
}
- private PriorityQueue<ID> getClientPriorityQueueByTaskLoad(final List<Map<ID, List<TaskId>>> taskLoadsByClient) {
- final PriorityQueue<ID> queue = new PriorityQueue<>(
+ private PriorityQueue<UUID> getClientPriorityQueueByTaskLoad(final List<Map<UUID, List<TaskId>>> taskLoadsByClient) {
+ final PriorityQueue<UUID> queue = new PriorityQueue<>(
(client, other) -> {
final int clientTasksPerThread = tasksPerThread(client, taskLoadsByClient);
final int otherTasksPerThread = tasksPerThread(other, taskLoadsByClient);
@@ -400,102 +320,25 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
return queue;
}
- private int tasksPerThread(final ID client, final List<Map<ID, List<TaskId>>> taskLoadsByClient) {
+ private int tasksPerThread(final UUID client, final List<Map<UUID, List<TaskId>>> taskLoadsByClient) {
double numTasks = 0;
- for (final Map<ID, List<TaskId>> assignment : taskLoadsByClient) {
+ for (final Map<UUID, List<TaskId>> assignment : taskLoadsByClient) {
numTasks += assignment.get(client).size();
}
return (int) Math.ceil(numTasks / clientsToNumberOfThreads.get(client));
}
-
- static class RankedClient<ID extends Comparable<? super ID>> implements Comparable<RankedClient<ID>> {
- private final ID clientId;
- private final long rank;
-
- RankedClient(final ID clientId, final long rank) {
- this.clientId = clientId;
- this.rank = rank;
- }
-
- ID clientId() {
- return clientId;
- }
-
- long rank() {
- return rank;
- }
-
- @Override
- public int compareTo(final RankedClient<ID> clientIdAndLag) {
- if (rank < clientIdAndLag.rank) {
- return -1;
- } else if (rank > clientIdAndLag.rank) {
- return 1;
- } else {
- return clientId.compareTo(clientIdAndLag.clientId);
- }
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- final RankedClient<?> that = (RankedClient<?>) o;
- return rank == that.rank && Objects.equals(clientId, that.clientId);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(clientId, rank);
- }
- }
-
- static class Movement<ID> {
- final TaskId task;
- final ID source;
- final ID destination;
-
- Movement(final TaskId task, final ID source, final ID destination) {
- this.task = task;
- this.source = source;
- this.destination = destination;
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- final Movement<?> movement = (Movement<?>) o;
- return Objects.equals(task, movement.task) &&
- Objects.equals(source, movement.source) &&
- Objects.equals(destination, movement.destination);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(task, source, destination);
- }
- }
-
+
/**
* Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment
*/
- static class ValidClientsByTaskLoadQueue<ID> {
+ static class ValidClientsByTaskLoadQueue<UUID> {
private final int numClientsPerTask;
- private final PriorityQueue<ID> clientsByTaskLoad;
- private final List<Map<ID, List<TaskId>>> allStatefulTaskAssignments;
+ private final PriorityQueue<UUID> clientsByTaskLoad;
+ private final List<Map<UUID, List<TaskId>>> allStatefulTaskAssignments;
ValidClientsByTaskLoadQueue(final int numClientsPerTask,
- final PriorityQueue<ID> clientsByTaskLoad,
- final List<Map<ID, List<TaskId>>> allStatefulTaskAssignments) {
+ final PriorityQueue<UUID> clientsByTaskLoad,
+ final List<Map<UUID, List<TaskId>>> allStatefulTaskAssignments) {
this.numClientsPerTask = numClientsPerTask;
this.clientsByTaskLoad = clientsByTaskLoad;
this.allStatefulTaskAssignments = allStatefulTaskAssignments;
@@ -505,11 +348,11 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
* @return the next N <= {@code numClientsPerTask} clients in the underlying priority queue that are valid
* candidates for the given task (ie do not already have any version of this task assigned)
*/
- List<ID> poll(final TaskId task) {
- final List<ID> nextLeastLoadedValidClients = new LinkedList<>();
- final Set<ID> invalidPolledClients = new HashSet<>();
+ List<UUID> poll(final TaskId task) {
+ final List<UUID> nextLeastLoadedValidClients = new LinkedList<>();
+ final Set<UUID> invalidPolledClients = new HashSet<>();
while (nextLeastLoadedValidClients.size() < numClientsPerTask) {
- ID candidateClient;
+ UUID candidateClient;
while (true) {
candidateClient = clientsByTaskLoad.poll();
if (candidateClient == null) {
@@ -529,12 +372,12 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
return nextLeastLoadedValidClients;
}
- void offer(final Collection<ID> clients) {
+ void offer(final Collection<UUID> clients) {
returnPolledClientsToQueue(clients);
}
- private boolean canBeAssignedToClient(final TaskId task, final ID client) {
- for (final Map<ID, List<TaskId>> taskAssignment : allStatefulTaskAssignments) {
+ private boolean canBeAssignedToClient(final TaskId task, final UUID client) {
+ for (final Map<UUID, List<TaskId>> taskAssignment : allStatefulTaskAssignments) {
if (taskAssignment.get(client).contains(task)) {
return false;
}
@@ -542,8 +385,8 @@ public class HighAvailabilityTaskAssignor<ID extends Comparable<ID>> implements
return true;
}
- private void returnPolledClientsToQueue(final Collection<ID> polledClients) {
- for (final ID client : polledClients) {
+ private void returnPolledClientsToQueue(final Collection<UUID> polledClients) {
+ for (final UUID client : polledClients) {
clientsByTaskLoad.offer(client);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RankedClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RankedClient.java
new file mode 100644
index 0000000..d73bf2a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RankedClient.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RankedClient implements Comparable<RankedClient> {
+ private static final Logger log = LoggerFactory.getLogger(RankedClient.class);
+
+ private final UUID clientId;
+ private final long rank;
+
+ RankedClient(final UUID clientId, final long rank) {
+ this.clientId = clientId;
+ this.rank = rank;
+ }
+
+ UUID clientId() {
+ return clientId;
+ }
+
+ long rank() {
+ return rank;
+ }
+
+ @Override
+ public int compareTo(final RankedClient clientIdAndLag) {
+ if (rank < clientIdAndLag.rank) {
+ return -1;
+ } else if (rank > clientIdAndLag.rank) {
+ return 1;
+ } else {
+ return clientId.compareTo(clientIdAndLag.clientId);
+ }
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final RankedClient that = (RankedClient) o;
+ return rank == that.rank && Objects.equals(clientId, that.clientId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(clientId, rank);
+ }
+
+ /**
+ * Rankings are computed as follows, with lower being more caught up:
+ * Rank -1: active running task
+ * Rank 0: standby or restoring task whose overall lag is within the acceptableRecoveryLag bounds
+ * Rank 1: tasks whose lag is unknown, eg because it was not encoded in an older version subscription.
+ * Since it may have been caught-up, we rank it higher than clients whom we know are not caught-up
+ * to give it priority without classifying it as caught-up and risking violating high availability
+ * Rank 1+: all other tasks are ranked according to their actual total lag
+ * @return Sorted set of all client candidates for each stateful task, ranked by their overall lag. Tasks are
+ */
+ static SortedMap<TaskId, SortedSet<RankedClient>> buildClientRankingsByTask(final Set<TaskId> statefulTasks,
+ final Map<UUID, ClientState> clientStates,
+ final long acceptableRecoveryLag) {
+ final SortedMap<TaskId, SortedSet<RankedClient>> statefulTasksToRankedCandidates = new TreeMap<>();
+
+ for (final TaskId task : statefulTasks) {
+ final SortedSet<RankedClient> rankedClientCandidates = new TreeSet<>();
+ statefulTasksToRankedCandidates.put(task, rankedClientCandidates);
+
+ for (final Map.Entry<UUID, ClientState> clientEntry : clientStates.entrySet()) {
+ final UUID clientId = clientEntry.getKey();
+ final long taskLag = clientEntry.getValue().lagFor(task);
+ final long clientRank;
+ if (taskLag == Task.LATEST_OFFSET) {
+ clientRank = Task.LATEST_OFFSET;
+ } else if (taskLag == UNKNOWN_OFFSET_SUM) {
+ clientRank = 1L;
+ } else if (taskLag <= acceptableRecoveryLag) {
+ clientRank = 0L;
+ } else {
+ clientRank = taskLag;
+ }
+ rankedClientCandidates.add(new RankedClient(clientId, clientRank));
+ }
+ }
+ log.trace("Computed statefulTasksToRankedCandidates map as {}", statefulTasksToRankedCandidates);
+
+ return statefulTasksToRankedCandidates;
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StateConstrainedBalancedAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StateConstrainedBalancedAssignor.java
index 4bc5309..8f8921a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StateConstrainedBalancedAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StateConstrainedBalancedAssignor.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
+import java.util.UUID;
import org.apache.kafka.streams.processor.TaskId;
import java.util.List;
@@ -23,12 +24,11 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
-import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor.RankedClient;
-public interface StateConstrainedBalancedAssignor<ID extends Comparable<? super ID>> {
+public interface StateConstrainedBalancedAssignor {
- Map<ID, List<TaskId>> assign(final SortedMap<TaskId, SortedSet<RankedClient<ID>>> statefulTasksToRankedClients,
- final int balanceFactor,
- final Set<ID> clients,
- final Map<ID, Integer> clientsToNumberOfStreamThreads);
+ Map<UUID, List<TaskId>> assign(final SortedMap<TaskId, SortedSet<RankedClient>> statefulTasksToRankedClients,
+ final int balanceFactor,
+ final Set<UUID> clients,
+ final Map<UUID, Integer> clientsToNumberOfStreamThreads);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 82de63b..2f2c77e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
+import java.util.UUID;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.slf4j.Logger;
@@ -32,20 +33,20 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
-public class StickyTaskAssignor<ID> implements TaskAssignor<ID> {
+public class StickyTaskAssignor implements TaskAssignor {
private static final Logger log = LoggerFactory.getLogger(StickyTaskAssignor.class);
- private final Map<ID, ClientState> clients;
+ private final Map<UUID, ClientState> clients;
private final Set<TaskId> allTaskIds;
private final Set<TaskId> standbyTaskIds;
- private final Map<TaskId, ID> previousActiveTaskAssignment = new HashMap<>();
- private final Map<TaskId, Set<ID>> previousStandbyTaskAssignment = new HashMap<>();
+ private final Map<TaskId, UUID> previousActiveTaskAssignment = new HashMap<>();
+ private final Map<TaskId, Set<UUID>> previousStandbyTaskAssignment = new HashMap<>();
private final TaskPairs taskPairs;
private final int numStandbyReplicas;
private final boolean mustPreserveActiveTaskAssignment;
- public StickyTaskAssignor(final Map<ID, ClientState> clients,
+ public StickyTaskAssignor(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> standbyTaskIds,
final AssignmentConfigs configs,
@@ -71,7 +72,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID> {
private void assignStandby(final int numStandbyReplicas) {
for (final TaskId taskId : standbyTaskIds) {
for (int i = 0; i < numStandbyReplicas; i++) {
- final Set<ID> ids = findClientsWithoutAssignedTask(taskId);
+ final Set<UUID> ids = findClientsWithoutAssignedTask(taskId);
if (ids.isEmpty()) {
log.warn("Unable to assign {} of {} standby tasks for task [{}]. " +
"There is not enough available capacity. You should " +
@@ -93,7 +94,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID> {
// first try and re-assign existing active tasks to clients that previously had
// the same active task
- for (final Map.Entry<TaskId, ID> entry : previousActiveTaskAssignment.entrySet()) {
+ for (final Map.Entry<TaskId, UUID> entry : previousActiveTaskAssignment.entrySet()) {
final TaskId taskId = entry.getKey();
if (allTaskIds.contains(taskId)) {
final ClientState client = clients.get(entry.getValue());
@@ -110,9 +111,9 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID> {
// have seen the task.
for (final Iterator<TaskId> iterator = unassigned.iterator(); iterator.hasNext(); ) {
final TaskId taskId = iterator.next();
- final Set<ID> clientIds = previousStandbyTaskAssignment.get(taskId);
+ final Set<UUID> clientIds = previousStandbyTaskAssignment.get(taskId);
if (clientIds != null) {
- for (final ID clientId : clientIds) {
+ for (final UUID clientId : clientIds) {
final ClientState client = clients.get(clientId);
if (client.hasUnfulfilledQuota(tasksPerThread)) {
assignTaskToClient(assigned, taskId, client);
@@ -131,7 +132,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID> {
}
}
- private void allocateTaskWithClientCandidates(final TaskId taskId, final Set<ID> clientsWithin, final boolean active) {
+ private void allocateTaskWithClientCandidates(final TaskId taskId, final Set<UUID> clientsWithin, final boolean active) {
final ClientState client = findClient(taskId, clientsWithin);
taskPairs.addPairs(taskId, client.assignedTasks());
if (active) {
@@ -147,9 +148,9 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID> {
assigned.add(taskId);
}
- private Set<ID> findClientsWithoutAssignedTask(final TaskId taskId) {
- final Set<ID> clientIds = new HashSet<>();
- for (final Map.Entry<ID, ClientState> client : clients.entrySet()) {
+ private Set<UUID> findClientsWithoutAssignedTask(final TaskId taskId) {
+ final Set<UUID> clientIds = new HashSet<>();
+ for (final Map.Entry<UUID, ClientState> client : clients.entrySet()) {
if (!client.getValue().hasAssignedTask(taskId)) {
clientIds.add(client.getKey());
}
@@ -158,7 +159,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID> {
}
- private ClientState findClient(final TaskId taskId, final Set<ID> clientsWithin) {
+ private ClientState findClient(final TaskId taskId, final Set<UUID> clientsWithin) {
// optimize the case where there is only 1 id to search within.
if (clientsWithin.size() == 1) {
@@ -194,25 +195,25 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID> {
return false;
}
- private ClientState findClientsWithPreviousAssignedTask(final TaskId taskId, final Set<ID> clientsWithin) {
- final ID previous = previousActiveTaskAssignment.get(taskId);
+ private ClientState findClientsWithPreviousAssignedTask(final TaskId taskId, final Set<UUID> clientsWithin) {
+ final UUID previous = previousActiveTaskAssignment.get(taskId);
if (previous != null && clientsWithin.contains(previous)) {
return clients.get(previous);
}
return findLeastLoadedClientWithPreviousStandByTask(taskId, clientsWithin);
}
- private ClientState findLeastLoadedClientWithPreviousStandByTask(final TaskId taskId, final Set<ID> clientsWithin) {
- final Set<ID> ids = previousStandbyTaskAssignment.get(taskId);
+ private ClientState findLeastLoadedClientWithPreviousStandByTask(final TaskId taskId, final Set<UUID> clientsWithin) {
+ final Set<UUID> ids = previousStandbyTaskAssignment.get(taskId);
if (ids == null) {
return null;
}
- final HashSet<ID> constrainTo = new HashSet<>(ids);
+ final HashSet<UUID> constrainTo = new HashSet<>(ids);
constrainTo.retainAll(clientsWithin);
return leastLoaded(taskId, constrainTo);
}
- private ClientState leastLoaded(final TaskId taskId, final Set<ID> clientIds) {
+ private ClientState leastLoaded(final TaskId taskId, final Set<UUID> clientIds) {
final ClientState leastLoaded = findLeastLoaded(taskId, clientIds, true);
if (leastLoaded == null) {
return findLeastLoaded(taskId, clientIds, false);
@@ -221,10 +222,10 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID> {
}
private ClientState findLeastLoaded(final TaskId taskId,
- final Set<ID> clientIds,
+ final Set<UUID> clientIds,
final boolean checkTaskPairs) {
ClientState leastLoaded = null;
- for (final ID id : clientIds) {
+ for (final UUID id : clientIds) {
final ClientState client = clients.get(id);
if (client.assignedTaskCount() == 0) {
return client;
@@ -243,8 +244,8 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID> {
}
- private void mapPreviousTaskAssignment(final Map<ID, ClientState> clients) {
- for (final Map.Entry<ID, ClientState> clientState : clients.entrySet()) {
+ private void mapPreviousTaskAssignment(final Map<UUID, ClientState> clients) {
+ for (final Map.Entry<UUID, ClientState> clientState : clients.entrySet()) {
for (final TaskId activeTask : clientState.getValue().prevActiveTasks()) {
previousActiveTaskAssignment.put(activeTask, clientState.getKey());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
index 97db6bd..cbecc24 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
-public interface TaskAssignor<ID> {
+public interface TaskAssignor {
/**
* @return whether the generated assignment requires a followup rebalance to satisfy all conditions
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
new file mode 100644
index 0000000..872c9f6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskMovement {
+ private static final Logger log = LoggerFactory.getLogger(TaskMovement.class);
+
+ final TaskId task;
+ final UUID source;
+ final UUID destination;
+
+ TaskMovement(final TaskId task, final UUID source, final UUID destination) {
+ this.task = task;
+ this.source = source;
+ this.destination = destination;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final TaskMovement movement = (TaskMovement) o;
+ return Objects.equals(task, movement.task) &&
+ Objects.equals(source, movement.source) &&
+ Objects.equals(destination, movement.destination);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(task, source, destination);
+ }
+
+ /**
+ * Returns a list of the movements of tasks from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment
+ * @param statefulActiveTaskAssignment the initial assignment, with source clients
+ * @param balancedStatefulActiveTaskAssignment the final assignment, with destination clients
+ */
+ static List<TaskMovement> getMovements(final Map<UUID, List<TaskId>> statefulActiveTaskAssignment,
+ final Map<UUID, List<TaskId>> balancedStatefulActiveTaskAssignment,
+ final int maxWarmupReplicas) {
+ if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) {
+ throw new IllegalStateException("Tried to compute movements but assignments differ in size.");
+ }
+
+ final Map<TaskId, UUID> taskToDestinationClient = new HashMap<>();
+ for (final Map.Entry<UUID, List<TaskId>> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) {
+ final UUID destination = clientEntry.getKey();
+ for (final TaskId task : clientEntry.getValue()) {
+ taskToDestinationClient.put(task, destination);
+ }
+ }
+
+ final List<TaskMovement> movements = new LinkedList<>();
+ for (final Map.Entry<UUID, List<TaskId>> sourceClientEntry : statefulActiveTaskAssignment.entrySet()) {
+ final UUID source = sourceClientEntry.getKey();
+
+ for (final TaskId task : sourceClientEntry.getValue()) {
+ final UUID destination = taskToDestinationClient.get(task);
+ if (destination == null) {
+ log.error("Task {} is assigned to client {} in initial assignment but has no owner in the final " +
+ "balanced assignment.", task, source);
+ throw new IllegalStateException("Found task in initial assignment that was not assigned in the final.");
+ } else if (!source.equals(destination)) {
+ movements.add(new TaskMovement(task, source, destination));
+ if (movements.size() == maxWarmupReplicas) {
+ return movements;
+ }
+ }
+ }
+ }
+ return movements;
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 520a693..a4a93a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -83,6 +83,23 @@ import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_3;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASK_OFFSET_SUMS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_3;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_3;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_0;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
@@ -104,6 +121,8 @@ public class StreamsPartitionAssignorTest {
private static final String CONSUMER_2 = "consumer2";
private static final String CONSUMER_3 = "consumer3";
private static final String CONSUMER_4 = "consumer4";
+
+ private final Set<String> allTopics = mkSet("topic1", "topic2");
private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
@@ -122,38 +141,23 @@ public class StreamsPartitionAssignorTest {
private final TopicPartition t4p2 = new TopicPartition("topic4", 2);
private final TopicPartition t4p3 = new TopicPartition("topic4", 3);
- private final TaskId task0_0 = new TaskId(0, 0);
- private final TaskId task0_1 = new TaskId(0, 1);
- private final TaskId task0_2 = new TaskId(0, 2);
- private final TaskId task0_3 = new TaskId(0, 3);
- private final TaskId task1_0 = new TaskId(1, 0);
- private final TaskId task1_1 = new TaskId(1, 1);
- private final TaskId task1_2 = new TaskId(1, 2);
- private final TaskId task1_3 = new TaskId(1, 3);
- private final TaskId task2_0 = new TaskId(2, 0);
- private final TaskId task2_1 = new TaskId(2, 1);
- private final TaskId task2_2 = new TaskId(2, 2);
- private final TaskId task2_3 = new TaskId(2, 3);
-
private final Map<TaskId, Set<TopicPartition>> partitionsForTask = mkMap(
- mkEntry(task0_0, mkSet(t1p0, t2p0)),
- mkEntry(task0_1, mkSet(t1p1, t2p1)),
- mkEntry(task0_2, mkSet(t1p2, t2p2)),
- mkEntry(task0_3, mkSet(t1p3, t2p3)),
-
- mkEntry(task1_0, mkSet(t3p0)),
- mkEntry(task1_1, mkSet(t3p1)),
- mkEntry(task1_2, mkSet(t3p2)),
- mkEntry(task1_3, mkSet(t3p3)),
-
- mkEntry(task2_0, mkSet(t4p0)),
- mkEntry(task2_1, mkSet(t4p1)),
- mkEntry(task2_2, mkSet(t4p2)),
- mkEntry(task2_3, mkSet(t4p3))
+ mkEntry(TASK_0_0, mkSet(t1p0, t2p0)),
+ mkEntry(TASK_0_1, mkSet(t1p1, t2p1)),
+ mkEntry(TASK_0_2, mkSet(t1p2, t2p2)),
+ mkEntry(TASK_0_3, mkSet(t1p3, t2p3)),
+
+ mkEntry(TASK_1_0, mkSet(t3p0)),
+ mkEntry(TASK_1_1, mkSet(t3p1)),
+ mkEntry(TASK_1_2, mkSet(t3p2)),
+ mkEntry(TASK_1_3, mkSet(t3p3)),
+
+ mkEntry(TASK_2_0, mkSet(t4p0)),
+ mkEntry(TASK_2_1, mkSet(t4p1)),
+ mkEntry(TASK_2_2, mkSet(t4p2)),
+ mkEntry(TASK_2_3, mkSet(t4p3))
);
- private final Set<String> allTopics = mkSet("topic1", "topic2");
-
private final List<PartitionInfo> infos = asList(
new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
@@ -167,14 +171,7 @@ public class StreamsPartitionAssignorTest {
new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
);
- private final Set<TaskId> emptyTasks = emptySet();
- private final Map<TaskId, Long> emptyTaskOffsetSums = emptyMap();
- private final Map<TopicPartition, Long> emptyChangelogEndOffsets = new HashMap<>();
-
- private final UUID uuid1 = UUID.randomUUID();
- private final UUID uuid2 = UUID.randomUUID();
-
- private final SubscriptionInfo defaultSubscriptionInfo = getInfo(uuid1, emptyTasks, emptyTasks);
+ private final SubscriptionInfo defaultSubscriptionInfo = getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS);
private final Cluster metadata = new Cluster(
"cluster",
@@ -232,13 +229,13 @@ public class StreamsPartitionAssignorTest {
}
private void createDefaultMockTaskManager() {
- createMockTaskManager(emptyTaskOffsetSums, uuid1);
+ createMockTaskManager(EMPTY_TASK_OFFSET_SUMS, UUID_1);
}
// Useful for tests that don't care about the task offset sums
private void createMockTaskManager(final Set<TaskId> activeTasks,
final Set<TaskId> standbyTasks) {
- createMockTaskManager(getTaskOffsetSums(activeTasks, standbyTasks), uuid1);
+ createMockTaskManager(getTaskOffsetSums(activeTasks, standbyTasks), UUID_1);
}
private void createMockTaskManager(final Map<TaskId, Long> taskOffsetSums,
@@ -290,7 +287,7 @@ public class StreamsPartitionAssignorTest {
public StreamsPartitionAssignorTest(final boolean highAvailabilityEnabled) {
this.highAvailabilityEnabled = highAvailabilityEnabled;
- createMockAdminClient(emptyChangelogEndOffsets);
+ createMockAdminClient(EMPTY_CHANGELOG_END_OFFSETS);
}
@Test
@@ -315,12 +312,13 @@ public class StreamsPartitionAssignorTest {
public void shouldProduceStickyAndBalancedAssignmentWhenNothingChanges() {
configureDefault();
final ClientState state = new ClientState();
- final List<TaskId> allTasks = asList(task0_0, task0_1, task0_2, task0_3, task1_0, task1_1, task1_2, task1_3);
+ final List<TaskId> allTasks = asList(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2,
+ TASK_1_3);
final Map<String, List<TaskId>> previousAssignment = mkMap(
- mkEntry(CONSUMER_1, asList(task0_0, task1_1, task1_3)),
- mkEntry(CONSUMER_2, asList(task0_3, task1_0)),
- mkEntry(CONSUMER_3, asList(task0_1, task0_2, task1_2))
+ mkEntry(CONSUMER_1, asList(TASK_0_0, TASK_1_1, TASK_1_3)),
+ mkEntry(CONSUMER_2, asList(TASK_0_3, TASK_1_0)),
+ mkEntry(CONSUMER_3, asList(TASK_0_1, TASK_0_2, TASK_1_2))
);
for (final Map.Entry<String, List<TaskId>> entry : previousAssignment.entrySet()) {
@@ -348,12 +346,13 @@ public class StreamsPartitionAssignorTest {
configureDefault();
final ClientState state = new ClientState();
- final Set<TaskId> allTasks = mkSet(task0_0, task0_1, task0_2, task0_3, task1_0, task1_1, task1_2, task1_3);
+ final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2,
+ TASK_1_3);
final Map<String, List<TaskId>> previousAssignment = mkMap(
- mkEntry(CONSUMER_1, new ArrayList<>(asList(task0_0, task1_1, task1_3))),
- mkEntry(CONSUMER_2, new ArrayList<>(asList(task0_3, task1_0))),
- mkEntry(CONSUMER_3, new ArrayList<>(asList(task0_1, task0_2, task1_2)))
+ mkEntry(CONSUMER_1, new ArrayList<>(asList(TASK_0_0, TASK_1_1, TASK_1_3))),
+ mkEntry(CONSUMER_2, new ArrayList<>(asList(TASK_0_3, TASK_1_0))),
+ mkEntry(CONSUMER_3, new ArrayList<>(asList(TASK_0_1, TASK_0_2, TASK_1_2)))
);
for (final Map.Entry<String, List<TaskId>> entry : previousAssignment.entrySet()) {
@@ -365,7 +364,7 @@ public class StreamsPartitionAssignorTest {
final Set<String> consumers = mkSet(CONSUMER_1, CONSUMER_2, CONSUMER_3);
// We should be able to add a new task without sacrificing stickyness
- final TaskId newTask = task2_0;
+ final TaskId newTask = TASK_2_0;
allTasks.add(newTask);
state.assignActiveTasks(allTasks);
@@ -381,12 +380,13 @@ public class StreamsPartitionAssignorTest {
configureDefault();
final ClientState state = new ClientState();
- final List<TaskId> allTasks = asList(task0_0, task0_1, task0_2, task0_3, task1_0, task1_1, task1_2, task1_3);
+ final List<TaskId> allTasks = asList(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2,
+ TASK_1_3);
final Map<String, List<TaskId>> previousAssignment = mkMap(
- mkEntry(CONSUMER_1, asList(task0_0, task1_1, task1_3)),
- mkEntry(CONSUMER_2, asList(task0_3, task1_0)),
- mkEntry(CONSUMER_3, asList(task0_1, task0_2, task1_2))
+ mkEntry(CONSUMER_1, asList(TASK_0_0, TASK_1_1, TASK_1_3)),
+ mkEntry(CONSUMER_2, asList(TASK_0_3, TASK_1_0)),
+ mkEntry(CONSUMER_3, asList(TASK_0_1, TASK_0_2, TASK_1_2))
);
for (final Map.Entry<String, List<TaskId>> entry : previousAssignment.entrySet()) {
@@ -408,12 +408,13 @@ public class StreamsPartitionAssignorTest {
configureDefault();
final ClientState state = new ClientState();
- final List<TaskId> allTasks = asList(task0_0, task0_1, task0_2, task0_3, task1_0, task1_1, task1_2, task1_3);
+ final List<TaskId> allTasks = asList(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2,
+ TASK_1_3);
final Map<String, List<TaskId>> previousAssignment = mkMap(
- mkEntry(CONSUMER_1, new ArrayList<>(asList(task1_1, task1_3))),
- mkEntry(CONSUMER_2, new ArrayList<>(asList(task0_3, task1_0))),
- mkEntry(CONSUMER_3, new ArrayList<>(asList(task0_1, task0_2, task1_2)))
+ mkEntry(CONSUMER_1, new ArrayList<>(asList(TASK_1_1, TASK_1_3))),
+ mkEntry(CONSUMER_2, new ArrayList<>(asList(TASK_0_3, TASK_1_0))),
+ mkEntry(CONSUMER_3, new ArrayList<>(asList(TASK_0_1, TASK_0_2, TASK_1_2)))
);
for (final Map.Entry<String, List<TaskId>> entry : previousAssignment.entrySet()) {
@@ -422,8 +423,8 @@ public class StreamsPartitionAssignorTest {
}
}
- // Add the partitions of task0_0 to allOwnedPartitions but not c1's ownedPartitions/previousAssignment
- final Set<TopicPartition> allOwnedPartitions = new HashSet<>(partitionsForTask.get(task0_0));
+ // Add the partitions of TASK_0_0 to allOwnedPartitions but not c1's ownedPartitions/previousAssignment
+ final Set<TopicPartition> allOwnedPartitions = new HashSet<>(partitionsForTask.get(TASK_0_0));
final Set<String> consumers = mkSet(CONSUMER_1, CONSUMER_2, CONSUMER_3);
state.assignActiveTasks(allTasks);
@@ -493,7 +494,7 @@ public class StreamsPartitionAssignorTest {
Collections.sort(subscription.topics());
assertEquals(asList("topic1", "topic2"), subscription.topics());
- final SubscriptionInfo info = getInfo(uuid1, prevTasks, standbyTasks);
+ final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);
assertEquals(info, SubscriptionInfo.decode(subscription.userData()));
}
@@ -519,7 +520,7 @@ public class StreamsPartitionAssignorTest {
Collections.sort(subscription.topics());
assertEquals(asList("topic1", "topic2"), subscription.topics());
- final SubscriptionInfo info = getInfo(uuid1, prevTasks, standbyTasks);
+ final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);
assertEquals(info, SubscriptionInfo.decode(subscription.userData()));
}
@@ -530,14 +531,14 @@ public class StreamsPartitionAssignorTest {
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor");
final List<String> topics = asList("topic1", "topic2");
- final Set<TaskId> allTasks = mkSet(task0_0, task0_1, task0_2);
+ final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
- final Set<TaskId> prevTasks10 = mkSet(task0_0);
- final Set<TaskId> prevTasks11 = mkSet(task0_1);
- final Set<TaskId> prevTasks20 = mkSet(task0_2);
- final Set<TaskId> standbyTasks10 = mkSet(task0_1);
- final Set<TaskId> standbyTasks11 = mkSet(task0_2);
- final Set<TaskId> standbyTasks20 = mkSet(task0_0);
+ final Set<TaskId> prevTasks10 = mkSet(TASK_0_0);
+ final Set<TaskId> prevTasks11 = mkSet(TASK_0_1);
+ final Set<TaskId> prevTasks20 = mkSet(TASK_0_2);
+ final Set<TaskId> standbyTasks10 = mkSet(TASK_0_1);
+ final Set<TaskId> standbyTasks11 = mkSet(TASK_0_2);
+ final Set<TaskId> standbyTasks20 = mkSet(TASK_0_0);
createMockTaskManager(prevTasks10, standbyTasks10);
createMockAdminClient(getTopicPartitionOffsetsMap(
@@ -549,17 +550,17 @@ public class StreamsPartitionAssignorTest {
subscriptions.put("consumer10",
new Subscription(
topics,
- getInfo(uuid1, prevTasks10, standbyTasks10).encode()
+ getInfo(UUID_1, prevTasks10, standbyTasks10).encode()
));
subscriptions.put("consumer11",
new Subscription(
topics,
- getInfo(uuid1, prevTasks11, standbyTasks11).encode()
+ getInfo(UUID_1, prevTasks11, standbyTasks11).encode()
));
subscriptions.put("consumer20",
new Subscription(
topics,
- getInfo(uuid2, prevTasks20, standbyTasks20).encode()
+ getInfo(UUID_2, prevTasks20, standbyTasks20).encode()
));
final Map<String, Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
@@ -581,7 +582,7 @@ public class StreamsPartitionAssignorTest {
final AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
allActiveTasks.addAll(info11.activeTasks());
- assertEquals(mkSet(task0_0, task0_1), allActiveTasks);
+ assertEquals(mkSet(TASK_0_0, TASK_0_1), allActiveTasks);
// the third consumer
final AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
@@ -673,7 +674,7 @@ public class StreamsPartitionAssignorTest {
builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), "processor2");
final List<String> topics = asList("topic1", "topic2");
- final Set<TaskId> allTasks = mkSet(task0_0, task0_1, task0_2);
+ final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
createDefaultMockTaskManager();
createMockAdminClient(getTopicPartitionOffsetsMap(
@@ -704,10 +705,10 @@ public class StreamsPartitionAssignorTest {
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
final List<String> topics = asList("topic1", "topic2");
- final Set<TaskId> allTasks = mkSet(task0_0, task0_1, task0_2);
+ final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
- final Set<TaskId> prevTasks10 = mkSet(task0_0);
- final Set<TaskId> standbyTasks10 = mkSet(task0_1);
+ final Set<TaskId> prevTasks10 = mkSet(TASK_0_0);
+ final Set<TaskId> standbyTasks10 = mkSet(TASK_0_1);
final Cluster emptyMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()),
emptySet(),
emptySet(),
@@ -719,7 +720,7 @@ public class StreamsPartitionAssignorTest {
subscriptions.put("consumer10",
new Subscription(
topics,
- getInfo(uuid1, prevTasks10, standbyTasks10).encode()
+ getInfo(UUID_1, prevTasks10, standbyTasks10).encode()
));
// initially metadata is empty
@@ -760,28 +761,28 @@ public class StreamsPartitionAssignorTest {
builder.addSource(null, "source3", null, null, null, "topic3");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3");
final List<String> topics = asList("topic1", "topic2", "topic3");
- final Set<TaskId> allTasks = mkSet(task0_0, task0_1, task0_2, task0_3);
+ final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3);
// assuming that previous tasks do not have topic3
- final Set<TaskId> prevTasks10 = mkSet(task0_0);
- final Set<TaskId> prevTasks11 = mkSet(task0_1);
- final Set<TaskId> prevTasks20 = mkSet(task0_2);
+ final Set<TaskId> prevTasks10 = mkSet(TASK_0_0);
+ final Set<TaskId> prevTasks11 = mkSet(TASK_0_1);
+ final Set<TaskId> prevTasks20 = mkSet(TASK_0_2);
- createMockTaskManager(prevTasks10, emptyTasks);
+ createMockTaskManager(prevTasks10, EMPTY_TASKS);
configureDefaultPartitionAssignor();
subscriptions.put("consumer10",
new Subscription(
topics,
- getInfo(uuid1, prevTasks10, emptyTasks).encode()));
+ getInfo(UUID_1, prevTasks10, EMPTY_TASKS).encode()));
subscriptions.put("consumer11",
new Subscription(
topics,
- getInfo(uuid1, prevTasks11, emptyTasks).encode()));
+ getInfo(UUID_1, prevTasks11, EMPTY_TASKS).encode()));
subscriptions.put("consumer20",
new Subscription(
topics,
- getInfo(uuid2, prevTasks20, emptyTasks).encode()));
+ getInfo(UUID_2, prevTasks20, EMPTY_TASKS).encode()));
final Map<String, Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
@@ -839,7 +840,7 @@ public class StreamsPartitionAssignorTest {
subscriptions.put("consumer11",
new Subscription(topics, defaultSubscriptionInfo.encode()));
subscriptions.put("consumer20",
- new Subscription(topics, getInfo(uuid2, emptyTasks, emptyTasks).encode()));
+ new Subscription(topics, getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS).encode()));
final Map<String, Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
@@ -897,17 +898,17 @@ public class StreamsPartitionAssignorTest {
final List<String> topics = asList("topic1", "topic2");
- createMockTaskManager(mkSet(task0_0), emptySet());
+ createMockTaskManager(mkSet(TASK_0_0), emptySet());
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
subscriptions.put("consumer10",
new Subscription(
topics,
- getInfo(uuid1, mkSet(task0_0), emptySet()).encode()));
+ getInfo(UUID_1, mkSet(TASK_0_0), emptySet()).encode()));
subscriptions.put("consumer20",
new Subscription(
topics,
- getInfo(uuid2, mkSet(task0_2), emptySet()).encode()));
+ getInfo(UUID_2, mkSet(TASK_0_2), emptySet()).encode()));
final Map<String, Assignment> assignments =
partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
@@ -927,17 +928,17 @@ public class StreamsPartitionAssignorTest {
final List<String> topics = asList("topic1", "topic2");
- createMockTaskManager(mkSet(task0_0), emptySet());
+ createMockTaskManager(mkSet(TASK_0_0), emptySet());
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
subscriptions.put("consumer10",
new Subscription(
topics,
- getInfo(uuid1, mkSet(task0_0), emptySet()).encode()));
+ getInfo(UUID_1, mkSet(TASK_0_0), emptySet()).encode()));
subscriptions.put("consumer20",
new Subscription(
topics,
- getInfo(uuid2, mkSet(task0_2), emptySet()).encode()));
+ getInfo(UUID_2, mkSet(TASK_0_2), emptySet()).encode()));
final Map<String, Assignment> assignments =
partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
@@ -962,14 +963,14 @@ public class StreamsPartitionAssignorTest {
.flatMap(Collection::stream)
.collect(Collectors.toSet());
- final Set<TaskId> allTasks = mkSet(task0_0, task0_1, task0_2);
+ final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
- final Set<TaskId> prevTasks00 = mkSet(task0_0);
- final Set<TaskId> prevTasks01 = mkSet(task0_1);
- final Set<TaskId> prevTasks02 = mkSet(task0_2);
- final Set<TaskId> standbyTasks00 = mkSet(task0_0);
- final Set<TaskId> standbyTasks01 = mkSet(task0_1);
- final Set<TaskId> standbyTasks02 = mkSet(task0_2);
+ final Set<TaskId> prevTasks00 = mkSet(TASK_0_0);
+ final Set<TaskId> prevTasks01 = mkSet(TASK_0_1);
+ final Set<TaskId> prevTasks02 = mkSet(TASK_0_2);
+ final Set<TaskId> standbyTasks00 = mkSet(TASK_0_0);
+ final Set<TaskId> standbyTasks01 = mkSet(TASK_0_1);
+ final Set<TaskId> standbyTasks02 = mkSet(TASK_0_2);
createMockTaskManager(prevTasks00, standbyTasks01);
createMockAdminClient(getTopicPartitionOffsetsMap(
@@ -981,15 +982,15 @@ public class StreamsPartitionAssignorTest {
subscriptions.put("consumer10",
new Subscription(
topics,
- getInfo(uuid1, prevTasks00, standbyTasks01, USER_END_POINT).encode()));
+ getInfo(UUID_1, prevTasks00, standbyTasks01, USER_END_POINT).encode()));
subscriptions.put("consumer11",
new Subscription(
topics,
- getInfo(uuid1, prevTasks01, standbyTasks02, USER_END_POINT).encode()));
+ getInfo(UUID_1, prevTasks01, standbyTasks02, USER_END_POINT).encode()));
subscriptions.put("consumer20",
new Subscription(
topics,
- getInfo(uuid2, prevTasks02, standbyTasks00, OTHER_END_POINT).encode()));
+ getInfo(UUID_2, prevTasks02, standbyTasks00, OTHER_END_POINT).encode()));
final Map<String, Assignment> assignments =
partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
@@ -1007,8 +1008,8 @@ public class StreamsPartitionAssignorTest {
assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks().keySet(), info10.standbyTasks().keySet());
// check active tasks assigned to the first client
- assertEquals(mkSet(task0_0, task0_1), new HashSet<>(allActiveTasks));
- assertEquals(mkSet(task0_2), new HashSet<>(allStandbyTasks));
+ assertEquals(mkSet(TASK_0_0, TASK_0_1), new HashSet<>(allActiveTasks));
+ assertEquals(mkSet(TASK_0_2), new HashSet<>(allStandbyTasks));
// the third consumer
final AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
@@ -1053,11 +1054,11 @@ public class StreamsPartitionAssignorTest {
mkSet(t3p0, t3p3));
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
- activeTasks.put(task0_0, mkSet(t3p0));
- activeTasks.put(task0_3, mkSet(t3p3));
+ activeTasks.put(TASK_0_0, mkSet(t3p0));
+ activeTasks.put(TASK_0_3, mkSet(t3p3));
final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
- standbyTasks.put(task0_1, mkSet(t3p1));
- standbyTasks.put(task0_2, mkSet(t3p2));
+ standbyTasks.put(TASK_0_1, mkSet(t3p1));
+ standbyTasks.put(TASK_0_2, mkSet(t3p2));
taskManager.handleAssignment(activeTasks, standbyTasks);
EasyMock.expectLastCall();
@@ -1069,7 +1070,7 @@ public class StreamsPartitionAssignorTest {
configureDefaultPartitionAssignor();
- final List<TaskId> activeTaskList = asList(task0_0, task0_3);
+ final List<TaskId> activeTaskList = asList(TASK_0_0, TASK_0_3);
final AssignmentInfo info = new AssignmentInfo(LATEST_SUPPORTED_VERSION, activeTaskList, standbyTasks, hostState, emptyMap(), 0);
final Assignment assignment = new Assignment(asList(t3p0, t3p3), info.encode());
@@ -1091,7 +1092,7 @@ public class StreamsPartitionAssignorTest {
builder.addSource(null, "source2", null, null, null, "topicX");
builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
final List<String> topics = asList("topic1", APPLICATION_ID + "-topicX");
- final Set<TaskId> allTasks = mkSet(task0_0, task0_1, task0_2);
+ final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
final MockInternalTopicManager internalTopicManager = configureDefault();
@@ -1119,7 +1120,7 @@ public class StreamsPartitionAssignorTest {
builder.addSink("sink2", "topicZ", null, null, null, "processor2");
builder.addSource(null, "source3", null, null, null, "topicZ");
final List<String> topics = asList("topic1", APPLICATION_ID + "-topicX", APPLICATION_ID + "-topicZ");
- final Set<TaskId> allTasks = mkSet(task0_0, task0_1, task0_2);
+ final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
final MockInternalTopicManager internalTopicManager = configureDefault();
@@ -1240,7 +1241,7 @@ public class StreamsPartitionAssignorTest {
subscriptions.put("consumer1",
new Subscription(
topics,
- getInfo(uuid1, emptyTasks, emptyTasks, USER_END_POINT).encode())
+ getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())
);
final Map<String, Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
final Assignment consumerAssignment = assignments.get("consumer1");
@@ -1400,12 +1401,12 @@ public class StreamsPartitionAssignorTest {
subscriptions.put("consumer1",
new Subscription(
Collections.singletonList("topic1"),
- getInfo(uuid1, emptyTasks, emptyTasks, USER_END_POINT).encode())
+ getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())
);
subscriptions.put("consumer2",
new Subscription(
Collections.singletonList("topic1"),
- getInfo(uuid2, emptyTasks, emptyTasks, OTHER_END_POINT).encode())
+ getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, OTHER_END_POINT).encode())
);
final Set<TopicPartition> allPartitions = mkSet(t1p0, t1p1, t1p2);
final Map<String, Assignment> assign = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
@@ -1501,12 +1502,12 @@ public class StreamsPartitionAssignorTest {
subscriptions.put("consumer1",
new Subscription(
Collections.singletonList("topic1"),
- getInfoForOlderVersion(smallestVersion, uuid1, emptyTasks, emptyTasks).encode())
+ getInfoForOlderVersion(smallestVersion, UUID_1, EMPTY_TASKS, EMPTY_TASKS).encode())
);
subscriptions.put("consumer2",
new Subscription(
Collections.singletonList("topic1"),
- getInfoForOlderVersion(otherVersion, uuid2, emptyTasks, emptyTasks).encode()
+ getInfoForOlderVersion(otherVersion, UUID_2, EMPTY_TASKS, EMPTY_TASKS).encode()
)
);
@@ -1569,18 +1570,18 @@ public class StreamsPartitionAssignorTest {
public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenNewConsumerJoins() {
builder.addSource(null, "source1", null, null, null, "topic1");
- final Set<TaskId> allTasks = mkSet(task0_0, task0_1, task0_2);
+ final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
subscriptions.put(CONSUMER_1,
new Subscription(
Collections.singletonList("topic1"),
- getInfo(uuid1, allTasks, emptyTasks).encode(),
+ getInfo(UUID_1, allTasks, EMPTY_TASKS).encode(),
asList(t1p0, t1p1, t1p2))
);
subscriptions.put(CONSUMER_2,
new Subscription(
Collections.singletonList("topic1"),
- getInfo(uuid2, emptyTasks, emptyTasks).encode(),
+ getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS).encode(),
emptyList())
);
@@ -1611,16 +1612,16 @@ public class StreamsPartitionAssignorTest {
builder.addProcessor("processor", new MockProcessorSupplier(), "source1");
builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor");
- final Set<TaskId> allTasks = mkSet(task0_0, task0_1, task0_2);
+ final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
- final Set<TaskId> activeTasks = mkSet(task0_0, task0_1);
- final Set<TaskId> standbyTasks = mkSet(task0_2);
+ final Set<TaskId> activeTasks = mkSet(TASK_0_0, TASK_0_1);
+ final Set<TaskId> standbyTasks = mkSet(TASK_0_2);
final Map<TaskId, Set<TopicPartition>> standbyTaskMap = mkMap(
- mkEntry(task0_2, Collections.singleton(t1p2))
+ mkEntry(TASK_0_2, Collections.singleton(t1p2))
);
final Map<TaskId, Set<TopicPartition>> futureStandbyTaskMap = mkMap(
- mkEntry(task0_0, Collections.singleton(t1p0)),
- mkEntry(task0_1, Collections.singleton(t1p1))
+ mkEntry(TASK_0_0, Collections.singleton(t1p0)),
+ mkEntry(TASK_0_1, Collections.singleton(t1p1))
);
createMockTaskManager(allTasks, allTasks);
@@ -1634,7 +1635,7 @@ public class StreamsPartitionAssignorTest {
subscriptions.put("consumer1",
new Subscription(
Collections.singletonList("topic1"),
- getInfo(uuid1, activeTasks, standbyTasks).encode(),
+ getInfo(UUID_1, activeTasks, standbyTasks).encode(),
asList(t1p0, t1p1))
);
subscriptions.put("future-consumer",
@@ -1669,7 +1670,7 @@ public class StreamsPartitionAssignorTest {
equalTo(
new AssignmentInfo(
LATEST_SUPPORTED_VERSION,
- Collections.singletonList(task0_2),
+ Collections.singletonList(TASK_0_2),
futureStandbyTaskMap,
emptyMap(),
emptyMap(),
@@ -1682,7 +1683,7 @@ public class StreamsPartitionAssignorTest {
public void shouldReturnInterleavedAssignmentForOnlyFutureInstancesDuringVersionProbing() {
builder.addSource(null, "source1", null, null, null, "topic1");
- final Set<TaskId> allTasks = mkSet(task0_0, task0_1, task0_2);
+ final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
subscriptions.put(CONSUMER_1,
new Subscription(
@@ -1708,13 +1709,13 @@ public class StreamsPartitionAssignorTest {
assertThat(assignment.get(CONSUMER_1).partitions(), equalTo(asList(t1p0, t1p2)));
assertThat(
AssignmentInfo.decode(assignment.get(CONSUMER_1).userData()),
- equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, asList(task0_0, task0_2), emptyMap(), emptyMap(), emptyMap(), 0)));
+ equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, asList(TASK_0_0, TASK_0_2), emptyMap(), emptyMap(), emptyMap(), 0)));
assertThat(assignment.get(CONSUMER_2).partitions(), equalTo(Collections.singletonList(t1p1)));
assertThat(
AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()),
- equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, Collections.singletonList(task0_1), emptyMap(), emptyMap(), emptyMap(), 0)));
+ equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, Collections.singletonList(TASK_0_1), emptyMap(), emptyMap(), emptyMap(), 0)));
}
@Test
@@ -1884,9 +1885,9 @@ public class StreamsPartitionAssignorTest {
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1");
- final Set<TaskId> allTasks = mkSet(task0_0, task0_1, task0_2);
+ final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
- createMockTaskManager(allTasks, emptyTasks);
+ createMockTaskManager(allTasks, EMPTY_TASKS);
adminClient = EasyMock.createMock(AdminClient.class);
expect(adminClient.listOffsets(anyObject())).andThrow(new StreamsException("Should be handled"));
configureDefaultPartitionAssignor();
@@ -1897,12 +1898,12 @@ public class StreamsPartitionAssignorTest {
subscriptions.put(firstConsumer,
new Subscription(
singletonList("source1"),
- getInfo(uuid1, allTasks, emptyTasks).encode()
+ getInfo(UUID_1, allTasks, EMPTY_TASKS).encode()
));
subscriptions.put(newConsumer,
new Subscription(
singletonList("source1"),
- getInfo(uuid2, emptyTasks, emptyTasks).encode()
+ getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS).encode()
));
final Map<String, Assignment> assignments = partitionAssignor
@@ -1930,7 +1931,7 @@ public class StreamsPartitionAssignorTest {
subscriptions.put("consumer1",
new Subscription(
Collections.singletonList("topic1"),
- getInfoForOlderVersion(oldVersion, uuid1, emptyTasks, emptyTasks).encode())
+ getInfoForOlderVersion(oldVersion, UUID_1, EMPTY_TASKS, EMPTY_TASKS).encode())
);
subscriptions.put("future-consumer",
new Subscription(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index 4958503..274ead1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -30,20 +30,24 @@ import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
import static org.junit.Assert.assertEquals;
public class AssignmentInfoTest {
private final List<TaskId> activeTasks = Arrays.asList(
- new TaskId(0, 0),
- new TaskId(0, 1),
- new TaskId(1, 0),
- new TaskId(1, 1));
+ TASK_0_0,
+ TASK_0_1,
+ TASK_1_0,
+ TASK_1_0);
private final Map<TaskId, Set<TopicPartition>> standbyTasks = mkMap(
- mkEntry(new TaskId(1, 0), mkSet(new TopicPartition("t1", 0), new TopicPartition("t2", 0))),
- mkEntry(new TaskId(1, 1), mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)))
+ mkEntry(TASK_1_0, mkSet(new TopicPartition("t1", 0), new TopicPartition("t2", 0))),
+ mkEntry(TASK_1_1, mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)))
);
private final Map<HostInfo, Set<TopicPartition>> activeAssignment = mkMap(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
new file mode 100644
index 0000000..e5c5348
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+public class AssignmentTestUtils {
+
+ public static final UUID UUID_1 = uuidForInt(1);
+ public static final UUID UUID_2 = uuidForInt(2);
+ public static final UUID UUID_3 = uuidForInt(3);
+ public static final UUID UUID_4 = uuidForInt(4);
+ public static final UUID UUID_5 = uuidForInt(5);
+ public static final UUID UUID_6 = uuidForInt(6);
+
+ public static final TaskId TASK_0_0 = new TaskId(0, 0);
+ public static final TaskId TASK_0_1 = new TaskId(0, 1);
+ public static final TaskId TASK_0_2 = new TaskId(0, 2);
+ public static final TaskId TASK_0_3 = new TaskId(0, 3);
+ public static final TaskId TASK_0_4 = new TaskId(0, 4);
+ public static final TaskId TASK_0_5 = new TaskId(0, 5);
+ public static final TaskId TASK_0_6 = new TaskId(0, 6);
+ public static final TaskId TASK_1_0 = new TaskId(1, 0);
+ public static final TaskId TASK_1_1 = new TaskId(1, 1);
+ public static final TaskId TASK_1_2 = new TaskId(1, 2);
+ public static final TaskId TASK_1_3 = new TaskId(1, 3);
+ public static final TaskId TASK_2_0 = new TaskId(2, 0);
+ public static final TaskId TASK_2_1 = new TaskId(2, 1);
+ public static final TaskId TASK_2_2 = new TaskId(2, 2);
+ public static final TaskId TASK_2_3 = new TaskId(2, 3);
+ public static final TaskId TASK_3_4 = new TaskId(3, 4);
+
+ public static final Set<TaskId> EMPTY_TASKS = emptySet();
+ public static final Map<TaskId, Long> EMPTY_TASK_OFFSET_SUMS = emptyMap();
+ public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>();
+
+ /**
+ * Builds a UUID by repeating the given number n. For valid n, it is guaranteed that the returned UUIDs satisfy
+ * the same relation relative to others as their parameter n does: iff n < m, then uuidForInt(n) < uuidForInt(m)
+ *
+ * @param n an integer between 1 and 7
+ * @return the UUID created by repeating the digit n in the UUID format
+ */
+ static UUID uuidForInt(final Integer n) {
+ if (n < 1 || n > 7) {
+ throw new IllegalArgumentException("Must pass in a single digit number to the uuid builder, got n = " + n);
+ }
+ final StringBuilder builder = new StringBuilder(36);
+ for (int i = 0; i < 8; ++i) {
+ builder.append(n);
+ }
+ builder.append('-');
+
+ for (int i = 0; i < 3; ++i) {
+ for (int j = 0; j < 4; ++j) {
+ builder.append(n);
+ }
+ builder.append('-');
+ }
+ for (int i = 0; i < 12; ++i) {
+ builder.append(n);
+ }
+ return UUID.fromString(builder.toString());
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
index dac5a50..cb32155 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
@@ -27,6 +27,8 @@ import java.util.Collections;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -39,9 +41,6 @@ public class ClientStateTest {
private final ClientState client = new ClientState(1);
private final ClientState zeroCapacityClient = new ClientState(0);
- private final TaskId taskId01 = new TaskId(0, 1);
- private final TaskId taskId02 = new TaskId(0, 2);
-
@Test
public void shouldHaveNotReachedCapacityWhenAssignedTasksLessThanCapacity() {
assertFalse(client.reachedCapacity());
@@ -49,64 +48,64 @@ public class ClientStateTest {
@Test
public void shouldHaveReachedCapacityWhenAssignedTasksGreaterThanOrEqualToCapacity() {
- client.assignActive(taskId01);
+ client.assignActive(TASK_0_1);
assertTrue(client.reachedCapacity());
}
@Test
public void shouldAddActiveTasksToBothAssignedAndActive() {
- client.assignActive(taskId01);
- assertThat(client.activeTasks(), equalTo(Collections.singleton(taskId01)));
- assertThat(client.assignedTasks(), equalTo(Collections.singleton(taskId01)));
+ client.assignActive(TASK_0_1);
+ assertThat(client.activeTasks(), equalTo(Collections.singleton(TASK_0_1)));
+ assertThat(client.assignedTasks(), equalTo(Collections.singleton(TASK_0_1)));
assertThat(client.assignedTaskCount(), equalTo(1));
assertThat(client.standbyTasks().size(), equalTo(0));
}
@Test
public void shouldAddStandbyTasksToBothStandbyAndAssigned() {
- client.assignStandby(taskId01);
- assertThat(client.assignedTasks(), equalTo(Collections.singleton(taskId01)));
- assertThat(client.standbyTasks(), equalTo(Collections.singleton(taskId01)));
+ client.assignStandby(TASK_0_1);
+ assertThat(client.assignedTasks(), equalTo(Collections.singleton(TASK_0_1)));
+ assertThat(client.standbyTasks(), equalTo(Collections.singleton(TASK_0_1)));
assertThat(client.assignedTaskCount(), equalTo(1));
assertThat(client.activeTasks().size(), equalTo(0));
}
@Test
public void shouldAddPreviousActiveTasksToPreviousAssignedAndPreviousActive() {
- client.addPreviousActiveTasks(Utils.mkSet(taskId01, taskId02));
- assertThat(client.prevActiveTasks(), equalTo(Utils.mkSet(taskId01, taskId02)));
- assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(taskId01, taskId02)));
+ client.addPreviousActiveTasks(Utils.mkSet(TASK_0_1, TASK_0_2));
+ assertThat(client.prevActiveTasks(), equalTo(Utils.mkSet(TASK_0_1, TASK_0_2)));
+ assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(TASK_0_1, TASK_0_2)));
}
@Test
public void shouldAddPreviousStandbyTasksToPreviousAssignedAndPreviousStandby() {
- client.addPreviousStandbyTasks(Utils.mkSet(taskId01, taskId02));
+ client.addPreviousStandbyTasks(Utils.mkSet(TASK_0_1, TASK_0_2));
assertThat(client.prevActiveTasks().size(), equalTo(0));
- assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(taskId01, taskId02)));
+ assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(TASK_0_1, TASK_0_2)));
}
@Test
public void shouldHaveAssignedTaskIfActiveTaskAssigned() {
- client.assignActive(taskId01);
- assertTrue(client.hasAssignedTask(taskId01));
+ client.assignActive(TASK_0_1);
+ assertTrue(client.hasAssignedTask(TASK_0_1));
}
@Test
public void shouldHaveAssignedTaskIfStandbyTaskAssigned() {
- client.assignStandby(taskId01);
- assertTrue(client.hasAssignedTask(taskId01));
+ client.assignStandby(TASK_0_1);
+ assertTrue(client.hasAssignedTask(TASK_0_1));
}
@Test
public void shouldNotHaveAssignedTaskIfTaskNotAssigned() {
- client.assignActive(taskId01);
- assertFalse(client.hasAssignedTask(taskId02));
+ client.assignActive(TASK_0_1);
+ assertFalse(client.hasAssignedTask(TASK_0_2));
}
@Test
public void shouldHaveMoreAvailableCapacityWhenCapacityTheSameButFewerAssignedTasks() {
final ClientState otherClient = new ClientState(1);
- client.assignActive(taskId01);
+ client.assignActive(TASK_0_1);
assertTrue(otherClient.hasMoreAvailableCapacityThan(client));
assertFalse(client.hasMoreAvailableCapacityThan(otherClient));
}
@@ -169,100 +168,100 @@ public class ClientStateTest {
@Test
public void shouldAddTasksWithLatestOffsetToPrevActiveTasks() {
- final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(taskId01, Task.LATEST_OFFSET);
+ final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(TASK_0_1, Task.LATEST_OFFSET);
client.addPreviousTasksAndOffsetSums(taskOffsetSums);
client.initializePrevTasks(Collections.emptyMap());
- assertThat(client.prevActiveTasks(), equalTo(Collections.singleton(taskId01)));
- assertThat(client.previousAssignedTasks(), equalTo(Collections.singleton(taskId01)));
+ assertThat(client.prevActiveTasks(), equalTo(Collections.singleton(TASK_0_1)));
+ assertThat(client.previousAssignedTasks(), equalTo(Collections.singleton(TASK_0_1)));
assertTrue(client.prevStandbyTasks().isEmpty());
}
@Test
public void shouldAddTasksInOffsetSumsMapToPrevStandbyTasks() {
final Map<TaskId, Long> taskOffsetSums = mkMap(
- mkEntry(taskId01, 0L),
- mkEntry(taskId02, 100L)
+ mkEntry(TASK_0_1, 0L),
+ mkEntry(TASK_0_2, 100L)
);
client.addPreviousTasksAndOffsetSums(taskOffsetSums);
client.initializePrevTasks(Collections.emptyMap());
- assertThat(client.prevStandbyTasks(), equalTo(mkSet(taskId01, taskId02)));
- assertThat(client.previousAssignedTasks(), equalTo(mkSet(taskId01, taskId02)));
+ assertThat(client.prevStandbyTasks(), equalTo(mkSet(TASK_0_1, TASK_0_2)));
+ assertThat(client.previousAssignedTasks(), equalTo(mkSet(TASK_0_1, TASK_0_2)));
assertTrue(client.prevActiveTasks().isEmpty());
}
@Test
public void shouldComputeTaskLags() {
final Map<TaskId, Long> taskOffsetSums = mkMap(
- mkEntry(taskId01, 0L),
- mkEntry(taskId02, 100L)
+ mkEntry(TASK_0_1, 0L),
+ mkEntry(TASK_0_2, 100L)
);
final Map<TaskId, Long> allTaskEndOffsetSums = mkMap(
- mkEntry(taskId01, 500L),
- mkEntry(taskId02, 100L)
+ mkEntry(TASK_0_1, 500L),
+ mkEntry(TASK_0_2, 100L)
);
client.addPreviousTasksAndOffsetSums(taskOffsetSums);
client.computeTaskLags(null, allTaskEndOffsetSums);
- assertThat(client.lagFor(taskId01), equalTo(500L));
- assertThat(client.lagFor(taskId02), equalTo(0L));
+ assertThat(client.lagFor(TASK_0_1), equalTo(500L));
+ assertThat(client.lagFor(TASK_0_2), equalTo(0L));
}
@Test
public void shouldReturnEndOffsetSumForLagOfTaskWeDidNotPreviouslyOwn() {
final Map<TaskId, Long> taskOffsetSums = Collections.emptyMap();
- final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L);
+ final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(TASK_0_1, 500L);
client.addPreviousTasksAndOffsetSums(taskOffsetSums);
client.computeTaskLags(null, allTaskEndOffsetSums);
- assertThat(client.lagFor(taskId01), equalTo(500L));
+ assertThat(client.lagFor(TASK_0_1), equalTo(500L));
}
@Test
public void shouldReturnLatestOffsetForLagOfPreviousActiveRunningTask() {
- final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(taskId01, Task.LATEST_OFFSET);
- final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L);
+ final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(TASK_0_1, Task.LATEST_OFFSET);
+ final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(TASK_0_1, 500L);
client.addPreviousTasksAndOffsetSums(taskOffsetSums);
client.computeTaskLags(null, allTaskEndOffsetSums);
- assertThat(client.lagFor(taskId01), equalTo(Task.LATEST_OFFSET));
+ assertThat(client.lagFor(TASK_0_1), equalTo(Task.LATEST_OFFSET));
}
@Test
public void shouldReturnUnknownOffsetSumForLagOfTaskWithUnknownOffset() {
- final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(taskId01, UNKNOWN_OFFSET_SUM);
- final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L);
+ final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(TASK_0_1, UNKNOWN_OFFSET_SUM);
+ final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(TASK_0_1, 500L);
client.addPreviousTasksAndOffsetSums(taskOffsetSums);
client.computeTaskLags(null, allTaskEndOffsetSums);
- assertThat(client.lagFor(taskId01), equalTo(UNKNOWN_OFFSET_SUM));
+ assertThat(client.lagFor(TASK_0_1), equalTo(UNKNOWN_OFFSET_SUM));
}
@Test
public void shouldReturnEndOffsetSumIfOffsetSumIsGreaterThanEndOffsetSum() {
- final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(taskId01, 5L);
- final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(taskId01, 1L);
+ final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(TASK_0_1, 5L);
+ final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(TASK_0_1, 1L);
client.addPreviousTasksAndOffsetSums(taskOffsetSums);
client.computeTaskLags(null, allTaskEndOffsetSums);
- assertThat(client.lagFor(taskId01), equalTo(1L));
+ assertThat(client.lagFor(TASK_0_1), equalTo(1L));
}
@Test
public void shouldThrowIllegalStateExceptionIfTaskLagsMapIsNotEmpty() {
- final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(taskId01, 5L);
- final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(taskId01, 1L);
+ final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(TASK_0_1, 5L);
+ final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(TASK_0_1, 1L);
client.computeTaskLags(null, taskOffsetSums);
assertThrows(IllegalStateException.class, () -> client.computeTaskLags(null, allTaskEndOffsetSums));
}
@Test
public void shouldThrowIllegalStateExceptionOnLagForUnknownTask() {
- final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(taskId01, 0L);
- final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L);
+ final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(TASK_0_1, 0L);
+ final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(TASK_0_1, 500L);
client.addPreviousTasksAndOffsetSums(taskOffsetSums);
client.computeTaskLags(null, allTaskEndOffsetSums);
- assertThrows(IllegalStateException.class, () -> client.lagFor(taskId02));
+ assertThrows(IllegalStateException.class, () -> client.lagFor(TASK_0_2));
}
@Test
public void shouldThrowIllegalStateExceptionIfAttemptingToInitializeNonEmptyPrevTaskSets() {
- client.addPreviousActiveTasks(Collections.singleton(taskId01));
+ client.addPreviousActiveTasks(Collections.singleton(TASK_0_1));
assertThrows(IllegalStateException.class, () -> client.initializePrevTasks(Collections.emptyMap()));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignorTest.java
index 54aeab5..b45a33f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
+import java.util.UUID;
import org.apache.kafka.streams.processor.TaskId;
import org.junit.Test;
@@ -29,51 +30,49 @@ import java.util.TreeSet;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSortedSet;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
public class DefaultBalancedAssignorTest {
- private static final TaskId TASK_00 = new TaskId(0, 0);
- private static final TaskId TASK_01 = new TaskId(0, 1);
- private static final TaskId TASK_02 = new TaskId(0, 2);
- private static final TaskId TASK_10 = new TaskId(1, 0);
- private static final TaskId TASK_11 = new TaskId(1, 1);
- private static final TaskId TASK_12 = new TaskId(1, 2);
- private static final TaskId TASK_20 = new TaskId(2, 0);
- private static final TaskId TASK_21 = new TaskId(2, 1);
- private static final TaskId TASK_22 = new TaskId(2, 2);
-
- private static final String CLIENT_1 = "client1";
- private static final String CLIENT_2 = "client2";
- private static final String CLIENT_3 = "client3";
-
- private static final SortedSet<String> TWO_CLIENTS = new TreeSet<>(Arrays.asList(CLIENT_1, CLIENT_2));
- private static final SortedSet<String> THREE_CLIENTS = new TreeSet<>(Arrays.asList(CLIENT_1, CLIENT_2, CLIENT_3));
+ private static final SortedSet<UUID> TWO_CLIENTS = new TreeSet<>(Arrays.asList(UUID_1, UUID_2));
+ private static final SortedSet<UUID> THREE_CLIENTS = new TreeSet<>(Arrays.asList(UUID_1, UUID_2, UUID_3));
@Test
public void shouldAssignTasksEvenlyOverClientsWhereNumberOfClientsIntegralDivisorOfNumberOfTasks() {
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
THREE_CLIENTS,
mkSortedSet(
- TASK_00,
- TASK_01,
- TASK_02,
- TASK_10,
- TASK_11,
- TASK_12,
- TASK_20,
- TASK_21,
- TASK_22
+ TASK_0_0,
+ TASK_0_1,
+ TASK_0_2,
+ TASK_1_0,
+ TASK_1_1,
+ TASK_1_2,
+ TASK_2_0,
+ TASK_2_1,
+ TASK_2_2
),
threeClientsToNumberOfStreamThreads(1, 1, 1),
balanceFactor
);
- final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_00, TASK_10, TASK_20);
- final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_01, TASK_11, TASK_21);
- final List<TaskId> assignedTasksForClient3 = Arrays.asList(TASK_02, TASK_12, TASK_22);
+ final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_1_0, TASK_2_0);
+ final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_0_1, TASK_1_1, TASK_2_1);
+ final List<TaskId> assignedTasksForClient3 = Arrays.asList(TASK_0_2, TASK_1_2, TASK_2_2);
assertThat(
assignment,
is(expectedAssignmentForThreeClients(assignedTasksForClient1, assignedTasksForClient2, assignedTasksForClient3))
@@ -84,25 +83,25 @@ public class DefaultBalancedAssignorTest {
public void shouldAssignTasksEvenlyOverClientsWhereNumberOfClientsNotIntegralDivisorOfNumberOfTasks() {
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
TWO_CLIENTS,
mkSortedSet(
- TASK_00,
- TASK_01,
- TASK_02,
- TASK_10,
- TASK_11,
- TASK_12,
- TASK_20,
- TASK_21,
- TASK_22
+ TASK_0_0,
+ TASK_0_1,
+ TASK_0_2,
+ TASK_1_0,
+ TASK_1_1,
+ TASK_1_2,
+ TASK_2_0,
+ TASK_2_1,
+ TASK_2_2
),
twoClientsToNumberOfStreamThreads(1, 1),
balanceFactor
);
- final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_00, TASK_02, TASK_11, TASK_20, TASK_22);
- final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_01, TASK_10, TASK_12, TASK_21);
+ final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_0_2, TASK_1_1, TASK_2_0, TASK_2_2);
+ final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_0_1, TASK_1_0, TASK_1_2, TASK_2_1);
assertThat(
assignment,
is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2))
@@ -113,26 +112,26 @@ public class DefaultBalancedAssignorTest {
public void shouldAssignTasksEvenlyOverClientsWhereNumberOfStreamThreadsIntegralDivisorOfNumberOfTasks() {
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
THREE_CLIENTS,
mkSortedSet(
- TASK_00,
- TASK_01,
- TASK_02,
- TASK_10,
- TASK_11,
- TASK_12,
- TASK_20,
- TASK_21,
- TASK_22
+ TASK_0_0,
+ TASK_0_1,
+ TASK_0_2,
+ TASK_1_0,
+ TASK_1_1,
+ TASK_1_2,
+ TASK_2_0,
+ TASK_2_1,
+ TASK_2_2
),
threeClientsToNumberOfStreamThreads(3, 3, 3),
balanceFactor
);
- final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_00, TASK_10, TASK_20);
- final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_01, TASK_11, TASK_21);
- final List<TaskId> assignedTasksForClient3 = Arrays.asList(TASK_02, TASK_12, TASK_22);
+ final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_1_0, TASK_2_0);
+ final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_0_1, TASK_1_1, TASK_2_1);
+ final List<TaskId> assignedTasksForClient3 = Arrays.asList(TASK_0_2, TASK_1_2, TASK_2_2);
assertThat(
assignment,
is(expectedAssignmentForThreeClients(assignedTasksForClient1, assignedTasksForClient2, assignedTasksForClient3))
@@ -143,26 +142,26 @@ public class DefaultBalancedAssignorTest {
public void shouldAssignTasksEvenlyOverClientsWhereNumberOfStreamThreadsNotIntegralDivisorOfNumberOfTasks() {
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
THREE_CLIENTS,
mkSortedSet(
- TASK_00,
- TASK_01,
- TASK_02,
- TASK_10,
- TASK_11,
- TASK_12,
- TASK_20,
- TASK_21,
- TASK_22
+ TASK_0_0,
+ TASK_0_1,
+ TASK_0_2,
+ TASK_1_0,
+ TASK_1_1,
+ TASK_1_2,
+ TASK_2_0,
+ TASK_2_1,
+ TASK_2_2
),
threeClientsToNumberOfStreamThreads(2, 2, 2),
balanceFactor
);
- final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_00, TASK_10, TASK_20);
- final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_01, TASK_11, TASK_21);
- final List<TaskId> assignedTasksForClient3 = Arrays.asList(TASK_02, TASK_12, TASK_22);
+ final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_1_0, TASK_2_0);
+ final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_0_1, TASK_1_1, TASK_2_1);
+ final List<TaskId> assignedTasksForClient3 = Arrays.asList(TASK_0_2, TASK_1_2, TASK_2_2);
assertThat(
assignment,
is(expectedAssignmentForThreeClients(assignedTasksForClient1, assignedTasksForClient2, assignedTasksForClient3))
@@ -173,26 +172,26 @@ public class DefaultBalancedAssignorTest {
public void shouldAssignTasksEvenlyOverUnevenlyDistributedStreamThreads() {
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
THREE_CLIENTS,
mkSortedSet(
- TASK_00,
- TASK_01,
- TASK_02,
- TASK_10,
- TASK_11,
- TASK_12,
- TASK_20,
- TASK_21,
- TASK_22
+ TASK_0_0,
+ TASK_0_1,
+ TASK_0_2,
+ TASK_1_0,
+ TASK_1_1,
+ TASK_1_2,
+ TASK_2_0,
+ TASK_2_1,
+ TASK_2_2
),
threeClientsToNumberOfStreamThreads(1, 2, 3),
balanceFactor
);
- final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_10, TASK_20);
- final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_01, TASK_11, TASK_21, TASK_00);
- final List<TaskId> assignedTasksForClient3 = Arrays.asList(TASK_02, TASK_12, TASK_22);
+ final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_1_0, TASK_2_0);
+ final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_0_1, TASK_1_1, TASK_2_1, TASK_0_0);
+ final List<TaskId> assignedTasksForClient3 = Arrays.asList(TASK_0_2, TASK_1_2, TASK_2_2);
assertThat(
assignment,
is(expectedAssignmentForThreeClients(assignedTasksForClient1, assignedTasksForClient2, assignedTasksForClient3))
@@ -203,18 +202,18 @@ public class DefaultBalancedAssignorTest {
public void shouldAssignTasksEvenlyOverClientsWithLessClientsThanTasks() {
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
THREE_CLIENTS,
mkSortedSet(
- TASK_00,
- TASK_01
+ TASK_0_0,
+ TASK_0_1
),
threeClientsToNumberOfStreamThreads(1, 1, 1),
balanceFactor
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_00);
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_01);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_0);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_0_1);
final List<TaskId> assignedTasksForClient3 = Collections.emptyList();
assertThat(
assignment,
@@ -226,26 +225,26 @@ public class DefaultBalancedAssignorTest {
public void shouldAssignTasksEvenlyOverClientsAndStreamThreadsWithMoreStreamThreadsThanTasks() {
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
THREE_CLIENTS,
mkSortedSet(
- TASK_00,
- TASK_01,
- TASK_02,
- TASK_10,
- TASK_11,
- TASK_12,
- TASK_20,
- TASK_21,
- TASK_22
+ TASK_0_0,
+ TASK_0_1,
+ TASK_0_2,
+ TASK_1_0,
+ TASK_1_1,
+ TASK_1_2,
+ TASK_2_0,
+ TASK_2_1,
+ TASK_2_2
),
threeClientsToNumberOfStreamThreads(6, 6, 6),
balanceFactor
);
- final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_00, TASK_10, TASK_20);
- final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_01, TASK_11, TASK_21);
- final List<TaskId> assignedTasksForClient3 = Arrays.asList(TASK_02, TASK_12, TASK_22);
+ final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_1_0, TASK_2_0);
+ final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_0_1, TASK_1_1, TASK_2_1);
+ final List<TaskId> assignedTasksForClient3 = Arrays.asList(TASK_0_2, TASK_1_2, TASK_2_2);
assertThat(
assignment,
is(expectedAssignmentForThreeClients(assignedTasksForClient1, assignedTasksForClient2, assignedTasksForClient3))
@@ -256,25 +255,26 @@ public class DefaultBalancedAssignorTest {
public void shouldAssignTasksEvenlyOverStreamThreadsButBestEffortOverClients() {
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
TWO_CLIENTS,
mkSortedSet(
- TASK_00,
- TASK_01,
- TASK_02,
- TASK_10,
- TASK_11,
- TASK_12,
- TASK_20,
- TASK_21,
- TASK_22
+ TASK_0_0,
+ TASK_0_1,
+ TASK_0_2,
+ TASK_1_0,
+ TASK_1_1,
+ TASK_1_2,
+ TASK_2_0,
+ TASK_2_1,
+ TASK_2_2
),
twoClientsToNumberOfStreamThreads(6, 2),
balanceFactor
);
- final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_00, TASK_02, TASK_11, TASK_20, TASK_22, TASK_01);
- final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_10, TASK_12, TASK_21);
+ final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_0_2, TASK_1_1, TASK_2_0, TASK_2_2,
+ TASK_0_1);
+ final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_1_0, TASK_1_2, TASK_2_1);
assertThat(
assignment,
is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2))
@@ -285,64 +285,64 @@ public class DefaultBalancedAssignorTest {
public void shouldAssignTasksEvenlyOverClientsButNotOverStreamThreadsBecauseBalanceFactorSatisfied() {
final int balanceFactor = 2;
- final Map<String, List<TaskId>> assignment = new DefaultBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
TWO_CLIENTS,
mkSortedSet(
- TASK_00,
- TASK_01,
- TASK_02,
- TASK_10,
- TASK_11,
- TASK_12,
- TASK_20,
- TASK_21,
- TASK_22
+ TASK_0_0,
+ TASK_0_1,
+ TASK_0_2,
+ TASK_1_0,
+ TASK_1_1,
+ TASK_1_2,
+ TASK_2_0,
+ TASK_2_1,
+ TASK_2_2
),
twoClientsToNumberOfStreamThreads(6, 2),
balanceFactor
);
- final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_00, TASK_02, TASK_11, TASK_20, TASK_22);
- final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_01, TASK_10, TASK_12, TASK_21);
+ final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_0_2, TASK_1_1, TASK_2_0, TASK_2_2);
+ final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_0_1, TASK_1_0, TASK_1_2, TASK_2_1);
assertThat(
assignment,
is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2))
);
}
- private static Map<String, Integer> twoClientsToNumberOfStreamThreads(final int numberOfStreamThread1,
+ private static Map<UUID, Integer> twoClientsToNumberOfStreamThreads(final int numberOfStreamThread1,
final int numberOfStreamThread2) {
return mkMap(
- mkEntry(CLIENT_1, numberOfStreamThread1),
- mkEntry(CLIENT_2, numberOfStreamThread2)
+ mkEntry(UUID_1, numberOfStreamThread1),
+ mkEntry(UUID_2, numberOfStreamThread2)
);
}
- private static Map<String, Integer> threeClientsToNumberOfStreamThreads(final int numberOfStreamThread1,
+ private static Map<UUID, Integer> threeClientsToNumberOfStreamThreads(final int numberOfStreamThread1,
final int numberOfStreamThread2,
final int numberOfStreamThread3) {
return mkMap(
- mkEntry(CLIENT_1, numberOfStreamThread1),
- mkEntry(CLIENT_2, numberOfStreamThread2),
- mkEntry(CLIENT_3, numberOfStreamThread3)
+ mkEntry(UUID_1, numberOfStreamThread1),
+ mkEntry(UUID_2, numberOfStreamThread2),
+ mkEntry(UUID_3, numberOfStreamThread3)
);
}
- private static Map<String, List<TaskId>> expectedAssignmentForThreeClients(final List<TaskId> assignedTasksForClient1,
+ private static Map<UUID, List<TaskId>> expectedAssignmentForThreeClients(final List<TaskId> assignedTasksForClient1,
final List<TaskId> assignedTasksForClient2,
final List<TaskId> assignedTasksForClient3) {
return mkMap(
- mkEntry(CLIENT_1, assignedTasksForClient1),
- mkEntry(CLIENT_2, assignedTasksForClient2),
- mkEntry(CLIENT_3, assignedTasksForClient3)
+ mkEntry(UUID_1, assignedTasksForClient1),
+ mkEntry(UUID_2, assignedTasksForClient2),
+ mkEntry(UUID_3, assignedTasksForClient3)
);
}
- private static Map<String, List<TaskId>> expectedAssignmentForTwoClients(final List<TaskId> assignedTasksForClient1,
+ private static Map<UUID, List<TaskId>> expectedAssignmentForTwoClients(final List<TaskId> assignedTasksForClient1,
final List<TaskId> assignedTasksForClient2) {
return mkMap(
- mkEntry(CLIENT_1, assignedTasksForClient1),
- mkEntry(CLIENT_2, assignedTasksForClient2)
+ mkEntry(UUID_1, assignedTasksForClient1),
+ mkEntry(UUID_2, assignedTasksForClient2)
);
}
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStateConstrainedBalancedAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStateConstrainedBalancedAssignorTest.java
index 7714283..aa10b7d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStateConstrainedBalancedAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStateConstrainedBalancedAssignorTest.java
@@ -16,10 +16,9 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
-
+import java.util.UUID;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
-import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor.RankedClient;
import org.junit.Test;
import java.util.Arrays;
@@ -35,22 +34,20 @@ import java.util.TreeSet;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_3;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_3_4;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
public class DefaultStateConstrainedBalancedAssignorTest {
- private static final TaskId TASK_01 = new TaskId(0, 1);
- private static final TaskId TASK_12 = new TaskId(1, 2);
- private static final TaskId TASK_23 = new TaskId(2, 3);
- private static final TaskId TASK_34 = new TaskId(3, 4);
-
- private static final String CLIENT_1 = "client1";
- private static final String CLIENT_2 = "client2";
- private static final String CLIENT_3 = "client3";
-
- private static final Set<String> TWO_CLIENTS = new HashSet<>(Arrays.asList(CLIENT_1, CLIENT_2));
- private static final Set<String> THREE_CLIENTS = new HashSet<>(Arrays.asList(CLIENT_1, CLIENT_2, CLIENT_3));
+ private static final Set<UUID> TWO_CLIENTS = new HashSet<>(Arrays.asList(UUID_1, UUID_2));
+ private static final Set<UUID> THREE_CLIENTS = new HashSet<>(Arrays.asList(UUID_1, UUID_2, UUID_3));
@Test
public void shouldAssignTaskToCaughtUpClient() {
@@ -58,14 +55,14 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankOfClient2 = Long.MAX_VALUE;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
oneStatefulTasksToTwoRankedClients(rankOfClient1, rankOfClient2),
balanceFactor,
TWO_CLIENTS,
twoClientsToNumberOfStreamThreads(1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_01);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_1);
final List<TaskId> assignedTasksForClient2 = Collections.emptyList();
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -76,7 +73,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankOfClient2 = Task.LATEST_OFFSET;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
oneStatefulTasksToTwoRankedClients(rankOfClient1, rankOfClient2),
balanceFactor,
TWO_CLIENTS,
@@ -84,7 +81,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
);
final List<TaskId> assignedTasksForClient1 = Collections.emptyList();
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_01);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_0_1);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -94,7 +91,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankOfClient2 = Task.LATEST_OFFSET;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
oneStatefulTasksToTwoRankedClients(rankOfClient1, rankOfClient2),
balanceFactor,
TWO_CLIENTS,
@@ -102,7 +99,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
);
final List<TaskId> assignedTasksForClient1 = Collections.emptyList();
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_01);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_0_1);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -112,14 +109,14 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankOfClient2 = 0;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
oneStatefulTasksToTwoRankedClients(rankOfClient1, rankOfClient2),
balanceFactor,
TWO_CLIENTS,
twoClientsToNumberOfStreamThreads(1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_01);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_1);
final List<TaskId> assignedTasksForClient2 = Collections.emptyList();
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -130,14 +127,14 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankOfClient2 = 5;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
oneStatefulTasksToTwoRankedClients(rankOfClient1, rankOfClient2),
balanceFactor,
TWO_CLIENTS,
twoClientsToNumberOfStreamThreads(1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_01);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_1);
final List<TaskId> assignedTasksForClient2 = Collections.emptyList();
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -150,7 +147,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask12OnClient2 = 0;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
twoStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -162,8 +159,8 @@ public class DefaultStateConstrainedBalancedAssignorTest {
twoClientsToNumberOfStreamThreads(1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_01);
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_12);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_1);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_1_2);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -175,7 +172,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask12OnClient2 = 0;
final int balanceFactor = 2;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
twoStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -187,8 +184,8 @@ public class DefaultStateConstrainedBalancedAssignorTest {
twoClientsToNumberOfStreamThreads(1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_01);
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_12);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_1);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_1_2);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -205,7 +202,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask23OnClient3 = 0;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
threeStatefulTasksToThreeRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -222,9 +219,9 @@ public class DefaultStateConstrainedBalancedAssignorTest {
threeClientsToNumberOfStreamThreads(1, 1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_01);
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_23);
- final List<TaskId> assignedTasksForClient3 = Collections.singletonList(TASK_12);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_1);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_2_3);
+ final List<TaskId> assignedTasksForClient3 = Collections.singletonList(TASK_1_2);
assertThat(
assignment,
is(expectedAssignmentForThreeClients(assignedTasksForClient1, assignedTasksForClient2, assignedTasksForClient3))
@@ -239,7 +236,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask12OnClient2 = 100;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
twoStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -251,8 +248,8 @@ public class DefaultStateConstrainedBalancedAssignorTest {
twoClientsToNumberOfStreamThreads(1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_12);
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_01);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_1_2);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_0_1);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -264,7 +261,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask12OnClient2 = Task.LATEST_OFFSET;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
twoStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -276,8 +273,8 @@ public class DefaultStateConstrainedBalancedAssignorTest {
twoClientsToNumberOfStreamThreads(1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_12);
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_01);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_1_2);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_0_1);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -289,7 +286,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask12OnClient2 = 0;
final int balanceFactor = 2;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
twoStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -301,7 +298,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
twoClientsToNumberOfStreamThreads(1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_01, TASK_12);
+ final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_1, TASK_1_2);
final List<TaskId> assignedTasksForClient2 = Collections.emptyList();
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -314,7 +311,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask12OnClient2 = 10;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
twoStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -326,8 +323,8 @@ public class DefaultStateConstrainedBalancedAssignorTest {
twoClientsToNumberOfStreamThreads(1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_01);
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_12);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_1);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_1_2);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -339,7 +336,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask12OnClient2 = 10;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
twoStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -351,8 +348,8 @@ public class DefaultStateConstrainedBalancedAssignorTest {
twoClientsToNumberOfStreamThreads(1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_12);
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_01);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_1_2);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_0_1);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -364,7 +361,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask12OnClient2 = 20;
final int balanceFactor = 2;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
twoStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -376,7 +373,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
twoClientsToNumberOfStreamThreads(1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_01, TASK_12);
+ final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_1, TASK_1_2);
final List<TaskId> assignedTasksForClient2 = Collections.emptyList();
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -389,7 +386,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask12OnClient2 = 50;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
twoStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -401,8 +398,8 @@ public class DefaultStateConstrainedBalancedAssignorTest {
twoClientsToNumberOfStreamThreads(1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_12);
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_01);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_1_2);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_0_1);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -419,7 +416,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask23OnClient3 = 100;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
threeStatefulTasksToThreeRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -436,9 +433,9 @@ public class DefaultStateConstrainedBalancedAssignorTest {
threeClientsToNumberOfStreamThreads(1, 1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_01);
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_23);
- final List<TaskId> assignedTasksForClient3 = Collections.singletonList(TASK_12);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_1);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_2_3);
+ final List<TaskId> assignedTasksForClient3 = Collections.singletonList(TASK_1_2);
assertThat(
assignment,
is(expectedAssignmentForThreeClients(assignedTasksForClient1, assignedTasksForClient2, assignedTasksForClient3))
@@ -453,7 +450,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask12OnClient2 = 10;
final int balanceFactor = 2;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
twoStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -466,7 +463,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
);
final List<TaskId> assignedTasksForClient1 = Collections.emptyList();
- final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_01, TASK_12);
+ final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_0_1, TASK_1_2);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -478,7 +475,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask12OnClient2 = 40;
final int balanceFactor = 2;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
twoStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -490,16 +487,16 @@ public class DefaultStateConstrainedBalancedAssignorTest {
twoClientsToNumberOfStreamThreads(1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_01);
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_12);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_1);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_1_2);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
/**
* This test shows that in an assigment of one client the assumption that the set of tasks which are caught-up on
* the given client is followed by the set of tasks that are not caught-up on the given client does NOT hold.
- * In fact, in this test, at some point during the execution of the algorithm the assignment for CLIENT_2
- * contains TASK_34 followed by TASK_23. TASK_23 is caught-up on CLIENT_2 whereas TASK_34 is not.
+ * In fact, in this test, at some point during the execution of the algorithm the assignment for UUID_2
+ * contains TASK_3_4 followed by TASK_2_3. TASK_2_3 is caught-up on UUID_2 whereas TASK_3_4 is not.
*/
@Test
public void shouldEvenlyDistributeTasksOrderOfCaughtUpAndNotCaughtUpTaskIsMixedUpInIntermediateResults() {
@@ -517,7 +514,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask34OnClient3 = 100;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
fourStatefulTasksToThreeRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -537,9 +534,9 @@ public class DefaultStateConstrainedBalancedAssignorTest {
threeClientsToNumberOfStreamThreads(1, 1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_01, TASK_12);
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_23);
- final List<TaskId> assignedTasksForClient3 = Collections.singletonList(TASK_34);
+ final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_1, TASK_1_2);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_2_3);
+ final List<TaskId> assignedTasksForClient3 = Collections.singletonList(TASK_3_4);
assertThat(
assignment,
is(expectedAssignmentForThreeClients(assignedTasksForClient1, assignedTasksForClient2, assignedTasksForClient3))
@@ -562,7 +559,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask34OnClient3 = 90;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
fourStatefulTasksToThreeRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -582,9 +579,9 @@ public class DefaultStateConstrainedBalancedAssignorTest {
threeClientsToNumberOfStreamThreads(1, 1, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_34);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_3_4);
final List<TaskId> assignedTasksForClient2 = Collections.emptyList();
- final List<TaskId> assignedTasksForClient3 = Arrays.asList(TASK_01, TASK_23, TASK_12);
+ final List<TaskId> assignedTasksForClient3 = Arrays.asList(TASK_0_1, TASK_2_3, TASK_1_2);
assertThat(
assignment,
is(expectedAssignmentForThreeClients(assignedTasksForClient1, assignedTasksForClient2, assignedTasksForClient3))
@@ -601,7 +598,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask23OnClient2 = 0;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
threeStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -615,8 +612,8 @@ public class DefaultStateConstrainedBalancedAssignorTest {
twoClientsToNumberOfStreamThreads(1, 2)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_01);
- final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_12, TASK_23);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_1);
+ final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_1_2, TASK_2_3);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -632,7 +629,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask34OnClient2 = 0;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
fourStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -648,8 +645,8 @@ public class DefaultStateConstrainedBalancedAssignorTest {
twoClientsToNumberOfStreamThreads(1, 2)
);
- final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_01, TASK_23);
- final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_12, TASK_34);
+ final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_1, TASK_2_3);
+ final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_1_2, TASK_3_4);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -661,7 +658,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask12OnClient2 = 0;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
twoStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -673,8 +670,8 @@ public class DefaultStateConstrainedBalancedAssignorTest {
twoClientsToNumberOfStreamThreads(2, 1)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_01);
- final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_12);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_1);
+ final List<TaskId> assignedTasksForClient2 = Collections.singletonList(TASK_1_2);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
@@ -690,7 +687,7 @@ public class DefaultStateConstrainedBalancedAssignorTest {
final long rankForTask34OnClient2 = 0;
final int balanceFactor = 1;
- final Map<String, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor<String>().assign(
+ final Map<UUID, List<TaskId>> assignment = new DefaultStateConstrainedBalancedAssignor().assign(
fourStatefulTasksToTwoRankedClients(
rankForTask01OnClient1,
rankForTask01OnClient2,
@@ -706,194 +703,194 @@ public class DefaultStateConstrainedBalancedAssignorTest {
twoClientsToNumberOfStreamThreads(1, 4)
);
- final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_01);
- final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_12, TASK_34, TASK_23);
+ final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_1);
+ final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_1_2, TASK_3_4, TASK_2_3);
assertThat(assignment, is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2)));
}
- private static Map<String, Integer> twoClientsToNumberOfStreamThreads(final int numberOfStreamThread1,
- final int numberOfStreamThread2) {
+ private static Map<UUID, Integer> twoClientsToNumberOfStreamThreads(final int numberOfStreamThread1,
+ final int numberOfStreamThread2) {
return mkMap(
- mkEntry(CLIENT_1, numberOfStreamThread1),
- mkEntry(CLIENT_2, numberOfStreamThread2)
+ mkEntry(UUID_1, numberOfStreamThread1),
+ mkEntry(UUID_2, numberOfStreamThread2)
);
}
- private static Map<String, Integer> threeClientsToNumberOfStreamThreads(final int numberOfStreamThread1,
- final int numberOfStreamThread2,
- final int numberOfStreamThread3) {
+ private static Map<UUID, Integer> threeClientsToNumberOfStreamThreads(final int numberOfStreamThread1,
+ final int numberOfStreamThread2,
+ final int numberOfStreamThread3) {
return mkMap(
- mkEntry(CLIENT_1, numberOfStreamThread1),
- mkEntry(CLIENT_2, numberOfStreamThread2),
- mkEntry(CLIENT_3, numberOfStreamThread3)
+ mkEntry(UUID_1, numberOfStreamThread1),
+ mkEntry(UUID_2, numberOfStreamThread2),
+ mkEntry(UUID_3, numberOfStreamThread3)
);
}
- private static SortedMap<TaskId, SortedSet<RankedClient<String>>> oneStatefulTasksToTwoRankedClients(final long rankOfClient1,
- final long rankOfClient2) {
- final SortedSet<RankedClient<String>> rankedClients01 = new TreeSet<>();
- rankedClients01.add(new RankedClient<>(CLIENT_1, rankOfClient1));
- rankedClients01.add(new RankedClient<>(CLIENT_2, rankOfClient2));
+ private static SortedMap<TaskId, SortedSet<RankedClient>> oneStatefulTasksToTwoRankedClients(final long rankOfClient1,
+ final long rankOfClient2) {
+ final SortedSet<RankedClient> rankedClients01 = new TreeSet<>();
+ rankedClients01.add(new RankedClient(UUID_1, rankOfClient1));
+ rankedClients01.add(new RankedClient(UUID_2, rankOfClient2));
return new TreeMap<>(
- mkMap(mkEntry(TASK_01, rankedClients01))
+ mkMap(mkEntry(TASK_0_1, rankedClients01))
);
}
- private static SortedMap<TaskId, SortedSet<RankedClient<String>>> twoStatefulTasksToTwoRankedClients(final long rankForTask01OnClient1,
- final long rankForTask01OnClient2,
- final long rankForTask12OnClient1,
- final long rankForTask12OnClient2) {
- final SortedSet<RankedClient<String>> rankedClients01 = new TreeSet<>();
- rankedClients01.add(new RankedClient<>(CLIENT_1, rankForTask01OnClient1));
- rankedClients01.add(new RankedClient<>(CLIENT_2, rankForTask01OnClient2));
- final SortedSet<RankedClient<String>> rankedClients12 = new TreeSet<>();
- rankedClients12.add(new RankedClient<>(CLIENT_1, rankForTask12OnClient1));
- rankedClients12.add(new RankedClient<>(CLIENT_2, rankForTask12OnClient2));
+ private static SortedMap<TaskId, SortedSet<RankedClient>> twoStatefulTasksToTwoRankedClients(final long rankForTask01OnClient1,
+ final long rankForTask01OnClient2,
+ final long rankForTask12OnClient1,
+ final long rankForTask12OnClient2) {
+ final SortedSet<RankedClient> rankedClients01 = new TreeSet<>();
+ rankedClients01.add(new RankedClient(UUID_1, rankForTask01OnClient1));
+ rankedClients01.add(new RankedClient(UUID_2, rankForTask01OnClient2));
+ final SortedSet<RankedClient> rankedClients12 = new TreeSet<>();
+ rankedClients12.add(new RankedClient(UUID_1, rankForTask12OnClient1));
+ rankedClients12.add(new RankedClient(UUID_2, rankForTask12OnClient2));
return new TreeMap<>(
mkMap(
- mkEntry(TASK_01, rankedClients01),
- mkEntry(TASK_12, rankedClients12)
+ mkEntry(TASK_0_1, rankedClients01),
+ mkEntry(TASK_1_2, rankedClients12)
)
);
}
- private static SortedMap<TaskId, SortedSet<RankedClient<String>>> threeStatefulTasksToTwoRankedClients(final long rankForTask01OnClient1,
- final long rankForTask01OnClient2,
- final long rankForTask12OnClient1,
- final long rankForTask12OnClient2,
- final long rankForTask23OnClient1,
- final long rankForTask23OnClient2) {
- final SortedSet<RankedClient<String>> rankedClients01 = new TreeSet<>();
- rankedClients01.add(new RankedClient<>(CLIENT_1, rankForTask01OnClient1));
- rankedClients01.add(new RankedClient<>(CLIENT_2, rankForTask01OnClient2));
- final SortedSet<RankedClient<String>> rankedClients12 = new TreeSet<>();
- rankedClients12.add(new RankedClient<>(CLIENT_1, rankForTask12OnClient1));
- rankedClients12.add(new RankedClient<>(CLIENT_2, rankForTask12OnClient2));
- final SortedSet<RankedClient<String>> rankedClients23 = new TreeSet<>();
- rankedClients23.add(new RankedClient<>(CLIENT_1, rankForTask23OnClient1));
- rankedClients23.add(new RankedClient<>(CLIENT_2, rankForTask23OnClient2));
+ private static SortedMap<TaskId, SortedSet<RankedClient>> threeStatefulTasksToTwoRankedClients(final long rankForTask01OnClient1,
+ final long rankForTask01OnClient2,
+ final long rankForTask12OnClient1,
+ final long rankForTask12OnClient2,
+ final long rankForTask23OnClient1,
+ final long rankForTask23OnClient2) {
+ final SortedSet<RankedClient> rankedClients01 = new TreeSet<>();
+ rankedClients01.add(new RankedClient(UUID_1, rankForTask01OnClient1));
+ rankedClients01.add(new RankedClient(UUID_2, rankForTask01OnClient2));
+ final SortedSet<RankedClient> rankedClients12 = new TreeSet<>();
+ rankedClients12.add(new RankedClient(UUID_1, rankForTask12OnClient1));
+ rankedClients12.add(new RankedClient(UUID_2, rankForTask12OnClient2));
+ final SortedSet<RankedClient> rankedClients23 = new TreeSet<>();
+ rankedClients23.add(new RankedClient(UUID_1, rankForTask23OnClient1));
+ rankedClients23.add(new RankedClient(UUID_2, rankForTask23OnClient2));
return new TreeMap<>(
mkMap(
- mkEntry(TASK_01, rankedClients01),
- mkEntry(TASK_12, rankedClients12),
- mkEntry(TASK_23, rankedClients23)
+ mkEntry(TASK_0_1, rankedClients01),
+ mkEntry(TASK_1_2, rankedClients12),
+ mkEntry(TASK_2_3, rankedClients23)
)
);
}
- private static SortedMap<TaskId, SortedSet<RankedClient<String>>> threeStatefulTasksToThreeRankedClients(final long rankForTask01OnClient1,
- final long rankForTask01OnClient2,
- final long rankForTask01OnClient3,
- final long rankForTask12OnClient1,
- final long rankForTask12OnClient2,
- final long rankForTask12OnClient3,
- final long rankForTask23OnClient1,
- final long rankForTask23OnClient2,
- final long rankForTask23OnClient3) {
- final SortedSet<RankedClient<String>> rankedClients01 = new TreeSet<>();
- rankedClients01.add(new RankedClient<>(CLIENT_1, rankForTask01OnClient1));
- rankedClients01.add(new RankedClient<>(CLIENT_2, rankForTask01OnClient2));
- rankedClients01.add(new RankedClient<>(CLIENT_3, rankForTask01OnClient3));
- final SortedSet<RankedClient<String>> rankedClients12 = new TreeSet<>();
- rankedClients12.add(new RankedClient<>(CLIENT_1, rankForTask12OnClient1));
- rankedClients12.add(new RankedClient<>(CLIENT_2, rankForTask12OnClient2));
- rankedClients12.add(new RankedClient<>(CLIENT_3, rankForTask12OnClient3));
- final SortedSet<RankedClient<String>> rankedClients23 = new TreeSet<>();
- rankedClients23.add(new RankedClient<>(CLIENT_1, rankForTask23OnClient1));
- rankedClients23.add(new RankedClient<>(CLIENT_2, rankForTask23OnClient2));
- rankedClients23.add(new RankedClient<>(CLIENT_3, rankForTask23OnClient3));
+ private static SortedMap<TaskId, SortedSet<RankedClient>> threeStatefulTasksToThreeRankedClients(final long rankForTask01OnClient1,
+ final long rankForTask01OnClient2,
+ final long rankForTask01OnClient3,
+ final long rankForTask12OnClient1,
+ final long rankForTask12OnClient2,
+ final long rankForTask12OnClient3,
+ final long rankForTask23OnClient1,
+ final long rankForTask23OnClient2,
+ final long rankForTask23OnClient3) {
+ final SortedSet<RankedClient> rankedClients01 = new TreeSet<>();
+ rankedClients01.add(new RankedClient(UUID_1, rankForTask01OnClient1));
+ rankedClients01.add(new RankedClient(UUID_2, rankForTask01OnClient2));
+ rankedClients01.add(new RankedClient(UUID_3, rankForTask01OnClient3));
+ final SortedSet<RankedClient> rankedClients12 = new TreeSet<>();
+ rankedClients12.add(new RankedClient(UUID_1, rankForTask12OnClient1));
+ rankedClients12.add(new RankedClient(UUID_2, rankForTask12OnClient2));
+ rankedClients12.add(new RankedClient(UUID_3, rankForTask12OnClient3));
+ final SortedSet<RankedClient> rankedClients23 = new TreeSet<>();
+ rankedClients23.add(new RankedClient(UUID_1, rankForTask23OnClient1));
+ rankedClients23.add(new RankedClient(UUID_2, rankForTask23OnClient2));
+ rankedClients23.add(new RankedClient(UUID_3, rankForTask23OnClient3));
return new TreeMap<>(
mkMap(
- mkEntry(TASK_01, rankedClients01),
- mkEntry(TASK_12, rankedClients12),
- mkEntry(TASK_23, rankedClients23)
+ mkEntry(TASK_0_1, rankedClients01),
+ mkEntry(TASK_1_2, rankedClients12),
+ mkEntry(TASK_2_3, rankedClients23)
)
);
}
- private static SortedMap<TaskId, SortedSet<RankedClient<String>>> fourStatefulTasksToTwoRankedClients(final long rankForTask01OnClient1,
- final long rankForTask01OnClient2,
- final long rankForTask12OnClient1,
- final long rankForTask12OnClient2,
- final long rankForTask23OnClient1,
- final long rankForTask23OnClient2,
- final long rankForTask34OnClient1,
- final long rankForTask34OnClient2) {
- final SortedSet<RankedClient<String>> rankedClients01 = new TreeSet<>();
- rankedClients01.add(new RankedClient<>(CLIENT_1, rankForTask01OnClient1));
- rankedClients01.add(new RankedClient<>(CLIENT_2, rankForTask01OnClient2));
- final SortedSet<RankedClient<String>> rankedClients12 = new TreeSet<>();
- rankedClients12.add(new RankedClient<>(CLIENT_1, rankForTask12OnClient1));
- rankedClients12.add(new RankedClient<>(CLIENT_2, rankForTask12OnClient2));
- final SortedSet<RankedClient<String>> rankedClients23 = new TreeSet<>();
- rankedClients23.add(new RankedClient<>(CLIENT_1, rankForTask23OnClient1));
- rankedClients23.add(new RankedClient<>(CLIENT_2, rankForTask23OnClient2));
- final SortedSet<RankedClient<String>> rankedClients34 = new TreeSet<>();
- rankedClients34.add(new RankedClient<>(CLIENT_1, rankForTask34OnClient1));
- rankedClients34.add(new RankedClient<>(CLIENT_2, rankForTask34OnClient2));
+ private static SortedMap<TaskId, SortedSet<RankedClient>> fourStatefulTasksToTwoRankedClients(final long rankForTask01OnClient1,
+ final long rankForTask01OnClient2,
+ final long rankForTask12OnClient1,
+ final long rankForTask12OnClient2,
+ final long rankForTask23OnClient1,
+ final long rankForTask23OnClient2,
+ final long rankForTask34OnClient1,
+ final long rankForTask34OnClient2) {
+ final SortedSet<RankedClient> rankedClients01 = new TreeSet<>();
+ rankedClients01.add(new RankedClient(UUID_1, rankForTask01OnClient1));
+ rankedClients01.add(new RankedClient(UUID_2, rankForTask01OnClient2));
+ final SortedSet<RankedClient> rankedClients12 = new TreeSet<>();
+ rankedClients12.add(new RankedClient(UUID_1, rankForTask12OnClient1));
+ rankedClients12.add(new RankedClient(UUID_2, rankForTask12OnClient2));
+ final SortedSet<RankedClient> rankedClients23 = new TreeSet<>();
+ rankedClients23.add(new RankedClient(UUID_1, rankForTask23OnClient1));
+ rankedClients23.add(new RankedClient(UUID_2, rankForTask23OnClient2));
+ final SortedSet<RankedClient> rankedClients34 = new TreeSet<>();
+ rankedClients34.add(new RankedClient(UUID_1, rankForTask34OnClient1));
+ rankedClients34.add(new RankedClient(UUID_2, rankForTask34OnClient2));
return new TreeMap<>(
mkMap(
- mkEntry(TASK_01, rankedClients01),
- mkEntry(TASK_12, rankedClients12),
- mkEntry(TASK_23, rankedClients23),
- mkEntry(TASK_34, rankedClients34)
+ mkEntry(TASK_0_1, rankedClients01),
+ mkEntry(TASK_1_2, rankedClients12),
+ mkEntry(TASK_2_3, rankedClients23),
+ mkEntry(TASK_3_4, rankedClients34)
)
);
}
- private static SortedMap<TaskId, SortedSet<RankedClient<String>>> fourStatefulTasksToThreeRankedClients(final long rankForTask01OnClient1,
- final long rankForTask01OnClient2,
- final long rankForTask01OnClient3,
- final long rankForTask12OnClient1,
- final long rankForTask12OnClient2,
- final long rankForTask12OnClient3,
- final long rankForTask23OnClient1,
- final long rankForTask23OnClient2,
- final long rankForTask23OnClient3,
- final long rankForTask34OnClient1,
- final long rankForTask34OnClient2,
- final long rankForTask34OnClient3) {
- final SortedSet<RankedClient<String>> rankedClients01 = new TreeSet<>();
- rankedClients01.add(new RankedClient<>(CLIENT_1, rankForTask01OnClient1));
- rankedClients01.add(new RankedClient<>(CLIENT_2, rankForTask01OnClient2));
- rankedClients01.add(new RankedClient<>(CLIENT_3, rankForTask01OnClient3));
- final SortedSet<RankedClient<String>> rankedClients12 = new TreeSet<>();
- rankedClients12.add(new RankedClient<>(CLIENT_1, rankForTask12OnClient1));
- rankedClients12.add(new RankedClient<>(CLIENT_2, rankForTask12OnClient2));
- rankedClients12.add(new RankedClient<>(CLIENT_3, rankForTask12OnClient3));
- final SortedSet<RankedClient<String>> rankedClients23 = new TreeSet<>();
- rankedClients23.add(new RankedClient<>(CLIENT_1, rankForTask23OnClient1));
- rankedClients23.add(new RankedClient<>(CLIENT_2, rankForTask23OnClient2));
- rankedClients23.add(new RankedClient<>(CLIENT_3, rankForTask23OnClient3));
- final SortedSet<RankedClient<String>> rankedClients34 = new TreeSet<>();
- rankedClients34.add(new RankedClient<>(CLIENT_1, rankForTask34OnClient1));
- rankedClients34.add(new RankedClient<>(CLIENT_2, rankForTask34OnClient2));
- rankedClients34.add(new RankedClient<>(CLIENT_3, rankForTask34OnClient3));
+ private static SortedMap<TaskId, SortedSet<RankedClient>> fourStatefulTasksToThreeRankedClients(final long rankForTask01OnClient1,
+ final long rankForTask01OnClient2,
+ final long rankForTask01OnClient3,
+ final long rankForTask12OnClient1,
+ final long rankForTask12OnClient2,
+ final long rankForTask12OnClient3,
+ final long rankForTask23OnClient1,
+ final long rankForTask23OnClient2,
+ final long rankForTask23OnClient3,
+ final long rankForTask34OnClient1,
+ final long rankForTask34OnClient2,
+ final long rankForTask34OnClient3) {
+ final SortedSet<RankedClient> rankedClients01 = new TreeSet<>();
+ rankedClients01.add(new RankedClient(UUID_1, rankForTask01OnClient1));
+ rankedClients01.add(new RankedClient(UUID_2, rankForTask01OnClient2));
+ rankedClients01.add(new RankedClient(UUID_3, rankForTask01OnClient3));
+ final SortedSet<RankedClient> rankedClients12 = new TreeSet<>();
+ rankedClients12.add(new RankedClient(UUID_1, rankForTask12OnClient1));
+ rankedClients12.add(new RankedClient(UUID_2, rankForTask12OnClient2));
+ rankedClients12.add(new RankedClient(UUID_3, rankForTask12OnClient3));
+ final SortedSet<RankedClient> rankedClients23 = new TreeSet<>();
+ rankedClients23.add(new RankedClient(UUID_1, rankForTask23OnClient1));
+ rankedClients23.add(new RankedClient(UUID_2, rankForTask23OnClient2));
+ rankedClients23.add(new RankedClient(UUID_3, rankForTask23OnClient3));
+ final SortedSet<RankedClient> rankedClients34 = new TreeSet<>();
+ rankedClients34.add(new RankedClient(UUID_1, rankForTask34OnClient1));
+ rankedClients34.add(new RankedClient(UUID_2, rankForTask34OnClient2));
+ rankedClients34.add(new RankedClient(UUID_3, rankForTask34OnClient3));
return new TreeMap<>(
mkMap(
- mkEntry(TASK_01, rankedClients01),
- mkEntry(TASK_12, rankedClients12),
- mkEntry(TASK_23, rankedClients23),
- mkEntry(TASK_34, rankedClients34)
+ mkEntry(TASK_0_1, rankedClients01),
+ mkEntry(TASK_1_2, rankedClients12),
+ mkEntry(TASK_2_3, rankedClients23),
+ mkEntry(TASK_3_4, rankedClients34)
)
);
}
- private static Map<String, List<TaskId>> expectedAssignmentForTwoClients(final List<TaskId> assignedTasksForClient1,
- final List<TaskId> assignedTasksForClient2) {
+ private static Map<UUID, List<TaskId>> expectedAssignmentForTwoClients(final List<TaskId> assignedTasksForClient1,
+ final List<TaskId> assignedTasksForClient2) {
return mkMap(
- mkEntry(CLIENT_1, assignedTasksForClient1),
- mkEntry(CLIENT_2, assignedTasksForClient2)
+ mkEntry(UUID_1, assignedTasksForClient1),
+ mkEntry(UUID_2, assignedTasksForClient2)
);
}
- private static Map<String, List<TaskId>> expectedAssignmentForThreeClients(final List<TaskId> assignedTasksForClient1,
- final List<TaskId> assignedTasksForClient2,
- final List<TaskId> assignedTasksForClient3) {
+ private static Map<UUID, List<TaskId>> expectedAssignmentForThreeClients(final List<TaskId> assignedTasksForClient1,
+ final List<TaskId> assignedTasksForClient2,
+ final List<TaskId> assignedTasksForClient3) {
return mkMap(
- mkEntry(CLIENT_1, assignedTasksForClient1),
- mkEntry(CLIENT_2, assignedTasksForClient2),
- mkEntry(CLIENT_3, assignedTasksForClient3)
+ mkEntry(UUID_1, assignedTasksForClient1),
+ mkEntry(UUID_2, assignedTasksForClient2),
+ mkEntry(UUID_3, assignedTasksForClient3)
);
}
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
index 5db6e36..1b00dc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
@@ -16,40 +16,41 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
-import static java.util.Arrays.asList;
-import static java.util.Collections.emptyList;
-import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.apache.kafka.common.utils.Utils.mkSortedSet;
-import static org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor.buildClientRankingsByTask;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_3;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_3;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_3;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
import static org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor.computeBalanceFactor;
-import static org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor.getMovements;
-import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
-import java.util.SortedSet;
+import java.util.UUID;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor.Movement;
-import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor.RankedClient;
import org.easymock.EasyMock;
import org.junit.Test;
@@ -60,33 +61,15 @@ public class HighAvailabilityTaskAssignorTest {
private int numStandbyReplicas = 0;
private long probingRebalanceInterval = 60 * 1000L;
- private Map<String, ClientState> clientStates = new HashMap<>();
+ private Map<UUID, ClientState> clientStates = new HashMap<>();
private Set<TaskId> allTasks = new HashSet<>();
private Set<TaskId> statefulTasks = new HashSet<>();
- private static final TaskId TASK_0_0 = new TaskId(0, 0);
- private static final TaskId TASK_0_1 = new TaskId(0, 1);
- private static final TaskId TASK_0_2 = new TaskId(0, 2);
- private static final TaskId TASK_0_3 = new TaskId(0, 3);
- private static final TaskId TASK_1_0 = new TaskId(1, 0);
- private static final TaskId TASK_1_1 = new TaskId(1, 1);
- private static final TaskId TASK_1_2 = new TaskId(1, 2);
- private static final TaskId TASK_1_3 = new TaskId(1, 3);
- private static final TaskId TASK_2_0 = new TaskId(2, 0);
- private static final TaskId TASK_2_1 = new TaskId(2, 1);
- private static final TaskId TASK_2_3 = new TaskId(2, 3);
-
- private static final String ID_1 = "client1";
- private static final String ID_2 = "client2";
- private static final String ID_3 = "client3";
-
private ClientState client1;
private ClientState client2;
private ClientState client3;
-
- private static final Set<TaskId> EMPTY_TASKS = emptySet();
-
- private HighAvailabilityTaskAssignor<String> taskAssignor;
+
+ private HighAvailabilityTaskAssignor taskAssignor;
private void createTaskAssignor() {
final AssignmentConfigs configs = new AssignmentConfigs(
@@ -96,7 +79,7 @@ public class HighAvailabilityTaskAssignorTest {
numStandbyReplicas,
probingRebalanceInterval
);
- taskAssignor = new HighAvailabilityTaskAssignor<>(
+ taskAssignor = new HighAvailabilityTaskAssignor(
clientStates,
allTasks,
statefulTasks,
@@ -104,216 +87,13 @@ public class HighAvailabilityTaskAssignorTest {
}
@Test
- public void shouldRankPreviousClientAboveEquallyCaughtUpClient() {
- client1 = EasyMock.createNiceMock(ClientState.class);
- client2 = EasyMock.createNiceMock(ClientState.class);
- expect(client1.lagFor(TASK_0_0)).andReturn(Task.LATEST_OFFSET);
- expect(client2.lagFor(TASK_0_0)).andReturn(0L);
- replay(client1, client2);
-
- final SortedSet<RankedClient<String>> expectedClientRanking = mkSortedSet(
- new RankedClient<>(ID_1, Task.LATEST_OFFSET),
- new RankedClient<>(ID_2, 0L)
- );
-
- final Map<String, ClientState> states = mkMap(
- mkEntry(ID_1, client1),
- mkEntry(ID_2, client2)
- );
-
- final Map<TaskId, SortedSet<RankedClient<String>>> statefulTasksToRankedCandidates =
- buildClientRankingsByTask(singleton(TASK_0_0), states, acceptableRecoveryLag);
-
- final SortedSet<RankedClient<String>> clientRanking = statefulTasksToRankedCandidates.get(TASK_0_0);
-
- EasyMock.verify(client1, client2);
- assertThat(clientRanking, equalTo(expectedClientRanking));
- }
-
- @Test
- public void shouldRankTaskWithUnknownOffsetSumBelowCaughtUpClientAndClientWithLargeLag() {
- client1 = EasyMock.createNiceMock(ClientState.class);
- client2 = EasyMock.createNiceMock(ClientState.class);
- client3 = EasyMock.createNiceMock(ClientState.class);
- expect(client1.lagFor(TASK_0_0)).andReturn(UNKNOWN_OFFSET_SUM);
- expect(client2.lagFor(TASK_0_0)).andReturn(50L);
- expect(client3.lagFor(TASK_0_0)).andReturn(500L);
- replay(client1, client2, client3);
-
- final SortedSet<RankedClient<String>> expectedClientRanking = mkSortedSet(
- new RankedClient<>(ID_2, 0L),
- new RankedClient<>(ID_1, 1L),
- new RankedClient<>(ID_3, 500L)
- );
-
- final Map<String, ClientState> states = mkMap(
- mkEntry(ID_1, client1),
- mkEntry(ID_2, client2),
- mkEntry(ID_3, client3)
- );
-
- final Map<TaskId, SortedSet<RankedClient<String>>> statefulTasksToRankedCandidates =
- buildClientRankingsByTask(singleton(TASK_0_0), states, acceptableRecoveryLag);
-
- final SortedSet<RankedClient<String>> clientRanking = statefulTasksToRankedCandidates.get(TASK_0_0);
-
- EasyMock.verify(client1, client2, client3);
- assertThat(clientRanking, equalTo(expectedClientRanking));
- }
-
- @Test
- public void shouldRankAllClientsWithinAcceptableRecoveryLagWithRank0() {
- client1 = EasyMock.createNiceMock(ClientState.class);
- client2 = EasyMock.createNiceMock(ClientState.class);
- expect(client1.lagFor(TASK_0_0)).andReturn(100L);
- expect(client2.lagFor(TASK_0_0)).andReturn(0L);
- replay(client1, client2);
-
- final SortedSet<RankedClient<String>> expectedClientRanking = mkSortedSet(
- new RankedClient<>(ID_1, 0L),
- new RankedClient<>(ID_2, 0L)
- );
-
- final Map<String, ClientState> states = mkMap(
- mkEntry(ID_1, client1),
- mkEntry(ID_2, client2)
- );
-
- final Map<TaskId, SortedSet<RankedClient<String>>> statefulTasksToRankedCandidates =
- buildClientRankingsByTask(singleton(TASK_0_0), states, acceptableRecoveryLag);
-
- EasyMock.verify(client1, client2);
- assertThat(statefulTasksToRankedCandidates.get(TASK_0_0), equalTo(expectedClientRanking));
- }
-
- @Test
- public void shouldRankNotCaughtUpClientsAccordingToLag() {
- client1 = EasyMock.createNiceMock(ClientState.class);
- client2 = EasyMock.createNiceMock(ClientState.class);
- client3 = EasyMock.createNiceMock(ClientState.class);
- expect(client1.lagFor(TASK_0_0)).andReturn(900L);
- expect(client2.lagFor(TASK_0_0)).andReturn(800L);
- expect(client3.lagFor(TASK_0_0)).andReturn(500L);
- replay(client1, client2, client3);
-
- final SortedSet<RankedClient<String>> expectedClientRanking = mkSortedSet(
- new RankedClient<>(ID_3, 500L),
- new RankedClient<>(ID_2, 800L),
- new RankedClient<>(ID_1, 900L)
- );
-
- final Map<String, ClientState> states = mkMap(
- mkEntry(ID_1, client1),
- mkEntry(ID_2, client2),
- mkEntry(ID_3, client3)
- );
-
- final Map<TaskId, SortedSet<RankedClient<String>>> statefulTasksToRankedCandidates =
- buildClientRankingsByTask(singleton(TASK_0_0), states, acceptableRecoveryLag);
-
- EasyMock.verify(client1, client2, client3);
- assertThat(statefulTasksToRankedCandidates.get(TASK_0_0), equalTo(expectedClientRanking));
- }
-
- @Test
- public void shouldReturnEmptyClientRankingsWithNoStatefulTasks() {
- client1 = EasyMock.createNiceMock(ClientState.class);
- client2 = EasyMock.createNiceMock(ClientState.class);
-
- final Map<String, ClientState> states = mkMap(
- mkEntry(ID_1, client1),
- mkEntry(ID_2, client2)
- );
-
- assertTrue(buildClientRankingsByTask(EMPTY_TASKS, states, acceptableRecoveryLag).isEmpty());
- }
-
- @Test
- public void shouldGetMovementsFromStateConstrainedToBalancedAssignment() {
- maxWarmupReplicas = Integer.MAX_VALUE;
- final Map<String, List<TaskId>> stateConstrainedAssignment = mkMap(
- mkEntry(ID_1, asList(TASK_0_0, TASK_1_2)),
- mkEntry(ID_2, asList(TASK_0_1, TASK_1_0)),
- mkEntry(ID_3, asList(TASK_0_2, TASK_1_1))
- );
- final Map<String, List<TaskId>> balancedAssignment = mkMap(
- mkEntry(ID_1, asList(TASK_0_0, TASK_1_0)),
- mkEntry(ID_2, asList(TASK_0_1, TASK_1_1)),
- mkEntry(ID_3, asList(TASK_0_2, TASK_1_2))
- );
- final Queue<Movement<String>> expectedMovements = new LinkedList<>();
- expectedMovements.add(new Movement<>(TASK_1_2, ID_1, ID_3));
- expectedMovements.add(new Movement<>(TASK_1_0, ID_2, ID_1));
- expectedMovements.add(new Movement<>(TASK_1_1, ID_3, ID_2));
-
- assertThat(getMovements(stateConstrainedAssignment, balancedAssignment, maxWarmupReplicas), equalTo(expectedMovements));
- }
-
- @Test
- public void shouldOnlyGetUpToMaxWarmupReplicaMovements() {
- maxWarmupReplicas = 1;
- final Map<String, List<TaskId>> stateConstrainedAssignment = mkMap(
- mkEntry(ID_1, asList(TASK_0_0, TASK_1_2)),
- mkEntry(ID_2, asList(TASK_0_1, TASK_1_0)),
- mkEntry(ID_3, asList(TASK_0_2, TASK_1_1))
- );
- final Map<String, List<TaskId>> balancedAssignment = mkMap(
- mkEntry(ID_1, asList(TASK_0_0, TASK_1_0)),
- mkEntry(ID_2, asList(TASK_0_1, TASK_1_1)),
- mkEntry(ID_3, asList(TASK_0_2, TASK_1_2))
- );
- final Queue<Movement<String>> expectedMovements = new LinkedList<>();
- expectedMovements.add(new Movement<>(TASK_1_2, ID_1, ID_3));
-
- assertThat(getMovements(stateConstrainedAssignment, balancedAssignment, maxWarmupReplicas), equalTo(expectedMovements));
- }
-
- @Test
- public void shouldReturnEmptyMovementsWhenPassedEmptyTaskAssignments() {
- final Map<String, List<TaskId>> stateConstrainedAssignment = mkMap(
- mkEntry(ID_1, emptyList()),
- mkEntry(ID_2, emptyList())
- );
- final Map<String, List<TaskId>> balancedAssignment = mkMap(
- mkEntry(ID_1, emptyList()),
- mkEntry(ID_2, emptyList())
- );
- assertTrue(getMovements(stateConstrainedAssignment, balancedAssignment, maxWarmupReplicas).isEmpty());
- }
-
- @Test
- public void shouldReturnEmptyMovementsWhenPassedIdenticalTaskAssignments() {
- final Map<String, List<TaskId>> stateConstrainedAssignment = mkMap(
- mkEntry(ID_1, asList(TASK_0_0, TASK_1_0)),
- mkEntry(ID_2, asList(TASK_0_1, TASK_1_1))
- );
- final Map<String, List<TaskId>> balancedAssignment = mkMap(
- mkEntry(ID_1, asList(TASK_0_0, TASK_1_0)),
- mkEntry(ID_2, asList(TASK_0_1, TASK_1_1))
- );
- assertTrue(getMovements(stateConstrainedAssignment, balancedAssignment, maxWarmupReplicas).isEmpty());
- }
-
- @Test
- public void shouldThrowIllegalStateExceptionIfAssignmentsAreOfDifferentSize() {
- final Map<String, List<TaskId>> stateConstrainedAssignment = mkMap(
- mkEntry(ID_1, asList(TASK_0_0, TASK_0_1))
- );
- final Map<String, List<TaskId>> balancedAssignment = mkMap(
- mkEntry(ID_1, asList(TASK_0_0, TASK_1_0)),
- mkEntry(ID_2, asList(TASK_0_1, TASK_1_1))
- );
- assertThrows(IllegalStateException.class, () -> getMovements(stateConstrainedAssignment, balancedAssignment, maxWarmupReplicas));
- }
-
- @Test
public void shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() {
client1 = EasyMock.createNiceMock(ClientState.class);
expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0));
expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS);
replay(client1);
allTasks = mkSet(TASK_0_0, TASK_0_1);
- clientStates = singletonMap(ID_1, client1);
+ clientStates = singletonMap(UUID_1, client1);
createTaskAssignor();
assertFalse(taskAssignor.previousAssignmentIsValid());
@@ -327,7 +107,7 @@ public class HighAvailabilityTaskAssignorTest {
replay(client1);
allTasks = mkSet(TASK_0_0);
statefulTasks = mkSet(TASK_0_0);
- clientStates = singletonMap(ID_1, client1);
+ clientStates = singletonMap(UUID_1, client1);
numStandbyReplicas = 1;
createTaskAssignor();
@@ -350,8 +130,8 @@ public class HighAvailabilityTaskAssignorTest {
allTasks = mkSet(TASK_0_0, TASK_0_1);
statefulTasks = mkSet(TASK_0_0);
clientStates = mkMap(
- mkEntry(ID_1, client1),
- mkEntry(ID_2, client2)
+ mkEntry(UUID_1, client1),
+ mkEntry(UUID_2, client2)
);
createTaskAssignor();
@@ -374,8 +154,8 @@ public class HighAvailabilityTaskAssignorTest {
allTasks = mkSet(TASK_0_0, TASK_0_1);
statefulTasks = mkSet(TASK_0_0);
clientStates = mkMap(
- mkEntry(ID_1, client1),
- mkEntry(ID_2, client2)
+ mkEntry(UUID_1, client1),
+ mkEntry(UUID_2, client2)
);
createTaskAssignor();
@@ -389,10 +169,10 @@ public class HighAvailabilityTaskAssignorTest {
replay(client1);
allTasks = mkSet(TASK_0_0);
statefulTasks = mkSet(TASK_0_0);
- clientStates = singletonMap(ID_1, client1);
+ clientStates = singletonMap(UUID_1, client1);
createTaskAssignor();
- assertTrue(taskAssignor.taskIsCaughtUpOnClient(TASK_0_0, ID_1));
+ assertTrue(taskAssignor.taskIsCaughtUpOnClient(TASK_0_0, UUID_1));
}
@Test
@@ -401,11 +181,11 @@ public class HighAvailabilityTaskAssignorTest {
expect(client1.lagFor(TASK_0_0)).andReturn(0L);
allTasks = mkSet(TASK_0_0);
statefulTasks = mkSet(TASK_0_0);
- clientStates = singletonMap(ID_1, client1);
+ clientStates = singletonMap(UUID_1, client1);
replay(client1);
createTaskAssignor();
- assertTrue(taskAssignor.taskIsCaughtUpOnClient(TASK_0_0, ID_1));
+ assertTrue(taskAssignor.taskIsCaughtUpOnClient(TASK_0_0, UUID_1));
}
@Test
@@ -418,12 +198,12 @@ public class HighAvailabilityTaskAssignorTest {
allTasks = mkSet(TASK_0_0);
statefulTasks = mkSet(TASK_0_0);
clientStates = mkMap(
- mkEntry(ID_1, client1),
- mkEntry(ID_2, client2)
+ mkEntry(UUID_1, client1),
+ mkEntry(UUID_2, client2)
);
createTaskAssignor();
- assertFalse(taskAssignor.taskIsCaughtUpOnClient(TASK_0_0, ID_1));
+ assertFalse(taskAssignor.taskIsCaughtUpOnClient(TASK_0_0, UUID_1));
}
@Test
@@ -432,8 +212,8 @@ public class HighAvailabilityTaskAssignorTest {
client2 = EasyMock.createNiceMock(ClientState.class);
client3 = EasyMock.createNiceMock(ClientState.class);
final Set<ClientState> states = mkSet(client1, client2, client3);
- final Set<TaskId> statefulTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_2_0,
- TASK_2_1, TASK_2_3);
+ final Set<TaskId> statefulTasks =
+ mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_2_0, TASK_2_1, TASK_2_3);
expect(client1.capacity()).andReturn(1);
expect(client1.prevActiveTasks()).andReturn(mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3));
@@ -454,8 +234,8 @@ public class HighAvailabilityTaskAssignorTest {
client2 = EasyMock.createNiceMock(ClientState.class);
client3 = EasyMock.createNiceMock(ClientState.class);
final Set<ClientState> states = mkSet(client1, client2, client3);
- final Set<TaskId> statefulTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_2_0,
- TASK_2_1, TASK_2_3);
+ final Set<TaskId> statefulTasks =
+ mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_2_0, TASK_2_1, TASK_2_3);
// client 1: 4 tasks per thread
expect(client1.capacity()).andReturn(1);
@@ -729,16 +509,16 @@ public class HighAvailabilityTaskAssignorTest {
assertThat(client2.standbyTaskCount(), equalTo(1));
}
- private Map<String, ClientState> getClientStatesWithOneClient() {
- return singletonMap(ID_1, client1);
+ private Map<UUID, ClientState> getClientStatesWithOneClient() {
+ return singletonMap(UUID_1, client1);
}
- private Map<String, ClientState> getClientStatesWithTwoClients() {
- return mkMap(mkEntry(ID_1, client1), mkEntry(ID_2, client2));
+ private Map<UUID, ClientState> getClientStatesWithTwoClients() {
+ return mkMap(mkEntry(UUID_1, client1), mkEntry(UUID_2, client2));
}
- private Map<String, ClientState> getClientStatesWithThreeClients() {
- return mkMap(mkEntry(ID_1, client1), mkEntry(ID_2, client2), mkEntry(ID_3, client3));
+ private Map<UUID, ClientState> getClientStatesWithThreeClients() {
+ return mkMap(mkEntry(UUID_1, client1), mkEntry(UUID_2, client2), mkEntry(UUID_3, client3));
}
private static void assertHasNoActiveTasks(final ClientState... clients) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RankedClientTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RankedClientTest.java
new file mode 100644
index 0000000..f3067d3
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RankedClientTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSortedSet;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
+import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.buildClientRankingsByTask;
+import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.UUID;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.Task;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+public class RankedClientTest {
+
+ private static final long ACCEPTABLE_RECOVERY_LAG = 100L;
+
+ private ClientState client1 = EasyMock.createNiceMock(ClientState.class);
+ private ClientState client2 = EasyMock.createNiceMock(ClientState.class);
+ private ClientState client3 = EasyMock.createNiceMock(ClientState.class);
+
+ @Test
+ public void shouldRankPreviousClientAboveEquallyCaughtUpClient() {
+ expect(client1.lagFor(TASK_0_0)).andReturn(Task.LATEST_OFFSET);
+ expect(client2.lagFor(TASK_0_0)).andReturn(0L);
+ replay(client1, client2);
+
+ final SortedSet<RankedClient> expectedClientRanking = mkSortedSet(
+ new RankedClient(UUID_1, Task.LATEST_OFFSET),
+ new RankedClient(UUID_2, 0L)
+ );
+
+ final Map<UUID, ClientState> states = mkMap(
+ mkEntry(UUID_1, client1),
+ mkEntry(UUID_2, client2)
+ );
+
+ final Map<TaskId, SortedSet<RankedClient>> statefulTasksToRankedCandidates =
+ buildClientRankingsByTask(singleton(TASK_0_0), states, ACCEPTABLE_RECOVERY_LAG);
+
+ final SortedSet<RankedClient> clientRanking = statefulTasksToRankedCandidates.get(TASK_0_0);
+
+ EasyMock.verify(client1, client2);
+ assertThat(clientRanking, equalTo(expectedClientRanking));
+ }
+
+ @Test
+ public void shouldRankTaskWithUnknownOffsetSumBelowCaughtUpClientAndClientWithLargeLag() {
+ expect(client1.lagFor(TASK_0_0)).andReturn(UNKNOWN_OFFSET_SUM);
+ expect(client2.lagFor(TASK_0_0)).andReturn(50L);
+ expect(client3.lagFor(TASK_0_0)).andReturn(500L);
+ replay(client1, client2, client3);
+
+ final SortedSet<RankedClient> expectedClientRanking = mkSortedSet(
+ new RankedClient(UUID_2, 0L),
+ new RankedClient(UUID_1, 1L),
+ new RankedClient(UUID_3, 500L)
+ );
+
+ final Map<UUID, ClientState> states = mkMap(
+ mkEntry(UUID_1, client1),
+ mkEntry(UUID_2, client2),
+ mkEntry(UUID_3, client3)
+ );
+
+ final Map<TaskId, SortedSet<RankedClient>> statefulTasksToRankedCandidates =
+ buildClientRankingsByTask(singleton(TASK_0_0), states, ACCEPTABLE_RECOVERY_LAG);
+
+ final SortedSet<RankedClient> clientRanking = statefulTasksToRankedCandidates.get(TASK_0_0);
+
+ EasyMock.verify(client1, client2, client3);
+ assertThat(clientRanking, equalTo(expectedClientRanking));
+ }
+
+ @Test
+ public void shouldRankAllClientsWithinAcceptableRecoveryLagWithRank0() {
+ expect(client1.lagFor(TASK_0_0)).andReturn(100L);
+ expect(client2.lagFor(TASK_0_0)).andReturn(0L);
+ replay(client1, client2);
+
+ final SortedSet<RankedClient> expectedClientRanking = mkSortedSet(
+ new RankedClient(UUID_1, 0L),
+ new RankedClient(UUID_2, 0L)
+ );
+
+ final Map<UUID, ClientState> states = mkMap(
+ mkEntry(UUID_1, client1),
+ mkEntry(UUID_2, client2)
+ );
+
+ final Map<TaskId, SortedSet<RankedClient>> statefulTasksToRankedCandidates =
+ buildClientRankingsByTask(singleton(TASK_0_0), states, ACCEPTABLE_RECOVERY_LAG);
+
+ EasyMock.verify(client1, client2);
+ assertThat(statefulTasksToRankedCandidates.get(TASK_0_0), equalTo(expectedClientRanking));
+ }
+
+ @Test
+ public void shouldRankNotCaughtUpClientsAccordingToLag() {
+ expect(client1.lagFor(TASK_0_0)).andReturn(900L);
+ expect(client2.lagFor(TASK_0_0)).andReturn(800L);
+ expect(client3.lagFor(TASK_0_0)).andReturn(500L);
+ replay(client1, client2, client3);
+
+ final SortedSet<RankedClient> expectedClientRanking = mkSortedSet(
+ new RankedClient(UUID_3, 500L),
+ new RankedClient(UUID_2, 800L),
+ new RankedClient(UUID_1, 900L)
+ );
+
+ final Map<UUID, ClientState> states = mkMap(
+ mkEntry(UUID_1, client1),
+ mkEntry(UUID_2, client2),
+ mkEntry(UUID_3, client3)
+ );
+
+ final Map<TaskId, SortedSet<RankedClient>> statefulTasksToRankedCandidates =
+ buildClientRankingsByTask(singleton(TASK_0_0), states, ACCEPTABLE_RECOVERY_LAG);
+
+ EasyMock.verify(client1, client2, client3);
+ assertThat(statefulTasksToRankedCandidates.get(TASK_0_0), equalTo(expectedClientRanking));
+ }
+
+ @Test
+ public void shouldReturnEmptyClientRankingsWithNoStatefulTasks() {
+ final Map<UUID, ClientState> states = mkMap(
+ mkEntry(UUID_1, client1),
+ mkEntry(UUID_2, client2)
+ );
+
+ assertTrue(buildClientRankingsByTask(emptySet(), states, ACCEPTABLE_RECOVERY_LAG).isEmpty());
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 9162941..d241a57 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -16,13 +16,13 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
+import java.util.UUID;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.junit.Test;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -32,6 +32,28 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_3;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_4;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_5;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_6;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_3;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_3;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_4;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_5;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_6;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsIterableContaining.hasItem;
@@ -41,302 +63,281 @@ import static org.junit.Assert.assertTrue;
public class StickyTaskAssignorTest {
- private final TaskId task00 = new TaskId(0, 0);
- private final TaskId task01 = new TaskId(0, 1);
- private final TaskId task02 = new TaskId(0, 2);
- private final TaskId task03 = new TaskId(0, 3);
- private final TaskId task04 = new TaskId(0, 4);
- private final TaskId task05 = new TaskId(0, 5);
+ private final List<Integer> expectedTopicGroupIds = asList(1, 2);
- private final TaskId task10 = new TaskId(1, 0);
- private final TaskId task11 = new TaskId(1, 1);
- private final TaskId task12 = new TaskId(1, 2);
- private final TaskId task20 = new TaskId(2, 0);
- private final TaskId task21 = new TaskId(2, 1);
- private final TaskId task22 = new TaskId(2, 2);
-
- private final List<Integer> expectedTopicGroupIds = Arrays.asList(1, 2);
-
- private final Map<Integer, ClientState> clients = new TreeMap<>();
- private final Integer p1 = 1;
- private final Integer p2 = 2;
- private final Integer p3 = 3;
- private final Integer p4 = 4;
+ private final Map<UUID, ClientState> clients = new TreeMap<>();
@Test
public void shouldAssignOneActiveTaskToEachProcessWhenTaskCountSameAsProcessCount() {
- createClient(p1, 1);
- createClient(p2, 1);
- createClient(p3, 1);
+ createClient(UUID_1, 1);
+ createClient(UUID_2, 1);
+ createClient(UUID_3, 1);
- final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_0_1, TASK_0_2);
taskAssignor.assign();
- for (final Integer processId : clients.keySet()) {
+ for (final UUID processId : clients.keySet()) {
assertThat(clients.get(processId).activeTaskCount(), equalTo(1));
}
}
@Test
public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithNoStandByTasks() {
- createClient(p1, 2);
- createClient(p2, 2);
- createClient(p3, 2);
+ createClient(UUID_1, 2);
+ createClient(UUID_2, 2);
+ createClient(UUID_3, 2);
- final StickyTaskAssignor taskAssignor = createTaskAssignor(task10, task11, task22, task20, task21, task12);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_1_0, TASK_1_1, TASK_2_2, TASK_2_0, TASK_2_1, TASK_1_2);
taskAssignor.assign();
assertActiveTaskTopicGroupIdsEvenlyDistributed();
}
@Test
public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithStandByTasks() {
- createClient(p1, 2);
- createClient(p2, 2);
- createClient(p3, 2);
+ createClient(UUID_1, 2);
+ createClient(UUID_2, 2);
+ createClient(UUID_3, 2);
- final StickyTaskAssignor taskAssignor = createTaskAssignor(1, task20, task11, task12, task10, task21, task22);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(1, TASK_2_0, TASK_1_1, TASK_1_2, TASK_1_0, TASK_2_1, TASK_2_2);
taskAssignor.assign();
assertActiveTaskTopicGroupIdsEvenlyDistributed();
}
@Test
public void shouldNotMigrateActiveTaskToOtherProcess() {
- createClientWithPreviousActiveTasks(p1, 1, task00);
- createClientWithPreviousActiveTasks(p2, 1, task01);
+ createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_0);
+ createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_1);
- final StickyTaskAssignor firstAssignor = createTaskAssignor(task00, task01, task02);
+ final StickyTaskAssignor firstAssignor = createTaskAssignor(TASK_0_0, TASK_0_1, TASK_0_2);
firstAssignor.assign();
- assertThat(clients.get(p1).activeTasks(), hasItems(task00));
- assertThat(clients.get(p2).activeTasks(), hasItems(task01));
- assertThat(allActiveTasks(), equalTo(Arrays.asList(task00, task01, task02)));
+ assertThat(clients.get(UUID_1).activeTasks(), hasItems(TASK_0_0));
+ assertThat(clients.get(UUID_2).activeTasks(), hasItems(TASK_0_1));
+ assertThat(allActiveTasks(), equalTo(asList(TASK_0_0, TASK_0_1, TASK_0_2)));
clients.clear();
// flip the previous active tasks assignment around.
- createClientWithPreviousActiveTasks(p1, 1, task01);
- createClientWithPreviousActiveTasks(p2, 1, task02);
+ createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_1);
+ createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_2);
- final StickyTaskAssignor secondAssignor = createTaskAssignor(task00, task01, task02);
+ final StickyTaskAssignor secondAssignor = createTaskAssignor(TASK_0_0, TASK_0_1, TASK_0_2);
secondAssignor.assign();
- assertThat(clients.get(p1).activeTasks(), hasItems(task01));
- assertThat(clients.get(p2).activeTasks(), hasItems(task02));
- assertThat(allActiveTasks(), equalTo(Arrays.asList(task00, task01, task02)));
+ assertThat(clients.get(UUID_1).activeTasks(), hasItems(TASK_0_1));
+ assertThat(clients.get(UUID_2).activeTasks(), hasItems(TASK_0_2));
+ assertThat(allActiveTasks(), equalTo(asList(TASK_0_0, TASK_0_1, TASK_0_2)));
}
@Test
public void shouldMigrateActiveTasksToNewProcessWithoutChangingAllAssignments() {
- createClientWithPreviousActiveTasks(p1, 1, task00, task02);
- createClientWithPreviousActiveTasks(p2, 1, task01);
- createClient(p3, 1);
+ createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_0, TASK_0_2);
+ createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_1);
+ createClient(UUID_3, 1);
- final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_0_1, TASK_0_2);
taskAssignor.assign();
- assertThat(clients.get(p2).activeTasks(), equalTo(Collections.singleton(task01)));
- assertThat(clients.get(p1).activeTasks().size(), equalTo(1));
- assertThat(clients.get(p3).activeTasks().size(), equalTo(1));
- assertThat(allActiveTasks(), equalTo(Arrays.asList(task00, task01, task02)));
+ assertThat(clients.get(UUID_2).activeTasks(), equalTo(Collections.singleton(TASK_0_1)));
+ assertThat(clients.get(UUID_1).activeTasks().size(), equalTo(1));
+ assertThat(clients.get(UUID_3).activeTasks().size(), equalTo(1));
+ assertThat(allActiveTasks(), equalTo(asList(TASK_0_0, TASK_0_1, TASK_0_2)));
}
@Test
public void shouldAssignBasedOnCapacity() {
- createClient(p1, 1);
- createClient(p2, 2);
- final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02);
+ createClient(UUID_1, 1);
+ createClient(UUID_2, 2);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_0_1, TASK_0_2);
taskAssignor.assign();
- assertThat(clients.get(p1).activeTasks().size(), equalTo(1));
- assertThat(clients.get(p2).activeTasks().size(), equalTo(2));
+ assertThat(clients.get(UUID_1).activeTasks().size(), equalTo(1));
+ assertThat(clients.get(UUID_2).activeTasks().size(), equalTo(2));
}
@Test
public void shouldAssignTasksEvenlyWithUnequalTopicGroupSizes() {
+ createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_0_4, TASK_0_5, TASK_1_0);
- createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02, task03,
- task04, task05, task10);
-
- createClient(p2, 1);
+ createClient(UUID_2, 1);
- final StickyTaskAssignor taskAssignor = createTaskAssignor(task10, task00, task01, task02, task03, task04, task05);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_1_0, TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_0_4, TASK_0_5);
- final Set<TaskId> expectedClientITasks = new HashSet<>(Arrays.asList(task00, task01, task10, task05));
- final Set<TaskId> expectedClientIITasks = new HashSet<>(Arrays.asList(task02, task03, task04));
+ final Set<TaskId> expectedClientITasks = new HashSet<>(asList(TASK_0_0, TASK_0_1, TASK_1_0, TASK_0_5));
+ final Set<TaskId> expectedClientIITasks = new HashSet<>(asList(TASK_0_2, TASK_0_3, TASK_0_4));
taskAssignor.assign();
- assertThat(clients.get(p1).activeTasks(), equalTo(expectedClientITasks));
- assertThat(clients.get(p2).activeTasks(), equalTo(expectedClientIITasks));
+ assertThat(clients.get(UUID_1).activeTasks(), equalTo(expectedClientITasks));
+ assertThat(clients.get(UUID_2).activeTasks(), equalTo(expectedClientIITasks));
}
@Test
public void shouldKeepActiveTaskStickynessWhenMoreClientThanActiveTasks() {
- final int p5 = 5;
- createClientWithPreviousActiveTasks(p1, 1, task00);
- createClientWithPreviousActiveTasks(p2, 1, task02);
- createClientWithPreviousActiveTasks(p3, 1, task01);
- createClient(p4, 1);
- createClient(p5, 1);
+ createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_0);
+ createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_2);
+ createClientWithPreviousActiveTasks(UUID_3, 1, TASK_0_1);
+ createClient(UUID_4, 1);
+ createClient(UUID_5, 1);
- final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_0_1, TASK_0_2);
taskAssignor.assign();
- assertThat(clients.get(p1).activeTasks(), equalTo(Collections.singleton(task00)));
- assertThat(clients.get(p2).activeTasks(), equalTo(Collections.singleton(task02)));
- assertThat(clients.get(p3).activeTasks(), equalTo(Collections.singleton(task01)));
+ assertThat(clients.get(UUID_1).activeTasks(), equalTo(Collections.singleton(TASK_0_0)));
+ assertThat(clients.get(UUID_2).activeTasks(), equalTo(Collections.singleton(TASK_0_2)));
+ assertThat(clients.get(UUID_3).activeTasks(), equalTo(Collections.singleton(TASK_0_1)));
// change up the assignment and make sure it is still sticky
clients.clear();
- createClient(p1, 1);
- createClientWithPreviousActiveTasks(p2, 1, task00);
- createClient(p3, 1);
- createClientWithPreviousActiveTasks(p4, 1, task02);
- createClientWithPreviousActiveTasks(p5, 1, task01);
+ createClient(UUID_1, 1);
+ createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_0);
+ createClient(UUID_3, 1);
+ createClientWithPreviousActiveTasks(UUID_4, 1, TASK_0_2);
+ createClientWithPreviousActiveTasks(UUID_5, 1, TASK_0_1);
- final StickyTaskAssignor secondAssignor = createTaskAssignor(task00, task01, task02);
+ final StickyTaskAssignor secondAssignor = createTaskAssignor(TASK_0_0, TASK_0_1, TASK_0_2);
secondAssignor.assign();
- assertThat(clients.get(p2).activeTasks(), equalTo(Collections.singleton(task00)));
- assertThat(clients.get(p4).activeTasks(), equalTo(Collections.singleton(task02)));
- assertThat(clients.get(p5).activeTasks(), equalTo(Collections.singleton(task01)));
+ assertThat(clients.get(UUID_2).activeTasks(), equalTo(Collections.singleton(TASK_0_0)));
+ assertThat(clients.get(UUID_4).activeTasks(), equalTo(Collections.singleton(TASK_0_2)));
+ assertThat(clients.get(UUID_5).activeTasks(), equalTo(Collections.singleton(TASK_0_1)));
}
@Test
public void shouldAssignTasksToClientWithPreviousStandbyTasks() {
- final ClientState client1 = createClient(p1, 1);
- client1.addPreviousStandbyTasks(Utils.mkSet(task02));
- final ClientState client2 = createClient(p2, 1);
- client2.addPreviousStandbyTasks(Utils.mkSet(task01));
- final ClientState client3 = createClient(p3, 1);
- client3.addPreviousStandbyTasks(Utils.mkSet(task00));
+ final ClientState client1 = createClient(UUID_1, 1);
+ client1.addPreviousStandbyTasks(Utils.mkSet(TASK_0_2));
+ final ClientState client2 = createClient(UUID_2, 1);
+ client2.addPreviousStandbyTasks(Utils.mkSet(TASK_0_1));
+ final ClientState client3 = createClient(UUID_3, 1);
+ client3.addPreviousStandbyTasks(Utils.mkSet(TASK_0_0));
- final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_0_1, TASK_0_2);
taskAssignor.assign();
- assertThat(clients.get(p1).activeTasks(), equalTo(Collections.singleton(task02)));
- assertThat(clients.get(p2).activeTasks(), equalTo(Collections.singleton(task01)));
- assertThat(clients.get(p3).activeTasks(), equalTo(Collections.singleton(task00)));
+ assertThat(clients.get(UUID_1).activeTasks(), equalTo(Collections.singleton(TASK_0_2)));
+ assertThat(clients.get(UUID_2).activeTasks(), equalTo(Collections.singleton(TASK_0_1)));
+ assertThat(clients.get(UUID_3).activeTasks(), equalTo(Collections.singleton(TASK_0_0)));
}
@Test
public void shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() {
- final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00);
- c1.addPreviousStandbyTasks(Utils.mkSet(task01));
- final ClientState c2 = createClientWithPreviousActiveTasks(p2, 2, task02);
- c2.addPreviousStandbyTasks(Utils.mkSet(task01));
+ final ClientState c1 = createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_0);
+ c1.addPreviousStandbyTasks(Utils.mkSet(TASK_0_1));
+ final ClientState c2 = createClientWithPreviousActiveTasks(UUID_2, 2, TASK_0_2);
+ c2.addPreviousStandbyTasks(Utils.mkSet(TASK_0_1));
- final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_0_1, TASK_0_2);
taskAssignor.assign();
- assertThat(clients.get(p1).activeTasks(), equalTo(Collections.singleton(task00)));
- assertThat(clients.get(p2).activeTasks(), equalTo(Utils.mkSet(task02, task01)));
+ assertThat(clients.get(UUID_1).activeTasks(), equalTo(Collections.singleton(TASK_0_0)));
+ assertThat(clients.get(UUID_2).activeTasks(), equalTo(Utils.mkSet(TASK_0_2, TASK_0_1)));
}
@Test
public void shouldAssignStandbyTasksToDifferentClientThanCorrespondingActiveTaskIsAssingedTo() {
- createClientWithPreviousActiveTasks(p1, 1, task00);
- createClientWithPreviousActiveTasks(p2, 1, task01);
- createClientWithPreviousActiveTasks(p3, 1, task02);
- createClientWithPreviousActiveTasks(p4, 1, task03);
+ createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_0);
+ createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_1);
+ createClientWithPreviousActiveTasks(UUID_3, 1, TASK_0_2);
+ createClientWithPreviousActiveTasks(UUID_4, 1, TASK_0_3);
- final StickyTaskAssignor taskAssignor = createTaskAssignor(1, task00, task01, task02, task03);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(1, TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3);
taskAssignor.assign();
- assertThat(clients.get(p1).standbyTasks(), not(hasItems(task00)));
- assertTrue(clients.get(p1).standbyTasks().size() <= 2);
- assertThat(clients.get(p2).standbyTasks(), not(hasItems(task01)));
- assertTrue(clients.get(p2).standbyTasks().size() <= 2);
- assertThat(clients.get(p3).standbyTasks(), not(hasItems(task02)));
- assertTrue(clients.get(p3).standbyTasks().size() <= 2);
- assertThat(clients.get(p4).standbyTasks(), not(hasItems(task03)));
- assertTrue(clients.get(p4).standbyTasks().size() <= 2);
+ assertThat(clients.get(UUID_1).standbyTasks(), not(hasItems(TASK_0_0)));
+ assertTrue(clients.get(UUID_1).standbyTasks().size() <= 2);
+ assertThat(clients.get(UUID_2).standbyTasks(), not(hasItems(TASK_0_1)));
+ assertTrue(clients.get(UUID_2).standbyTasks().size() <= 2);
+ assertThat(clients.get(UUID_3).standbyTasks(), not(hasItems(TASK_0_2)));
+ assertTrue(clients.get(UUID_3).standbyTasks().size() <= 2);
+ assertThat(clients.get(UUID_4).standbyTasks(), not(hasItems(TASK_0_3)));
+ assertTrue(clients.get(UUID_4).standbyTasks().size() <= 2);
int nonEmptyStandbyTaskCount = 0;
- for (final Integer client : clients.keySet()) {
+ for (final UUID client : clients.keySet()) {
nonEmptyStandbyTaskCount += clients.get(client).standbyTasks().isEmpty() ? 0 : 1;
}
assertTrue(nonEmptyStandbyTaskCount >= 3);
- assertThat(allStandbyTasks(), equalTo(Arrays.asList(task00, task01, task02, task03)));
+ assertThat(allStandbyTasks(), equalTo(asList(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3)));
}
@Test
public void shouldAssignMultipleReplicasOfStandbyTask() {
- createClientWithPreviousActiveTasks(p1, 1, task00);
- createClientWithPreviousActiveTasks(p2, 1, task01);
- createClientWithPreviousActiveTasks(p3, 1, task02);
+ createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_0);
+ createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_1);
+ createClientWithPreviousActiveTasks(UUID_3, 1, TASK_0_2);
- final StickyTaskAssignor taskAssignor = createTaskAssignor(2, task00, task01, task02);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(2, TASK_0_0, TASK_0_1, TASK_0_2);
taskAssignor.assign();
- assertThat(clients.get(p1).standbyTasks(), equalTo(Utils.mkSet(task01, task02)));
- assertThat(clients.get(p2).standbyTasks(), equalTo(Utils.mkSet(task02, task00)));
- assertThat(clients.get(p3).standbyTasks(), equalTo(Utils.mkSet(task00, task01)));
+ assertThat(clients.get(UUID_1).standbyTasks(), equalTo(Utils.mkSet(TASK_0_1, TASK_0_2)));
+ assertThat(clients.get(UUID_2).standbyTasks(), equalTo(Utils.mkSet(TASK_0_2, TASK_0_0)));
+ assertThat(clients.get(UUID_3).standbyTasks(), equalTo(Utils.mkSet(TASK_0_0, TASK_0_1)));
}
@Test
public void shouldNotAssignStandbyTaskReplicasWhenNoClientAvailableWithoutHavingTheTaskAssigned() {
- createClient(p1, 1);
- final StickyTaskAssignor taskAssignor = createTaskAssignor(1, task00);
+ createClient(UUID_1, 1);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(1, TASK_0_0);
taskAssignor.assign();
- assertThat(clients.get(p1).standbyTasks().size(), equalTo(0));
+ assertThat(clients.get(UUID_1).standbyTasks().size(), equalTo(0));
}
@Test
public void shouldAssignActiveAndStandbyTasks() {
- createClient(p1, 1);
- createClient(p2, 1);
- createClient(p3, 1);
+ createClient(UUID_1, 1);
+ createClient(UUID_2, 1);
+ createClient(UUID_3, 1);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(1, task00, task01, task02);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(1, TASK_0_0, TASK_0_1, TASK_0_2);
taskAssignor.assign();
- assertThat(allActiveTasks(), equalTo(Arrays.asList(task00, task01, task02)));
- assertThat(allStandbyTasks(), equalTo(Arrays.asList(task00, task01, task02)));
+ assertThat(allActiveTasks(), equalTo(asList(TASK_0_0, TASK_0_1, TASK_0_2)));
+ assertThat(allStandbyTasks(), equalTo(asList(TASK_0_0, TASK_0_1, TASK_0_2)));
}
@Test
public void shouldAssignAtLeastOneTaskToEachClientIfPossible() {
- createClient(p1, 3);
- createClient(p2, 1);
- createClient(p3, 1);
+ createClient(UUID_1, 3);
+ createClient(UUID_2, 1);
+ createClient(UUID_3, 1);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task01, task02);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_0_1, TASK_0_2);
taskAssignor.assign();
- assertThat(clients.get(p1).assignedTaskCount(), equalTo(1));
- assertThat(clients.get(p2).assignedTaskCount(), equalTo(1));
- assertThat(clients.get(p3).assignedTaskCount(), equalTo(1));
+ assertThat(clients.get(UUID_1).assignedTaskCount(), equalTo(1));
+ assertThat(clients.get(UUID_2).assignedTaskCount(), equalTo(1));
+ assertThat(clients.get(UUID_3).assignedTaskCount(), equalTo(1));
}
@Test
public void shouldAssignEachActiveTaskToOneClientWhenMoreClientsThanTasks() {
- createClient(p1, 1);
- createClient(p2, 1);
- createClient(p3, 1);
- createClient(p4, 1);
- createClient(5, 1);
- createClient(6, 1);
+ createClient(UUID_1, 1);
+ createClient(UUID_2, 1);
+ createClient(UUID_3, 1);
+ createClient(UUID_4, 1);
+ createClient(UUID_5, 1);
+ createClient(UUID_6, 1);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task01, task02);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_0_1, TASK_0_2);
taskAssignor.assign();
- assertThat(allActiveTasks(), equalTo(Arrays.asList(task00, task01, task02)));
+ assertThat(allActiveTasks(), equalTo(asList(TASK_0_0, TASK_0_1, TASK_0_2)));
}
@Test
public void shouldBalanceActiveAndStandbyTasksAcrossAvailableClients() {
- createClient(p1, 1);
- createClient(p2, 1);
- createClient(p3, 1);
- createClient(p4, 1);
- createClient(5, 1);
- createClient(6, 1);
+ createClient(UUID_1, 1);
+ createClient(UUID_2, 1);
+ createClient(UUID_3, 1);
+ createClient(UUID_4, 1);
+ createClient(UUID_5, 1);
+ createClient(UUID_6, 1);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(1, task00, task01, task02);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(1, TASK_0_0, TASK_0_1, TASK_0_2);
taskAssignor.assign();
for (final ClientState clientState : clients.values()) {
@@ -346,12 +347,12 @@ public class StickyTaskAssignorTest {
@Test
public void shouldAssignMoreTasksToClientWithMoreCapacity() {
- createClient(p2, 2);
- createClient(p1, 1);
+ createClient(UUID_2, 2);
+ createClient(UUID_1, 1);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00,
- task01,
- task02,
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0,
+ TASK_0_1,
+ TASK_0_2,
new TaskId(1, 0),
new TaskId(1, 1),
new TaskId(1, 2),
@@ -363,16 +364,16 @@ public class StickyTaskAssignorTest {
new TaskId(3, 2));
taskAssignor.assign();
- assertThat(clients.get(p2).assignedTaskCount(), equalTo(8));
- assertThat(clients.get(p1).assignedTaskCount(), equalTo(4));
+ assertThat(clients.get(UUID_2).assignedTaskCount(), equalTo(8));
+ assertThat(clients.get(UUID_1).assignedTaskCount(), equalTo(4));
}
@Test
public void shouldEvenlyDistributeByTaskIdAndPartition() {
- createClient(p1, 4);
- createClient(p2, 4);
- createClient(p3, 4);
- createClient(p4, 4);
+ createClient(UUID_1, 4);
+ createClient(UUID_2, 4);
+ createClient(UUID_3, 4);
+ createClient(UUID_4, 4);
final List<TaskId> taskIds = new ArrayList<>();
final TaskId[] taskIdArray = new TaskId[16];
@@ -386,7 +387,7 @@ public class StickyTaskAssignorTest {
Collections.shuffle(taskIds);
taskIds.toArray(taskIdArray);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(taskIdArray);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(taskIdArray);
taskAssignor.assign();
Collections.sort(taskIds);
@@ -395,29 +396,30 @@ public class StickyTaskAssignorTest {
final Set<TaskId> expectedClientThreeAssignment = getExpectedTaskIdAssignment(taskIds, 2, 6, 10, 14);
final Set<TaskId> expectedClientFourAssignment = getExpectedTaskIdAssignment(taskIds, 3, 7, 11, 15);
- final Map<Integer, Set<TaskId>> sortedAssignments = sortClientAssignments(clients);
+ final Map<UUID, Set<TaskId>> sortedAssignments = sortClientAssignments(clients);
- assertThat(sortedAssignments.get(p1), equalTo(expectedClientOneAssignment));
- assertThat(sortedAssignments.get(p2), equalTo(expectedClientTwoAssignment));
- assertThat(sortedAssignments.get(p3), equalTo(expectedClientThreeAssignment));
- assertThat(sortedAssignments.get(p4), equalTo(expectedClientFourAssignment));
+ assertThat(sortedAssignments.get(UUID_1), equalTo(expectedClientOneAssignment));
+ assertThat(sortedAssignments.get(UUID_2), equalTo(expectedClientTwoAssignment));
+ assertThat(sortedAssignments.get(UUID_3), equalTo(expectedClientThreeAssignment));
+ assertThat(sortedAssignments.get(UUID_4), equalTo(expectedClientFourAssignment));
}
@Test
public void shouldNotHaveSameAssignmentOnAnyTwoHosts() {
- createClient(p1, 1);
- createClient(p2, 1);
- createClient(p3, 1);
- createClient(p4, 1);
+ final List<UUID> allUUIDs = asList(UUID_1, UUID_2, UUID_3, UUID_4);
+ createClient(UUID_1, 1);
+ createClient(UUID_2, 1);
+ createClient(UUID_3, 1);
+ createClient(UUID_4, 1);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(1, task00, task02, task01, task03);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(1, TASK_0_0, TASK_0_2, TASK_0_1, TASK_0_3);
taskAssignor.assign();
- for (int i = p1; i <= p4; i++) {
- final Set<TaskId> taskIds = clients.get(i).assignedTasks();
- for (int j = p1; j <= p4; j++) {
- if (j != i) {
- assertThat("clients shouldn't have same task assignment", clients.get(j).assignedTasks(),
+ for (final UUID uuid : allUUIDs) {
+ final Set<TaskId> taskIds = clients.get(uuid).assignedTasks();
+ for (final UUID otherUUID : allUUIDs) {
+ if (!uuid.equals(otherUUID)) {
+ assertThat("clients shouldn't have same task assignment", clients.get(otherUUID).assignedTasks(),
not(equalTo(taskIds)));
}
}
@@ -427,19 +429,20 @@ public class StickyTaskAssignorTest {
@Test
public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks() {
- createClientWithPreviousActiveTasks(p1, 1, task01, task02);
- createClientWithPreviousActiveTasks(p2, 1, task03);
- createClientWithPreviousActiveTasks(p3, 1, task00);
- createClient(p4, 1);
+ final List<UUID> allUUIDs = asList(UUID_1, UUID_2, UUID_3);
+ createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_1, TASK_0_2);
+ createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_3);
+ createClientWithPreviousActiveTasks(UUID_3, 1, TASK_0_0);
+ createClient(UUID_4, 1);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(1, task00, task02, task01, task03);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(1, TASK_0_0, TASK_0_2, TASK_0_1, TASK_0_3);
taskAssignor.assign();
- for (int i = p1; i <= p4; i++) {
- final Set<TaskId> taskIds = clients.get(i).assignedTasks();
- for (int j = p1; j <= p4; j++) {
- if (j != i) {
- assertThat("clients shouldn't have same task assignment", clients.get(j).assignedTasks(),
+ for (final UUID uuid : allUUIDs) {
+ final Set<TaskId> taskIds = clients.get(uuid).assignedTasks();
+ for (final UUID otherUUID : allUUIDs) {
+ if (!uuid.equals(otherUUID)) {
+ assertThat("clients shouldn't have same task assignment", clients.get(otherUUID).assignedTasks(),
not(equalTo(taskIds)));
}
}
@@ -449,22 +452,24 @@ public class StickyTaskAssignorTest {
@Test
public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() {
- final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task02);
- c1.addPreviousStandbyTasks(Utils.mkSet(task03, task00));
- final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task03, task00);
- c2.addPreviousStandbyTasks(Utils.mkSet(task01, task02));
+ final List<UUID> allUUIDs = asList(UUID_1, UUID_2, UUID_3, UUID_4);
- createClient(p3, 1);
- createClient(p4, 1);
+ final ClientState c1 = createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_1, TASK_0_2);
+ c1.addPreviousStandbyTasks(Utils.mkSet(TASK_0_3, TASK_0_0));
+ final ClientState c2 = createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_3, TASK_0_0);
+ c2.addPreviousStandbyTasks(Utils.mkSet(TASK_0_1, TASK_0_2));
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(1, task00, task02, task01, task03);
+ createClient(UUID_3, 1);
+ createClient(UUID_4, 1);
+
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(1, TASK_0_0, TASK_0_2, TASK_0_1, TASK_0_3);
taskAssignor.assign();
- for (int i = p1; i <= p4; i++) {
- final Set<TaskId> taskIds = clients.get(i).assignedTasks();
- for (int j = p1; j <= p4; j++) {
- if (j != i) {
- assertThat("clients shouldn't have same task assignment", clients.get(j).assignedTasks(),
+ for (final UUID uuid : allUUIDs) {
+ final Set<TaskId> taskIds = clients.get(uuid).assignedTasks();
+ for (final UUID otherUUID : allUUIDs) {
+ if (!uuid.equals(otherUUID)) {
+ assertThat("clients shouldn't have same task assignment", clients.get(otherUUID).assignedTasks(),
not(equalTo(taskIds)));
}
}
@@ -474,227 +479,208 @@ public class StickyTaskAssignorTest {
@Test
public void shouldReBalanceTasksAcrossAllClientsWhenCapacityAndTaskCountTheSame() {
- createClientWithPreviousActiveTasks(p3, 1, task00, task01, task02, task03);
- createClient(p1, 1);
- createClient(p2, 1);
- createClient(p4, 1);
+ createClientWithPreviousActiveTasks(UUID_3, 1, TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3);
+ createClient(UUID_1, 1);
+ createClient(UUID_2, 1);
+ createClient(UUID_4, 1);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task02, task01, task03);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_0_2, TASK_0_1, TASK_0_3);
taskAssignor.assign();
- assertThat(clients.get(p1).assignedTaskCount(), equalTo(1));
- assertThat(clients.get(p2).assignedTaskCount(), equalTo(1));
- assertThat(clients.get(p3).assignedTaskCount(), equalTo(1));
- assertThat(clients.get(p4).assignedTaskCount(), equalTo(1));
+ assertThat(clients.get(UUID_1).assignedTaskCount(), equalTo(1));
+ assertThat(clients.get(UUID_2).assignedTaskCount(), equalTo(1));
+ assertThat(clients.get(UUID_3).assignedTaskCount(), equalTo(1));
+ assertThat(clients.get(UUID_4).assignedTaskCount(), equalTo(1));
}
@Test
public void shouldReBalanceTasksAcrossClientsWhenCapacityLessThanTaskCount() {
- createClientWithPreviousActiveTasks(p3, 1, task00, task01, task02, task03);
- createClient(p1, 1);
- createClient(p2, 1);
+ createClientWithPreviousActiveTasks(UUID_3, 1, TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3);
+ createClient(UUID_1, 1);
+ createClient(UUID_2, 1);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task02, task01, task03);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_0_2, TASK_0_1, TASK_0_3);
taskAssignor.assign();
- assertThat(clients.get(p3).assignedTaskCount(), equalTo(2));
- assertThat(clients.get(p1).assignedTaskCount(), equalTo(1));
- assertThat(clients.get(p2).assignedTaskCount(), equalTo(1));
+ assertThat(clients.get(UUID_3).assignedTaskCount(), equalTo(2));
+ assertThat(clients.get(UUID_1).assignedTaskCount(), equalTo(1));
+ assertThat(clients.get(UUID_2).assignedTaskCount(), equalTo(1));
}
@Test
public void shouldRebalanceTasksToClientsBasedOnCapacity() {
- createClientWithPreviousActiveTasks(p2, 1, task00, task03, task02);
- createClient(p3, 2);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task02, task03);
+ createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_0, TASK_0_3, TASK_0_2);
+ createClient(UUID_3, 2);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_0_2, TASK_0_3);
taskAssignor.assign();
- assertThat(clients.get(p2).assignedTaskCount(), equalTo(1));
- assertThat(clients.get(p3).assignedTaskCount(), equalTo(2));
+ assertThat(clients.get(UUID_2).assignedTaskCount(), equalTo(1));
+ assertThat(clients.get(UUID_3).assignedTaskCount(), equalTo(2));
}
@Test
public void shouldMoveMinimalNumberOfTasksWhenPreviouslyAboveCapacityAndNewClientAdded() {
- final Set<TaskId> p1PrevTasks = Utils.mkSet(task00, task02);
- final Set<TaskId> p2PrevTasks = Utils.mkSet(task01, task03);
+ final Set<TaskId> p1PrevTasks = Utils.mkSet(TASK_0_0, TASK_0_2);
+ final Set<TaskId> p2PrevTasks = Utils.mkSet(TASK_0_1, TASK_0_3);
- createClientWithPreviousActiveTasks(p1, 1, task00, task02);
- createClientWithPreviousActiveTasks(p2, 1, task01, task03);
- createClientWithPreviousActiveTasks(p3, 1);
+ createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_0, TASK_0_2);
+ createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_1, TASK_0_3);
+ createClientWithPreviousActiveTasks(UUID_3, 1);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task02, task01, task03);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_0_2, TASK_0_1, TASK_0_3);
taskAssignor.assign();
- final Set<TaskId> p3ActiveTasks = clients.get(p3).activeTasks();
+ final Set<TaskId> p3ActiveTasks = clients.get(UUID_3).activeTasks();
assertThat(p3ActiveTasks.size(), equalTo(1));
if (p1PrevTasks.removeAll(p3ActiveTasks)) {
- assertThat(clients.get(p2).activeTasks(), equalTo(p2PrevTasks));
+ assertThat(clients.get(UUID_2).activeTasks(), equalTo(p2PrevTasks));
} else {
- assertThat(clients.get(p1).activeTasks(), equalTo(p1PrevTasks));
+ assertThat(clients.get(UUID_1).activeTasks(), equalTo(p1PrevTasks));
}
}
@Test
public void shouldNotMoveAnyTasksWhenNewTasksAdded() {
- createClientWithPreviousActiveTasks(p1, 1, task00, task01);
- createClientWithPreviousActiveTasks(p2, 1, task02, task03);
+ createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_0, TASK_0_1);
+ createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_2, TASK_0_3);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task03, task01, task04, task02, task00, task05);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_3, TASK_0_1, TASK_0_4, TASK_0_2, TASK_0_0, TASK_0_5);
taskAssignor.assign();
- assertThat(clients.get(p1).activeTasks(), hasItems(task00, task01));
- assertThat(clients.get(p2).activeTasks(), hasItems(task02, task03));
+ assertThat(clients.get(UUID_1).activeTasks(), hasItems(TASK_0_0, TASK_0_1));
+ assertThat(clients.get(UUID_2).activeTasks(), hasItems(TASK_0_2, TASK_0_3));
}
@Test
public void shouldAssignNewTasksToNewClientWhenPreviousTasksAssignedToOldClients() {
- createClientWithPreviousActiveTasks(p1, 1, task02, task01);
- createClientWithPreviousActiveTasks(p2, 1, task00, task03);
- createClient(p3, 1);
+ createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_2, TASK_0_1);
+ createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_0, TASK_0_3);
+ createClient(UUID_3, 1);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task03, task01, task04, task02, task00, task05);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_3, TASK_0_1, TASK_0_4, TASK_0_2, TASK_0_0, TASK_0_5);
taskAssignor.assign();
- assertThat(clients.get(p1).activeTasks(), hasItems(task02, task01));
- assertThat(clients.get(p2).activeTasks(), hasItems(task00, task03));
- assertThat(clients.get(p3).activeTasks(), hasItems(task04, task05));
+ assertThat(clients.get(UUID_1).activeTasks(), hasItems(TASK_0_2, TASK_0_1));
+ assertThat(clients.get(UUID_2).activeTasks(), hasItems(TASK_0_0, TASK_0_3));
+ assertThat(clients.get(UUID_3).activeTasks(), hasItems(TASK_0_4, TASK_0_5));
}
@Test
public void shouldAssignTasksNotPreviouslyActiveToNewClient() {
- final TaskId task10 = new TaskId(0, 10);
- final TaskId task11 = new TaskId(0, 11);
- final TaskId task12 = new TaskId(1, 2);
- final TaskId task13 = new TaskId(1, 3);
- final TaskId task20 = new TaskId(2, 0);
- final TaskId task21 = new TaskId(2, 1);
- final TaskId task22 = new TaskId(2, 2);
- final TaskId task23 = new TaskId(2, 3);
-
- final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task12, task13);
- c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23));
- final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task00, task11, task22);
- c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23));
- final ClientState c3 = createClientWithPreviousActiveTasks(p3, 1, task20, task21, task23);
- c3.addPreviousStandbyTasks(Utils.mkSet(task02, task12));
+ final ClientState c1 = createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_1, TASK_1_2, TASK_1_3);
+ c1.addPreviousStandbyTasks(Utils.mkSet(TASK_0_0, TASK_1_1, TASK_2_0, TASK_2_1, TASK_2_3));
+ final ClientState c2 = createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_0, TASK_1_1, TASK_2_2);
+ c2.addPreviousStandbyTasks(Utils.mkSet(TASK_0_1, TASK_1_0, TASK_0_2, TASK_2_0, TASK_0_3, TASK_1_2, TASK_2_1, TASK_1_3, TASK_2_3));
+ final ClientState c3 = createClientWithPreviousActiveTasks(UUID_3, 1, TASK_2_0, TASK_2_1, TASK_2_3);
+ c3.addPreviousStandbyTasks(Utils.mkSet(TASK_0_2, TASK_1_2));
- final ClientState newClient = createClient(p4, 1);
- newClient.addPreviousStandbyTasks(Utils.mkSet(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23));
+ final ClientState newClient = createClient(UUID_4, 1);
+ newClient.addPreviousStandbyTasks(Utils.mkSet(TASK_0_0, TASK_1_0, TASK_0_1, TASK_0_2, TASK_1_1, TASK_2_0, TASK_0_3, TASK_1_2, TASK_2_1, TASK_1_3, TASK_2_2, TASK_2_3));
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_1_0, TASK_0_1, TASK_0_2, TASK_1_1, TASK_2_0, TASK_0_3, TASK_1_2, TASK_2_1, TASK_1_3, TASK_2_2, TASK_2_3);
taskAssignor.assign();
- assertThat(c1.activeTasks(), equalTo(Utils.mkSet(task01, task12, task13)));
- assertThat(c2.activeTasks(), equalTo(Utils.mkSet(task00, task11, task22)));
- assertThat(c3.activeTasks(), equalTo(Utils.mkSet(task20, task21, task23)));
- assertThat(newClient.activeTasks(), equalTo(Utils.mkSet(task02, task03, task10)));
+ assertThat(c1.activeTasks(), equalTo(Utils.mkSet(TASK_0_1, TASK_1_2, TASK_1_3)));
+ assertThat(c2.activeTasks(), equalTo(Utils.mkSet(TASK_0_0, TASK_1_1, TASK_2_2)));
+ assertThat(c3.activeTasks(), equalTo(Utils.mkSet(TASK_2_0, TASK_2_1, TASK_2_3)));
+ assertThat(newClient.activeTasks(), equalTo(Utils.mkSet(TASK_0_2, TASK_0_3, TASK_1_0)));
}
@Test
public void shouldAssignTasksNotPreviouslyActiveToMultipleNewClients() {
- final TaskId task10 = new TaskId(0, 10);
- final TaskId task11 = new TaskId(0, 11);
- final TaskId task12 = new TaskId(1, 2);
- final TaskId task13 = new TaskId(1, 3);
- final TaskId task20 = new TaskId(2, 0);
- final TaskId task21 = new TaskId(2, 1);
- final TaskId task22 = new TaskId(2, 2);
- final TaskId task23 = new TaskId(2, 3);
-
- final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task12, task13);
- c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23));
- final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task00, task11, task22);
- c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23));
+ final ClientState c1 = createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_1, TASK_1_2, TASK_1_3);
+ c1.addPreviousStandbyTasks(Utils.mkSet(TASK_0_0, TASK_1_1, TASK_2_0, TASK_2_1, TASK_2_3));
+ final ClientState c2 = createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_0, TASK_1_1, TASK_2_2);
+ c2.addPreviousStandbyTasks(Utils.mkSet(TASK_0_1, TASK_1_0, TASK_0_2, TASK_2_0, TASK_0_3, TASK_1_2, TASK_2_1, TASK_1_3, TASK_2_3));
- final ClientState bounce1 = createClient(p3, 1);
- bounce1.addPreviousStandbyTasks(Utils.mkSet(task20, task21, task23));
+ final ClientState bounce1 = createClient(UUID_3, 1);
+ bounce1.addPreviousStandbyTasks(Utils.mkSet(TASK_2_0, TASK_2_1, TASK_2_3));
- final ClientState bounce2 = createClient(p4, 1);
- bounce2.addPreviousStandbyTasks(Utils.mkSet(task02, task03, task10));
+ final ClientState bounce2 = createClient(UUID_4, 1);
+ bounce2.addPreviousStandbyTasks(Utils.mkSet(TASK_0_2, TASK_0_3, TASK_1_0));
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_1_0, TASK_0_1, TASK_0_2, TASK_1_1, TASK_2_0, TASK_0_3, TASK_1_2, TASK_2_1, TASK_1_3, TASK_2_2, TASK_2_3);
taskAssignor.assign();
- assertThat(c1.activeTasks(), equalTo(Utils.mkSet(task01, task12, task13)));
- assertThat(c2.activeTasks(), equalTo(Utils.mkSet(task00, task11, task22)));
- assertThat(bounce1.activeTasks(), equalTo(Utils.mkSet(task20, task21, task23)));
- assertThat(bounce2.activeTasks(), equalTo(Utils.mkSet(task02, task03, task10)));
+ assertThat(c1.activeTasks(), equalTo(Utils.mkSet(TASK_0_1, TASK_1_2, TASK_1_3)));
+ assertThat(c2.activeTasks(), equalTo(Utils.mkSet(TASK_0_0, TASK_1_1, TASK_2_2)));
+ assertThat(bounce1.activeTasks(), equalTo(Utils.mkSet(TASK_2_0, TASK_2_1, TASK_2_3)));
+ assertThat(bounce2.activeTasks(), equalTo(Utils.mkSet(TASK_0_2, TASK_0_3, TASK_1_0)));
}
@Test
public void shouldAssignTasksToNewClient() {
- createClientWithPreviousActiveTasks(p1, 1, task01, task02);
- createClient(p2, 1);
- createTaskAssignor(task01, task02).assign();
- assertThat(clients.get(p1).activeTaskCount(), equalTo(1));
+ createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_1, TASK_0_2);
+ createClient(UUID_2, 1);
+ createTaskAssignor(TASK_0_1, TASK_0_2).assign();
+ assertThat(clients.get(UUID_1).activeTaskCount(), equalTo(1));
}
@Test
public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients() {
- final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02);
- final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task03, task04, task05);
- final ClientState newClient = createClient(p3, 1);
+ final ClientState c1 = createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_0, TASK_0_1, TASK_0_2);
+ final ClientState c2 = createClientWithPreviousActiveTasks(UUID_2, 1, TASK_0_3, TASK_0_4, TASK_0_5);
+ final ClientState newClient = createClient(UUID_3, 1);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task01, task02, task03, task04, task05);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_0_4, TASK_0_5);
taskAssignor.assign();
- assertThat(c1.activeTasks(), not(hasItem(task03)));
- assertThat(c1.activeTasks(), not(hasItem(task04)));
- assertThat(c1.activeTasks(), not(hasItem(task05)));
+ assertThat(c1.activeTasks(), not(hasItem(TASK_0_3)));
+ assertThat(c1.activeTasks(), not(hasItem(TASK_0_4)));
+ assertThat(c1.activeTasks(), not(hasItem(TASK_0_5)));
assertThat(c1.activeTaskCount(), equalTo(2));
- assertThat(c2.activeTasks(), not(hasItems(task00)));
- assertThat(c2.activeTasks(), not(hasItems(task01)));
- assertThat(c2.activeTasks(), not(hasItems(task02)));
+ assertThat(c2.activeTasks(), not(hasItems(TASK_0_0)));
+ assertThat(c2.activeTasks(), not(hasItems(TASK_0_1)));
+ assertThat(c2.activeTasks(), not(hasItems(TASK_0_2)));
assertThat(c2.activeTaskCount(), equalTo(2));
assertThat(newClient.activeTaskCount(), equalTo(2));
}
@Test
public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingAndBouncedClients() {
- final TaskId task06 = new TaskId(0, 6);
- final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02, task06);
- final ClientState c2 = createClient(p2, 1);
- c2.addPreviousStandbyTasks(Utils.mkSet(task03, task04, task05));
- final ClientState newClient = createClient(p3, 1);
+ final ClientState c1 = createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_6);
+ final ClientState c2 = createClient(UUID_2, 1);
+ c2.addPreviousStandbyTasks(Utils.mkSet(TASK_0_3, TASK_0_4, TASK_0_5));
+ final ClientState newClient = createClient(UUID_3, 1);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task01, task02, task03, task04, task05, task06);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_0_4, TASK_0_5, TASK_0_6);
taskAssignor.assign();
- assertThat(c1.activeTasks(), not(hasItem(task03)));
- assertThat(c1.activeTasks(), not(hasItem(task04)));
- assertThat(c1.activeTasks(), not(hasItem(task05)));
+ assertThat(c1.activeTasks(), not(hasItem(TASK_0_3)));
+ assertThat(c1.activeTasks(), not(hasItem(TASK_0_4)));
+ assertThat(c1.activeTasks(), not(hasItem(TASK_0_5)));
assertThat(c1.activeTaskCount(), equalTo(3));
- assertThat(c2.activeTasks(), not(hasItems(task00)));
- assertThat(c2.activeTasks(), not(hasItems(task01)));
- assertThat(c2.activeTasks(), not(hasItems(task02)));
+ assertThat(c2.activeTasks(), not(hasItems(TASK_0_0)));
+ assertThat(c2.activeTasks(), not(hasItems(TASK_0_1)));
+ assertThat(c2.activeTasks(), not(hasItems(TASK_0_2)));
assertThat(c2.activeTaskCount(), equalTo(2));
assertThat(newClient.activeTaskCount(), equalTo(2));
}
@Test
public void shouldViolateBalanceToPreserveActiveTaskStickiness() {
- final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02);
- final ClientState c2 = createClient(p2, 1);
+ final ClientState c1 = createClientWithPreviousActiveTasks(UUID_1, 1, TASK_0_0, TASK_0_1, TASK_0_2);
+ final ClientState c2 = createClient(UUID_2, 1);
- final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(0, true, task00, task01, task02);
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(0, true, TASK_0_0, TASK_0_1, TASK_0_2);
taskAssignor.assign();
- assertThat(c1.activeTasks(), equalTo(Utils.mkSet(task00, task01, task02)));
+ assertThat(c1.activeTasks(), equalTo(Utils.mkSet(TASK_0_0, TASK_0_1, TASK_0_2)));
assertTrue(c2.activeTasks().isEmpty());
}
- private StickyTaskAssignor<Integer> createTaskAssignor(final TaskId... tasks) {
+ private StickyTaskAssignor createTaskAssignor(final TaskId... tasks) {
return createTaskAssignor(0, false, tasks);
}
- private StickyTaskAssignor<Integer> createTaskAssignor(final int numStandbys, final TaskId... tasks) {
+ private StickyTaskAssignor createTaskAssignor(final int numStandbys, final TaskId... tasks) {
return createTaskAssignor(numStandbys, false, tasks);
}
- private StickyTaskAssignor<Integer> createTaskAssignor(final int numStandbys,
- final boolean mustPreserveActiveTaskAssignment,
- final TaskId... tasks) {
- final List<TaskId> taskIds = Arrays.asList(tasks);
+ private StickyTaskAssignor createTaskAssignor(final int numStandbys,
+ final boolean mustPreserveActiveTaskAssignment,
+ final TaskId... tasks) {
+ final List<TaskId> taskIds = asList(tasks);
Collections.shuffle(taskIds);
- return new StickyTaskAssignor<>(
+ return new StickyTaskAssignor(
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
@@ -721,11 +707,11 @@ public class StickyTaskAssignorTest {
return tasks;
}
- private ClientState createClient(final Integer processId, final int capacity) {
+ private ClientState createClient(final UUID processId, final int capacity) {
return createClientWithPreviousActiveTasks(processId, capacity);
}
- private ClientState createClientWithPreviousActiveTasks(final Integer processId, final int capacity, final TaskId... taskIds) {
+ private ClientState createClientWithPreviousActiveTasks(final UUID processId, final int capacity, final TaskId... taskIds) {
final ClientState clientState = new ClientState(capacity);
clientState.addPreviousActiveTasks(Utils.mkSet(taskIds));
clients.put(processId, clientState);
@@ -733,7 +719,7 @@ public class StickyTaskAssignorTest {
}
private void assertActiveTaskTopicGroupIdsEvenlyDistributed() {
- for (final Map.Entry<Integer, ClientState> clientStateEntry : clients.entrySet()) {
+ for (final Map.Entry<UUID, ClientState> clientStateEntry : clients.entrySet()) {
final List<Integer> topicGroupIds = new ArrayList<>();
final Set<TaskId> activeTasks = clientStateEntry.getValue().activeTasks();
for (final TaskId activeTask : activeTasks) {
@@ -744,9 +730,9 @@ public class StickyTaskAssignorTest {
}
}
- private Map<Integer, Set<TaskId>> sortClientAssignments(final Map<Integer, ClientState> clients) {
- final Map<Integer, Set<TaskId>> sortedAssignments = new HashMap<>();
- for (final Map.Entry<Integer, ClientState> entry : clients.entrySet()) {
+ private Map<UUID, Set<TaskId>> sortClientAssignments(final Map<UUID, ClientState> clients) {
+ final Map<UUID, Set<TaskId>> sortedAssignments = new HashMap<>();
+ for (final Map.Entry<UUID, ClientState> entry : clients.entrySet()) {
final Set<TaskId> sorted = new TreeSet<>(entry.getValue().activeTasks());
sortedAssignments.put(entry.getKey(), sorted);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 07bb085..b76ce59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -25,10 +25,15 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
-import java.util.UUID;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_2_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.MIN_VERSION_OFFSET_SUM_SUBSCRIPTION;
import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
@@ -38,20 +43,19 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class SubscriptionInfoTest {
- private final UUID processId = UUID.randomUUID();
private static final Set<TaskId> ACTIVE_TASKS = new HashSet<>(Arrays.asList(
- new TaskId(0, 0),
- new TaskId(0, 1),
- new TaskId(1, 0)));
+ TASK_0_0,
+ TASK_0_1,
+ TASK_1_0));
private static final Set<TaskId> STANDBY_TASKS = new HashSet<>(Arrays.asList(
- new TaskId(1, 1),
- new TaskId(2, 0)));
+ TASK_1_1,
+ TASK_2_0));
private static final Map<TaskId, Long> TASK_OFFSET_SUMS = mkMap(
- mkEntry(new TaskId(0, 0), Task.LATEST_OFFSET),
- mkEntry(new TaskId(0, 1), Task.LATEST_OFFSET),
- mkEntry(new TaskId(1, 0), Task.LATEST_OFFSET),
- mkEntry(new TaskId(1, 1), 0L),
- mkEntry(new TaskId(2, 0), 10L)
+ mkEntry(TASK_0_0, Task.LATEST_OFFSET),
+ mkEntry(TASK_0_1, Task.LATEST_OFFSET),
+ mkEntry(TASK_1_0, Task.LATEST_OFFSET),
+ mkEntry(TASK_1_1, 0L),
+ mkEntry(TASK_2_0, 10L)
);
private final static String IGNORED_USER_ENDPOINT = "ignoredUserEndpoint:80";
@@ -61,7 +65,7 @@ public class SubscriptionInfoTest {
new SubscriptionInfo(
0,
LATEST_SUPPORTED_VERSION,
- processId,
+ UUID_1,
"localhost:80",
TASK_OFFSET_SUMS
);
@@ -72,7 +76,7 @@ public class SubscriptionInfoTest {
new SubscriptionInfo(
LATEST_SUPPORTED_VERSION + 1,
LATEST_SUPPORTED_VERSION,
- processId,
+ UUID_1,
"localhost:80",
TASK_OFFSET_SUMS
);
@@ -83,14 +87,14 @@ public class SubscriptionInfoTest {
final SubscriptionInfo info = new SubscriptionInfo(
1,
LATEST_SUPPORTED_VERSION,
- processId,
+ UUID_1,
IGNORED_USER_ENDPOINT,
TASK_OFFSET_SUMS
);
final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
assertEquals(1, decoded.version());
assertEquals(SubscriptionInfo.UNKNOWN, decoded.latestSupportedVersion());
- assertEquals(processId, decoded.processId());
+ assertEquals(UUID_1, decoded.processId());
assertEquals(ACTIVE_TASKS, decoded.prevTasks());
assertEquals(STANDBY_TASKS, decoded.standbyTasks());
assertNull(decoded.userEndPoint());
@@ -101,7 +105,7 @@ public class SubscriptionInfoTest {
final SubscriptionInfo info = new SubscriptionInfo(
1,
1234,
- processId,
+ UUID_1,
"ignoreme",
TASK_OFFSET_SUMS
);
@@ -110,7 +114,7 @@ public class SubscriptionInfoTest {
final LegacySubscriptionInfoSerde decoded = LegacySubscriptionInfoSerde.decode(buffer);
assertEquals(1, decoded.version());
assertEquals(SubscriptionInfo.UNKNOWN, decoded.latestSupportedVersion());
- assertEquals(processId, decoded.processId());
+ assertEquals(UUID_1, decoded.processId());
assertEquals(ACTIVE_TASKS, decoded.prevTasks());
assertEquals(STANDBY_TASKS, decoded.standbyTasks());
assertNull(decoded.userEndPoint());
@@ -121,7 +125,7 @@ public class SubscriptionInfoTest {
final LegacySubscriptionInfoSerde info = new LegacySubscriptionInfoSerde(
1,
LATEST_SUPPORTED_VERSION,
- processId,
+ UUID_1,
ACTIVE_TASKS,
STANDBY_TASKS,
"localhost:80"
@@ -131,7 +135,7 @@ public class SubscriptionInfoTest {
final SubscriptionInfo decoded = SubscriptionInfo.decode(buffer);
assertEquals(1, decoded.version());
assertEquals(SubscriptionInfo.UNKNOWN, decoded.latestSupportedVersion());
- assertEquals(processId, decoded.processId());
+ assertEquals(UUID_1, decoded.processId());
assertEquals(ACTIVE_TASKS, decoded.prevTasks());
assertEquals(STANDBY_TASKS, decoded.standbyTasks());
assertNull(decoded.userEndPoint());
@@ -142,14 +146,14 @@ public class SubscriptionInfoTest {
final SubscriptionInfo info = new SubscriptionInfo(
2,
LATEST_SUPPORTED_VERSION,
- processId,
+ UUID_1,
"localhost:80",
TASK_OFFSET_SUMS
);
final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
assertEquals(2, decoded.version());
assertEquals(SubscriptionInfo.UNKNOWN, decoded.latestSupportedVersion());
- assertEquals(processId, decoded.processId());
+ assertEquals(UUID_1, decoded.processId());
assertEquals(ACTIVE_TASKS, decoded.prevTasks());
assertEquals(STANDBY_TASKS, decoded.standbyTasks());
assertEquals("localhost:80", decoded.userEndPoint());
@@ -160,7 +164,7 @@ public class SubscriptionInfoTest {
final SubscriptionInfo info = new SubscriptionInfo(
2,
LATEST_SUPPORTED_VERSION,
- processId,
+ UUID_1,
"localhost:80",
TASK_OFFSET_SUMS
);
@@ -169,7 +173,7 @@ public class SubscriptionInfoTest {
final LegacySubscriptionInfoSerde decoded = LegacySubscriptionInfoSerde.decode(buffer);
assertEquals(2, decoded.version());
assertEquals(SubscriptionInfo.UNKNOWN, decoded.latestSupportedVersion());
- assertEquals(processId, decoded.processId());
+ assertEquals(UUID_1, decoded.processId());
assertEquals(ACTIVE_TASKS, decoded.prevTasks());
assertEquals(STANDBY_TASKS, decoded.standbyTasks());
assertEquals("localhost:80", decoded.userEndPoint());
@@ -180,7 +184,7 @@ public class SubscriptionInfoTest {
final LegacySubscriptionInfoSerde info = new LegacySubscriptionInfoSerde(
2,
LATEST_SUPPORTED_VERSION,
- processId,
+ UUID_1,
ACTIVE_TASKS,
STANDBY_TASKS,
"localhost:80"
@@ -190,7 +194,7 @@ public class SubscriptionInfoTest {
final SubscriptionInfo decoded = SubscriptionInfo.decode(buffer);
assertEquals(2, decoded.version());
assertEquals(SubscriptionInfo.UNKNOWN, decoded.latestSupportedVersion());
- assertEquals(processId, decoded.processId());
+ assertEquals(UUID_1, decoded.processId());
assertEquals(ACTIVE_TASKS, decoded.prevTasks());
assertEquals(STANDBY_TASKS, decoded.standbyTasks());
assertEquals("localhost:80", decoded.userEndPoint());
@@ -202,14 +206,14 @@ public class SubscriptionInfoTest {
final SubscriptionInfo info = new SubscriptionInfo(
version,
LATEST_SUPPORTED_VERSION,
- processId,
+ UUID_1,
"localhost:80",
TASK_OFFSET_SUMS
);
final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
assertEquals(version, decoded.version());
assertEquals(LATEST_SUPPORTED_VERSION, decoded.latestSupportedVersion());
- assertEquals(processId, decoded.processId());
+ assertEquals(UUID_1, decoded.processId());
assertEquals(ACTIVE_TASKS, decoded.prevTasks());
assertEquals(STANDBY_TASKS, decoded.standbyTasks());
assertEquals("localhost:80", decoded.userEndPoint());
@@ -222,7 +226,7 @@ public class SubscriptionInfoTest {
final SubscriptionInfo info = new SubscriptionInfo(
version,
LATEST_SUPPORTED_VERSION,
- processId,
+ UUID_1,
"localhost:80",
TASK_OFFSET_SUMS
);
@@ -231,7 +235,7 @@ public class SubscriptionInfoTest {
final LegacySubscriptionInfoSerde decoded = LegacySubscriptionInfoSerde.decode(buffer);
assertEquals(version, decoded.version());
assertEquals(LATEST_SUPPORTED_VERSION, decoded.latestSupportedVersion());
- assertEquals(processId, decoded.processId());
+ assertEquals(UUID_1, decoded.processId());
assertEquals(ACTIVE_TASKS, decoded.prevTasks());
assertEquals(STANDBY_TASKS, decoded.standbyTasks());
assertEquals("localhost:80", decoded.userEndPoint());
@@ -244,7 +248,7 @@ public class SubscriptionInfoTest {
final LegacySubscriptionInfoSerde info = new LegacySubscriptionInfoSerde(
version,
LATEST_SUPPORTED_VERSION,
- processId,
+ UUID_1,
ACTIVE_TASKS,
STANDBY_TASKS,
"localhost:80"
@@ -255,7 +259,7 @@ public class SubscriptionInfoTest {
final String message = "for version: " + version;
assertEquals(message, version, decoded.version());
assertEquals(message, LATEST_SUPPORTED_VERSION, decoded.latestSupportedVersion());
- assertEquals(message, processId, decoded.processId());
+ assertEquals(message, UUID_1, decoded.processId());
assertEquals(message, ACTIVE_TASKS, decoded.prevTasks());
assertEquals(message, STANDBY_TASKS, decoded.standbyTasks());
assertEquals(message, "localhost:80", decoded.userEndPoint());
@@ -265,7 +269,7 @@ public class SubscriptionInfoTest {
@Test
public void shouldEncodeAndDecodeVersion5() {
final SubscriptionInfo info =
- new SubscriptionInfo(5, LATEST_SUPPORTED_VERSION, processId, "localhost:80", TASK_OFFSET_SUMS);
+ new SubscriptionInfo(5, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
assertEquals(info, SubscriptionInfo.decode(info.encode()));
}
@@ -282,23 +286,23 @@ public class SubscriptionInfoTest {
final int latestSupportedVersion = LATEST_SUPPORTED_VERSION - 1;
final SubscriptionInfo info =
- new SubscriptionInfo(usedVersion, latestSupportedVersion, processId, "localhost:80", TASK_OFFSET_SUMS);
+ new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
final SubscriptionInfo expectedInfo =
- new SubscriptionInfo(usedVersion, latestSupportedVersion, processId, "localhost:80", TASK_OFFSET_SUMS);
+ new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
}
@Test
public void shouldEncodeAndDecodeVersion7() {
final SubscriptionInfo info =
- new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, processId, "localhost:80", TASK_OFFSET_SUMS);
+ new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
assertThat(info, is(SubscriptionInfo.decode(info.encode())));
}
@Test
public void shouldConvertTaskOffsetSumMapToTaskSets() {
final SubscriptionInfo info =
- new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, processId, "localhost:80", TASK_OFFSET_SUMS);
+ new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
assertThat(info.prevTasks(), is(ACTIVE_TASKS));
assertThat(info.standbyTasks(), is(STANDBY_TASKS));
}
@@ -307,7 +311,7 @@ public class SubscriptionInfoTest {
public void shouldReturnTaskOffsetSumsMapForDecodedSubscription() {
final SubscriptionInfo info = SubscriptionInfo.decode(
new SubscriptionInfo(MIN_VERSION_OFFSET_SUM_SUBSCRIPTION,
- LATEST_SUPPORTED_VERSION, processId,
+ LATEST_SUPPORTED_VERSION, UUID_1,
"localhost:80",
TASK_OFFSET_SUMS)
.encode());
@@ -328,7 +332,7 @@ public class SubscriptionInfoTest {
new LegacySubscriptionInfoSerde(
SubscriptionInfo.MIN_VERSION_OFFSET_SUM_SUBSCRIPTION - 1,
LATEST_SUPPORTED_VERSION,
- processId,
+ UUID_1,
ACTIVE_TASKS,
STANDBY_TASKS,
"localhost:80")
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
new file mode 100644
index 0000000..b205b19
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
+import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.getMovements;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Test;
+
+public class TaskMovementTest {
+
+ @Test
+ public void shouldGetMovementsFromStateConstrainedToBalancedAssignment() {
+ final int maxWarmupReplicas = Integer.MAX_VALUE;
+ final Map<UUID, List<TaskId>> stateConstrainedAssignment = mkMap(
+ mkEntry(UUID_1, asList(TASK_0_0, TASK_1_2)),
+ mkEntry(UUID_2, asList(TASK_0_1, TASK_1_0)),
+ mkEntry(UUID_3, asList(TASK_0_2, TASK_1_1))
+ );
+ final Map<UUID, List<TaskId>> balancedAssignment = mkMap(
+ mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)),
+ mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1)),
+ mkEntry(UUID_3, asList(TASK_0_2, TASK_1_2))
+ );
+ final Queue<TaskMovement> expectedMovements = new LinkedList<>();
+ expectedMovements.add(new TaskMovement(TASK_1_2, UUID_1, UUID_3));
+ expectedMovements.add(new TaskMovement(TASK_1_0, UUID_2, UUID_1));
+ expectedMovements.add(new TaskMovement(TASK_1_1, UUID_3, UUID_2));
+
+ assertThat(getMovements(stateConstrainedAssignment, balancedAssignment, maxWarmupReplicas), equalTo(expectedMovements));
+ }
+
+ @Test
+ public void shouldOnlyGetUpToMaxWarmupReplicaMovements() {
+ final int maxWarmupReplicas = 1;
+ final Map<UUID, List<TaskId>> stateConstrainedAssignment = mkMap(
+ mkEntry(UUID_1, asList(TASK_0_0, TASK_1_2)),
+ mkEntry(UUID_2, asList(TASK_0_1, TASK_1_0)),
+ mkEntry(UUID_3, asList(TASK_0_2, TASK_1_1))
+ );
+ final Map<UUID, List<TaskId>> balancedAssignment = mkMap(
+ mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)),
+ mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1)),
+ mkEntry(UUID_3, asList(TASK_0_2, TASK_1_2))
+ );
+ final Queue<TaskMovement> expectedMovements = new LinkedList<>();
+ expectedMovements.add(new TaskMovement(TASK_1_2, UUID_1, UUID_3));
+
+ assertThat(getMovements(stateConstrainedAssignment, balancedAssignment, maxWarmupReplicas), equalTo(expectedMovements));
+ }
+
+ @Test
+ public void shouldReturnEmptyMovementsWhenPassedEmptyTaskAssignments() {
+ final int maxWarmupReplicas = 2;
+ final Map<UUID, List<TaskId>> stateConstrainedAssignment = mkMap(
+ mkEntry(UUID_1, emptyList()),
+ mkEntry(UUID_2, emptyList())
+ );
+ final Map<UUID, List<TaskId>> balancedAssignment = mkMap(
+ mkEntry(UUID_1, emptyList()),
+ mkEntry(UUID_2, emptyList())
+ );
+ assertTrue(getMovements(stateConstrainedAssignment, balancedAssignment, maxWarmupReplicas).isEmpty());
+ }
+
+ @Test
+ public void shouldReturnEmptyMovementsWhenPassedIdenticalTaskAssignments() {
+ final int maxWarmupReplicas = 2;
+ final Map<UUID, List<TaskId>> stateConstrainedAssignment = mkMap(
+ mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)),
+ mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1))
+ );
+ final Map<UUID, List<TaskId>> balancedAssignment = mkMap(
+ mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)),
+ mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1))
+ );
+ assertTrue(getMovements(stateConstrainedAssignment, balancedAssignment, maxWarmupReplicas).isEmpty());
+ }
+
+ @Test
+ public void shouldThrowIllegalStateExceptionIfAssignmentsAreOfDifferentSize() {
+ final int maxWarmupReplicas = 2;
+
+ final Map<UUID, List<TaskId>> stateConstrainedAssignment = mkMap(
+ mkEntry(UUID_1, asList(TASK_0_0, TASK_0_1))
+ );
+ final Map<UUID, List<TaskId>> balancedAssignment = mkMap(
+ mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)),
+ mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1))
+ );
+ assertThrows(IllegalStateException.class, () -> getMovements(stateConstrainedAssignment, balancedAssignment, maxWarmupReplicas));
+ }
+}