You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/01 02:15:49 UTC

[kafka] branch trunk updated: KAFKA-6145: Remove check to reuse previous assignment (#8590)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e5217d6  KAFKA-6145: Remove check to reuse previous assignment (#8590)
e5217d6 is described below

commit e5217d6cb5eaebaf13bd4e1f98c21ad174d8afb3
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Fri May 1 04:15:00 2020 +0200

    KAFKA-6145: Remove check to reuse previous assignment (#8590)
    
    Since we cannot guarantee to reassign the correct number of
    stand-by tasks when reusing the previous assignment and the
    reassignment is rather a micro-optimization, it is removed
    to keep the algorithm correct and simple.
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, John Roesler <vv...@apache.org>
---
 .../internals/assignment/AssignmentUtils.java      | 41 ------------
 .../assignment/HighAvailabilityTaskAssignor.java   | 77 ----------------------
 .../internals/assignment/TaskMovement.java         | 13 +++-
 .../internals/assignment/AssignmentUtilsTest.java  | 57 ----------------
 .../HighAvailabilityTaskAssignorTest.java          | 20 ------
 5 files changed, 11 insertions(+), 197 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentUtils.java
deleted file mode 100644
index b88c24b..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentUtils.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals.assignment;
-
-import org.apache.kafka.streams.processor.TaskId;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.UUID;
-
-final class AssignmentUtils {
-
-    private AssignmentUtils() {}
-
-    /**
-     * @return true if this client is caught-up for this task, or the task has no caught-up clients
-     */
-    static boolean taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final TaskId task,
-                                                                  final UUID client,
-                                                                  final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients) {
-        final Set<UUID> caughtUpClients = tasksToCaughtUpClients.get(task);
-        return caughtUpClients == null || caughtUpClients.contains(client);
-    }
-
-
-}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
index 3253c26..5dbf099 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
@@ -22,7 +22,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -34,7 +33,6 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
 import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.buildClientRankingsByTask;
 import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.tasksToCaughtUpClients;
 import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
@@ -80,11 +78,6 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
         tasksToCaughtUpClients = tasksToCaughtUpClients(statefulTasksToRankedCandidates);
 
 
-        if (shouldUsePreviousAssignment()) {
-            assignPreviousTasksToClientStates();
-            return false;
-        }
-
         final Map<TaskId, Integer> tasksToRemainingStandbys =
             statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> configs.numStandbyReplicas));
 
@@ -161,50 +154,6 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
     }
 
     /**
-     * @return true iff all active tasks with caught-up client are assigned to one of them, and all tasks are assigned
-     */
-    boolean previousAssignmentIsValid() {
-        final Set<TaskId> unassignedActiveTasks = new HashSet<>(allTasks);
-        final Map<TaskId, Integer> unassignedStandbyTasks =
-            configs.numStandbyReplicas == 0 ?
-                Collections.emptyMap() :
-                new HashMap<>(statefulTasksToRankedCandidates.keySet().stream()
-                                  .collect(Collectors.toMap(task -> task, task -> configs.numStandbyReplicas)));
-
-        for (final Map.Entry<UUID, ClientState> clientEntry : clientStates.entrySet()) {
-            final UUID client = clientEntry.getKey();
-            final ClientState state = clientEntry.getValue();
-            final Set<TaskId> prevActiveTasks = state.prevActiveTasks();
-
-            // Verify that this client was caught-up on all stateful active tasks
-            for (final TaskId activeTask : prevActiveTasks) {
-                if (!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(activeTask, client, tasksToCaughtUpClients)) {
-                    return false;
-                }
-            }
-            if (!unassignedActiveTasks.containsAll(prevActiveTasks)) {
-                return false;
-            }
-            unassignedActiveTasks.removeAll(prevActiveTasks);
-
-            for (final TaskId task : state.prevStandbyTasks()) {
-                final Integer remainingStandbys = unassignedStandbyTasks.get(task);
-                if (remainingStandbys != null) {
-                    if (remainingStandbys == 1) {
-                        unassignedStandbyTasks.remove(task);
-                    } else {
-                        unassignedStandbyTasks.put(task, remainingStandbys - 1);
-                    }
-                } else {
-                    return false;
-                }
-            }
-
-        }
-        return unassignedActiveTasks.isEmpty() && unassignedStandbyTasks.isEmpty();
-    }
-
-    /**
      * Compute the balance factor as the difference in stateful active task count per thread between the most and
      * least loaded clients
      */
@@ -227,30 +176,4 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
 
         return maxActiveStatefulTasksPerThreadCount - minActiveStatefulTasksPerThreadCount;
     }
