You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/11/01 00:15:53 UTC
[kafka] branch 2.4 updated: KAFKA-8972 (2.4 blocker): bug fix for
restoring task (#7617)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 90e4e1e KAFKA-8972 (2.4 blocker): bug fix for restoring task (#7617)
90e4e1e is described below
commit 90e4e1e61614d1c55c61b56861eac0a1d715485b
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu Oct 31 17:14:29 2019 -0700
KAFKA-8972 (2.4 blocker): bug fix for restoring task (#7617)
This is a typo bug which is due to calling a wrong map.
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../processor/internals/AssignedStreamsTasks.java | 2 +-
.../internals/AssignedStreamsTasksTest.java | 143 +++++++++++++++++++++
2 files changed, 144 insertions(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 63dac25..161714e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -281,7 +281,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
firstException.compareAndSet(null, closeNonRunning(true, created.get(id), lostTaskChangelogs));
} else if (restoring.containsKey(id)) {
log.debug("Closing the zombie restoring stream task {}.", id);
- firstException.compareAndSet(null, closeRestoring(true, created.get(id), lostTaskChangelogs));
+ firstException.compareAndSet(null, closeRestoring(true, restoring.get(id), lostTaskChangelogs));
} else if (running.containsKey(id)) {
log.debug("Closing the zombie running stream task {}.", id);
firstException.compareAndSet(null, closeRunning(true, running.get(id), lostTaskChangelogs));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 68ca9bd..42dc58b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -41,6 +41,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.function.ThrowingRunnable;
@@ -49,6 +50,7 @@ import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
@@ -517,6 +519,147 @@ public class AssignedStreamsTasksTest {
assignedTasks.shutdown(true);
}
+ @Test
+ public void shouldClearZombieCreatedTasks() {
+ new TaskTestSuite() {
+ @Override
+ public void additionalSetup(final StreamTask task) {
+ task.close(false, true);
+ }
+
+ @Override
+ public void action(final StreamTask task) {
+ assignedTasks.addNewTask(task);
+ }
+
+ @Override
+ public Set<TaskId> taskIds() {
+ return assignedTasks.created.keySet();
+ }
+
+ @Override
+ public List<TopicPartition> expectedLostChangelogs() {
+ return clearingPartitions;
+ }
+ }.createTaskAndClear();
+ }
+
+ @Test
+ public void shouldClearZombieRestoringTasks() {
+ new TaskTestSuite() {
+ @Override
+ public void additionalSetup(final StreamTask task) {
+ EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
+ task.closeStateManager(false);
+ }
+
+ @Override
+ public void action(final StreamTask task) {
+ assignedTasks.addTaskToRestoring(task);
+ }
+
+ @Override
+ public Set<TaskId> taskIds() {
+ return assignedTasks.restoringTaskIds();
+ }
+
+ @Override
+ public List<TopicPartition> expectedLostChangelogs() {
+ return clearingPartitions;
+ }
+ }.createTaskAndClear();
+ }
+
+ @Test
+ public void shouldClearZombieRunningTasks() {
+ new TaskTestSuite() {
+ @Override
+ public void additionalSetup(final StreamTask task) {
+ task.initializeTopology();
+ EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
+ task.close(false, true);
+ }
+
+ @Override
+ public void action(final StreamTask task) {
+ assignedTasks.transitionToRunning(task);
+ }
+
+ @Override
+ public Set<TaskId> taskIds() {
+ return assignedTasks.runningTaskIds();
+ }
+
+ @Override
+ public List<TopicPartition> expectedLostChangelogs() {
+ return clearingPartitions;
+ }
+ }.createTaskAndClear();
+ }
+
+ @Test
+ public void shouldClearZombieSuspendedTasks() {
+ new TaskTestSuite() {
+ @Override
+ public void additionalSetup(final StreamTask task) {
+ task.initializeTopology();
+ EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
+ task.suspend();
+ task.closeSuspended(false, null);
+ }
+
+ @Override
+ public void action(final StreamTask task) {
+ assignedTasks.transitionToRunning(task);
+ final List<TopicPartition> revokedChangelogs = new ArrayList<>();
+ final List<TaskId> ids = Collections.singletonList(task.id());
+ assignedTasks.suspendOrCloseTasks(new HashSet<>(ids), revokedChangelogs);
+ assertEquals(clearingPartitions, revokedChangelogs);
+ }
+
+ @Override
+ public Set<TaskId> taskIds() {
+ return assignedTasks.suspendedTaskIds();
+ }
+
+ @Override
+ public List<TopicPartition> expectedLostChangelogs() {
+ return Collections.emptyList();
+ }
+ }.createTaskAndClear();
+ }
+
+ abstract class TaskTestSuite {
+
+ TaskId clearingTaskId = new TaskId(0, 0);
+ List<TopicPartition> clearingPartitions = Collections.singletonList(new TopicPartition("topic", 0));
+
+ abstract void additionalSetup(final StreamTask task);
+
+ abstract void action(final StreamTask task);
+
+ abstract Set<TaskId> taskIds();
+
+ abstract List<TopicPartition> expectedLostChangelogs();
+
+ void createTaskAndClear() {
+ final StreamTask task = EasyMock.createMock(StreamTask.class);
+ EasyMock.expect(task.id()).andReturn(clearingTaskId).anyTimes();
+ EasyMock.expect(task.changelogPartitions()).andReturn(clearingPartitions).anyTimes();
+ additionalSetup(task);
+ EasyMock.replay(task);
+
+ action(task);
+ final List<TopicPartition> changelogs = new ArrayList<>();
+ final Set<TaskId> ids = new HashSet<>(Collections.singleton(task.id()));
+ assertEquals(ids, taskIds());
+
+ assignedTasks.closeZombieTasks(ids, changelogs);
+ assertEquals(Collections.emptySet(), taskIds());
+ assertEquals(expectedLostChangelogs(), changelogs);
+ }
+ }
+
private void addAndInitTask() {
assignedTasks.addNewTask(t1);
assignedTasks.initializeNewTasks();