You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/24 02:02:28 UTC

[incubator-seatunnel] branch 2.3.0-beta-release updated: [Bug]add 3node worker done test and fix some bug (#3115)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch 2.3.0-beta-release
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/2.3.0-beta-release by this push:
     new 2f98fc540 [Bug]add 3node worker done test and fix some bug (#3115)
2f98fc540 is described below

commit 2f98fc540927e81d1f3a8cec66ce9b0a53db1d0e
Author: Eric <ga...@gmail.com>
AuthorDate: Mon Oct 24 09:55:30 2022 +0800

    [Bug]add 3node worker done test and fix some bug (#3115)
    
    * Add master node switch test and fix bug
    
    Add master node switch test and fix bug
    
    * fix ci error
    
    * add todo
    
    * fix ci error
    
    * fix ci error
    
    * add testBatchJobRestoreIn3NodeWorkerDown and testStreamJobRestoreIn3NodeWorkerDown
    
    * add testBatchJobRestoreIn3NodeMasterDown and testStreamJobRestoreIn3NodeMasterDown
    
    * Add withTryCatch to the whenComplete which may throw exception
    
    * Add withTryCatch to the whenComplete which may throw exception
    
    * fix ci error
    
    * improve test case
    
    * notify checkpoint state
    
    * add worker node and master node done test
    
    * revert some file
    
    * fix ci error
    
    * improve test
    
    * add checkpoint.interval to test job config
    
    * fix some error
    
    * add try catch to test case
    
    * fix ci error
    
    * fix checkpoint error
    
    * [engine][tests] add cluster test case
    
    * [engine] Close checkpoint at the end of the pipeline
    
    * [engine] Fix hazelcast master node switching exception
    
    * [hotfix][connector][file] fix file sink error
    
    * fix checkpoint error
    
    # Conflicts:
    #       seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
    
    * fix PipelineState to PipelineStatus bug
    
    * [engine][checkpoint] add barrier flow operation (#3158)
    
    * fix operator retry error
    
    * fix operator retry error
    
    * fix checkpoint error
    
    * fix checkpoint error
    
    * fix checkpoint error
    
    * fix TaskExecutionService task is not exist when cancel task
    
    Co-authored-by: Zongwen Li <zo...@apache.org>
---
 .../seatunnel/file/sink/BaseFileSinkWriter.java    |   5 +-
 .../sink/commit/FileSinkAggregatedCommitter.java   |   1 +
 .../seatunnel/file/sink/util/FileSystemUtils.java  |  12 +-
 .../file/sink/writer/AbstractWriteStrategy.java    |  13 +-
 .../seatunnel/file/sink/writer/WriteStrategy.java  |   2 +
 .../engine/e2e/ClusterFaultToleranceIT.java        | 558 ++++++++++++++++++---
 .../cluster_batch_fake_to_localfile_template.conf  |   4 +-
 .../seatunnel/engine/client/SeaTunnelClient.java   |   6 +
 .../engine/client/SeaTunnelHazelcastClient.java    |   6 +
 .../common/config/server/ServerConfigOptions.java  |   2 +-
 .../src/main/resources/hazelcast.yaml              |  11 +-
 .../engine/server/CoordinatorService.java          |   8 +-
 .../seatunnel/engine/server/SeaTunnelServer.java   |   3 +
 .../engine/server/SeaTunnelServerStarter.java      |   4 +-
 .../engine/server/TaskExecutionService.java        |  27 +-
 .../server/checkpoint/CheckpointCoordinator.java   |  52 +-
 .../server/checkpoint/CheckpointFailureReason.java |   2 +-
 .../server/checkpoint/CheckpointManager.java       |  45 +-
 .../server/checkpoint/IMapCheckpointIDCounter.java |   8 +-
 .../engine/server/dag/physical/PhysicalPlan.java   |  17 +
 .../engine/server/dag/physical/PhysicalVertex.java |   3 +-
 .../engine/server/dag/physical/SubPlan.java        |   2 +
 .../seatunnel/engine/server/master/JobMaster.java  |  14 +
 .../engine/server/CoordinatorServiceTest.java      |   7 +-
 .../server/checkpoint/CheckpointManagerTest.java   |   5 +-
 25 files changed, 662 insertions(+), 155 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
index 8513af2f2..1a57947a4 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -46,12 +46,15 @@ public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow, FileCommitIn
         if (!fileSinkStates.isEmpty()) {
             List<String> transactionIds = writeStrategy.getTransactionIdFromStates(fileSinkStates);
             transactionIds.forEach(writeStrategy::abortPrepare);
-            writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId());
+            writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId() + 1);
+        } else {
+            writeStrategy.beginTransaction(1L);
         }
     }
 
     public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, SinkWriter.Context context, String jobId) {
         this(writeStrategy, hadoopConf, context, jobId, Collections.emptyList());
+        writeStrategy.beginTransaction(1L);
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
index de3354642..a0f1516e9 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
@@ -83,6 +83,7 @@ public class FileSinkAggregatedCommitter implements SinkAggregatedCommitter<File
      */
     @Override
     public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws Exception {
+        log.info("rollback aggregate commit");
         if (aggregatedCommitInfos == null || aggregatedCommitInfos.size() == 0) {
             return;
         }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index 652f7e6ac..bb69cf6b3 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -76,12 +76,19 @@ public class FileSystemUtils {
      * @param rmWhenExist if this is true, we will delete the target file when it already exists
      * @throws IOException throw IOException
      */
-    public static void renameFile(@NonNull String oldName, @NonNull String newName, boolean rmWhenExist) throws IOException {
+    public static void renameFile(@NonNull String oldName, @NonNull String newName, boolean rmWhenExist)
+        throws IOException {
         FileSystem fileSystem = getFileSystem(newName);
         log.info("begin rename file oldName :[" + oldName + "] to newName :[" + newName + "]");
 
         Path oldPath = new Path(oldName);
         Path newPath = new Path(newName);
+
+        if (!fileExist(oldPath.toString())) {
+            log.warn("rename file :[" + oldPath + "] to [" + newPath + "] already finished in the last commit, skip");
+            return;
+        }
+
         if (rmWhenExist) {
             if (fileExist(newName) && fileExist(oldName)) {
                 fileSystem.delete(newPath, true);
@@ -119,6 +126,9 @@ public class FileSystemUtils {
     public static List<Path> dirList(@NonNull String filePath) throws FileNotFoundException, IOException {
         FileSystem fileSystem = getFileSystem(filePath);
         List<Path> pathList = new ArrayList<>();
+        if (!fileExist(filePath)) {
+            return pathList;
+        }
         Path fileName = new Path(filePath);
         FileStatus[] status = fileSystem.listStatus(fileName);
         if (status != null && status.length > 0) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 2548ea92a..2d0f3155a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -70,7 +70,9 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
     protected Map<String, String> beingWrittenFile;
     private Map<String, List<String>> partitionDirAndValuesMap;
     protected SeaTunnelRowType seaTunnelRowType;
-    protected Long checkpointId = 1L;
+
+    // Checkpoint id from engine is start with 1
+    protected Long checkpointId = 0L;
 
     public AbstractWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
         this.textFileSinkConfig = textFileSinkConfig;
@@ -88,7 +90,6 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
         this.jobId = jobId;
         this.subTaskIndex = subTaskIndex;
         FileSystemUtils.CONF = getConfiguration(hadoopConf);
-        this.beginTransaction(this.checkpointId);
     }
 
     /**
@@ -233,6 +234,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
      * @param checkpointId checkpoint id
      */
     public void beginTransaction(Long checkpointId) {
+        this.checkpointId = checkpointId;
         this.transactionId = "T" + Constant.TRANSACTION_ID_SPLIT + jobId + Constant.TRANSACTION_ID_SPLIT + subTaskIndex + Constant.TRANSACTION_ID_SPLIT + checkpointId;
         this.transactionDirectory = getTransactionDir(this.transactionId);
         this.needMoveFiles = new HashMap<>();
@@ -265,8 +267,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
     @Override
     public List<FileSinkState> snapshotState(long checkpointId) {
         ArrayList<FileSinkState> fileState = Lists.newArrayList(new FileSinkState(this.transactionId, this.checkpointId));
-        this.checkpointId = checkpointId;
-        this.beginTransaction(checkpointId);
+        this.beginTransaction(checkpointId + 1);
         return fileState;
     }
 
@@ -303,4 +304,8 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
                 Matcher.quoteReplacement(textFileSinkConfig.getPath()));
         return tmpPath.replaceAll(Constant.NON_PARTITION + Matcher.quoteReplacement(File.separator), "");
     }
+
+    public long getCheckpointId() {
+        return this.checkpointId;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index aecc80b0d..55ac378db 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -72,4 +72,6 @@ public interface WriteStrategy extends Transaction, Serializable {
      * when a transaction is triggered, release resources
      */
     void finishAndCloseFile();
+
+    long getCheckpointId();
 }
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index 6fc5cee93..c7f8d5eaf 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -37,7 +37,6 @@ import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
 
@@ -62,56 +61,79 @@ public class ClusterFaultToleranceIT {
 
     public static final String DYNAMIC_TEST_PARALLELISM = "dynamic_test_parallelism";
 
+    @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
     public void testBatchJobRunOkIn3Node() throws ExecutionException, InterruptedException {
         String testCaseName = "testBatchJobRunOkIn3Node";
         String testClusterName = "ClusterFaultToleranceIT_testBatchJobRunOkIn3Node";
         long testRowNumber = 1000;
-        int testParallelism = 1;
-        HazelcastInstanceImpl node1 =
-            SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+        int testParallelism = 6;
 
-        HazelcastInstanceImpl node2 =
-            SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName(testClusterName));
+        HazelcastInstanceImpl node1 = null;
+        HazelcastInstanceImpl node2 = null;
+        HazelcastInstanceImpl node3 = null;
+        SeaTunnelClient engineClient = null;
 
-        HazelcastInstanceImpl node3 =
-            SeaTunnelServerStarter.createHazelcastInstance(
+        try {
+            node1 = SeaTunnelServerStarter.createHazelcastInstance(
                 TestUtils.getClusterName(testClusterName));
 
-        // waiting all node added to cluster
-        Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-            .untilAsserted(() -> Assertions.assertEquals(3, node1.getCluster().getMembers().size()));
+            node2 = SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
 
-        // TODO Need FakeSource support parallel first
-        Common.setDeployMode(DeployMode.CLIENT);
-        ImmutablePair<String, String> testResources = createTestResources(testCaseName, JobMode.BATCH, testRowNumber, testParallelism);
-        JobConfig jobConfig = new JobConfig();
-        jobConfig.setName(testCaseName);
+            node3 = SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
 
-        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
-        clientConfig.setClusterName(
-            TestUtils.getClusterName(testClusterName));
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-        JobExecutionEnvironment jobExecutionEnv =
-            engineClient.createExecutionContext(testResources.getRight(), jobConfig);
-        ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            // waiting all node added to cluster
+            HazelcastInstanceImpl finalNode = node1;
+            Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> Assertions.assertEquals(3, finalNode.getCluster().getMembers().size()));
 
-        CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
-            return clientJobProxy.waitForJobComplete();
-        });
+            Common.setDeployMode(DeployMode.CLIENT);
+            ImmutablePair<String, String> testResources =
+                createTestResources(testCaseName, JobMode.BATCH, testRowNumber, testParallelism);
+            JobConfig jobConfig = new JobConfig();
+            jobConfig.setName(testCaseName);
 
-        Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
-            .untilAsserted(() -> Assertions.assertTrue(
-                objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
+            ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+            clientConfig.setClusterName(
+                TestUtils.getClusterName(testClusterName));
+            engineClient = new SeaTunnelClient(clientConfig);
+            JobExecutionEnvironment jobExecutionEnv =
+                engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+            ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
 
-        Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
-        Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+            CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+                return clientJobProxy.waitForJobComplete();
+            });
 
-        node1.shutdown();
-        node2.shutdown();
-        node3.shutdown();
+            Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    Thread.sleep(2000);
+                    System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                    Assertions.assertTrue(
+                        objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get()));
+                });
+
+            Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+            Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+        } finally {
+            if (engineClient != null) {
+                engineClient.shutdown();
+            }
+
+            if (node1 != null) {
+                node1.shutdown();
+            }
+
+            if (node2 != null) {
+                node2.shutdown();
+            }
+
+            if (node3 != null) {
+                node3.shutdown();
+            }
+        }
     }
 
     /**
@@ -120,8 +142,8 @@ public class ClusterFaultToleranceIT {
      *
      * @param testCaseName testCaseName
      * @param jobMode      jobMode
-     * @param rowNumber row.num per FakeSource parallelism
-     * @param parallelism FakeSource parallelism
+     * @param rowNumber    row.num per FakeSource parallelism
+     * @param parallelism  FakeSource parallelism
      * @return
      */
     private ImmutablePair<String, String> createTestResources(@NonNull String testCaseName, @NonNull JobMode jobMode,
@@ -149,65 +171,445 @@ public class ClusterFaultToleranceIT {
         return new ImmutablePair<>(targetDir, targetConfigFilePath);
     }
 
+    @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled("Disable because we can not support changeless row number in FakeSource Stream Job")
     public void testStreamJobRunOkIn3Node() throws ExecutionException, InterruptedException {
         String testCaseName = "testStreamJobRunOkIn3Node";
         String testClusterName = "ClusterFaultToleranceIT_testStreamJobRunOkIn3Node";
         long testRowNumber = 1000;
-        int testParallelism = 1;
-        HazelcastInstanceImpl node1 =
-            SeaTunnelServerStarter.createHazelcastInstance(
+        int testParallelism = 6;
+        HazelcastInstanceImpl node1 = null;
+        HazelcastInstanceImpl node2 = null;
+        HazelcastInstanceImpl node3 = null;
+        SeaTunnelClient engineClient = null;
+
+        try {
+            node1 = SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
+
+            node2 = SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
+
+            node3 = SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
+
+            // waiting all node added to cluster
+            HazelcastInstanceImpl finalNode = node1;
+            Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> Assertions.assertEquals(3, finalNode.getCluster().getMembers().size()));
+
+            Common.setDeployMode(DeployMode.CLIENT);
+            ImmutablePair<String, String> testResources =
+                createTestResources(testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
+            JobConfig jobConfig = new JobConfig();
+            jobConfig.setName(testCaseName);
+
+            ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+            clientConfig.setClusterName(
+                TestUtils.getClusterName(testClusterName));
+            engineClient = new SeaTunnelClient(clientConfig);
+            JobExecutionEnvironment jobExecutionEnv =
+                engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+            ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+            CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+                return clientJobProxy.waitForJobComplete();
+            });
+
+            Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    Thread.sleep(2000);
+                    System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                    Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+                        testRowNumber * testParallelism == FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                });
+
+            clientJobProxy.cancelJob();
+
+            Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> Assertions.assertTrue(
+                    objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+
+            Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+            Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+        } finally {
+            if (engineClient != null) {
+                engineClient.shutdown();
+            }
+
+            if (node1 != null) {
+                node1.shutdown();
+            }
+
+            if (node2 != null) {
+                node2.shutdown();
+            }
+
+            if (node3 != null) {
+                node3.shutdown();
+            }
+        }
+    }
+
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    @Test
+    public void testBatchJobRestoreIn3NodeWorkerDown() throws ExecutionException, InterruptedException {
+        String testCaseName = "testBatchJobRestoreIn3NodeWorkerDown";
+        String testClusterName = "ClusterFaultToleranceIT_testBatchJobRestoreIn3NodeWorkerDown";
+        long testRowNumber = 1000;
+        int testParallelism = 2;
+        HazelcastInstanceImpl node1 = null;
+        HazelcastInstanceImpl node2 = null;
+        HazelcastInstanceImpl node3 = null;
+        SeaTunnelClient engineClient = null;
+
+        try {
+            node1 = SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
+
+            node2 = SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
+
+            node3 = SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
+
+            // waiting all node added to cluster
+            HazelcastInstanceImpl finalNode = node1;
+            Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> Assertions.assertEquals(3, finalNode.getCluster().getMembers().size()));
+
+            Common.setDeployMode(DeployMode.CLIENT);
+            ImmutablePair<String, String> testResources =
+                createTestResources(testCaseName, JobMode.BATCH, testRowNumber, testParallelism);
+            JobConfig jobConfig = new JobConfig();
+            jobConfig.setName(testCaseName);
+
+            ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+            clientConfig.setClusterName(
+                TestUtils.getClusterName(testClusterName));
+            engineClient = new SeaTunnelClient(clientConfig);
+            JobExecutionEnvironment jobExecutionEnv =
+                engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+            ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+            CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+                return clientJobProxy.waitForJobComplete();
+            });
+
+            Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    // Wait some tasks commit finished
+                    Thread.sleep(2000);
+                    System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                    Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+                        FileUtils.getFileLineNumberFromDir(testResources.getLeft()) > 1);
+                });
+
+            // shutdown on worker node
+            node2.shutdown();
+
+            Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> Assertions.assertTrue(
+                    objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
+
+            Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+            Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+        } finally {
+            if (engineClient != null) {
+                engineClient.shutdown();
+            }
+
+            if (node1 != null) {
+                node1.shutdown();
+            }
+
+            if (node2 != null) {
+                node2.shutdown();
+            }
+
+            if (node3 != null) {
+                node3.shutdown();
+            }
+        }
+    }
+
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    @Test
+    public void testStreamJobRestoreIn3NodeWorkerDown() throws ExecutionException, InterruptedException {
+        String testCaseName = "testStreamJobRestoreIn3NodeWorkerDown";
+        String testClusterName = "ClusterFaultToleranceIT_testStreamJobRestoreIn3NodeWorkerDown";
+        long testRowNumber = 1000;
+        int testParallelism = 6;
+        HazelcastInstanceImpl node1 = null;
+        HazelcastInstanceImpl node2 = null;
+        HazelcastInstanceImpl node3 = null;
+        SeaTunnelClient engineClient = null;
+
+        try {
+            node1 = SeaTunnelServerStarter.createHazelcastInstance(
                 TestUtils.getClusterName(testClusterName));
 
-        HazelcastInstanceImpl node2 =
-            SeaTunnelServerStarter.createHazelcastInstance(
+            node2 = SeaTunnelServerStarter.createHazelcastInstance(
                 TestUtils.getClusterName(testClusterName));
 
-        HazelcastInstanceImpl node3 =
-            SeaTunnelServerStarter.createHazelcastInstance(
+            node3 = SeaTunnelServerStarter.createHazelcastInstance(
                 TestUtils.getClusterName(testClusterName));
 
-        // waiting all node added to cluster
-        Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
-            .untilAsserted(() -> Assertions.assertEquals(3, node1.getCluster().getMembers().size()));
+            // waiting all node added to cluster
+            HazelcastInstanceImpl finalNode = node1;
+            Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> Assertions.assertEquals(3, finalNode.getCluster().getMembers().size()));
 
-        // TODO Need FakeSource support parallel first
-        Common.setDeployMode(DeployMode.CLIENT);
-        ImmutablePair<String, String> testResources = createTestResources(testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
-        JobConfig jobConfig = new JobConfig();
-        jobConfig.setName(testCaseName);
+            Common.setDeployMode(DeployMode.CLIENT);
+            ImmutablePair<String, String> testResources =
+                createTestResources(testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
+            JobConfig jobConfig = new JobConfig();
+            jobConfig.setName(testCaseName);
 
-        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
-        clientConfig.setClusterName(
-            TestUtils.getClusterName(testClusterName));
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-        JobExecutionEnvironment jobExecutionEnv =
-            engineClient.createExecutionContext(testResources.getRight(), jobConfig);
-        ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+            clientConfig.setClusterName(
+                TestUtils.getClusterName(testClusterName));
+            engineClient = new SeaTunnelClient(clientConfig);
+            JobExecutionEnvironment jobExecutionEnv =
+                engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+            ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
 
-        CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
-            return clientJobProxy.waitForJobComplete();
-        });
+            CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+                return clientJobProxy.waitForJobComplete();
+            });
 
-        Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
-            .untilAsserted(() -> {
-                log.info("File Lines ==================" + FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
-                Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
-                    testRowNumber * testParallelism == FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+            Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    // Wait some tasks commit finished, and we can get rows from the sink target dir
+                    Thread.sleep(2000);
+                    System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                    Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+                        FileUtils.getFileLineNumberFromDir(testResources.getLeft()) > 1);
+                });
+
+            Thread.sleep(5000);
+            // shutdown on worker node
+            node2.shutdown();
+
+            Awaitility.await().atMost(180000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    // Wait job write all rows in file
+                    Thread.sleep(2000);
+                    System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                    Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+                        testRowNumber * testParallelism == FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                });
+
+            // sleep 10s and expect the job don't write more rows.
+            Thread.sleep(10000);
+            clientJobProxy.cancelJob();
+
+            Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> Assertions.assertTrue(
+                    objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+
+            // check the final rows
+            Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+            Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+        } finally {
+            if (engineClient != null) {
+                engineClient.shutdown();
+            }
+
+            if (node1 != null) {
+                node1.shutdown();
+            }
+
+            if (node2 != null) {
+                node2.shutdown();
+            }
+
+            if (node3 != null) {
+                node3.shutdown();
+            }
+        }
+    }
+
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    @Test
+    public void testBatchJobRestoreIn3NodeMasterDown() throws ExecutionException, InterruptedException {
+        String testCaseName = "testBatchJobRestoreIn3NodeMasterDown";
+        String testClusterName = "ClusterFaultToleranceIT_testBatchJobRestoreIn3NodeMasterDown";
+        long testRowNumber = 1000;
+        int testParallelism = 6;
+        HazelcastInstanceImpl node1 = null;
+        HazelcastInstanceImpl node2 = null;
+        HazelcastInstanceImpl node3 = null;
+        SeaTunnelClient engineClient = null;
+
+        try {
+            node1 = SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
+
+            node2 = SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
+
+            node3 = SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
+
+            // waiting all node added to cluster
+            HazelcastInstanceImpl finalNode = node1;
+            Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> Assertions.assertEquals(3, finalNode.getCluster().getMembers().size()));
+
+            Common.setDeployMode(DeployMode.CLIENT);
+            ImmutablePair<String, String> testResources =
+                createTestResources(testCaseName, JobMode.BATCH, testRowNumber, testParallelism);
+            JobConfig jobConfig = new JobConfig();
+            jobConfig.setName(testCaseName);
+
+            ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+            clientConfig.setClusterName(
+                TestUtils.getClusterName(testClusterName));
+            engineClient = new SeaTunnelClient(clientConfig);
+            JobExecutionEnvironment jobExecutionEnv =
+                engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+            ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+            CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+                return clientJobProxy.waitForJobComplete();
             });
 
