You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/09/02 10:16:43 UTC

[kafka] branch 3.3 updated: HOTFIX: fix PriorityQueue iteration to assign warmups in priority order (#12585)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 387bcf4d80 HOTFIX: fix PriorityQueue iteration to assign warmups in priority order (#12585)
387bcf4d80 is described below

commit 387bcf4d80779270a99be9e056ddc0ae3a369044
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Sep 2 03:14:34 2022 -0700

    HOTFIX: fix PriorityQueue iteration to assign warmups in priority order (#12585)
    
    Based on a patch submitted to the confluentinc fork & then abandoned. Needed some updates and minor expansion but more or less just re-applied the changes proposed in confluentinc#697.
    
    Original PR has a very detailed justification for these changes but the tl;dr of it is that apparently the PriorityQueue's iterator does not actually guarantee to return elements in priority order.
    
    Reviewer: Luke Chen <sh...@gmail.com>
---
 .../streams/processor/internals/assignment/TaskMovement.java  |  6 ++++--
 .../processor/internals/assignment/TaskMovementTest.java      | 11 +++--------
 2 files changed, 7 insertions(+), 10 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
index 38e64276ba..ec0fa5e11e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
@@ -107,7 +107,8 @@ final class TaskMovement {
 
         final int movementsNeeded = taskMovements.size();
 
-        for (final TaskMovement movement : taskMovements) {
+        while (!taskMovements.isEmpty()) {
+            final TaskMovement movement = taskMovements.poll();
             // Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most
             // caught up client.
             final boolean moved = tryToSwapStandbyAndActiveOnCaughtUpClient(clientStates, caughtUpClientsByTaskLoad, movement) ||
@@ -157,7 +158,8 @@ final class TaskMovement {
 
         int movementsNeeded = 0;
 
-        for (final TaskMovement movement : taskMovements) {
+        while (!taskMovements.isEmpty()) {
+            final TaskMovement movement = taskMovements.poll();
             final Function<UUID, Boolean> eligibleClientPredicate =
                     clientId -> !clientStates.get(clientId).hasAssignedTask(movement.task);
             UUID sourceClient = caughtUpClientsByTaskLoad.poll(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
index baf6d18496..a337deb801 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
@@ -244,15 +244,10 @@ public class TaskMovementTest {
         assertThat(client2, hasProperty("activeTasks", ClientState::activeTasks, mkSet(TASK_0_2)));
         assertThat(client3, hasProperty("activeTasks", ClientState::activeTasks, mkSet(TASK_0_1)));
 
-        // we should only assign one warmup, but it could be either one that needs to be migrated.
+        // we should only assign one warmup, and the task movement should have the highest priority
         assertThat(client1, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet()));
-        try {
-            assertThat(client2, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet(TASK_0_1)));
-            assertThat(client3, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet()));
-        } catch (final AssertionError ignored) {
-            assertThat(client2, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet()));
-            assertThat(client3, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet(TASK_0_2)));
-        }
+        assertThat(client2, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet(TASK_0_1)));
+        assertThat(client3, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet()));
     }
 
     @Test