You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/07 00:34:48 UTC

[kafka] branch trunk updated: KAFKA-10097: Internalize checkpoint data (#8820)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d4ef46c  KAFKA-10097: Internalize checkpoint data (#8820)
d4ef46c is described below

commit d4ef46c69dbc525f55a6d9ae5c9ae6da795f98fb
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Sat Jun 6 17:34:03 2020 -0700

    KAFKA-10097: Internalize checkpoint data (#8820)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../streams/processor/internals/StandbyTask.java   |  8 +---
 .../streams/processor/internals/StreamTask.java    | 54 ++++++++++++----------
 .../kafka/streams/processor/internals/Task.java    |  4 +-
 .../streams/processor/internals/TaskManager.java   | 33 ++++++-------
 .../processor/internals/StandbyTaskTest.java       |  9 ++--
 .../processor/internals/StreamTaskTest.java        | 26 +++++------
 .../processor/internals/TaskManagerTest.java       | 28 ++++++-----
 .../apache/kafka/streams/TopologyTestDriver.java   |  4 +-
 8 files changed, 80 insertions(+), 86 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 8cba911..41598d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -33,7 +33,6 @@ import org.slf4j.Logger;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -153,12 +152,10 @@ public class StandbyTask extends AbstractTask implements Task {
     }
 
     @Override
-    public Map<TopicPartition, Long> prepareCloseClean() {
+    public void prepareCloseClean() {
         prepareClose(true);
 
         log.info("Prepared clean close");
-
-        return Collections.emptyMap();
     }
 
     @Override
@@ -199,8 +196,7 @@ public class StandbyTask extends AbstractTask implements Task {
     }
 
     @Override
-    public void closeClean(final Map<TopicPartition, Long> checkpoint) {
-        Objects.requireNonNull(checkpoint);
+    public void closeClean() {
         close(true);
 
         log.info("Closed clean");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d045144..b23a1a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -107,6 +107,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     private boolean commitNeeded = false;
     private boolean commitRequested = false;
 
+    private Map<TopicPartition, Long> checkpoint = null;
+
     public StreamTask(final TaskId id,
                       final Set<TopicPartition> partitions,
                       final ProcessorTopology topology,
@@ -465,17 +467,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     }
 
     @Override
-    public Map<TopicPartition, Long> prepareCloseClean() {
-        final Map<TopicPartition, Long> checkpoint = prepareClose(true);
+    public void prepareCloseClean() {
+        prepareClose(true);
 
         log.info("Prepared clean close");
-
-        return checkpoint;
     }
 
     @Override
-    public void closeClean(final Map<TopicPartition, Long> checkpoint) {
-        close(true, checkpoint);
+    public void closeClean() {
+        close(true);
 
         log.info("Closed clean");
     }
@@ -489,7 +489,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
     @Override
     public void closeDirty() {
-        close(false, null);
+
+        close(false);
 
         log.info("Closed dirty");
     }
@@ -505,11 +506,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
     @Override
     public void closeAndRecycleState() {
-        final Map<TopicPartition, Long> checkpoint = prepareClose(true);
+        prepareClose(true);
+
+        writeCheckpointIfNeed();
 
-        if (checkpoint != null) {
-            stateMgr.checkpoint(checkpoint);
-        }
         switch (state()) {
             case CREATED:
             case RUNNING:
@@ -546,14 +546,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
      *                 otherwise, just close open resources
      * @throws TaskMigratedException if the task producer got fenced (EOS)
      */
-    private Map<TopicPartition, Long> prepareClose(final boolean clean) {
-        final Map<TopicPartition, Long> checkpoint;
+    private void prepareClose(final boolean clean) {
+        // Reset any previously scheduled checkpoint.
+        checkpoint = null;
 
         switch (state()) {
             case CREATED:
                 // the task is created and not initialized, just re-write the checkpoint file
-                checkpoint = Collections.emptyMap();
-
+                scheduleCheckpoint(emptyMap());
                 break;
 
             case RUNNING:
@@ -562,9 +562,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                 if (clean) {
                     stateMgr.flush();
                     recordCollector.flush();
-                    checkpoint = checkpointableOffsets();
+                    scheduleCheckpoint(checkpointableOffsets());
                 } else {
-                    checkpoint = null; // `null` indicates to not write a checkpoint
                     executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
                 }
 
@@ -572,22 +571,29 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
             case RESTORING:
                 executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
-                checkpoint = Collections.emptyMap();
+                scheduleCheckpoint(emptyMap());
 
                 break;
 
             case SUSPENDED:
             case CLOSED:
                 // not need to checkpoint, since when suspending we've already committed the state
-                checkpoint = null; // `null` indicates to not write a checkpoint
-
                 break;
 
             default:
                 throw new IllegalStateException("Unknown state " + state() + " while prepare closing active task " + id);
         }
+    }
+
+    private void scheduleCheckpoint(final Map<TopicPartition, Long> checkpoint) {
+        this.checkpoint = checkpoint;
+    }
 
-        return checkpoint;
+    private void writeCheckpointIfNeed() {
+        if (checkpoint != null) {
+            stateMgr.checkpoint(checkpoint);
+            checkpoint = null;
+        }
     }
 
     /**
@@ -598,9 +604,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
      *  3. finally release the state manager lock
      * </pre>
      */
-    private void close(final boolean clean, final Map<TopicPartition, Long> checkpoint) {
-        if (clean && checkpoint != null) {
-            executeAndMaybeSwallow(clean, () -> stateMgr.checkpoint(checkpoint), "state manager checkpoint", log);
+    private void close(final boolean clean) {
+        if (clean) {
+            executeAndMaybeSwallow(true, this::writeCheckpointIfNeed, "state manager checkpoint", log);
         }
 
         switch (state()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index eee290a..9283e86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -145,12 +145,12 @@ public interface Task {
      *
      * @throws StreamsException fatal error, should close the thread
      */
-    Map<TopicPartition, Long> prepareCloseClean();
+    void prepareCloseClean();
 
     /**
      * Must be idempotent.
      */
-    void closeClean(final Map<TopicPartition, Long> checkpoint);
+    void closeClean();
 
     /**
      * Prepare to close a task that we may not own. Discard any uncommitted progress and close the task.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 5c55093..12361d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -190,7 +190,7 @@ public class TaskManager {
         // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
 
-        final Map<Task, Map<TopicPartition, Long>> checkpointPerTask = new HashMap<>();
+        final Set<Task> tasksToClose = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
         final Set<Task> additionalTasksForCommitting = new HashSet<>();
         final Set<Task> dirtyTasks = new HashSet<>();
@@ -210,11 +210,11 @@ public class TaskManager {
                 tasksToRecycle.add(task);
             } else {
                 try {
-                    final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
+                    task.prepareCloseClean();
                     final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task
                         .committableOffsetsAndMetadata();
 
-                    checkpointPerTask.put(task, checkpoint);
+                    tasksToClose.add(task);
                     if (!committableOffsets.isEmpty()) {
                         consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                     }
@@ -250,20 +250,17 @@ public class TaskManager {
                 log.error("Failed to batch commit tasks, " +
                     "will close all tasks involved in this commit as dirty by the end", e);
                 dirtyTasks.addAll(additionalTasksForCommitting);
-                dirtyTasks.addAll(checkpointPerTask.keySet());
+                dirtyTasks.addAll(tasksToClose);
 
-                checkpointPerTask.clear();
+                tasksToClose.clear();
                 // Just add first taskId to re-throw by the end.
                 taskCloseExceptions.put(consumedOffsetsAndMetadataPerTask.keySet().iterator().next(), e);
             }
         }
 
-        for (final Map.Entry<Task, Map<TopicPartition, Long>> taskAndCheckpoint : checkpointPerTask.entrySet()) {
-            final Task task = taskAndCheckpoint.getKey();
-            final Map<TopicPartition, Long> checkpoint = taskAndCheckpoint.getValue();
-
+        for (final Task task : tasksToClose) {
             try {
-                completeTaskCloseClean(task, checkpoint);
+                completeTaskCloseClean(task);
                 cleanUpTaskProducer(task, taskCloseExceptions);
                 tasks.remove(task.id());
             } catch (final RuntimeException e) {
@@ -631,9 +628,9 @@ public class TaskManager {
         task.closeDirty();
     }
 
-    private void completeTaskCloseClean(final Task task, final Map<TopicPartition, Long> checkpoint) {
+    private void completeTaskCloseClean(final Task task) {
         cleanupTask(task);
-        task.closeClean(checkpoint);
+        task.closeClean();
     }
 
     // Note: this MUST be called *before* actually closing the task
@@ -652,16 +649,16 @@ public class TaskManager {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Map<Task, Map<TopicPartition, Long>> checkpointPerTask = new HashMap<>();
+        final Set<Task> tasksToClose = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
         for (final Task task : tasks.values()) {
             if (clean) {
                 try {
-                    final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
+                    task.prepareCloseClean();
                     final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.committableOffsetsAndMetadata();
 
-                    checkpointPerTask.put(task, checkpoint);
+                    tasksToClose.add(task);
                     if (!committableOffsets.isEmpty()) {
                         consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                     }
@@ -681,11 +678,9 @@ public class TaskManager {
             commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
         }
 
-        for (final Map.Entry<Task, Map<TopicPartition, Long>> taskAndCheckpoint : checkpointPerTask.entrySet()) {
-            final Task task = taskAndCheckpoint.getKey();
-            final Map<TopicPartition, Long> checkpoint = taskAndCheckpoint.getValue();
+        for (final Task task : tasksToClose) {
             try {
-                completeTaskCloseClean(task, checkpoint);
+                completeTaskCloseClean(task);
             } catch (final RuntimeException e) {
                 firstException.compareAndSet(null, e);
                 closeTaskDirty(task);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index f868de4..04ef951 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -52,7 +52,6 @@ import org.junit.runner.RunWith;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.Map;
 
 import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -270,8 +269,8 @@ public class StandbyTaskTest {
 
         task = createStandbyTask();
         task.initializeIfNeeded();
-        final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
-        task.closeClean(checkpoint);
+        task.prepareCloseClean();
+        task.closeClean();
 
         assertEquals(Task.State.CLOSED, task.state());
 
@@ -323,8 +322,8 @@ public class StandbyTaskTest {
         task = createStandbyTask();
         task.initializeIfNeeded();
 
-        final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
-        assertThrows(RuntimeException.class, () -> task.closeClean(checkpoint));
+        task.prepareCloseClean();
+        assertThrows(RuntimeException.class, () -> task.closeClean());
 
         final double expectedCloseTaskMetric = 0.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a4431a2..27135f8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1567,8 +1567,8 @@ public class StreamTaskTest {
 
         task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
 
-        final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
-        task.closeClean(checkpoint);
+        task.prepareCloseClean();
+        task.closeClean();
 
         assertEquals(Task.State.CLOSED, task.state());
         assertFalse(source1.initialized);
@@ -1642,8 +1642,8 @@ public class StreamTaskTest {
         task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
 
         task.initializeIfNeeded();
-        final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
-        task.closeClean(checkpoint);
+        task.prepareCloseClean();
+        task.closeClean();
 
         assertEquals(Task.State.CLOSED, task.state());
 
@@ -1668,8 +1668,8 @@ public class StreamTaskTest {
         task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
         task.initializeIfNeeded();
         task.completeRestoration();
-        final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
-        task.closeClean(checkpoint);
+        task.prepareCloseClean();
+        task.closeClean();
 
         assertEquals(Task.State.CLOSED, task.state());
 
@@ -1696,8 +1696,8 @@ public class StreamTaskTest {
         task.initializeIfNeeded();
         task.completeRestoration();
 
-        final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
-        assertThrows(ProcessorStateException.class, () -> task.closeClean(checkpoint));
+        task.prepareCloseClean();
+        assertThrows(ProcessorStateException.class, () -> task.closeClean());
 
         final double expectedCloseTaskMetric = 0.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
@@ -1760,8 +1760,8 @@ public class StreamTaskTest {
         task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
         task.initializeIfNeeded();
 
-        final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
-        assertThrows(ProcessorStateException.class, () -> task.closeClean(checkpoint));
+        task.prepareCloseClean();
+        assertThrows(ProcessorStateException.class, () -> task.closeClean());
 
         assertEquals(Task.State.RESTORING, task.state());
 
@@ -1795,11 +1795,11 @@ public class StreamTaskTest {
 
         task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
 
-        final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
-        task.closeClean(checkpoint);
+        task.prepareCloseClean();
+        task.closeClean();
 
         // close calls are idempotent since we are already in closed
-        task.closeClean(checkpoint);
+        task.closeClean();
         task.closeDirty();
 
         EasyMock.reset(stateManager);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 7f31d7e..200d841 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -604,7 +604,7 @@ public class TaskManagerTest {
 
         final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
             @Override
-            public Map<TopicPartition, Long> prepareCloseClean() {
+            public void prepareCloseClean() {
                 throw new RuntimeException("oops");
             }
         };
@@ -1115,7 +1115,7 @@ public class TaskManagerTest {
         final AtomicBoolean closedDirtyTask03 = new AtomicBoolean(false);
         final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true) {
             @Override
-            public Map<TopicPartition, Long> prepareCloseClean() {
+            public void prepareCloseClean() {
                 throw new TaskMigratedException("migrated", new RuntimeException("cause"));
             }
 
@@ -1133,7 +1133,7 @@ public class TaskManagerTest {
         };
         final Task task02 = new StateMachineTask(taskId02, taskId02Partitions, true) {
             @Override
-            public Map<TopicPartition, Long> prepareCloseClean() {
+            public void prepareCloseClean() {
                 throw new RuntimeException("oops");
             }
 
@@ -1451,13 +1451,13 @@ public class TaskManagerTest {
         };
         final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true) {
             @Override
-            public Map<TopicPartition, Long> prepareCloseClean() {
+            public void prepareCloseClean() {
                 throw new TaskMigratedException("migrated", new RuntimeException("cause"));
             }
         };
         final Task task02 = new StateMachineTask(taskId02, taskId02Partitions, true) {
             @Override
-            public Map<TopicPartition, Long> prepareCloseClean() {
+            public void prepareCloseClean() {
                 throw new RuntimeException("oops");
             }
         };
@@ -2273,14 +2273,14 @@ public class TaskManagerTest {
     public void shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() {
         final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) {
             @Override
-            public Map<TopicPartition, Long> prepareCloseClean() {
+            public void prepareCloseClean() {
                 throw new TaskMigratedException("t1 close exception", new RuntimeException());
             }
         };
 
         final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) {
             @Override
-            public Map<TopicPartition, Long> prepareCloseClean() {
+            public void prepareCloseClean() {
                 throw new TaskMigratedException("t2 close exception", new RuntimeException());
             }
         };
@@ -2303,14 +2303,14 @@ public class TaskManagerTest {
     public void shouldThrowRuntimeExceptionWhenEncounteredUnknownExceptionDuringTaskClose() {
         final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) {
             @Override
-            public Map<TopicPartition, Long> prepareCloseClean() {
+            public void prepareCloseClean() {
                 throw new TaskMigratedException("t1 close exception", new RuntimeException());
             }
         };
 
         final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) {
             @Override
-            public Map<TopicPartition, Long> prepareCloseClean() {
+            public void prepareCloseClean() {
                 throw new IllegalStateException("t2 illegal state exception", new RuntimeException());
             }
         };
@@ -2332,14 +2332,14 @@ public class TaskManagerTest {
     public void shouldThrowSameKafkaExceptionWhenEncounteredDuringTaskClose() {
         final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) {
             @Override
-            public Map<TopicPartition, Long> prepareCloseClean() {
+            public void prepareCloseClean() {
                 throw new TaskMigratedException("t1 close exception", new RuntimeException());
             }
         };
 
         final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) {
             @Override
-            public Map<TopicPartition, Long> prepareCloseClean() {
+            public void prepareCloseClean() {
                 throw new KafkaException("Kaboom for t2!", new RuntimeException());
             }
         };
@@ -2703,15 +2703,13 @@ public class TaskManagerTest {
         }
 
         @Override
-        public Map<TopicPartition, Long> prepareCloseClean() {
-            return Collections.emptyMap();
-        }
+        public void prepareCloseClean() {}
 
         @Override
         public void prepareCloseDirty() {}
 
         @Override
-        public void closeClean(final Map<TopicPartition, Long> checkpoint) {
+        public void closeClean() {
             transitionTo(State.CLOSED);
         }
 
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index db31012..3416ec0 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -1180,8 +1180,8 @@ public class TopologyTestDriver implements Closeable {
      */
     public void close() {
         if (task != null) {
-            final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
-            task.closeClean(checkpoint);
+            task.prepareCloseClean();
+            task.closeClean();
         }
         if (globalStateTask != null) {
             try {