You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/09/02 10:16:43 UTC
[kafka] branch 3.3 updated: HOTFIX: fix PriorityQueue iteration to assign warmups in priority order (#12585)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 387bcf4d80 HOTFIX: fix PriorityQueue iteration to assign warmups in priority order (#12585)
387bcf4d80 is described below
commit 387bcf4d80779270a99be9e056ddc0ae3a369044
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Sep 2 03:14:34 2022 -0700
HOTFIX: fix PriorityQueue iteration to assign warmups in priority order (#12585)
Based on a patch submitted to the confluentinc fork & then abandoned. Needed some updates and minor expansion but more or less just re-applied the changes proposed in confluentinc#697.
Original PR has a very detailed justification for these changes but the tl;dr of it is that apparently the PriorityQueue's iterator does not actually guarantee to return elements in priority order.
Reviewer: Luke Chen <sh...@gmail.com>
---
.../streams/processor/internals/assignment/TaskMovement.java | 6 ++++--
.../processor/internals/assignment/TaskMovementTest.java | 11 +++--------
2 files changed, 7 insertions(+), 10 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
index 38e64276ba..ec0fa5e11e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
@@ -107,7 +107,8 @@ final class TaskMovement {
final int movementsNeeded = taskMovements.size();
- for (final TaskMovement movement : taskMovements) {
+ while (!taskMovements.isEmpty()) {
+ final TaskMovement movement = taskMovements.poll();
// Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most
// caught up client.
final boolean moved = tryToSwapStandbyAndActiveOnCaughtUpClient(clientStates, caughtUpClientsByTaskLoad, movement) ||
@@ -157,7 +158,8 @@ final class TaskMovement {
int movementsNeeded = 0;
- for (final TaskMovement movement : taskMovements) {
+ while (!taskMovements.isEmpty()) {
+ final TaskMovement movement = taskMovements.poll();
final Function<UUID, Boolean> eligibleClientPredicate =
clientId -> !clientStates.get(clientId).hasAssignedTask(movement.task);
UUID sourceClient = caughtUpClientsByTaskLoad.poll(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
index baf6d18496..a337deb801 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
@@ -244,15 +244,10 @@ public class TaskMovementTest {
assertThat(client2, hasProperty("activeTasks", ClientState::activeTasks, mkSet(TASK_0_2)));
assertThat(client3, hasProperty("activeTasks", ClientState::activeTasks, mkSet(TASK_0_1)));
- // we should only assign one warmup, but it could be either one that needs to be migrated.
+ // we should only assign one warmup, and the task movement should have the highest priority
assertThat(client1, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet()));
- try {
- assertThat(client2, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet(TASK_0_1)));
- assertThat(client3, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet()));
- } catch (final AssertionError ignored) {
- assertThat(client2, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet()));
- assertThat(client3, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet(TASK_0_2)));
- }
+ assertThat(client2, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet(TASK_0_1)));
+ assertThat(client3, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet()));
}
@Test