You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/08/17 18:13:40 UTC
[kafka] branch trunk updated: KAFKA-10199: Remove tasks from state updater on revocation (#12520)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b47c4d8598 KAFKA-10199: Remove tasks from state updater on revocation (#12520)
b47c4d8598 is described below
commit b47c4d859805068de6a8fe8de3bda5e7a21132e2
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Wed Aug 17 20:13:34 2022 +0200
KAFKA-10199: Remove tasks from state updater on revocation (#12520)
Removes tasks from the state updater when the input partitions of the tasks are revoked during a rebalance.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../streams/processor/internals/TaskManager.java | 16 +++++
.../processor/internals/TaskManagerTest.java | 81 ++++++++++++++++++++++
2 files changed, 97 insertions(+)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 4bba28a3f3..bab05a5184 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -757,6 +757,8 @@ public class TaskManager {
}
}
+ removeRevokedTasksFromStateUpdater(remainingRevokedPartitions);
+
if (!remainingRevokedPartitions.isEmpty()) {
log.debug("The following revoked partitions {} are missing from the current task partitions. It could "
+ "potentially be due to race condition of consumer detecting the heartbeat failure, or the tasks " +
@@ -842,6 +844,20 @@ public class TaskManager {
}
}
+ private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition> remainingRevokedPartitions) {
+ if (stateUpdater != null) {
+ for (final Task restoringTask : stateUpdater.getTasks()) {
+ if (restoringTask.isActive()) {
+ if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
+ tasks.addPendingTaskToClose(restoringTask.id());
+ stateUpdater.remove(restoringTask.id());
+ remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
+ }
+ }
+ }
+ }
+ }
+
private void prepareCommitAndAddOffsetsToMap(final Set<Task> tasksToPrepare,
final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask) {
for (final Task task : tasksToPrepare) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 133541bfac..ff52ad5ae9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -751,6 +751,87 @@ public class TaskManagerTest {
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
}
+ @Test
+ public void shouldRemoveStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation() {
+ final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId00Partitions).build();
+ final TaskManager taskManager = setupForRevocation(mkSet(task), mkSet(task));
+
+ taskManager.handleRevocation(taskId00Partitions);
+
+ Mockito.verify(stateUpdater).remove(task.id());
+
+ taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+
+ Mockito.verify(task).closeClean();
+ }
+
+ public void shouldRemoveMultipleStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation() {
+ final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId00Partitions).build();
+ final StreamTask task2 = statefulTask(taskId01, taskId01ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId01Partitions).build();
+ final TaskManager taskManager = setupForRevocation(mkSet(task1, task2), mkSet(task1, task2));
+
+ taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, taskId01Partitions));
+
+ Mockito.verify(stateUpdater).remove(task1.id());
+ Mockito.verify(stateUpdater).remove(task2.id());
+
+ taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+
+ Mockito.verify(task1).closeClean();
+ Mockito.verify(task2).closeClean();
+ }
+
+ @Test
+ public void shouldNotRemoveStatefulTaskWithoutRevokedInputPartitionsFromStateUpdaterOnRevocation() {
+ final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId00Partitions).build();
+ final TaskManager taskManager = setupForRevocation(mkSet(task), Collections.emptySet());
+
+ taskManager.handleRevocation(taskId01Partitions);
+
+ Mockito.verify(stateUpdater, never()).remove(task.id());
+
+ taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+
+ Mockito.verify(task, never()).closeClean();
+ }
+
+ @Test
+ public void shouldNotRemoveStandbyTaskFromStateUpdaterOnRevocation() {
+ final StandbyTask task = standbyTask(taskId00, taskId00ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId00Partitions).build();
+ final TaskManager taskManager = setupForRevocation(mkSet(task), Collections.emptySet());
+
+ taskManager.handleRevocation(taskId00Partitions);
+
+ Mockito.verify(stateUpdater, never()).remove(task.id());
+
+ taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+
+ Mockito.verify(task, never()).closeClean();
+ }
+
+ private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater,
+ final Set<Task> removedTasks) {
+ final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
+ when(stateUpdater.getTasks()).thenReturn(tasksInStateUpdater);
+ when(stateUpdater.drainRemovedTasks()).thenReturn(removedTasks);
+ expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
+ consumer.resume(anyObject());
+ expectLastCall().anyTimes();
+ replay(consumer);
+
+ return taskManager;
+ }
+
@Test
public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true);