-        clientJobProxy.cancelJob();
+            Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    // Wait some tasks commit finished
+                    Thread.sleep(2000);
+                    System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                    Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+                        FileUtils.getFileLineNumberFromDir(testResources.getLeft()) > 1);
+                });
+
+            // shutdown master node
+            node1.shutdown();
+
+            Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> Assertions.assertTrue(
+                    objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
+
+            Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+            Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+        } finally {
+            if (engineClient != null) {
+                engineClient.shutdown();
+            }
+
+            if (node1 != null) {
+                node1.shutdown();
+            }
+
+            if (node2 != null) {
+                node2.shutdown();
+            }
+
+            if (node3 != null) {
+                node3.shutdown();
+            }
+        }
+    }
+
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    @Test
+    public void testStreamJobRestoreIn3NodeMasterDown() throws ExecutionException, InterruptedException {
+        String testCaseName = "testStreamJobRestoreIn3NodeMasterDown";
+        String testClusterName = "ClusterFaultToleranceIT_testStreamJobRestoreIn3NodeMasterDown";
+        long testRowNumber = 1000;
+        int testParallelism = 6;
+        HazelcastInstanceImpl node1 = null;
+        HazelcastInstanceImpl node2 = null;
+        HazelcastInstanceImpl node3 = null;
+        SeaTunnelClient engineClient = null;
+
+        try {
+            node1 = SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
+
+            node2 = SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
+
+            node3 = SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
 
-        Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
-            .untilAsserted(() -> Assertions.assertTrue(
-                objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+            // waiting all node added to cluster
+            HazelcastInstanceImpl finalNode = node1;
+            Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> Assertions.assertEquals(3, finalNode.getCluster().getMembers().size()));
 
-        Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
-        Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+            Common.setDeployMode(DeployMode.CLIENT);
+            ImmutablePair<String, String> testResources =
+                createTestResources(testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
+            JobConfig jobConfig = new JobConfig();
+            jobConfig.setName(testCaseName);
+
+            ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+            clientConfig.setClusterName(
+                TestUtils.getClusterName(testClusterName));
+            engineClient = new SeaTunnelClient(clientConfig);
+            JobExecutionEnvironment jobExecutionEnv =
+                engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+            ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+            CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+                return clientJobProxy.waitForJobComplete();
+            });
 
-        node1.shutdown();
-        node2.shutdown();
-        node3.shutdown();
+            Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    // Wait some tasks commit finished, and we can get rows from the sink target dir
+                    Thread.sleep(2000);
+                    System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                    Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+                        FileUtils.getFileLineNumberFromDir(testResources.getLeft()) > 1);
+                });
+
+            // shutdown master node
+            node1.shutdown();
+
+            Awaitility.await().atMost(600000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    // Wait job write all rows in file
+                    Thread.sleep(2000);
+                    System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                    Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+                        testRowNumber * testParallelism == FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                });
+
+            // sleep 10s and expect the job don't write more rows.
+            Thread.sleep(10000);
+            clientJobProxy.cancelJob();
+
+            Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> Assertions.assertTrue(
+                    objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+
+            // check the final rows
+            Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+            Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+        } finally {
+            if (engineClient != null) {
+                engineClient.shutdown();
+            }
+
+            if (node1 != null) {
+                node1.shutdown();
+            }
+
+            if (node2 != null) {
+                node2.shutdown();
+            }
+
+            if (node3 != null) {
+                node3.shutdown();
+            }
+        }
     }
 }
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
index 2789cbc3e..b066fd9c9 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
@@ -21,7 +21,7 @@
 env {
   # You can set flink configuration here
   job.mode = "${dynamic_job_mode}" # dynamic_job_mode will be replace to the final file name before test run
-  execution.checkpoint.interval = 5000
+  checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
@@ -29,6 +29,8 @@ source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
       row.num = ${dynamic_test_row_num_per_parallelism}
+      split.num = 5
+      split.read-interval = 3000
       map.size = 10
       array.size = 10
       bytes.length = 10
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index e3331a4e7..0f5d8fa4e 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -58,4 +58,10 @@ public class SeaTunnelClient implements SeaTunnelClientInstance {
             SeaTunnelPrintMessageCodec::decodeResponse
         );
     }
