You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/06/12 23:22:46 UTC

[kafka] branch trunk updated: KAFKA-10144: clean up corrupted standby tasks before attempting a commit (#8849)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 03ed08d  KAFKA-10144: clean up corrupted standby tasks before attempting a commit (#8849)
03ed08d is described below

commit 03ed08d0d17a10ca4f96c8cc0a8694834ae01e6d
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Jun 12 16:21:57 2020 -0700

    KAFKA-10144: clean up corrupted standby tasks before attempting a commit (#8849)
    
    We need to make sure that corrupted standby tasks are actually cleaned up upon a TaskCorruptedException. However due to the commit prior to invoking handleCorruption, it's possible to throw a TaskMigratedException before actually cleaning up any of the corrupted tasks.
    
    This is fine for active tasks since handleLostAll will finish up the job, but it does nothing with standby tasks. We should make sure that standby tasks are handled before attempting to commit (which we can do, since we don't need to commit anything for the corrupted standbys)
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 checkstyle/suppressions.xml                        |   4 +-
 .../streams/processor/internals/StandbyTask.java   |   4 +-
 .../streams/processor/internals/StreamTask.java    |   2 +-
 .../streams/processor/internals/StreamThread.java  |   8 --
 .../streams/processor/internals/TaskManager.java   |  34 ++++++-
 .../integration/StandbyTaskEOSIntegrationTest.java |   5 +-
 .../processor/internals/StreamThreadTest.java      |  78 +--------------
 .../processor/internals/TaskManagerTest.java       | 106 ++++++++++++++++++++-
 8 files changed, 146 insertions(+), 95 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index bd558f2..4e206d8 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -214,9 +214,7 @@
               files="RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java"/>
 
     <suppress checks="JavaNCSS"
-              files="KStreamKStreamJoinTest.java"/>
-    <suppress checks="JavaNCSS"
-              files="SmokeTestDriver.java"/>
+              files="(KStreamKStreamJoinTest|SmokeTestDriver|TaskManagerTest).java"/>
 
     <suppress checks="NPathComplexity"
               files="EosTestDriver|KStreamKStreamJoinTest.java|RelationalSmokeTest.java|SmokeTestDriver.java|KStreamKStreamLeftJoinTest.java|KTableKTableForeignKeyJoinIntegrationTest.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 9c80fd7..1b069d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -137,7 +137,7 @@ public class StandbyTask extends AbstractTask implements Task {
     public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
         if (state() == State.RUNNING || state() == State.SUSPENDED) {
             stateMgr.flush();
-            log.info("Task ready for committing");
+            log.debug("Prepared task for committing");
         } else {
             throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing ");
         }
@@ -152,7 +152,7 @@ public class StandbyTask extends AbstractTask implements Task {
             // and the state current offset would be used to checkpoint
             stateMgr.checkpoint(Collections.emptyMap());
             offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
-            log.info("Finalized commit");
+            log.debug("Finalized commit");
         } else {
             throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + id);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3925acc..7f08643 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -443,7 +443,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                 throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
         }
 
-        log.debug("Committed");
+        log.debug("Finalized commit");
     }
 
     private Map<TopicPartition, Long> extractPartitionTimes() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3a57cc6..91c483c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -557,14 +557,6 @@ public class StreamThread extends Thread {
                 log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
                              "Will close the task as dirty and re-create and bootstrap from scratch.", e);
                 try {
-                    taskManager.commit(
-                        taskManager.tasks()
-                            .values()
-                            .stream()
-                            .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
-                            .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id()))
-                            .collect(Collectors.toSet())
-                    );
                     taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
                 } catch (final TaskMigratedException taskMigrated) {
                     handleTaskMigrated(taskMigrated);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 460796e..689be9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -153,10 +153,38 @@ public class TaskManager {
         rebalanceInProgress = false;
     }
 
-    void handleCorruption(final Map<TaskId, Collection<TopicPartition>> taskWithChangelogs) {
-        for (final Map.Entry<TaskId, Collection<TopicPartition>> entry : taskWithChangelogs.entrySet()) {
-            final TaskId taskId = entry.getKey();
+    void handleCorruption(final Map<TaskId, Collection<TopicPartition>> tasksWithChangelogs) throws TaskMigratedException {
+        final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = new HashMap<>();
+        final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new HashMap<>();
+
+        for (final Map.Entry<TaskId, Collection<TopicPartition>> taskEntry : tasksWithChangelogs.entrySet()) {
+            final TaskId taskId = taskEntry.getKey();
             final Task task = tasks.get(taskId);
+            if (task.isActive()) {
+                corruptedActiveTasks.put(task, taskEntry.getValue());
+            } else {
+                corruptedStandbyTasks.put(task, taskEntry.getValue());
+            }
+        }
+
+        // Make sure to clean up any corrupted standby tasks in their entirety before committing
+        // since TaskMigrated can be thrown and the resulting handleLostAll will only clean up active tasks
+        closeAndRevive(corruptedStandbyTasks);
+
+        commit(tasks()
+                   .values()
+                   .stream()
+                   .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
+                   .filter(t -> !tasksWithChangelogs.containsKey(t.id()))
+                   .collect(Collectors.toSet())
+        );
+
+        closeAndRevive(corruptedActiveTasks);
+    }
+
+    private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWithChangelogs) {
+        for (final Map.Entry<Task, Collection<TopicPartition>> entry : taskWithChangelogs.entrySet()) {
+            final Task task = entry.getKey();
 
             // mark corrupted partitions to not be checkpointed, and then close the task as dirty
             final Collection<TopicPartition> corruptedPartitions = entry.getValue();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
index 8fa3913..bab5c9f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
@@ -48,7 +48,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
@@ -92,7 +91,7 @@ public class StandbyTaskEOSIntegrationTest {
     }
 
     @Test
-    public void surviveWithOneTaskAsStandby() throws ExecutionException, InterruptedException, IOException {
+    public void surviveWithOneTaskAsStandby() throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             inputTopic,
             Collections.singletonList(
@@ -112,8 +111,6 @@ public class StandbyTaskEOSIntegrationTest {
             final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch);
             final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch);
         ) {
-
-
             streamInstanceOne.start();
 
             streamInstanceTwo.start();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 91c3797..6b15cfc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -100,7 +100,6 @@ import java.util.stream.Stream;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
-import static java.util.Collections.singleton;
 import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -1915,7 +1914,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
+    public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() {
         final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
         final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
         final Task task1 = mock(Task.class);
@@ -1932,14 +1931,7 @@ public class StreamThreadTest {
         expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
         expect(task2.id()).andReturn(taskId2).anyTimes();
 
-        expect(taskManager.tasks()).andReturn(mkMap(
-            mkEntry(taskId1, task1),
-            mkEntry(taskId2, task2)
-        )).anyTimes();
-        expect(taskManager.commit(singleton(task2))).andReturn(0);
-
-        taskManager.handleCorruption(singletonMap(taskId1, emptySet()));
-        expectLastCall();
+        taskManager.handleCorruption(corruptedTasksWithChangelogs);
 
         EasyMock.replay(task1, task2, taskManager);
 
@@ -1991,12 +1983,9 @@ public class StreamThreadTest {
         expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
         expect(task2.id()).andReturn(taskId2).anyTimes();
 
-        expect(taskManager.tasks()).andReturn(mkMap(
-            mkEntry(taskId1, task1),
-            mkEntry(taskId2, task2)
-        )).anyTimes();
-        expect(taskManager.commit(singleton(task2))).andThrow(new TaskMigratedException("Task migrated",
-            new RuntimeException("non-corrupted task migrated")));
+        taskManager.handleCorruption(corruptedTasksWithChangelogs);
+        expectLastCall().andThrow(new TaskMigratedException("Task migrated",
+                                                            new RuntimeException("non-corrupted task migrated")));
 
         taskManager.handleLostAll();
         expectLastCall();
@@ -2088,63 +2077,6 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldNotCommitNonRunningNonCorruptedTasksOnTaskCorruptedException() {
-        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
-        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
-        final Task task1 = mock(Task.class);
-        final Task task2 = mock(Task.class);
-        final TaskId taskId1 = new TaskId(0, 0);
-        final TaskId taskId2 = new TaskId(0, 2);
-
-        final Map<TaskId, Collection<TopicPartition>> corruptedTasksWithChangelogs = mkMap(
-            mkEntry(taskId1, emptySet())
-        );
-
-        expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
-        expect(task1.id()).andReturn(taskId1).anyTimes();
-        expect(task2.state()).andReturn(Task.State.CREATED).anyTimes();
-        expect(task2.id()).andReturn(taskId2).anyTimes();
-
-        expect(taskManager.tasks()).andReturn(mkMap(
-            mkEntry(taskId1, task1),
-            mkEntry(taskId2, task2)
-        )).anyTimes();
-        // expect not to try and commit task2, even though it's not corrupted, because it's not running.
-        expect(taskManager.commit(emptySet())).andReturn(0);
-
-        EasyMock.replay(task1, task2, taskManager);
-
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
-        final StreamThread thread = new StreamThread(
-            mockTime,
-            config,
-            null,
-            consumer,
-            consumer,
-            null,
-            null,
-            taskManager,
-            streamsMetrics,
-            internalTopologyBuilder,
-            CLIENT_ID,
-            new LogContext(""),
-            new AtomicInteger(),
-            new AtomicLong(Long.MAX_VALUE)
-        ) {
-            @Override
-            void runOnce() {
-                setState(State.PENDING_SHUTDOWN);
-                throw new TaskCorruptedException(corruptedTasksWithChangelogs);
-            }
-        }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
-
-        thread.setState(StreamThread.State.STARTING);
-        thread.runLoop();
-
-        verify(taskManager);
-    }
-
-    @Test
     public void shouldLogAndRecordSkippedRecordsForInvalidTimestampsWithBuiltInMetricsVersion0100To24() {
         shouldLogAndRecordSkippedRecordsForInvalidTimestamps(StreamsConfig.METRICS_0100_TO_24);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 8be3c21..76136b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -99,6 +99,7 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
@@ -630,6 +631,108 @@ public class TaskManagerTest {
     }
 
     @Test
+    public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
+        final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
+        stateManager.markChangelogAsCorrupted(taskId00Partitions);
+        replay(stateManager);
+
+        final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
+        final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
+
+        final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(taskId00Assignment);
+        assignment.putAll(taskId01Assignment);
+
+        // `handleAssignment`
+        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
+            .andStubReturn(asList(corruptedTask, nonCorruptedTask));
+        topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
+        expectLastCall().anyTimes();
+
+        expectRestoreToBeCompleted(consumer, changeLogReader);
+
+        replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
+
+        taskManager.handleAssignment(assignment, emptyMap());
+        assertThat(taskManager.tryToCompleteRestoration(), is(true));
+
+        assertThat(nonCorruptedTask.state(), is(Task.State.RUNNING));
+        nonCorruptedTask.setCommitNeeded();
+
+        taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions));
+
+        assertTrue(nonCorruptedTask.commitPrepared);
+    }
+
+    @Test
+    public void shouldNotCommitNonRunningNonCorruptedTasks() {
+        final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
+        stateManager.markChangelogAsCorrupted(taskId00Partitions);
+        replay(stateManager);
+
+        final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
+        final StateMachineTask nonRunningNonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
+
+        nonRunningNonCorruptedTask.setCommitNeeded();
+
+        final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(taskId00Assignment);
+        assignment.putAll(taskId01Assignment);
+
+        // `handleAssignment`
+        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
+            .andStubReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
+        topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
+        expectLastCall().anyTimes();
+
+        replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
+
+        taskManager.handleAssignment(assignment, emptyMap());
+        assertThat(nonRunningNonCorruptedTask.state(), is(Task.State.CREATED));
+
+        taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions));
+
+        verify(activeTaskCreator);
+        assertFalse(nonRunningNonCorruptedTask.commitPrepared);
+    }
+
+    @Test
+    public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
+        final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
+        stateManager.markChangelogAsCorrupted(taskId00Partitions);
+        replay(stateManager);
+
+        final StateMachineTask corruptedStandby = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
+        final StateMachineTask runningNonCorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
+            @Override
+            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+                throw new TaskMigratedException("You dropped out of the group!", new RuntimeException());
+            }
+        };
+
+        // handleAssignment
+        expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singleton(corruptedStandby));
+        expect(activeTaskCreator.createTasks(anyObject(), eq(taskId01Assignment))).andStubReturn(singleton(runningNonCorruptedActive));
+        topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
+        expectLastCall().anyTimes();
+
+        expectRestoreToBeCompleted(consumer, changeLogReader);
+
+        replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
+
+        taskManager.handleAssignment(taskId01Assignment, taskId00Assignment);
+        assertThat(taskManager.tryToCompleteRestoration(), is(true));
+
+        // make sure this will be committed and throw
+        assertThat(runningNonCorruptedActive.state(), is(Task.State.RUNNING));
+        assertThat(corruptedStandby.state(), is(Task.State.RUNNING));
+
+        runningNonCorruptedActive.setCommitNeeded();
+
+        assertThrows(TaskMigratedException.class, () -> taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions)));
+
+        assertThat(corruptedStandby.state(), is(Task.State.CREATED));
+    }
+
+    @Test
     public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
         final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false);
 
@@ -2567,7 +2670,8 @@ public class TaskManagerTest {
     }
 
     private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer,
-                                                   final ChangelogReader changeLogReader, final boolean changeLogUpdateRequired) {
+                                                   final ChangelogReader changeLogReader,
+                                                   final boolean changeLogUpdateRequired) {
         final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
         expect(consumer.assignment()).andReturn(assignment);
         consumer.resume(assignment);