You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/26 02:51:20 UTC

[incubator-seatunnel] branch st-engine updated: [New-Engine]Support Hdfs Storage and async storage (#2485)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 72e66272d [New-Engine]Support Hdfs Storage and async storage (#2485)
72e66272d is described below

commit 72e66272d5ab54d8d8d54fba8a4a1193c94ba3d9
Author: Kirs <ki...@apache.org>
AuthorDate: Fri Aug 26 10:51:15 2022 +0800

    [New-Engine]Support Hdfs Storage and async storage (#2485)
    
    * [New-Engine]Checkpoint storage
    Support local file storage plugin
    Support proto-stuff serillazer
    Support SPI load check-point storage
    
    * Improve  getLatest method
    
    * refactoring some methods
    
    * refact init storage instance
    
    * [New-Enging]support checkpoint storage
    
    * [New-Engine]Support Hdfs Storage and async storage
    Support config storage namespace
    Support windows local file storage
    Support async submit task
    
    * fix license header
    
    * fix ut
    
    * fix some log and improve name split method
    
    * fix some log and improve name split method
    
    * fix code style
    
    * fix code style
    
    * remove hflush
---
 .../operation/source/SourceRegisterOperation.java  |   1 -
 .../storage/api/AbstractCheckpointStorage.java     | 119 +++++++++-
 .../checkpoint/storage/api/CheckpointStorage.java  |   8 +
 .../storage/api/CheckpointStorageFactory.java      |   4 +-
 .../storage/common/ProtoStuffSerializer.java       |  10 +-
 .../storage/common/StorageThreadFactory.java       |  50 ++++
 .../StorageConstants.java}                         |  26 +-
 .../pom.xml                                        |  25 +-
 .../checkpoint/storage/hdfs/HdfsConstants.java}    |  27 +--
 .../checkpoint/storage/hdfs/HdfsStorage.java       | 261 +++++++++++++++++++++
 .../storage/hdfs/HdfsStorageFactory.java}          |  16 +-
 .../checkpoint-storage-local-file/pom.xml          |  13 +-
 .../storage/localfile/LocalFileStorage.java        | 104 ++++----
 .../storage/localfile/LocalFileStorageFactory.java |   2 +-
 .../storage/localfile/LocalFileStorageTest.java    |   4 +-
 .../checkpoint-storage-plugins/pom.xml             |   6 +
 16 files changed, 542 insertions(+), 134 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index 63e9ab4fb..1f89f1f9d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -63,7 +63,6 @@ public class SourceRegisterOperation extends Operation implements IdentifiedData
             return null;
         }, new RetryUtils.RetryMaterial(RETRY_TIME, true,
             exception -> exception instanceof NullPointerException, RETRY_TIME_OUT));
-
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
index e7774e3cf..acf607b7e 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
@@ -23,12 +23,26 @@ package org.apache.seatunnel.engine.checkpoint.storage.api;
 import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
 import org.apache.seatunnel.engine.checkpoint.storage.common.ProtoStuffSerializer;
 import org.apache.seatunnel.engine.checkpoint.storage.common.Serializer;
+import org.apache.seatunnel.engine.checkpoint.storage.common.StorageThreadFactory;
 import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
