You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/14 07:04:01 UTC

[incubator-seatunnel] branch dev updated: [feature][engine][checkpoint] The checkpoint coordinator supports fault-tolerant recovery (#3092)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c8fc31023 [feature][engine][checkpoint] The checkpoint coordinator supports fault-tolerant recovery (#3092)
c8fc31023 is described below

commit c8fc3102378f4ca4d561c533e0560e0a867100ba
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Fri Oct 14 15:03:55 2022 +0800

    [feature][engine][checkpoint] The checkpoint coordinator supports fault-tolerant recovery (#3092)
    
    * [feature][engine][checkpoint] The checkpoint coordinator supports fault-tolerant recovery
    
    * [engine][checkpoint] add CheckpointManager Test case
---
 .../core/checkpoint/CheckpointIDCounter.java       |  6 +-
 .../server/checkpoint/CheckpointCoordinator.java   | 18 +++--
 .../server/checkpoint/CheckpointManager.java       | 57 ++++++++++++++--
 .../engine/server/checkpoint/CheckpointPlan.java   |  6 --
 ...IDCounter.java => IMapCheckpointIDCounter.java} | 40 +++++++-----
 .../checkpoint/StandaloneCheckpointIDCounter.java  |  4 +-
 .../server/checkpoint/CheckpointManagerTest.java   | 76 ++++++++++++++++++++++
 .../storage/api/AbstractCheckpointStorage.java     |  2 +-
 .../checkpoint/storage/api/CheckpointStorage.java  | 11 ++++
 .../checkpoint/storage/hdfs/HdfsStorage.java       | 20 ++++++
 .../storage/localfile/LocalFileStorage.java        | 28 +++++++-
 11 files changed, 224 insertions(+), 44 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
index cbf728380..7bdc228fc 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
@@ -18,14 +18,14 @@
 
 package org.apache.seatunnel.engine.core.checkpoint;
 
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineState;
 
 import java.util.concurrent.CompletableFuture;
 
 /** A checkpoint ID counter. */
 public interface CheckpointIDCounter {
 
-    int INITIAL_CHECKPOINT_ID = 1;
+    long INITIAL_CHECKPOINT_ID = 1;
 
     /** Starts the {@link CheckpointIDCounter} service down. */
     void start() throws Exception;
@@ -38,7 +38,7 @@ public interface CheckpointIDCounter {
      *
      * @return The {@code CompletableFuture} holding the result of the shutdown operation.
      */
-    CompletableFuture<Void> shutdown(JobStatus jobStatus);
+    CompletableFuture<Void> shutdown(PipelineState jobStatus);
 
     /**
      * Atomically increments the current checkpoint ID.
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 6525e1beb..2a1ef88d7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -41,6 +41,8 @@ import org.apache.seatunnel.engine.server.task.record.Barrier;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+import lombok.Getter;
+import lombok.SneakyThrows;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,6 +83,7 @@ public class CheckpointCoordinator {
 
     private final CheckpointStorageConfiguration storageConfig;
 
+    @Getter
     private final CheckpointIDCounter checkpointIdCounter;
 
     private final transient Serializer serializer;
@@ -100,7 +103,7 @@ public class CheckpointCoordinator {
 
     private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
-    private volatile CompletedCheckpoint latestCompletedCheckpoint;
+    private volatile CompletedCheckpoint latestCompletedCheckpoint = null;
 
     private final CheckpointCoordinatorConfiguration coordinatorConfig;
 
@@ -116,13 +119,15 @@ public class CheckpointCoordinator {
     /** Flag marking the coordinator as shut down (not accepting any messages any more). */
     private volatile boolean shutdown;
 
+    @SneakyThrows
     public CheckpointCoordinator(CheckpointManager manager,
                                  CheckpointStorage checkpointStorage,
                                  CheckpointStorageConfiguration storageConfig,
                                  long jobId,
                                  CheckpointPlan plan,
-                                 CheckpointCoordinatorConfiguration coordinatorConfig) {
-
+                                 CheckpointCoordinatorConfiguration coordinatorConfig,
+                                 CheckpointIDCounter checkpointIdCounter,
+                                 PipelineState pipelineState) {
         this.checkpointManager = manager;
         this.checkpointStorage = checkpointStorage;
         this.storageConfig = storageConfig;
@@ -130,7 +135,6 @@ public class CheckpointCoordinator {
         this.pipelineId = plan.getPipelineId();
         this.plan = plan;
         this.coordinatorConfig = coordinatorConfig;
-        this.latestCompletedCheckpoint = plan.getRestoredCheckpoint();
         this.tolerableFailureCheckpoints = coordinatorConfig.getTolerableFailureCheckpoints();
         this.pendingCheckpoints = new ConcurrentHashMap<>();
         this.completedCheckpoints = new ArrayDeque<>(storageConfig.getMaxRetainedCheckpoints() + 1);
@@ -144,8 +148,10 @@ public class CheckpointCoordinator {
         this.serializer = new ProtoStuffSerializer();
         this.pipelineTasks = getPipelineTasks(plan.getPipelineSubtasks());
         this.pipelineTaskStatus = new ConcurrentHashMap<>();
-        // TODO: IDCounter SPI
-        this.checkpointIdCounter = new StandaloneCheckpointIDCounter();
+        this.checkpointIdCounter = checkpointIdCounter;
+        if (pipelineState != null) {
+            this.latestCompletedCheckpoint = serializer.deserialize(pipelineState.getStates(), CompletedCheckpoint.class);
+        }
     }
 
     public int getPipelineId() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index b62e1a454..5286c1aae 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -18,13 +18,19 @@
 package org.apache.seatunnel.engine.server.checkpoint;
 
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
 import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.job.Job;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
 import org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusOperation;
+import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
 import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.Task;
@@ -36,12 +42,14 @@ import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import com.hazelcast.cluster.Address;
+import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -78,14 +86,30 @@ public class CheckpointManager {
         this.subtaskWithAddresses = new HashMap<>();
         this.checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, storageConfig.getStorage())
             .create(new HashMap<>());
+        IMap<Integer, Long> checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", jobId));
         this.coordinatorMap = checkpointPlanMap.values().parallelStream()
-            .map(plan -> new CheckpointCoordinator(this,
-                checkpointStorage,
-                storageConfig,
-                jobId,
-                plan,
-                coordinatorConfig)
-            ).collect(Collectors.toMap(CheckpointCoordinator::getPipelineId, Function.identity()));
+            .map(plan -> {
+                IMapCheckpointIDCounter idCounter = new IMapCheckpointIDCounter(plan.getPipelineId(), checkpointIdMap);
+                try {
+                    idCounter.start();
+                    // TODO: support savepoint
+                    PipelineState pipelineState = null;
+                    if (idCounter.get() != CheckpointIDCounter.INITIAL_CHECKPOINT_ID) {
+                        pipelineState = checkpointStorage.getCheckpoint(String.valueOf(jobId), String.valueOf(plan.getPipelineId()), String.valueOf(idCounter.get() - 1));
+                    }
+                    return new CheckpointCoordinator(this,
+                        checkpointStorage,
+                        storageConfig,
+                        jobId,
+                        plan,
+                        coordinatorConfig,
+                        idCounter,
+                        pipelineState);
+                } catch (Exception e) {
+                    ExceptionUtil.sneakyThrow(e);
+                }
+                throw new RuntimeException("Never throw here.");
+            }).collect(Collectors.toMap(CheckpointCoordinator::getPipelineId, Function.identity()));
     }
 
     /**
@@ -147,6 +171,25 @@ public class CheckpointManager {
         }
     }
 
+    /**
+     * Called by the JobMaster.
+     * <br> Listen to the {@link org.apache.seatunnel.engine.core.job.PipelineState} of the {@link Pipeline}, which is used to shut down the running {@link CheckpointIDCounter} at the end of the pipeline.
+     */
+    public CompletableFuture<Void> listenPipeline(int pipelineId, org.apache.seatunnel.engine.core.job.PipelineState pipelineStatus) {
+        return getCheckpointCoordinator(pipelineId).getCheckpointIdCounter().shutdown(pipelineStatus);
+    }
+
+    /**
+     * Called by the JobMaster.
+     * <br> Listen to the {@link JobStatus} of the {@link Job}.
+     */
+    public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+        if (jobStatus == JobStatus.FINISHED) {
+            checkpointStorage.deleteCheckpoint(jobId + "");
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
     /**
      * Called by the JobMaster.
      * <br> Returns whether the pipeline has completed; No need to deploy/restore the {@link SubPlan} if the pipeline has been completed;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
index c2f7ab1f5..0dd1c0dc5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
@@ -49,11 +49,6 @@ public class CheckpointPlan {
      */
     private final Set<TaskLocation> startingSubtasks;
 
-    /**
-     * Restored completed checkpoint.
-     */
-    private final CompletedCheckpoint restoredCheckpoint;
-
     /**
      * All actions in this pipeline.
      * <br> key: the action id;
@@ -64,7 +59,6 @@ public class CheckpointPlan {
     public static final class Builder {
         private final Set<TaskLocation> pipelineSubtasks = new HashSet<>();
         private final Set<TaskLocation> startingSubtasks = new HashSet<>();
-        private CompletedCheckpoint restoredCheckpoint;
         private final Map<Long, Integer> pipelineActions = new HashMap<>();
 
         private Builder() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
similarity index 55%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
index e656fca4f..c98a8a69a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
@@ -18,45 +18,49 @@
 package org.apache.seatunnel.engine.server.checkpoint;
 
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineState;
+
+import com.hazelcast.map.IMap;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicLong;
 
-public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
+public class IMapCheckpointIDCounter implements CheckpointIDCounter {
+    private final Integer pipelineId;
+    private final IMap<Integer, Long> checkpointIdMap;
 
-    private final AtomicLong checkpointIdCounter = new AtomicLong(INITIAL_CHECKPOINT_ID);
+    public IMapCheckpointIDCounter(Integer pipelineId,
+                                   IMap<Integer, Long> checkpointIdMap) {
+        this.pipelineId = pipelineId;
+        this.checkpointIdMap = checkpointIdMap;
+    }
 
     @Override
     public void start() throws Exception {
+        checkpointIdMap.putIfAbsent(pipelineId, INITIAL_CHECKPOINT_ID);
     }
 
     @Override
-    public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+    public CompletableFuture<Void> shutdown(PipelineState pipelineStatus) {
+        if (pipelineStatus.isEndState()) {
+            checkpointIdMap.remove(pipelineId);
+        }
         return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public long getAndIncrement() throws Exception {
-        return checkpointIdCounter.getAndIncrement();
+        Long currentId = checkpointIdMap.get(pipelineId);
+        checkpointIdMap.put(pipelineId, currentId + 1);
+        return currentId;
     }
 
     @Override
     public long get() {
-        return checkpointIdCounter.get();
+        return checkpointIdMap.get(pipelineId);
     }
 
     @Override
-    public void setCount(long newCount) {
-        checkpointIdCounter.set(newCount);
-    }
-
-    /**
-     * Returns the last checkpoint ID (current - 1).
-     *
-     * @return Last checkpoint ID.
-     */
-    public long getLast() {
-        return checkpointIdCounter.get() - 1;
+    public void setCount(long newId) throws Exception {
+        checkpointIdMap.put(pipelineId, newId);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
index e656fca4f..d1ad67a87 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.engine.server.checkpoint;
 
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
-import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineState;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
@@ -32,7 +32,7 @@ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
     }
 
     @Override
-    public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+    public CompletableFuture<Void> shutdown(PipelineState pipelineStatus) {
         return CompletableFuture.completedFuture(null);
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
new file mode 100644
index 000000000..1801def2b
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
+import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
+import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
+import org.apache.seatunnel.engine.checkpoint.storage.common.ProtoStuffSerializer;
+import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+
+import com.hazelcast.map.IMap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+@DisabledOnOs(OS.WINDOWS)
+public class CheckpointManagerTest extends AbstractSeaTunnelServerTest {
+
+    @Test
+    public void testHAByIMapCheckpointIDCounter() throws CheckpointStorageException {
+        long jobId = (long) (Math.random() * 1000000L);
+        CheckpointStorage checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class,
+                CheckpointStorageConfiguration.builder().build().getStorage())
+            .create(new HashMap<>());
+        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(jobId, 1, 1,
+            Instant.now().toEpochMilli(),
+            CheckpointType.COMPLETED_POINT_TYPE,
+            Instant.now().toEpochMilli(),
+            new HashMap<>(),
+            new HashMap<>());
+        checkpointStorage.storeCheckPoint(PipelineState.builder().jobId(jobId + "").pipelineId(1).checkpointId(1)
+            .states(new ProtoStuffSerializer().serialize(completedCheckpoint)).build());
+        IMap<Integer, Long> checkpointIdMap = nodeEngine.getHazelcastInstance().getMap("checkpoint-id-" + jobId);
+        checkpointIdMap.put(1, 2L);
+        Map<Integer, CheckpointPlan> planMap = new HashMap<>();
+        planMap.put(1, CheckpointPlan.builder().pipelineId(1).build());
+        CheckpointManager checkpointManager = new CheckpointManager(
+            jobId,
+            nodeEngine,
+            planMap,
+            CheckpointCoordinatorConfiguration.builder().build(),
+            CheckpointStorageConfiguration.builder().build());
+        Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
+        CompletableFuture<Void> future = checkpointManager.listenPipeline(1, org.apache.seatunnel.engine.core.job.PipelineState.FINISHED);
+        future.join();
+        Assertions.assertNull(checkpointIdMap.get(1));
+        future = checkpointManager.shutdown(JobStatus.FINISHED);
+        future.join();
+        Assertions.assertTrue(checkpointStorage.getAllCheckpoints(jobId + "").isEmpty());
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
index 9dd791d49..3e14ea44d 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
@@ -168,7 +168,7 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
      * @return the checkpoint id of the file.
      */
     public String getCheckpointIdByFileName(String fileName) {
-        return fileName.split(FILE_NAME_SPLIT)[FILE_NAME_CHECKPOINT_ID_INDEX];
+        return fileName.split(FILE_NAME_SPLIT)[FILE_NAME_CHECKPOINT_ID_INDEX].split("\\.")[0];
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java
index 814df74c1..7478f5362 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java
@@ -95,6 +95,17 @@ public interface CheckpointStorage {
      */
     void deleteCheckpoint(String jobId);
 
+    /**
+     * get checkpoint state
+     *
+     * @param jobId job id
+     * @param pipelineId pipeline id
+     * @param checkpointId checkpoint id
+     * @return checkpoint state
+     * @throws CheckpointStorageException get checkpoint failed
+     */
+    PipelineState getCheckpoint(String jobId, String pipelineId, String checkpointId) throws CheckpointStorageException;
+
     /**
      * Delete the checkpoint data.
      *
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
index cd9190e9b..9ac3393e2 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
@@ -206,6 +206,26 @@ public class HdfsStorage extends AbstractCheckpointStorage {
         }
     }
 
+    @Override
+    public PipelineState getCheckpoint(String jobId, String pipelineId, String checkpointId) throws CheckpointStorageException {
+        String path = getStorageParentDirectory() + jobId;
+        List<String> fileNames = getFileNames(path);
+        if (fileNames.isEmpty()) {
+            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
+        }
+        for (String fileName : fileNames) {
+            if (pipelineId.equals(getPipelineIdByFileName(fileName)) &&
+                checkpointId.equals(getCheckpointIdByFileName(fileName))) {
+                try {
+                    return readPipelineState(fileName, jobId);
+                } catch (Exception e) {
+                    log.error("Failed to get checkpoint {} for job {}, pipeline {}", checkpointId, jobId, pipelineId, e);
+                }
+            }
+        }
+        throw new CheckpointStorageException(String.format("No checkpoint found, job(%s), pipeline(%s), checkpoint(%s)", jobId, pipelineId, checkpointId));
+    }
+
     @Override
     public void deleteCheckpoint(String jobId, String pipelineId, String checkpointId) throws CheckpointStorageException {
         String path = getStorageParentDirectory() + jobId;
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
index 1396b6a75..bc6f0c7e1 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
@@ -105,9 +105,14 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
 
     @Override
     public List<PipelineState> getAllCheckpoints(String jobId) throws CheckpointStorageException {
+        File filePath = new File(getStorageParentDirectory() + jobId);
+        if (!filePath.exists()) {
+            return new ArrayList<>();
+        }
+
         Collection<File> fileList;
         try {
-            fileList = FileUtils.listFiles(new File(getStorageParentDirectory() + jobId), FILE_EXTENSIONS, true);
+            fileList = FileUtils.listFiles(filePath, FILE_EXTENSIONS, true);
         } catch (Exception e) {
             throw new CheckpointStorageException("Failed to get all checkpoints for job " + jobId, e);
         }
@@ -216,6 +221,27 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
         }
     }
 
+    @Override
+    public PipelineState getCheckpoint(String jobId, String pipelineId, String checkpointId) throws CheckpointStorageException {
+        Collection<File> fileList = FileUtils.listFiles(new File(getStorageParentDirectory() + jobId), FILE_EXTENSIONS, false);
+        if (fileList.isEmpty()) {
+            throw new CheckpointStorageException("No checkpoint found for job " + jobId);
+        }
+        for (File file : fileList) {
+            String fileName = file.getName();
+            if (pipelineId.equals(getPipelineIdByFileName(fileName)) &&
+                checkpointId.equals(getCheckpointIdByFileName(fileName))) {
+                try {
+                    byte[] data = FileUtils.readFileToByteArray(file);
+                    return deserializeCheckPointData(data);
+                } catch (Exception e) {
+                    log.error("Failed to delete checkpoint {} for job {}, pipeline {}", checkpointId, jobId, pipelineId, e);
+                }
+            }
+        }
+        throw new CheckpointStorageException(String.format("No checkpoint found, job(%s), pipeline(%s), checkpoint(%s)", jobId, pipelineId, checkpointId));
+    }
+
     @Override
     public void deleteCheckpoint(String jobId, String pipelineId, String checkpointId) throws CheckpointStorageException {
         Collection<File> fileList = FileUtils.listFiles(new File(getStorageParentDirectory() + jobId), FILE_EXTENSIONS, false);