You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/01 02:15:49 UTC
[kafka] branch trunk updated: KAFKA-6145: Remove check to reuse
previous assignment (#8590)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e5217d6 KAFKA-6145: Remove check to reuse previous assignment (#8590)
e5217d6 is described below
commit e5217d6cb5eaebaf13bd4e1f98c21ad174d8afb3
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Fri May 1 04:15:00 2020 +0200
KAFKA-6145: Remove check to reuse previous assignment (#8590)
Since we cannot guarantee to reassign the correct number of
stand-by tasks when reusing the previous assignment and the
reassignment is rather a micro-optimization, it is removed
to keep the algorithm correct and simple.
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, John Roesler <vv...@apache.org>
---
.../internals/assignment/AssignmentUtils.java | 41 ------------
.../assignment/HighAvailabilityTaskAssignor.java | 77 ----------------------
.../internals/assignment/TaskMovement.java | 13 +++-
.../internals/assignment/AssignmentUtilsTest.java | 57 ----------------
.../HighAvailabilityTaskAssignorTest.java | 20 ------
5 files changed, 11 insertions(+), 197 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentUtils.java
deleted file mode 100644
index b88c24b..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentUtils.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals.assignment;
-
-import org.apache.kafka.streams.processor.TaskId;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.UUID;
-
-final class AssignmentUtils {
-
- private AssignmentUtils() {}
-
- /**
- * @return true if this client is caught-up for this task, or the task has no caught-up clients
- */
- static boolean taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final TaskId task,
- final UUID client,
- final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients) {
- final Set<UUID> caughtUpClients = tasksToCaughtUpClients.get(task);
- return caughtUpClients == null || caughtUpClients.contains(client);
- }
-
-
-}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
index 3253c26..5dbf099 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
@@ -22,7 +22,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -34,7 +33,6 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.stream.Collectors;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.buildClientRankingsByTask;
import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.tasksToCaughtUpClients;
import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
@@ -80,11 +78,6 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
tasksToCaughtUpClients = tasksToCaughtUpClients(statefulTasksToRankedCandidates);
- if (shouldUsePreviousAssignment()) {
- assignPreviousTasksToClientStates();
- return false;
- }
-
final Map<TaskId, Integer> tasksToRemainingStandbys =
statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> configs.numStandbyReplicas));
@@ -161,50 +154,6 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
}
/**
- * @return true iff all active tasks with caught-up client are assigned to one of them, and all tasks are assigned
- */
- boolean previousAssignmentIsValid() {
- final Set<TaskId> unassignedActiveTasks = new HashSet<>(allTasks);
- final Map<TaskId, Integer> unassignedStandbyTasks =
- configs.numStandbyReplicas == 0 ?
- Collections.emptyMap() :
- new HashMap<>(statefulTasksToRankedCandidates.keySet().stream()
- .collect(Collectors.toMap(task -> task, task -> configs.numStandbyReplicas)));
-
- for (final Map.Entry<UUID, ClientState> clientEntry : clientStates.entrySet()) {
- final UUID client = clientEntry.getKey();
- final ClientState state = clientEntry.getValue();
- final Set<TaskId> prevActiveTasks = state.prevActiveTasks();
-
- // Verify that this client was caught-up on all stateful active tasks
- for (final TaskId activeTask : prevActiveTasks) {
- if (!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(activeTask, client, tasksToCaughtUpClients)) {
- return false;
- }
- }
- if (!unassignedActiveTasks.containsAll(prevActiveTasks)) {
- return false;
- }
- unassignedActiveTasks.removeAll(prevActiveTasks);
-
- for (final TaskId task : state.prevStandbyTasks()) {
- final Integer remainingStandbys = unassignedStandbyTasks.get(task);
- if (remainingStandbys != null) {
- if (remainingStandbys == 1) {
- unassignedStandbyTasks.remove(task);
- } else {
- unassignedStandbyTasks.put(task, remainingStandbys - 1);
- }
- } else {
- return false;
- }
- }
-
- }
- return unassignedActiveTasks.isEmpty() && unassignedStandbyTasks.isEmpty();
- }
-
- /**
* Compute the balance factor as the difference in stateful active task count per thread between the most and
* least loaded clients
*/
@@ -227,30 +176,4 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
return maxActiveStatefulTasksPerThreadCount - minActiveStatefulTasksPerThreadCount;
}
-
- /**
- * Determines whether to use the new proposed assignment or just return the group's previous assignment. The
- * previous assignment will be chosen and returned iff all of the following are true:
- * 1) it satisfies the state constraint, ie all tasks with caught up clients are assigned to one of those clients
- * 2) it satisfies the balance factor
- * 3) there are no unassigned tasks (eg due to a client that dropped out of the group)
- * 4) there are no warmup tasks
- */
- private boolean shouldUsePreviousAssignment() {
- if (previousAssignmentIsValid()) {
- final int previousAssignmentBalanceFactor =
- computeBalanceFactor(clientStates.values(), statefulTasks);
- return previousAssignmentBalanceFactor <= configs.balanceFactor;
- } else {
- return false;
- }
- }
-
- private void assignPreviousTasksToClientStates() {
- for (final ClientState clientState : clientStates.values()) {
- clientState.assignActiveTasks(clientState.prevActiveTasks());
- clientState.assignStandbyTasks(clientState.prevStandbyTasks());
- }
- }
-
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
index 1bc2751..003fa36 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
@@ -16,10 +16,9 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
-
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
@@ -42,6 +41,16 @@ class TaskMovement {
}
/**
+ * @return true if this client is caught-up for this task, or the task has no caught-up clients
+ */
+ private static boolean taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final TaskId task,
+ final UUID client,
+ final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients) {
+ final Set<UUID> caughtUpClients = tasksToCaughtUpClients.get(task);
+ return caughtUpClients == null || caughtUpClients.contains(client);
+ }
+
+ /**
* @return whether any warmup replicas were assigned
*/
static boolean assignTaskMovements(final Map<UUID, List<TaskId>> statefulActiveTaskAssignment,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentUtilsTest.java
deleted file mode 100644
index 0644b50..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentUtilsTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals.assignment;
-
-import static java.util.Collections.emptyMap;
-import static org.apache.kafka.common.utils.Utils.mkSortedSet;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.UUID;
-import org.apache.kafka.streams.processor.TaskId;
-import org.junit.Test;
-
-public class AssignmentUtilsTest {
-
- @Test
- public void shouldReturnTrueIfTaskHasNoCaughtUpClients() {
- assertTrue(taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(TASK_0_0, UUID_1, emptyMap()));
- }
-
- @Test
- public void shouldReturnTrueIfTaskIsCaughtUpOnClient() {
- final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = new HashMap<>();
- tasksToCaughtUpClients.put(TASK_0_0, mkSortedSet(UUID_1));
-
- assertTrue(taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(TASK_0_0, UUID_1, tasksToCaughtUpClients));
- }
-
- @Test
- public void shouldReturnFalseIfTaskWasNotCaughtUpOnClientButCaughtUpClientsExist() {
- final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = new HashMap<>();
- tasksToCaughtUpClients.put(TASK_0_0, mkSortedSet(UUID_2));
-
- assertFalse(taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(TASK_0_0, UUID_1, tasksToCaughtUpClients));
- }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
index 17d7c17..d3fcbbe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
@@ -131,26 +131,6 @@ public class HighAvailabilityTaskAssignorTest {
}
@Test
- public void shouldReusePreviousAssignmentIfItIsAlreadyBalanced() {
- final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1);
- final Set<TaskId> statefulTasks = mkSet(TASK_0_0);
- final ClientState client1 = new ClientState(singleton(TASK_0_0), emptySet(), singletonMap(TASK_0_0, 0L), 1);
- final ClientState client2 =
- new ClientState(singleton(TASK_0_1), emptySet(), mkMap(mkEntry(TASK_0_0, 0L), mkEntry(TASK_0_1, 0L)), 1);
- final Map<UUID, ClientState> clientStates = mkMap(
- mkEntry(UUID_1, client1),
- mkEntry(UUID_2, client2)
- );
-
- final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
-
- assertThat(clientStates.get(UUID_1).activeTasks(), is(singleton(TASK_0_0)));
- assertThat(clientStates.get(UUID_2).activeTasks(), is(singleton(TASK_0_1)));
- assertThat(probingRebalanceNeeded, is(false));
- }
-
- @Test
public void shouldComputeBalanceFactorAsDifferenceBetweenMostAndLeastLoadedClients() {
final ClientState client1 = EasyMock.createNiceMock(ClientState.class);
final ClientState client2 = EasyMock.createNiceMock(ClientState.class);