+@Slf4j
 public abstract class AbstractCheckpointStorage implements CheckpointStorage {
 
     /**
@@ -36,23 +50,31 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
      */
     private final Serializer serializer = new ProtoStuffSerializer();
 
+    public static final String DEFAULT_CHECKPOINT_FILE_PATH_SPLIT = "/";
+
     /**
      * storage root directory
+     * if not set, use default value
      */
-    private static final String CHECKPOINT_DEFAULT_FILE_DIR = "/tmp/seatunnel/checkpoint/";
+    private String storageNameSpace = "/seatunnel/checkpoint/";
 
     public static final String FILE_NAME_SPLIT = "-";
 
-    public static final int FILE_NAME_PIPELINE_ID_INDEX = 1;
+    public static final int FILE_NAME_PIPELINE_ID_INDEX = 2;
 
     public static final int FILE_SORT_ID_INDEX = 0;
 
-    public static final String FILE_FORMAT = "txt";
+    public static final int FILE_NAME_RANDOM_RANGE = 1000;
 
-    /**
-     * record order
-     */
-    private final AtomicLong counter = new AtomicLong(0);
+    public static final String FILE_FORMAT = "ser";
+
+    private volatile ExecutorService executorService;
+
+    private static final int DEFAULT_THREAD_POOL_MIN_SIZE = Runtime.getRuntime().availableProcessors() * 2 + 1;
+
+    private static final int DEFAULT_THREAD_POOL_MAX_SIZE = Runtime.getRuntime().availableProcessors() * 4 + 1;
+
+    private static final int DEFAULT_THREAD_POOL_QUENE_SIZE = 1024;
 
     /**
      * init storage instance
@@ -65,11 +87,11 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
     public abstract void initStorage(Map<String, String> configuration) throws CheckpointStorageException;
 
     public String getStorageParentDirectory() {
-        return CHECKPOINT_DEFAULT_FILE_DIR;
+        return storageNameSpace;
     }
 
     public String getCheckPointName(PipelineState state) {
-        return counter.incrementAndGet() + FILE_NAME_SPLIT + state.getPipelineId() + FILE_NAME_SPLIT + state.getCheckpointId() + "." + FILE_FORMAT;
+        return System.nanoTime() + FILE_NAME_SPLIT + ThreadLocalRandom.current().nextInt(FILE_NAME_RANDOM_RANGE) + FILE_NAME_SPLIT + state.getPipelineId() + FILE_NAME_SPLIT + state.getCheckpointId() + "." + FILE_FORMAT;
     }
 
     public byte[] serializeCheckPointData(PipelineState state) throws IOException {
@@ -79,4 +101,83 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
     public PipelineState deserializeCheckPointData(byte[] data) throws IOException {
         return serializer.deserialize(data, PipelineState.class);
     }
+
+    public void setStorageNameSpace(String storageNameSpace) {
+        if (storageNameSpace != null) {
+            this.storageNameSpace = storageNameSpace;
+        }
+    }
+
+    public Set<String> getLatestPipelineNames(List<String> fileNames) {
+        Map<String, String> latestPipelineMap = new HashMap<>();
+        fileNames.forEach(fileName -> {
+            String[] fileNameSegments = fileName.split(FILE_NAME_SPLIT);
+            long fileVersion = Long.parseLong(fileNameSegments[FILE_SORT_ID_INDEX]);
+            String filePipelineId = fileNameSegments[FILE_NAME_PIPELINE_ID_INDEX];
+            if (latestPipelineMap.containsKey(filePipelineId)) {
+                long oldVersion = Long.parseLong(latestPipelineMap.get(filePipelineId).split(FILE_NAME_SPLIT)[FILE_SORT_ID_INDEX]);
+                if (fileVersion > oldVersion) {
+                    latestPipelineMap.put(filePipelineId, fileName);
+                }
+            } else {
+                latestPipelineMap.put(filePipelineId, fileName);
+            }
+        });
+        Set<String> latestPipelines = new HashSet<>(latestPipelineMap.size());
+        latestPipelineMap.forEach((pipelineId, fileName) -> latestPipelines.add(fileName));
+        return latestPipelines;
+    }
+
+    /**
+     * get latest checkpoint file name
+     *
+     * @param fileNames file names
+     * @return latest checkpoint file name
+     */
+    public String getLatestCheckpointFileNameByJobIdAndPipelineId(List<String> fileNames, String pipelineId) {
+        AtomicReference<String> latestFileName = new AtomicReference<>();
+        AtomicLong latestVersion = new AtomicLong();
+        fileNames.forEach(fileName -> {
+            String[] fileNameSegments = fileName.split(FILE_NAME_SPLIT);
+            long fileVersion = Long.parseLong(fileNameSegments[FILE_SORT_ID_INDEX]);
+            String filePipelineId = fileNameSegments[FILE_NAME_PIPELINE_ID_INDEX];
+            if (pipelineId.equals(filePipelineId) && fileVersion > latestVersion.get()) {
+                latestVersion.set(fileVersion);
+                latestFileName.set(fileName);
+            }
+        });
+        return latestFileName.get();
+    }
+
+    /**
+     * get the latest checkpoint file name
+     *
+     * @param fileName file names. note: file name cannot contain parent path
+     * @return latest checkpoint file name
+     */
+    public String getPipelineIdByFileName(String fileName) {
+        return fileName.split(FILE_NAME_SPLIT)[FILE_NAME_PIPELINE_ID_INDEX];
+    }
+
+    @Override
+    public void asyncStoreCheckPoint(PipelineState state) {
+        initExecutor();
+        this.executorService.submit(() -> {
+            try {
+                storeCheckPoint(state);
+            } catch (Exception e) {
+                log.error(String.format("store checkpoint failed, job id : %s, pipeline id : %d", state.getJobId(), state.getPipelineId()), e);
+            }
+        });
+    }
+
+    private void initExecutor() {
+        if (null == this.executorService || this.executorService.isShutdown()) {
+            synchronized (this) {
+                if (null == this.executorService || this.executorService.isShutdown()) {
+                    this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_MIN_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(DEFAULT_THREAD_POOL_QUENE_SIZE), new StorageThreadFactory());
+                }
+            }
+        }
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java
index c9fb732a6..2d38c6b36 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java
@@ -35,6 +35,14 @@ public interface CheckpointStorage {
      */
     String storeCheckPoint(PipelineState state) throws CheckpointStorageException;
 
+    /**
+     * async save checkpoint to storage
+     *
+     * @param state PipelineState
+     * @throws CheckpointStorageException if save checkpoint failed
+     */
+    void asyncStoreCheckPoint(PipelineState state) throws CheckpointStorageException;
+
     /**
      * get all checkpoint from storage
      * if no data found, return empty list
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java
index 0723b5d84..73cb777b8 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java
@@ -20,6 +20,8 @@
 
 package org.apache.seatunnel.engine.checkpoint.storage.api;
 
+import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+
 import java.util.Map;
 
 /**
@@ -43,5 +45,5 @@ public interface CheckpointStorageFactory {
      *                      value: "fs.defaultFS"
      *                      return storage plugin instance
      */
-    CheckpointStorage create(Map<String, String> configuration);
+    CheckpointStorage create(Map<String, String> configuration) throws CheckpointStorageException;
 }
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/common/ProtoStuffSerializer.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/common/ProtoStuffSerializer.java
index 32288ece5..5893e0c64 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/common/ProtoStuffSerializer.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/common/ProtoStuffSerializer.java
@@ -30,11 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class ProtoStuffSerializer implements Serializer {
 
-    /**
-     * Consider configurable
-     */
-    private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(1024);
-
     /**
      * At the moment it looks like we only have one Schema.
      */
@@ -48,12 +43,13 @@ public class ProtoStuffSerializer implements Serializer {
     @Override
     public <T> byte[] serialize(T obj) {
         Class<T> clazz = (Class<T>) obj.getClass();
+        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
         Schema<T> schema = getSchema(clazz);
         byte[] data;
         try {
-            data = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER);
+            data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
         } finally {
-            BUFFER.clear();
+            buffer.clear();
         }
         return data;
     }
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/common/StorageThreadFactory.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/common/StorageThreadFactory.java
new file mode 100644
index 000000000..76cbb2435
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/common/StorageThreadFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.common;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StorageThreadFactory implements ThreadFactory {
+    private final AtomicInteger poolNumber = new AtomicInteger(1);
+    private final ThreadGroup group;
+    private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+    private final String namePrefix;
+
+    public StorageThreadFactory() {
+        SecurityManager s = System.getSecurityManager();
+        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+        namePrefix = "StorageThread-" + poolNumber.getAndIncrement() + "-thread-";
+    }
+
+    @Override
+    public Thread newThread(Runnable runnable) {
+        Thread thread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);
+        if (thread.isDaemon()) {
+            thread.setDaemon(false);
+        }
+        if (thread.getPriority() != Thread.NORM_PRIORITY) {
+            thread.setPriority(Thread.NORM_PRIORITY);
+        }
+        return thread;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/constants/StorageConstants.java
similarity index 52%
copy from seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java
copy to seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/constants/StorageConstants.java
index 0723b5d84..7ce1957e5 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/constants/StorageConstants.java
@@ -18,30 +18,12 @@
  *
  */
 
-package org.apache.seatunnel.engine.checkpoint.storage.api;
+package org.apache.seatunnel.engine.checkpoint.storage.constants;
 
-import java.util.Map;
-
-/**
- * All checkpoint storage plugins need to implement it
- */
-public interface CheckpointStorageFactory {
-
-    /**
-     * Returns the name of the storage plugin
-     */
-    String name();
+public class StorageConstants {
 
     /**
-     * create storage plugin instance
-     *
-     * @param configuration storage system config params
-     *                      key: storage system config key
-     *                      value: storage system config value
-     *                      e.g.
-     *                      key: "FS_DEFAULT_NAME_KEY"
-     *                      value: "fs.defaultFS"
-     *                      return storage plugin instance
+     * The name of the configuration property that specifies the name of the file system.
      */
-    CheckpointStorage create(Map<String, String> configuration);
+    public static final String STORAGE_NAME_SPACE = "storageNameSpace";
 }
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/pom.xml b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
similarity index 70%
copy from seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/pom.xml
copy to seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
index e6a13657e..1a67ea683 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
@@ -28,21 +28,24 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>checkpoint-storage-local-file</artifactId>
+    <artifactId>checkpoint-storage-hdfs</artifactId>
     <dependencies>
         <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>checkpoint-storage-api</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-hadoop-2</artifactId>
+            <version>${flink-shaded-hadoop-2.version}</version>
+            <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-lang</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
-        </dependency>
-        <!--test-->
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-engine</artifactId>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
         </dependency>
     </dependencies>
+
 </project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
similarity index 51%
copy from seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java
copy to seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
index 0723b5d84..d054d1611 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
@@ -18,30 +18,13 @@
  *
  */
 
-package org.apache.seatunnel.engine.checkpoint.storage.api;
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
 
-import java.util.Map;
+public class HdfsConstants {
 
-/**
- * All checkpoint storage plugins need to implement it
- */
-public interface CheckpointStorageFactory {
+    public static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
 
-    /**
-     * Returns the name of the storage plugin
-     */
-    String name();
+    public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
 
-    /**
-     * create storage plugin instance
-     *
-     * @param configuration storage system config params
-     *                      key: storage system config key
-     *                      value: storage system config value
-     *                      e.g.
-     *                      key: "FS_DEFAULT_NAME_KEY"
-     *                      value: "fs.defaultFS"
-     *                      return storage plugin instance
-     */
-    CheckpointStorage create(Map<String, String> configuration);
+    public static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath";
 }
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
new file mode 100644
index 000000000..56961492f
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
+
+import static org.apache.seatunnel.engine.checkpoint.storage.constants.StorageConstants.STORAGE_NAME_SPACE;
+
+import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
+import org.apache.seatunnel.engine.checkpoint.storage.api.AbstractCheckpointStorage;
+import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class HdfsStorage extends AbstractCheckpointStorage {
+
+    public FileSystem fs;
+    private static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
+
+    private static final String KERBEROS_KEY = "kerberos";
+
+    private static final String STORAGE_TMP_SUFFIX = "tmp";
+
+    public HdfsStorage(Map<String, String> configuration) throws CheckpointStorageException {
+        this.initStorage(configuration);
+    }
+
+    @Override
+    public void initStorage(Map<String, String> configuration) throws CheckpointStorageException {
+        Configuration hadoopConf = new Configuration();
+        if (configuration.containsKey(HdfsConstants.HDFS_DEF_FS_NAME)) {
+            hadoopConf.set(HdfsConstants.HDFS_DEF_FS_NAME, configuration.get(HdfsConstants.HDFS_DEF_FS_NAME));
+        }
+        if (StringUtils.isNotBlank(configuration.get(STORAGE_NAME_SPACE))) {
+            setStorageNameSpace(configuration.get(STORAGE_NAME_SPACE));
+        }
+        // todo support other config configurations
+        if (configuration.containsKey(HdfsConstants.KERBEROS_PRINCIPAL) && configuration.containsKey(HdfsConstants.KERBEROS_KEYTAB_FILE_PATH)) {
+            String kerberosPrincipal = configuration.get(HdfsConstants.KERBEROS_PRINCIPAL);
+            String kerberosKeytabFilePath = configuration.get(HdfsConstants.KERBEROS_KEYTAB_FILE_PATH);
+            if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
+                hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, KERBEROS_KEY);
+                authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
+            }
+        }
+        JobConf jobConf = new JobConf(hadoopConf);
+        try {
+            fs = FileSystem.get(jobConf);
+        } catch (IOException e) {
+            throw new CheckpointStorageException("Failed to get file system", e);
+        }
+
+    }
+
+    @Override
+    public String storeCheckPoint(PipelineState state) throws CheckpointStorageException {
+        byte[] datas;
+        try {
+            datas = serializeCheckPointData(state);
+        } catch (IOException e) {
+            throw new CheckpointStorageException("Failed to serialize checkpoint data,state is :" + state, e);
+        }
+        Path filePath = new Path(getStorageParentDirectory() + state.getJobId() + "/" + getCheckPointName(state));
+
+        Path tmpFilePath = new Path(getStorageParentDirectory() + state.getJobId() + "/" + getCheckPointName(state) + STORAGE_TMP_SUFFIX);
+        try (FSDataOutputStream out = fs.create(tmpFilePath)) {
+            out.write(datas);
+            out.hsync();
+            fs.rename(tmpFilePath, filePath);
+        } catch (IOException e) {
+            throw new CheckpointStorageException("Failed to write checkpoint data, state: " + state, e);
+        } finally {
+            try {
+                // clean up tmp file, if still lying around
+                if (fs.exists(tmpFilePath)) {
+                    fs.delete(tmpFilePath, false);
+                }
+            } catch (IOException ioe) {
+                log.error("Failed to delete tmp file", ioe);
+            }
+        }
+        return filePath.getName();
+    }
+
+    @Override
+    public List<PipelineState> getAllCheckpoints(String jobId) throws CheckpointStorageException {
+        String path = getStorageParentDirectory() + jobId;
+        List<String> fileNames = getFileNames(path);
+        if (fileNames.isEmpty()) {
+            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
+        }
+        List<PipelineState> states = new ArrayList<>();
+        fileNames.forEach(file -> {
+            try {
+                states.add(readPipelineState(file, jobId));
+            } catch (CheckpointStorageException e) {
+                log.error("Failed to read checkpoint data from file: " + file, e);
+            }
+        });
+        if (states.isEmpty()) {
+            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
+        }
+        return states;
+    }
+
+    @Override
+    public List<PipelineState> getLatestCheckpoint(String jobId) throws CheckpointStorageException {
+        String path = getStorageParentDirectory() + jobId;
+        List<String> fileNames = getFileNames(path);
+        if (fileNames.isEmpty()) {
+            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
+        }
+        Set<String> latestPipelineNames = getLatestPipelineNames(fileNames);
+        List<PipelineState> latestPipelineStates = new ArrayList<>();
+        latestPipelineNames.forEach(fileName -> {
+            try {
+                latestPipelineStates.add(readPipelineState(fileName, jobId));
+            } catch (CheckpointStorageException e) {
+                log.error("Failed to read pipeline state for file: {}", fileName, e);
+            }
+        });
+
+        if (latestPipelineStates.isEmpty()) {
+            throw new CheckpointStorageException("No checkpoint found for job, job id:{} " + jobId);
+        }
+        return latestPipelineStates;
+    }
+
+    @Override
+    public PipelineState getLatestCheckpointByJobIdAndPipelineId(String jobId, String pipelineId) throws CheckpointStorageException {
+        String path = getStorageParentDirectory() + jobId;
+        List<String> fileNames = getFileNames(path);
+        if (fileNames.isEmpty()) {
+            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
+        }
+
+        String latestFileName = getLatestCheckpointFileNameByJobIdAndPipelineId(fileNames, pipelineId);
+        if (latestFileName == null) {
+            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId + ", pipeline id is: " + pipelineId);
+        }
+        return readPipelineState(latestFileName, jobId);
+    }
+
+    @Override
+    public List<PipelineState> getCheckpointsByJobIdAndPipelineId(String jobId, String pipelineId) throws CheckpointStorageException {
+        String path = getStorageParentDirectory() + jobId;
+        List<String> fileNames = getFileNames(path);
+        if (fileNames.isEmpty()) {
+            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
+        }
+
+        List<PipelineState> pipelineStates = new ArrayList<>();
+        fileNames.forEach(file -> {
+            String filePipelineId = file.split(FILE_NAME_SPLIT)[FILE_NAME_PIPELINE_ID_INDEX];
+            if (pipelineId.equals(filePipelineId)) {
+                try {
+                    pipelineStates.add(readPipelineState(file, jobId));
+                } catch (Exception e) {
+                    log.error("Failed to read checkpoint data from file " + file, e);
+                }
+            }
+        });
+        return pipelineStates;
+    }
+
+    @Override
+    public void deleteCheckpoint(String jobId) {
+        String jobPath = getStorageParentDirectory() + jobId;
+        try {
+            fs.delete(new Path(jobPath), true);
+        } catch (IOException e) {
+            log.error("Failed to delete checkpoint for job {}", jobId, e);
+        }
+    }
+
+    /**
+     * Authenticate kerberos
+     *
+     * @param kerberosPrincipal      kerberos principal
+     * @param kerberosKeytabFilePath kerberos keytab file path
+     * @param hdfsConf               hdfs configuration
+     * @throws CheckpointStorageException authentication exception
+     */
+    private void authenticateKerberos(String kerberosPrincipal, String kerberosKeytabFilePath, Configuration hdfsConf) throws CheckpointStorageException {
+        UserGroupInformation.setConfiguration(hdfsConf);
+        try {
+            UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
+        } catch (IOException e) {
+            throw new CheckpointStorageException("Failed to login user from keytab : " + kerberosKeytabFilePath + " and kerberos principal : " + kerberosPrincipal, e);
+        }
+    }
+
+    private List<String> getFileNames(String path) throws CheckpointStorageException {
+        try {
+
+            RemoteIterator<LocatedFileStatus> fileStatusRemoteIterator = fs.listFiles(new Path(path), false);
+            List<String> fileNames = new ArrayList<>();
+            while (fileStatusRemoteIterator.hasNext()) {
+                LocatedFileStatus fileStatus = fileStatusRemoteIterator.next();
+                if (!fileStatus.getPath().getName().endsWith(FILE_FORMAT)) {
+                    fileNames.add(fileStatus.getPath().getName());
+                }
+                fileNames.add(fileStatus.getPath().getName());
+            }
+            return fileNames;
+        } catch (IOException e) {
+            throw new CheckpointStorageException("Failed to list files from names" + path, e);
+        }
+    }
+
+    /**
+     * Get checkpoint name
+     *
+     * @param fileName file name
+     * @return checkpoint data
+     */
+    private PipelineState readPipelineState(String fileName, String jobId) throws CheckpointStorageException {
+        fileName = getStorageParentDirectory() + jobId + DEFAULT_CHECKPOINT_FILE_PATH_SPLIT + fileName;
+        try (FSDataInputStream in = fs.open(new Path(fileName))) {
+            byte[] datas = new byte[in.available()];
+            in.read(datas);
+            return deserializeCheckPointData(datas);
+        } catch (IOException e) {
+            throw new CheckpointStorageException(String.format("Failed to read checkpoint data, file name is %s,job id is %s", fileName, jobId), e);
+        }
+    }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
similarity index 75%
copy from seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
copy to seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
index 9c8f4ad07..ccb96c9b6 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
@@ -18,29 +18,25 @@
  *
  */
 
-package org.apache.seatunnel.engine.checkpoint.storage.localfile;
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
 
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
+import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 
 import com.google.auto.service.AutoService;
 
 import java.util.Map;
 
-/**
- * Local file storage plug-in, use local file storage,
- * only suitable for single-machine testing or small data scale use, use with caution in production environment
- */
 @AutoService(CheckpointStorageFactory.class)
-public class LocalFileStorageFactory implements CheckpointStorageFactory {
-
+public class HdfsStorageFactory implements CheckpointStorageFactory {
     @Override
     public String name() {
-        return "localfile";
+        return "hdfs";
     }
 
     @Override
-    public CheckpointStorage create(Map<String, String> configuration) {
-        return new LocalFileStorage();
+    public CheckpointStorage create(Map<String, String> configuration) throws CheckpointStorageException {
+        return new HdfsStorage(configuration);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/pom.xml b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/pom.xml
index e6a13657e..eb618ffaa 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/pom.xml
@@ -30,15 +30,18 @@
 
     <artifactId>checkpoint-storage-local-file</artifactId>
     <dependencies>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>checkpoint-storage-api</artifactId>
-            <version>${project.version}</version>
-        </dependency>
         <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+        </dependency>
         <!--test-->
         <dependency>
             <groupId>org.junit.jupiter</groupId>
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
index cb4bcf63c..b0a22c4a8 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
@@ -20,35 +20,60 @@
 
 package org.apache.seatunnel.engine.checkpoint.storage.localfile;
 
+import static org.apache.seatunnel.engine.checkpoint.storage.constants.StorageConstants.STORAGE_NAME_SPACE;
+
 import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
 import org.apache.seatunnel.engine.checkpoint.storage.api.AbstractCheckpointStorage;
 import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 @Slf4j
 public class LocalFileStorage extends AbstractCheckpointStorage {
 
     private static final String[] FILE_EXTENSIONS = new String[]{FILE_FORMAT};
 
-    public LocalFileStorage() {
-        // Nothing to do
+    private static final String DEFAULT_WINDOWS_OS_NAME_SPACE = "C:\\ProgramData\\seatunnel\\checkpoint\\";
+
+    private static final String DEFAULT_LINUX_OS_NAME_SPACE = "/tmp/seatunnel/checkpoint/";
+
+    public LocalFileStorage(Map<String, String> configuration) {
+        initStorage(configuration);
     }
 
     @Override
-    public void initStorage(Map<String, String> configuration) throws CheckpointStorageException {
-        // Nothing to do
+    public void initStorage(Map<String, String> configuration) {
+        if (MapUtils.isEmpty(configuration)) {
+            setDefaultStorageSpaceByOSName();
+            return;
+        }
+        if (StringUtils.isNotBlank(configuration.get(STORAGE_NAME_SPACE))) {
+            setStorageNameSpace(configuration.get(STORAGE_NAME_SPACE));
+        }
+    }
+
+    /**
+     * set default storage root directory
+     */
+    private void setDefaultStorageSpaceByOSName() {
+        if (System.getProperty("os.name").toLowerCase().contains("windows")) {
+            setStorageNameSpace(DEFAULT_WINDOWS_OS_NAME_SPACE);
+        } else {
+            setStorageNameSpace(DEFAULT_LINUX_OS_NAME_SPACE);
+        }
     }
 
     @Override
@@ -60,7 +85,7 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
             throw new CheckpointStorageException("Failed to serialize checkpoint data", e);
         }
         //Consider file paths for different operating systems
-        String fileName = getStorageParentDirectory() + state.getJobId() + "/" + getCheckPointName(state);
+        String fileName = getStorageParentDirectory() + state.getJobId() + File.separator + getCheckPointName(state);
 
         File file = new File(fileName);
         try {
@@ -107,27 +132,20 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
         if (fileList.isEmpty()) {
             throw new CheckpointStorageException("No checkpoint found for job " + jobId);
         }
-        Map<String, File> latestPipelineMap = new HashMap<>();
+        List<String> fileNames = fileList.stream().map(File::getName).collect(Collectors.toList());
+        Set<String> latestPipelines = getLatestPipelineNames(fileNames);
+        List<PipelineState> latestPipelineFiles = new ArrayList<>(latestPipelines.size());
         fileList.forEach(file -> {
-            String filePipelineId = file.getName().split(FILE_NAME_SPLIT)[FILE_NAME_PIPELINE_ID_INDEX];
-            int fileVersion = Integer.parseInt(file.getName().split(FILE_NAME_SPLIT)[FILE_SORT_ID_INDEX]);
-            if (latestPipelineMap.containsKey(filePipelineId)) {
-                int oldVersion = Integer.parseInt(latestPipelineMap.get(filePipelineId).getName().split(FILE_NAME_SPLIT)[FILE_SORT_ID_INDEX]);
-                if (fileVersion > oldVersion) {
-                    latestPipelineMap.put(filePipelineId, file);
+            String fileName = file.getName();
+            if (latestPipelines.contains(fileName)) {
+                try {
+                    byte[] data = FileUtils.readFileToByteArray(file);
+                    latestPipelineFiles.add(deserializeCheckPointData(data));
+                } catch (IOException e) {
+                    log.error("Failed to read checkpoint data from file " + file.getAbsolutePath(), e);
                 }
-            } else {
-                latestPipelineMap.put(filePipelineId, file);
-            }
-        });
-        List<PipelineState> latestPipelineFiles = new ArrayList<>(latestPipelineMap.size());
-        latestPipelineMap.values().forEach(file -> {
-            try {
-                byte[] data = FileUtils.readFileToByteArray(file);
-                latestPipelineFiles.add(deserializeCheckPointData(data));
-            } catch (IOException e) {
-                log.error("Failed to read checkpoint data from file " + file.getAbsolutePath(), e);
             }
+
         });
         if (latestPipelineFiles.isEmpty()) {
             throw new CheckpointStorageException("Failed to read checkpoint data from file");
@@ -138,32 +156,32 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
     @Override
     public PipelineState getLatestCheckpointByJobIdAndPipelineId(String jobId, String pipelineId) throws CheckpointStorageException {
 
-        Collection<File> fileList = FileUtils.listFiles(new File(getStorageParentDirectory() + jobId), FILE_EXTENSIONS, false);
+        String parentPath = getStorageParentDirectory() + jobId;
+        Collection<File> fileList = FileUtils.listFiles(new File(parentPath), FILE_EXTENSIONS, false);
         if (fileList.isEmpty()) {
             throw new CheckpointStorageException("No checkpoint found for job " + jobId);
         }
+        List<String> fileNames = fileList.stream().map(File::getName).collect(Collectors.toList());
+
+        String latestFileName = getLatestCheckpointFileNameByJobIdAndPipelineId(fileNames, pipelineId);
 
-        AtomicReference<File> latestFile = new AtomicReference<>();
-        AtomicInteger latestVersion = new AtomicInteger();
+        AtomicReference<PipelineState> latestFile = new AtomicReference<>(null);
         fileList.forEach(file -> {
-            int fileVersion = Integer.parseInt(file.getName().split(FILE_NAME_SPLIT)[FILE_SORT_ID_INDEX]);
-            String filePipelineId = file.getName().split(FILE_NAME_SPLIT)[FILE_NAME_PIPELINE_ID_INDEX];
-            if (pipelineId.equals(filePipelineId) && fileVersion > latestVersion.get()) {
-                latestVersion.set(fileVersion);
-                latestFile.set(file);
+            String fileName = file.getName();
+            if (fileName.equals(latestFileName)) {
+                try {
+                    byte[] data = FileUtils.readFileToByteArray(file);
+                    latestFile.set(deserializeCheckPointData(data));
+                } catch (IOException e) {
+                    log.error("read checkpoint data from file " + file.getAbsolutePath(), e);
+                }
             }
         });
 
-        if (latestFile.get().exists()) {
-            try {
-                byte[] data = FileUtils.readFileToByteArray(latestFile.get());
-                return deserializeCheckPointData(data);
-            } catch (IOException e) {
-                throw new CheckpointStorageException("Failed to read checkpoint data from file " + latestFile.get().getAbsolutePath(), e);
-            }
-
+        if (latestFile.get() == null) {
+            throw new CheckpointStorageException("Failed to read checkpoint data from file, file name " + latestFileName);
         }
-        return null;
+        return latestFile.get();
     }
 
     @Override
@@ -174,7 +192,7 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
         }
         List<PipelineState> pipelineStates = new ArrayList<>();
         fileList.forEach(file -> {
-            String filePipelineId = file.getName().split(FILE_NAME_SPLIT)[FILE_NAME_PIPELINE_ID_INDEX];
+            String filePipelineId = getPipelineIdByFileName(file.getName());
             if (pipelineId.equals(filePipelineId)) {
                 try {
                     byte[] data = FileUtils.readFileToByteArray(file);
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
index 9c8f4ad07..bc7ec95f2 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
@@ -41,6 +41,6 @@ public class LocalFileStorageFactory implements CheckpointStorageFactory {
 
     @Override
     public CheckpointStorage create(Map<String, String> configuration) {
-        return new LocalFileStorage();
+        return new LocalFileStorage(configuration);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageTest.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageTest.java
index 40002cc17..9371662a2 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageTest.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageTest.java
@@ -37,7 +37,7 @@ import java.util.List;
 @EnabledOnOs({LINUX, MAC})
 public class LocalFileStorageTest {
 
-    private static LocalFileStorage STORAGE = new LocalFileStorage();
+    private static LocalFileStorage STORAGE = new LocalFileStorage(null);
     private static final String JOB_ID = "chris";
 
     @Before
@@ -60,7 +60,7 @@ public class LocalFileStorageTest {
     public void testGetAllCheckpoints() throws CheckpointStorageException {
 
         List<PipelineState> pipelineStates = STORAGE.getAllCheckpoints(JOB_ID);
-        Assertions.assertEquals(4, pipelineStates.size());
+        Assertions.assertEquals(3, pipelineStates.size());
     }
 
     @Test
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/pom.xml b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/pom.xml
index 0fa871a93..5f8e88159 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/pom.xml
@@ -31,6 +31,11 @@
     <artifactId>checkpoint-storage-plugins</artifactId>
     <packaging>pom</packaging>
     <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>checkpoint-storage-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>com.google.auto.service</groupId>
             <artifactId>auto-service</artifactId>
@@ -39,6 +44,7 @@
     
     <modules>
         <module>checkpoint-storage-local-file</module>
+        <module>checkpoint-storage-hdfs</module>
     </modules>
 
 </project>
\ No newline at end of file