-
-    /**
-     * Determines whether to use the new proposed assignment or just return the group's previous assignment. The
-     * previous assignment will be chosen and returned iff all of the following are true:
-     *   1) it satisfies the state constraint, ie all tasks with caught up clients are assigned to one of those clients
-     *   2) it satisfies the balance factor
-     *   3) there are no unassigned tasks (eg due to a client that dropped out of the group)
-     *   4) there are no warmup tasks
-     */
-    private boolean shouldUsePreviousAssignment() {
-        if (previousAssignmentIsValid()) {
-            final int previousAssignmentBalanceFactor =
-                computeBalanceFactor(clientStates.values(), statefulTasks);
-            return previousAssignmentBalanceFactor <= configs.balanceFactor;
-        } else {
-            return false;
-        }
-    }
-
-    private void assignPreviousTasksToClientStates() {
-        for (final ClientState clientState : clientStates.values()) {
-            clientState.assignActiveTasks(clientState.prevActiveTasks());
-            clientState.assignStandbyTasks(clientState.prevStandbyTasks());
-        }
-    }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
index 1bc2751..003fa36 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
@@ -16,10 +16,9 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
-
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
@@ -42,6 +41,16 @@ class TaskMovement {
     }
 
     /**
+     * @return true if this client is caught-up for this task, or the task has no caught-up clients
+     */
+    private static boolean taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final TaskId task,
+                                                                          final UUID client,
+                                                                          final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients) {
+        final Set<UUID> caughtUpClients = tasksToCaughtUpClients.get(task);
+        return caughtUpClients == null || caughtUpClients.contains(client);
+    }
+
+    /**
      * @return whether any warmup replicas were assigned
      */
     static boolean assignTaskMovements(final Map<UUID, List<TaskId>> statefulActiveTaskAssignment,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentUtilsTest.java
deleted file mode 100644
index 0644b50..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentUtilsTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals.assignment;
-
-import static java.util.Collections.emptyMap;
-import static org.apache.kafka.common.utils.Utils.mkSortedSet;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.UUID;
-import org.apache.kafka.streams.processor.TaskId;
-import org.junit.Test;
-
-public class AssignmentUtilsTest {
-
-    @Test
-    public void shouldReturnTrueIfTaskHasNoCaughtUpClients() {
-        assertTrue(taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(TASK_0_0, UUID_1, emptyMap()));
-    }
-
-    @Test
-    public void shouldReturnTrueIfTaskIsCaughtUpOnClient() {
-        final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = new HashMap<>();
-        tasksToCaughtUpClients.put(TASK_0_0, mkSortedSet(UUID_1));
-
-        assertTrue(taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(TASK_0_0, UUID_1, tasksToCaughtUpClients));
-    }
-
-    @Test
-    public void shouldReturnFalseIfTaskWasNotCaughtUpOnClientButCaughtUpClientsExist() {
-        final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = new HashMap<>();
-        tasksToCaughtUpClients.put(TASK_0_0, mkSortedSet(UUID_2));
-
-        assertFalse(taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(TASK_0_0, UUID_1, tasksToCaughtUpClients));
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
index 17d7c17..d3fcbbe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
@@ -131,26 +131,6 @@ public class HighAvailabilityTaskAssignorTest {
     }
 
     @Test
-    public void shouldReusePreviousAssignmentIfItIsAlreadyBalanced() {
-        final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1);
-        final Set<TaskId> statefulTasks = mkSet(TASK_0_0);
-        final ClientState client1 = new ClientState(singleton(TASK_0_0), emptySet(), singletonMap(TASK_0_0, 0L), 1);
-        final ClientState client2 =
-            new ClientState(singleton(TASK_0_1), emptySet(), mkMap(mkEntry(TASK_0_0, 0L), mkEntry(TASK_0_1, 0L)), 1);
-        final Map<UUID, ClientState> clientStates = mkMap(
-            mkEntry(UUID_1, client1),
-            mkEntry(UUID_2, client2)
-        );
-
-        final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
-
-        assertThat(clientStates.get(UUID_1).activeTasks(), is(singleton(TASK_0_0)));
-        assertThat(clientStates.get(UUID_2).activeTasks(), is(singleton(TASK_0_1)));
-        assertThat(probingRebalanceNeeded, is(false));
-    }
-
-    @Test
     public void shouldComputeBalanceFactorAsDifferenceBetweenMostAndLeastLoadedClients() {
         final ClientState client1 = EasyMock.createNiceMock(ClientState.class);
         final ClientState client2 = EasyMock.createNiceMock(ClientState.class);