+
+    public void shutdown() {
+        if (hazelcastClient != null) {
+            hazelcastClient.shutdown();
+        }
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
index e893988b0..c57413585 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
@@ -125,4 +125,10 @@ public class SeaTunnelHazelcastClient {
         UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
         return requestAndGetCompletableFuture(masterUuid, request);
     }
+
+    public void shutdown() {
+        if (hazelcastClient != null) {
+            hazelcastClient.shutdown();
+        }
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index cde0901ac..22be81e01 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -35,7 +35,7 @@ public class ServerConfigOptions {
 
     public static final Option<Integer> CHECKPOINT_INTERVAL = Options.key("interval").intType().defaultValue(300000).withDescription("The interval (in milliseconds) between two consecutive checkpoints.");
 
-    public static final Option<Integer> CHECKPOINT_TIMEOUT = Options.key("timeout").intType().defaultValue(300000).withDescription("The timeout (in milliseconds) for a checkpoint.");
+    public static final Option<Integer> CHECKPOINT_TIMEOUT = Options.key("timeout").intType().defaultValue(30000).withDescription("The timeout (in milliseconds) for a checkpoint.");
 
     public static final Option<Integer> CHECKPOINT_MAX_CONCURRENT = Options.key("max-concurrent").intType().defaultValue(1).withDescription("The maximum number of concurrent checkpoints.");
 
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
index 21f4d544d..c23ddeeff 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
@@ -27,11 +27,6 @@ hazelcast:
       auto-increment: true
       port-count: 100
       port: 5801
-  map:
-    map-name-template:
-      map-store:
-        enabled: true
-        initial-mode: EAGER
-        class-name: org.apache.seatunnel.engine.server.persistence.FileMapStore
-        properties:
-          path: /tmp/file-store-map
\ No newline at end of file
+  properties:
+    hazelcast.invocation.max.retry.count: 20
+    hazelcast.tcp.join.port.try.count: 100
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 09e91eb92..51793a8f9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -64,7 +64,7 @@ public class CoordinatorService {
     private volatile ResourceManager resourceManager;
 
     /**
-     * IMap key is jobId and value is a Tuple2.
+     * IMap key is jobId and value is {@link RunningJobInfo}.
      * Tuple2 key is JobMaster init timestamp and value is the jobImmutableInformation which is sent by client when submit job
      * <p>
      * This IMap is used to recovery runningJobInfoIMap in JobMaster when a new master node active
@@ -195,7 +195,7 @@ public class CoordinatorService {
         try {
             jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
         } catch (Exception e) {
-            throw new SeaTunnelEngineException(String.format("Job id %s init JobMaster failed", jobId));
+            throw new SeaTunnelEngineException(String.format("Job id %s init JobMaster failed", jobId), e);
         }
 
         String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
@@ -265,7 +265,7 @@ public class CoordinatorService {
         } catch (Exception e) {
             isActive = false;
             logger.severe(ExceptionUtils.getMessage(e));
-            throw new SeaTunnelEngineException("check new active master error, stop loop");
+            throw new SeaTunnelEngineException("check new active master error, stop loop", e);
         }
     }
 
@@ -279,7 +279,7 @@ public class CoordinatorService {
             executorService.awaitTermination(20, TimeUnit.SECONDS);
             runningJobMasterMap = new ConcurrentHashMap<>();
         } catch (InterruptedException e) {
-            throw new SeaTunnelEngineException("wait clean executor service error");
+            throw new SeaTunnelEngineException("wait clean executor service error", e);
         }
 
         if (resourceManager != null) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 397f60434..231869318 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -203,6 +203,9 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
                 return nodeEngine.getMasterAddress().equals(nodeEngine.getThisAddress());
             }, new RetryUtils.RetryMaterial(20, true,
                 exception -> exception instanceof NullPointerException && isRunning, 1000));
+        } catch (InterruptedException e) {
+            LOGGER.info("master node check interrupted");
+            return false;
         } catch (Exception e) {
             throw new SeaTunnelEngineException("cluster have no master node", e);
         }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index 447a7fe7c..5a9b49507 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -36,7 +36,7 @@ public class SeaTunnelServerStarter {
         return ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(
             seaTunnelConfig.getHazelcastConfig(),
             HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig()),
-            new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
+            new SeaTunnelNodeContext(seaTunnelConfig))).getOriginal();
     }
 
     public static HazelcastInstanceImpl createHazelcastInstance() {
@@ -44,6 +44,6 @@ public class SeaTunnelServerStarter {
         return ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(
             seaTunnelConfig.getHazelcastConfig(),
             HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig()),
-            new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
+            new SeaTunnelNodeContext(seaTunnelConfig))).getOriginal();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index fea05ddd4..3b44634b9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -61,6 +61,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -214,24 +215,30 @@ public class TaskExecutionService {
         resultFuture.whenComplete(withTryCatch(logger, (r, s) -> {
             logger.info(
                 String.format("Task %s complete with state %s", r.getTaskGroupLocation(), r.getExecutionState()));
-            InvocationFuture<Object> invoke = null;
             long sleepTime = 1000;
-            while (isRunning && (invoke == null || !invoke.isDone())) {
-                if (null != invoke) {
+            boolean notifyStateSuccess = false;
+            while (isRunning && !notifyStateSuccess) {
+                InvocationFuture<Object> invoke = nodeEngine.getOperationService().createInvocationBuilder(
+                    SeaTunnelServer.SERVICE_NAME,
+                    new NotifyTaskStatusOperation(taskGroup.getTaskGroupLocation(), r),
+                    nodeEngine.getMasterAddress()).invoke();
+                try {
+                    invoke.get();
+                    notifyStateSuccess = true;
+                } catch (InterruptedException e) {
+                    logger.severe(e);
+                    Thread.interrupted();
+                } catch (ExecutionException e) {
+                    logger.warning(ExceptionUtils.getMessage(e));
                     logger.warning(String.format("notify the job of the task(%s) status failed, retry in %s millis",
                         taskGroup.getTaskGroupLocation(), sleepTime));
                     try {
-                        Thread.sleep(sleepTime += 1000);
-                    } catch (InterruptedException e) {
+                        Thread.sleep(sleepTime);
+                    } catch (InterruptedException ex) {
                         logger.severe(e);
                         Thread.interrupted();
                     }
                 }
-                invoke = nodeEngine.getOperationService().createInvocationBuilder(
-                    SeaTunnelServer.SERVICE_NAME,
-                    new NotifyTaskStatusOperation(taskGroup.getTaskGroupLocation(), r),
-                    nodeEngine.getMasterAddress()).invoke();
-                invoke.join();
             }
         }));
         return new PassiveCompletableFuture<>(resultFuture);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 5ed069f76..dd82f64c2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -22,6 +22,7 @@ import static org.apache.seatunnel.engine.core.checkpoint.CheckpointType.COMPLET
 import static org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan.COORDINATOR_INDEX;
 import static org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.READY_START;
 
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
 import org.apache.seatunnel.engine.checkpoint.storage.common.ProtoStuffSerializer;
@@ -44,6 +45,7 @@ import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.Getter;
+import lombok.Setter;
 import lombok.SneakyThrows;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,7 +110,7 @@ public class CheckpointCoordinator {
     private final CheckpointConfig coordinatorConfig;
 
     private int tolerableFailureCheckpoints;
-    private final transient ScheduledExecutorService scheduler;
+    private transient ScheduledExecutorService scheduler;
 
     private final AtomicLong latestTriggerTimestamp = new AtomicLong(0);
 
@@ -119,6 +121,9 @@ public class CheckpointCoordinator {
     /** Flag marking the coordinator as shut down (not accepting any messages any more). */
     private volatile boolean shutdown;
 
+    @Setter
+    private volatile boolean isAllTaskReady = false;
+
     @SneakyThrows
     public CheckpointCoordinator(CheckpointManager manager,
                                  CheckpointStorage checkpointStorage,
@@ -208,6 +213,7 @@ public class CheckpointCoordinator {
                 return;
             }
         }
+        isAllTaskReady = true;
         InvocationFuture<?>[] futures = notifyTaskStart();
         CompletableFuture.allOf(futures).join();
         scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
@@ -229,7 +235,7 @@ public class CheckpointCoordinator {
         synchronized (lock) {
             final long currentTimestamp = Instant.now().toEpochMilli();
             if (currentTimestamp - latestTriggerTimestamp.get() >= coordinatorConfig.getCheckpointInterval() &&
-                pendingCounter.get() < coordinatorConfig.getMaxConcurrentCheckpoints()) {
+                pendingCounter.get() < coordinatorConfig.getMaxConcurrentCheckpoints() && isAllTaskReady) {
                 CompletableFuture<PendingCheckpoint> pendingCheckpoint = createPendingCheckpoint(currentTimestamp, CheckpointType.CHECKPOINT_TYPE);
                 startTriggerPendingCheckpoint(pendingCheckpoint);
                 pendingCounter.incrementAndGet();
@@ -261,22 +267,30 @@ public class CheckpointCoordinator {
 
     private void startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint> pendingCompletableFuture) {
         pendingCompletableFuture.thenAcceptAsync(pendingCheckpoint -> {
-            LOG.debug("wait checkpoint completed: " + pendingCheckpoint);
+            LOG.info("wait checkpoint completed: " + pendingCheckpoint.getCheckpointId());
             PassiveCompletableFuture<CompletedCheckpoint> completableFuture = pendingCheckpoint.getCompletableFuture();
             completableFuture.thenAcceptAsync(this::completePendingCheckpoint);
 
             if (COMPLETED_POINT_TYPE != pendingCheckpoint.getCheckpointType()) {
                 // Trigger the barrier and wait for all tasks to ACK
                 LOG.debug("trigger checkpoint barrier {}/{}/{}, {}", pendingCheckpoint.getJobId(), pendingCheckpoint.getPipelineId(), pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getCheckpointType());
-                CompletableFuture.supplyAsync(() ->
+                CompletableFuture<InvocationFuture<?>[]> completableFutureArray = CompletableFuture.supplyAsync(() ->
                         new CheckpointBarrier(pendingCheckpoint.getCheckpointId(),
                             pendingCheckpoint.getCheckpointTimestamp(),
                             pendingCheckpoint.getCheckpointType()))
-                    .thenApplyAsync(this::triggerCheckpoint)
-                    .thenApplyAsync(invocationFutures -> CompletableFuture.allOf(invocationFutures).join());
+                    .thenApplyAsync(this::triggerCheckpoint);
+
+                try {
+                    CompletableFuture.allOf(completableFutureArray).get();
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                } catch (Exception e) {
+                    LOG.error(ExceptionUtils.getMessage(e));
+                    return;
+                }
             }
 
-            LOG.debug("Start a scheduled task to prevent checkpoint timeouts");
+            LOG.info("Start a scheduled task to prevent checkpoint timeouts");
             scheduler.schedule(() -> {
                     // If any task is not acked within the checkpoint timeout
                     if (pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null && !pendingCheckpoint.isFullyAcknowledged()) {
@@ -361,19 +375,29 @@ public class CheckpointCoordinator {
     }
 
     protected void cleanPendingCheckpoint(CheckpointFailureReason failureReason) {
-        pendingCheckpoints.values().forEach(pendingCheckpoint ->
-            pendingCheckpoint.abortCheckpoint(failureReason, null)
-        );
-        // TODO: clear related future & scheduler task
-        pendingCheckpoints.clear();
-        scheduler.shutdown();
+        synchronized (lock) {
+            pendingCheckpoints.values().forEach(pendingCheckpoint ->
+                pendingCheckpoint.abortCheckpoint(failureReason, null)
+            );
+            // TODO: clear related future & scheduler task
+            pendingCheckpoints.clear();
+            scheduler.shutdownNow();
+            scheduler = Executors.newScheduledThreadPool(
+                1, runnable -> {
+                    Thread thread = new Thread(runnable);
+                    thread.setDaemon(true);
+                    thread.setName(String.format("checkpoint-coordinator-%s/%s", pipelineId, jobId));
+                    return thread;
+                });
+            isAllTaskReady = false;
+        }
     }
 
     protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
         final long checkpointId = ackOperation.getBarrier().getId();
         final PendingCheckpoint pendingCheckpoint = pendingCheckpoints.get(checkpointId);
         TaskLocation location = ackOperation.getTaskLocation();
-        LOG.debug("task[{}]({}/{}) ack. {}", location.getTaskID(), location.getPipelineId(), location.getJobId(), ackOperation.getBarrier().toString());
+        LOG.info("task[{}]({}/{}) ack. {}", location.getTaskID(), location.getPipelineId(), location.getJobId(), ackOperation.getBarrier().toString());
         if (ackOperation.getBarrier().getCheckpointType() == COMPLETED_POINT_TYPE) {
             synchronized (lock) {
                 if (pendingCheckpoints.get(Barrier.PREPARE_CLOSE_BARRIER_ID) == null) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
index 8d4276f5e..703e6e1d7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
 
 public enum CheckpointFailureReason {
 
-    TASK_FAILURE("Task has failed."),
+    PIPELINE_END("Pipeline turn to end state."),
     CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
     CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
     CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown.");
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 972643551..e7ff7ae20 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -34,24 +34,22 @@ import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOp
 import org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusOperation;
 import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
 import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
-import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.Task;
-import org.apache.seatunnel.engine.server.execution.TaskGroup;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.master.JobMaster;
 import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
+import com.hazelcast.cluster.Address;
 import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -77,6 +75,7 @@ public class CheckpointManager {
     private final CheckpointStorage checkpointStorage;
 
     private final JobMaster jobMaster;
+
     public CheckpointManager(long jobId,
                              NodeEngine nodeEngine,
                              JobMaster jobMaster,
@@ -85,9 +84,12 @@ public class CheckpointManager {
         this.jobId = jobId;
         this.nodeEngine = nodeEngine;
         this.jobMaster = jobMaster;
-        this.checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, checkpointConfig.getStorage().getStorage())
-            .create(new HashMap<>());
-        IMap<Integer, Long> checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", jobId));
+        this.checkpointStorage =
+            FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class,
+                    checkpointConfig.getStorage().getStorage())
+                .create(new ConcurrentHashMap<>());
+        IMap<Integer, Long> checkpointIdMap =
+            nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", jobId));
         this.coordinatorMap = checkpointPlanMap.values().parallelStream()
             .map(plan -> {
                 IMapCheckpointIDCounter idCounter = new IMapCheckpointIDCounter(plan.getPipelineId(), checkpointIdMap);
@@ -96,7 +98,9 @@ public class CheckpointManager {
                     // TODO: support savepoint
                     PipelineState pipelineState = null;
                     if (idCounter.get() != CheckpointIDCounter.INITIAL_CHECKPOINT_ID) {
-                        pipelineState = checkpointStorage.getCheckpoint(String.valueOf(jobId), String.valueOf(plan.getPipelineId()), String.valueOf(idCounter.get() - 1));
+                        pipelineState =
+                            checkpointStorage.getCheckpoint(String.valueOf(jobId), String.valueOf(plan.getPipelineId()),
+                                String.valueOf(idCounter.get() - 1));
                     }
                     return new CheckpointCoordinator(this,
                         checkpointStorage,
@@ -133,6 +137,7 @@ public class CheckpointManager {
     }
 
     public void reportedPipelineRunning(int pipelineId) {
+        getCheckpointCoordinator(pipelineId).setAllTaskReady(true);
         getCheckpointCoordinator(pipelineId).tryTriggerPendingCheckpoint();
     }
 
@@ -158,25 +163,18 @@ public class CheckpointManager {
      */
     public void reportedTask(TaskReportStatusOperation reportStatusOperation) {
         // task address may change during restore.
-        log.debug("reported task({}) status{}", reportStatusOperation.getLocation().getTaskID(), reportStatusOperation.getStatus());
+        log.debug("reported task({}) status{}", reportStatusOperation.getLocation().getTaskID(),
+            reportStatusOperation.getStatus());
         getCheckpointCoordinator(reportStatusOperation.getLocation()).reportedTask(reportStatusOperation);
     }
 
     /**
      * Called by the JobMaster.
-     * <br> Listen to the {@link ExecutionState} of the {@link TaskGroup}, which is used to cancel the running {@link PendingCheckpoint} when the task group is abnormal.
+     * <br> Listen to the {@link PipelineStatus} of the {@link SubPlan}, which is used to cancel the running {@link PendingCheckpoint} when the SubPlan is abnormal.
      */
-    public void listenTaskGroup(TaskGroupLocation groupLocation, ExecutionState executionState) {
-        if (jobId != groupLocation.getJobId()) {
-            return;
-        }
-        switch (executionState) {
-            case FAILED:
-            case CANCELED:
-                getCheckpointCoordinator(groupLocation.getPipelineId()).cleanPendingCheckpoint(CheckpointFailureReason.TASK_FAILURE);
-                return;
-            default:
-        }
+    public CompletableFuture<Void> listenPipelineRetry(int pipelineId, PipelineStatus pipelineStatus) {
+        getCheckpointCoordinator(pipelineId).cleanPendingCheckpoint(CheckpointFailureReason.PIPELINE_END);
+        return CompletableFuture.completedFuture(null);
     }
 
     /**
@@ -220,6 +218,9 @@ public class CheckpointManager {
     }
 
     protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation operation) {
-        return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
+        Address address =
+            jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId());
+        return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation,
+            jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
index 8f43ac260..a6861b210 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
@@ -17,9 +17,12 @@
 
 package org.apache.seatunnel.engine.server.checkpoint;
 
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 
+import com.hazelcast.core.HazelcastInstanceNotActiveException;
 import com.hazelcast.map.IMap;
 
 import java.util.concurrent.CompletableFuture;
@@ -36,7 +39,10 @@ public class IMapCheckpointIDCounter implements CheckpointIDCounter {
 
     @Override
     public void start() throws Exception {
-        checkpointIdMap.putIfAbsent(pipelineId, INITIAL_CHECKPOINT_ID);
+        RetryUtils.retryWithException(() -> {
+            return checkpointIdMap.putIfAbsent(pipelineId, INITIAL_CHECKPOINT_ID);
+        }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+            exception -> exception instanceof HazelcastInstanceNotActiveException, Constant.OPERATION_RETRY_SLEEP));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index e086d02c9..6a48f5255 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -143,6 +143,9 @@ public class PhysicalPlan {
         PassiveCompletableFuture<PipelineStatus> future = subPlan.initStateFuture();
         future.thenAcceptAsync(pipelineState -> {
             try {
+                // Notify checkpoint manager when the pipeline end, Whether the pipeline will be restarted or not
+                jobMaster.getCheckpointManager()
+                    .listenPipelineRetry(subPlan.getPipelineLocation().getPipelineId(), subPlan.getPipelineState()).join();
                 if (PipelineStatus.CANCELED.equals(pipelineState)) {
                     if (canRestorePipeline(subPlan)) {
                         subPlan.restorePipeline();
@@ -170,6 +173,8 @@ public class PhysicalPlan {
                     LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
                 }
 
+                notifyCheckpointManagerPipelineEnd(subPlan);
+
                 if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
                     if (failedPipelineNum.get() > 0) {
                         updateJobState(JobStatus.FAILING);
@@ -187,6 +192,15 @@ public class PhysicalPlan {
         });
     }
 
+    /**
+     * only call when the pipeline will never restart
+     * @param subPlan subPlan
+     */
+    private void notifyCheckpointManagerPipelineEnd(@NonNull SubPlan subPlan) {
+        jobMaster.getCheckpointManager()
+            .listenPipeline(subPlan.getPipelineLocation().getPipelineId(), subPlan.getPipelineState()).join();
+    }
+
     private boolean canRestorePipeline(SubPlan subPlan) {
         return needRestore && subPlan.getPipelineRestoreNum() < PIPELINE_MAX_RESTORE_NUM;
     }
@@ -245,6 +259,9 @@ public class PhysicalPlan {
                 throw new IllegalStateException(message);
             }
 
+            // notify checkpoint manager
+            jobMaster.getCheckpointManager().shutdown(endState);
+
             LOGGER.info(String.format("%s end with state %s", getJobFullName(), endState));
             // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
             updateStateTimestamps(endState);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index a6f352505..38069a16d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -242,7 +242,6 @@ public class PhysicalVertex {
 
     private boolean turnToEndState(@NonNull ExecutionState endState) {
         synchronized (this) {
-            jobMaster.getCheckpointManager().listenTaskGroup(taskGroupLocation, endState);
             // consistency check
             ExecutionState currentState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
             if (currentState.isEndState()) {
@@ -327,7 +326,7 @@ public class PhysicalVertex {
     private void noticeTaskExecutionServiceCancel() {
         int i = 0;
         // In order not to generate uncontrolled tasks, We will try again until the taskFuture is completed
-        while (!taskFuture.isDone()) {
+        while (!taskFuture.isDone() && nodeEngine.getClusterService().getMember(getCurrentExecutionAddress()) != null) {
             try {
                 i++;
                 LOGGER.info(String.format("send cancel %s operator to member %s", taskFullName, getCurrentExecutionAddress()));
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index f299feb65..b68f9199e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -381,6 +381,8 @@ public class SubPlan {
             restorePipeline();
         } else if (PipelineStatus.CANCELING.equals(getPipelineState())) {
             cancelPipelineTasks();
+        } else if (PipelineStatus.RUNNING.equals(getPipelineState())) {
+            jobMaster.getCheckpointManager().reportedPipelineRunning(this.getPipelineLocation().getPipelineId());
         }
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 83a2af560..76cca50ad 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -20,11 +20,13 @@ package org.apache.seatunnel.engine.server.master;
 import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
 import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
@@ -98,6 +100,8 @@ public class JobMaster extends Thread {
 
     private final EngineConfig engineConfig;
 
+    private boolean isRunning = true;
+
     public JobMaster(@NonNull Data jobImmutableInformationData,
                      @NonNull NodeEngine nodeEngine,
                      @NonNull ExecutorService executorService,
@@ -304,9 +308,18 @@ public class JobMaster extends Thread {
         return ownedSlotProfilesIMap.get(pipelineLocation);
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
     public void setOwnedSlotProfiles(@NonNull PipelineLocation pipelineLocation,
                                      @NonNull Map<TaskGroupLocation, SlotProfile> pipelineOwnedSlotProfiles) {
         ownedSlotProfilesIMap.put(pipelineLocation, pipelineOwnedSlotProfiles);
+        try {
+            RetryUtils.retryWithException(() -> {
+                return pipelineOwnedSlotProfiles.equals(ownedSlotProfilesIMap.get(pipelineLocation));
+            }, new RetryUtils.RetryMaterial(20, true,
+                exception -> exception instanceof NullPointerException && isRunning, 1000));
+        } catch (Exception e) {
+            throw new SeaTunnelEngineException("Can not sync pipeline owned slot profiles with IMap", e);
+        }
     }
 
     public SlotProfile getOwnedSlotProfiles(@NonNull TaskGroupLocation taskGroupLocation) {
@@ -325,6 +338,7 @@ public class JobMaster extends Thread {
 
     public void interrupt() {
         try {
+            isRunning = false;
             jobMasterCompleteFuture.cancel(true);
         } finally {
             super.interrupt();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index b84225793..2f11954c3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
 
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
 import com.hazelcast.internal.serialization.Data;
@@ -155,10 +156,12 @@ public class CoordinatorServiceTest {
         // wait job restore
         Thread.sleep(5000);
 
-        // job will recovery running state
+        // pipeline will recovery running state
         await().atMost(200000, TimeUnit.MILLISECONDS)
             .untilAsserted(
-                () -> Assertions.assertEquals(JobStatus.RUNNING, server2.getCoordinatorService().getJobStatus(jobId)));
+                () -> Assertions.assertEquals(PipelineStatus.RUNNING,
+                    server2.getCoordinatorService().getJobMaster(jobId).getPhysicalPlan().getPipelineList().get(0)
+                        .getPipelineState()));
 
         server2.getCoordinatorService().cancelJob(jobId);
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
index f3d95ed16..6b19101d6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -69,10 +69,9 @@ public class CheckpointManagerTest extends AbstractSeaTunnelServerTest {
             planMap,
             new CheckpointConfig());
         Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
-        CompletableFuture<Void> future = checkpointManager.listenPipeline(1, PipelineStatus.FINISHED);
-        future.join();
+        checkpointManager.listenPipeline(1, PipelineStatus.FINISHED);
         Assertions.assertNull(checkpointIdMap.get(1));
-        future = checkpointManager.shutdown(JobStatus.FINISHED);
+        CompletableFuture<Void> future = checkpointManager.shutdown(JobStatus.FINISHED);
         future.join();
         Assertions.assertTrue(checkpointStorage.getAllCheckpoints(jobId + "").isEmpty());
     }