You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/11/01 00:15:53 UTC

[kafka] branch 2.4 updated: KAFKA-8972 (2.4 blocker): bug fix for restoring task (#7617)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 90e4e1e  KAFKA-8972 (2.4 blocker): bug fix for restoring task (#7617)
90e4e1e is described below

commit 90e4e1e61614d1c55c61b56861eac0a1d715485b
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu Oct 31 17:14:29 2019 -0700

    KAFKA-8972 (2.4 blocker): bug fix for restoring task (#7617)
    
    This is a typo bug which is due to calling a wrong map.
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../processor/internals/AssignedStreamsTasks.java  |   2 +-
 .../internals/AssignedStreamsTasksTest.java        | 143 +++++++++++++++++++++
 2 files changed, 144 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 63dac25..161714e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -281,7 +281,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
                 firstException.compareAndSet(null, closeNonRunning(true, created.get(id), lostTaskChangelogs));
             } else if (restoring.containsKey(id)) {
                 log.debug("Closing the zombie restoring stream task {}.", id);
-                firstException.compareAndSet(null, closeRestoring(true, created.get(id), lostTaskChangelogs));
+                firstException.compareAndSet(null, closeRestoring(true, restoring.get(id), lostTaskChangelogs));
             } else if (running.containsKey(id)) {
                 log.debug("Closing the zombie running stream task {}.", id);
                 firstException.compareAndSet(null, closeRunning(true, running.get(id), lostTaskChangelogs));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 68ca9bd..42dc58b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -41,6 +41,7 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.junit.function.ThrowingRunnable;
@@ -49,6 +50,7 @@ import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThrows;
@@ -517,6 +519,147 @@ public class AssignedStreamsTasksTest {
         assignedTasks.shutdown(true);
     }
 
+    @Test
+    public void shouldClearZombieCreatedTasks() {
+        new TaskTestSuite() {
+            @Override
+            public void additionalSetup(final StreamTask task) {
+                task.close(false, true);
+            }
+
+            @Override
+            public void action(final StreamTask task) {
+                assignedTasks.addNewTask(task);
+            }
+
+            @Override
+            public Set<TaskId> taskIds() {
+                return assignedTasks.created.keySet();
+            }
+
+            @Override
+            public List<TopicPartition> expectedLostChangelogs() {
+                return clearingPartitions;
+            }
+        }.createTaskAndClear();
+    }
+
+    @Test
+    public void shouldClearZombieRestoringTasks() {
+        new TaskTestSuite() {
+            @Override
+            public void additionalSetup(final StreamTask task) {
+                EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
+                task.closeStateManager(false);
+            }
+
+            @Override
+            public void action(final StreamTask task) {
+                assignedTasks.addTaskToRestoring(task);
+            }
+
+            @Override
+            public Set<TaskId> taskIds() {
+                return assignedTasks.restoringTaskIds();
+            }
+
+            @Override
+            public List<TopicPartition> expectedLostChangelogs() {
+                return clearingPartitions;
+            }
+        }.createTaskAndClear();
+    }
+
+    @Test
+    public void shouldClearZombieRunningTasks() {
+        new TaskTestSuite() {
+            @Override
+            public void additionalSetup(final StreamTask task) {
+                task.initializeTopology();
+                EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
+                task.close(false, true);
+            }
+
+            @Override
+            public void action(final StreamTask task) {
+                assignedTasks.transitionToRunning(task);
+            }
+
+            @Override
+            public Set<TaskId> taskIds() {
+                return assignedTasks.runningTaskIds();
+            }
+
+            @Override
+            public List<TopicPartition> expectedLostChangelogs() {
+                return clearingPartitions;
+            }
+        }.createTaskAndClear();
+    }
+
+    @Test
+    public void shouldClearZombieSuspendedTasks() {
+        new TaskTestSuite() {
+            @Override
+            public void additionalSetup(final StreamTask task) {
+                task.initializeTopology();
+                EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
+                task.suspend();
+                task.closeSuspended(false, null);
+            }
+
+            @Override
+            public void action(final StreamTask task) {
+                assignedTasks.transitionToRunning(task);
+                final List<TopicPartition> revokedChangelogs = new ArrayList<>();
+                final List<TaskId> ids = Collections.singletonList(task.id());
+                assignedTasks.suspendOrCloseTasks(new HashSet<>(ids), revokedChangelogs);
+                assertEquals(clearingPartitions, revokedChangelogs);
+            }
+
+            @Override
+            public Set<TaskId> taskIds() {
+                return assignedTasks.suspendedTaskIds();
+            }
+
+            @Override
+            public List<TopicPartition> expectedLostChangelogs() {
+                return Collections.emptyList();
+            }
+        }.createTaskAndClear();
+    }
+
+    abstract class TaskTestSuite {
+
+        TaskId clearingTaskId = new TaskId(0, 0);
+        List<TopicPartition> clearingPartitions = Collections.singletonList(new TopicPartition("topic", 0));
+
+        abstract void additionalSetup(final StreamTask task);
+
+        abstract void action(final StreamTask task);
+
+        abstract Set<TaskId> taskIds();
+
+        abstract List<TopicPartition> expectedLostChangelogs();
+
+        void createTaskAndClear() {
+            final StreamTask task = EasyMock.createMock(StreamTask.class);
+            EasyMock.expect(task.id()).andReturn(clearingTaskId).anyTimes();
+            EasyMock.expect(task.changelogPartitions()).andReturn(clearingPartitions).anyTimes();
+            additionalSetup(task);
+            EasyMock.replay(task);
+
+            action(task);
+            final List<TopicPartition> changelogs = new ArrayList<>();
+            final Set<TaskId> ids = new HashSet<>(Collections.singleton(task.id()));
+            assertEquals(ids, taskIds());
+
+            assignedTasks.closeZombieTasks(ids, changelogs);
+            assertEquals(Collections.emptySet(), taskIds());
+            assertEquals(expectedLostChangelogs(), changelogs);
+        }
+    }
+
     private void addAndInitTask() {
         assignedTasks.addNewTask(t1);
         assignedTasks.initializeNewTasks();