You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/06/25 01:58:30 UTC

[kafka] branch trunk updated: KAFKA-10198: guard against recycling dirty state (#8924)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3348fc4  KAFKA-10198: guard against recycling dirty state (#8924)
3348fc4 is described below

commit 3348fc49d8824155e737b866f633e14684da5fe9
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Wed Jun 24 18:57:38 2020 -0700

    KAFKA-10198: guard against recycling dirty state (#8924)
    
    We just needed to add the check in StreamTask#closeClean to closeAndRecycleState as well. I also renamed closeAndRecycleState to closeCleanAndRecycleState to drive this point home: it needs to be clean.
    
    This should be cherry-picked back to the 2.6 branch
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <wa...@gmail.com>,
---
 .../processor/internals/ActiveTaskCreator.java     |  2 +-
 .../streams/processor/internals/StandbyTask.java   |  2 +-
 .../processor/internals/StandbyTaskCreator.java    |  2 +-
 .../streams/processor/internals/StreamTask.java    | 21 +++++++-----
 .../kafka/streams/processor/internals/Task.java    |  2 +-
 .../processor/internals/StandbyTaskTest.java       |  6 ++--
 .../processor/internals/StreamTaskTest.java        | 39 ++++++++++++++++++----
 .../processor/internals/TaskManagerTest.java       |  2 +-
 8 files changed, 54 insertions(+), 22 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 0a1f47e..012ff20 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -184,7 +184,7 @@ class ActiveTaskCreator {
         final ProcessorStateManager stateManager = standbyTask.stateMgr;
         final LogContext logContext = getLogContext(standbyTask.id);
 
-        standbyTask.closeAndRecycleState();
+        standbyTask.closeCleanAndRecycleState();
         stateManager.transitionTaskType(TaskType.ACTIVE, logContext);
 
         return createActiveTask(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index ffd09f1..1aa68bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -189,7 +189,7 @@ public class StandbyTask extends AbstractTask implements Task {
     }
 
     @Override
-    public void closeAndRecycleState() {
+    public void closeCleanAndRecycleState() {
         streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
         if (state() == State.SUSPENDED) {
             stateMgr.recycle();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index 443db8e..b5f2b74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -112,7 +112,7 @@ class StandbyTaskCreator {
         final InternalProcessorContext context = streamTask.processorContext();
         final ProcessorStateManager stateManager = streamTask.stateMgr;
 
-        streamTask.closeAndRecycleState();
+        streamTask.closeCleanAndRecycleState();
         stateManager.transitionTaskType(TaskType.STANDBY, getLogContext(streamTask.id()));
 
         return createStandbyTask(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6e8bf40..4b27436 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -463,6 +463,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
     @Override
     public void closeClean() {
+        validateClean();
         streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
         close(true);
         log.info("Closed clean");
@@ -482,7 +483,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     }
 
     @Override
-    public void closeAndRecycleState() {
+    public void closeCleanAndRecycleState() {
+        validateClean();
         streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
         switch (state()) {
             case SUSPENDED:
@@ -515,17 +517,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         stateMgr.checkpoint(checkpointableOffsets());
     }
 
-    /**
-     * You must commit a task and checkpoint the state manager before closing as this will release the state dir lock
-     */
-    private void close(final boolean clean) {
-        if (clean && commitNeeded) {
-            // It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to
-            // closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty
+    private void validateClean() {
+        // It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to
+        // closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty
+        if (commitNeeded) {
             log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to"
                           + " commit and should close as dirty instead");
             throw new TaskMigratedException("Tried to close dirty task as clean");
         }
+    }
+
+    /**
+     * You must commit a task and checkpoint the state manager before closing as this will release the state dir lock
+     */
+    private void close(final boolean clean) {
         switch (state()) {
             case SUSPENDED:
                 // first close state manager (which is idempotent) then close the record collector
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 0200870..103c231 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -153,7 +153,7 @@ public interface Task {
     /**
      * Attempt a clean close but do not close the underlying state
      */
-    void closeAndRecycleState();
+    void closeCleanAndRecycleState();
 
     /**
      * Revive a closed task to a created one; should never throw an exception
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 82f33c4..f98d630 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -502,13 +502,13 @@ public class StandbyTaskTest {
         EasyMock.replay(stateManager);
 
         task = createStandbyTask();
-        assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED
+        assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED
 
         task.initializeIfNeeded();
-        assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING
+        assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING
 
         task.suspend();
-        task.closeAndRecycleState(); // SUSPENDED
+        task.closeCleanAndRecycleState(); // SUSPENDED
 
         // Currently, there are no metrics registered for standby tasks.
         // This is a regression test so that, if we add some, we will be sure to deregister them.
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index c6ffe74..9607470 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
@@ -1752,7 +1753,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldUnregisterMetricsInCloseAndRecycle() {
+    public void shouldUnregisterMetricsInCloseCleanAndRecycleState() {
         EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
@@ -1761,7 +1762,7 @@ public class StreamTaskTest {
 
         task.suspend();
         assertThat(getTaskMetrics(), not(empty()));
-        task.closeAndRecycleState();
+        task.closeCleanAndRecycleState();
         assertThat(getTaskMetrics(), empty());
     }
 
@@ -1799,22 +1800,48 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldThrowIfCleanClosingDirtyTask() {
+        task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
+        task.initializeIfNeeded();
+        task.completeRestoration();
+
+        task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0)));
+        task.process(0L);
+        assertTrue(task.commitNeeded());
+
+        assertThrows(TaskMigratedException.class, () -> task.closeClean());
+    }
+
+    @Test
+    public void shouldThrowIfRecyclingDirtyTask() {
+        task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
+        task.initializeIfNeeded();
+        task.completeRestoration();
+
+        task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0)));
+        task.process(0L);
+        assertTrue(task.commitNeeded());
+
+        assertThrows(TaskMigratedException.class, () -> task.closeCleanAndRecycleState());
+    }
+
+    @Test
     public void shouldOnlyRecycleSuspendedTasks() {
         stateManager.recycle();
         recordCollector.closeClean();
         EasyMock.replay(stateManager, recordCollector);
 
         task = createStatefulTask(createConfig(false, "100"), true);
-        assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED
+        assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED
 
         task.initializeIfNeeded();
-        assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RESTORING
+        assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RESTORING
 
         task.completeRestoration();
-        assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING
+        assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING
 
         task.suspend();
-        task.closeAndRecycleState(); // SUSPENDED
+        task.closeCleanAndRecycleState(); // SUSPENDED
 
         EasyMock.verify(stateManager, recordCollector);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index fcfbb1f..23166d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -2723,7 +2723,7 @@ public class TaskManagerTest {
         }
 
         @Override
-        public void closeAndRecycleState() {
+        public void closeCleanAndRecycleState() {
             transitionTo(State.CLOSED);
         }