You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/16 01:15:02 UTC

[incubator-seatunnel] branch dev updated: [hotfix][engine] Add master node switch test and fix bug (#3082)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 608be51bc [hotfix][engine] Add master node switch test and fix bug (#3082)
608be51bc is described below

commit 608be51bc4f367b15b25447587c754304a5262de
Author: Eric <ga...@gmail.com>
AuthorDate: Sun Oct 16 09:14:55 2022 +0800

    [hotfix][engine] Add master node switch test and fix bug (#3082)
    
    * Add master node switch test and fix bug
    
    * Add withTryCatch to the whenComplete which may throw exception
---
 .../apache/seatunnel/common/utils/FileUtils.java   |  10 ++
 .../file/sink/writer/TextWriteStrategy.java        |   2 +
 .../engine/e2e/ClusterFaultToleranceIT.java        | 101 ++++++++++++++++++--
 .../cluster_batch_fake_to_localfile_template.conf  |   9 +-
 .../engine/server/CoordinatorService.java          | 105 +++++++++++++++------
 .../seatunnel/engine/server/SeaTunnelServer.java   |  97 +++++++++----------
 .../engine/server/TaskExecutionService.java        |  19 ++--
 .../CheckpointCoordinatorConfiguration.java        |   3 +-
 .../engine/server/dag/physical/PhysicalPlan.java   |  24 ++---
 .../engine/server/dag/physical/PhysicalVertex.java |  27 +++---
 .../engine/server/dag/physical/SubPlan.java        |  50 +++++-----
 .../seatunnel/engine/server/master/JobMaster.java  |  27 ++++--
 .../resourcemanager/AbstractResourceManager.java   |  23 +++--
 .../resourcemanager/ResourceRequestHandler.java    |  14 +--
 .../server/service/slot/DefaultSlotService.java    |  50 +++++-----
 .../engine/server/AbstractSeaTunnelServerTest.java |  19 +++-
 .../engine/server/CoordinatorServiceTest.java      |  65 ++++++++++++-
 17 files changed, 447 insertions(+), 198 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
index 491ae2caa..a4738b6cc 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
@@ -121,6 +121,16 @@ public class FileUtils {
         return getFileLineNumber(file.getPath());
     }
 
+    /**
+     * create a dir, if the dir exists, clear the files and sub dirs in the dir.
+     * @param dirPath dirPath
+     */
+    public static void createNewDir(@NonNull String dirPath) {
+        deleteFile(dirPath);
+        File file = new File(dirPath);
+        file.mkdirs();
+    }
+
     /**
      * clear dir and the sub dir
      *
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index 4433775d5..e9579723d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -101,6 +101,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
             }
             needMoveFiles.put(key, getTargetLocation(key));
         });
+        beingWrittenOutputStream.clear();
+        isFirstWrite.clear();
     }
 
     private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index a1b0b1a42..6fc5cee93 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -17,8 +17,11 @@
 
 package org.apache.seatunnel.engine.e2e;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
 import org.apache.seatunnel.engine.client.job.ClientJobProxy;
@@ -31,8 +34,10 @@ import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 import com.hazelcast.client.config.ClientConfig;
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
 
@@ -46,14 +51,23 @@ import java.util.concurrent.TimeUnit;
 /**
  * Cluster fault tolerance test. Test the job recovery capability and data consistency assurance capability in case of cluster node failure
  */
+@Slf4j
 public class ClusterFaultToleranceIT {
 
     public static final String DYNAMIC_TEST_CASE_NAME = "dynamic_test_case_name";
 
+    public static final String DYNAMIC_JOB_MODE = "dynamic_job_mode";
+
+    public static final String DYNAMIC_TEST_ROW_NUM_PER_PARALLELISM = "dynamic_test_row_num_per_parallelism";
+
+    public static final String DYNAMIC_TEST_PARALLELISM = "dynamic_test_parallelism";
+
     @Test
     public void testBatchJobRunOkIn3Node() throws ExecutionException, InterruptedException {
         String testCaseName = "testBatchJobRunOkIn3Node";
         String testClusterName = "ClusterFaultToleranceIT_testBatchJobRunOkIn3Node";
+        long testRowNumber = 1000;
+        int testParallelism = 1;
         HazelcastInstanceImpl node1 =
             SeaTunnelServerStarter.createHazelcastInstance(
                 TestUtils.getClusterName(testClusterName));
@@ -72,7 +86,7 @@ public class ClusterFaultToleranceIT {
 
         // TODO Need FakeSource support parallel first
         Common.setDeployMode(DeployMode.CLIENT);
-        ImmutablePair<String, String> testResources = createTestResources(testCaseName);
+        ImmutablePair<String, String> testResources = createTestResources(testCaseName, JobMode.BATCH, testRowNumber, testParallelism);
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName(testCaseName);
 
@@ -88,12 +102,16 @@ public class ClusterFaultToleranceIT {
             return clientJobProxy.waitForJobComplete();
         });
 
-        Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+        Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
             .untilAsserted(() -> Assertions.assertTrue(
                 objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
 
         Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
-        Assertions.assertEquals(100, fileLineNumberFromDir);
+        Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+        node1.shutdown();
+        node2.shutdown();
+        node3.shutdown();
     }
 
     /**
@@ -101,17 +119,26 @@ public class ClusterFaultToleranceIT {
      * It will delete the test sink target path before return the final job config file path
      *
      * @param testCaseName testCaseName
+     * @param jobMode      jobMode
+     * @param rowNumber row.num per FakeSource parallelism
+     * @param parallelism FakeSource parallelism
      * @return
      */
-    private ImmutablePair<String, String> createTestResources(@NonNull String testCaseName) {
+    private ImmutablePair<String, String> createTestResources(@NonNull String testCaseName, @NonNull JobMode jobMode,
+                                                              long rowNumber, int parallelism) {
+        checkArgument(rowNumber > 0, "rowNumber must greater than 0");
+        checkArgument(parallelism > 0, "parallelism must greater than 0");
         Map<String, String> valueMap = new HashMap<>();
         valueMap.put(DYNAMIC_TEST_CASE_NAME, testCaseName);
+        valueMap.put(DYNAMIC_JOB_MODE, jobMode.toString());
+        valueMap.put(DYNAMIC_TEST_ROW_NUM_PER_PARALLELISM, String.valueOf(rowNumber));
+        valueMap.put(DYNAMIC_TEST_PARALLELISM, String.valueOf(parallelism));
 
         String targetDir = "/tmp/hive/warehouse/" + testCaseName;
-        targetDir = targetDir.replaceAll("/", File.separator);
+        targetDir = targetDir.replace("/", File.separator);
 
         // clear target dir before test
-        FileUtils.deleteFile(targetDir);
+        FileUtils.createNewDir(targetDir);
 
         String targetConfigFilePath =
             File.separator + "tmp" + File.separator + "test_conf" + File.separator + testCaseName +
@@ -121,4 +148,66 @@ public class ClusterFaultToleranceIT {
 
         return new ImmutablePair<>(targetDir, targetConfigFilePath);
     }
+
+    @Test
+    @Disabled("Disable because we can not support changeless row number in FakeSource Stream Job")
+    public void testStreamJobRunOkIn3Node() throws ExecutionException, InterruptedException {
+        String testCaseName = "testStreamJobRunOkIn3Node";
+        String testClusterName = "ClusterFaultToleranceIT_testStreamJobRunOkIn3Node";
+        long testRowNumber = 1000;
+        int testParallelism = 1;
+        HazelcastInstanceImpl node1 =
+            SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
+
+        HazelcastInstanceImpl node2 =
+            SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
+
+        HazelcastInstanceImpl node3 =
+            SeaTunnelServerStarter.createHazelcastInstance(
+                TestUtils.getClusterName(testClusterName));
+
+        // waiting all node added to cluster
+        Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertEquals(3, node1.getCluster().getMembers().size()));
+
+        // TODO Need FakeSource support parallel first
+        Common.setDeployMode(DeployMode.CLIENT);
+        ImmutablePair<String, String> testResources = createTestResources(testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName(testCaseName);
+
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        clientConfig.setClusterName(
+            TestUtils.getClusterName(testClusterName));
+        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+        JobExecutionEnvironment jobExecutionEnv =
+            engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+        ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+        CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+            return clientJobProxy.waitForJobComplete();
+        });
+
+        Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> {
+                log.info("File Lines ==================" + FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+                Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+                    testRowNumber * testParallelism == FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+            });
+
+        clientJobProxy.cancelJob();
+
+        Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertTrue(
+                objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+
+        Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+        Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+        node1.shutdown();
+        node2.shutdown();
+        node3.shutdown();
+    }
 }
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
index 35b4e44c4..2789cbc3e 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
@@ -20,7 +20,7 @@
 
 env {
   # You can set flink configuration here
-  job.mode = "BATCH"
+  job.mode = "${dynamic_job_mode}" # dynamic_job_mode will be replace to the final file name before test run
   execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
@@ -28,12 +28,12 @@ env {
 source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
-      row.num = 100
+      row.num = ${dynamic_test_row_num_per_parallelism}
       map.size = 10
       array.size = 10
       bytes.length = 10
       string.length = 10
-      parallelism = 1
+      parallelism = ${dynamic_test_parallelism}
       schema = {
         fields {
           c_map = "map<string, array<int>>"
@@ -81,9 +81,6 @@ sink {
     path="/tmp/hive/warehouse/${dynamic_test_case_name}" # dynamic_test_case_name will be replace to the final file name before test run
     field_delimiter="\t"
     row_delimiter="\n"
-    partition_by=["c_string"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
     file_name_expression="${transactionId}_${now}"
     file_format="text"
     filename_time_format="yyyy.MM.dd"
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 2b4ef575a..78ec4158d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -35,6 +35,7 @@ import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.hazelcast.cluster.Address;
 import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.internal.services.MembershipServiceEvent;
@@ -50,6 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -111,15 +113,21 @@ public class CoordinatorService {
     private volatile boolean isActive = false;
 
     private final ExecutorService executorService;
-    private final ScheduledExecutorService monitorService;
+
+    private final SeaTunnelServer seaTunnelServer;
+
+    private ScheduledExecutorService masterActiveListener;
 
     @SuppressWarnings("checkstyle:MagicNumber")
-    public CoordinatorService(@NonNull NodeEngineImpl nodeEngine, @NonNull ExecutorService executorService) {
+    public CoordinatorService(@NonNull NodeEngineImpl nodeEngine, @NonNull SeaTunnelServer seaTunnelServer) {
         this.nodeEngine = nodeEngine;
         this.logger = nodeEngine.getLogger(getClass());
-        this.executorService = executorService;
-        this.monitorService = Executors.newSingleThreadScheduledExecutor();
-        monitorService.scheduleAtFixedRate(() -> checkNewActiveMaster(), 0, 100, TimeUnit.MILLISECONDS);
+        this.executorService =
+            Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+                .setNameFormat("seatunnel-coordinator-service-%d").build());
+        this.seaTunnelServer = seaTunnelServer;
+        masterActiveListener = Executors.newSingleThreadScheduledExecutor();
+        masterActiveListener.scheduleAtFixedRate(() -> checkNewActiveMaster(), 0, 100, TimeUnit.MILLISECONDS);
     }
 
     public JobMaster getJobMaster(Long jobId) {
@@ -146,8 +154,11 @@ public class CoordinatorService {
         ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
 
         List<CompletableFuture<Void>> collect = runningJobInfoIMap.entrySet().stream().map(entry -> {
-            return CompletableFuture.runAsync(() -> restoreJobFromMasterActiveSwitch(entry.getKey(), entry.getValue()),
-                executorService);
+            return CompletableFuture.runAsync(() -> {
+                logger.info(String.format("begin restore job (%s) from master active switch", entry.getKey()));
+                restoreJobFromMasterActiveSwitch(entry.getKey(), entry.getValue());
+                logger.info(String.format("restore job (%s) from master active switch finished", entry.getKey()));
+            }, executorService);
         }).collect(Collectors.toList());
 
         try {
@@ -170,7 +181,7 @@ public class CoordinatorService {
             new JobMaster(runningJobInfo.getJobImmutableInformation(),
                 nodeEngine,
                 executorService,
-                resourceManager,
+                getResourceManager(),
                 runningJobStateIMap,
                 runningJobStateTimestampsIMap,
                 ownedSlotProfilesIMap);
@@ -181,12 +192,18 @@ public class CoordinatorService {
             throw new SeaTunnelEngineException(String.format("Job id %s init JobMaster failed", jobId));
         }
 
+        String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
         if (jobStatus.isEndState()) {
+            logger.info(String.format(
+                "The restore %s is in an end state %s, store the job info to JobHistory and clear the job running time info",
+                jobFullName, jobStatus));
             removeJobIMap(jobMaster);
             return;
         }
 
         if (jobStatus.ordinal() < JobStatus.RUNNING.ordinal()) {
+            logger.info(
+                String.format("The restore %s is state %s, cancel job and submit it again.", jobFullName, jobStatus));
             jobMaster.cancelJob();
             jobMaster.getJobMasterCompleteFuture().join();
             submitJob(jobId, runningJobInfo.getJobImmutableInformation()).join();
@@ -197,6 +214,7 @@ public class CoordinatorService {
         jobMaster.markRestore();
 
         if (JobStatus.CANCELLING.equals(jobStatus)) {
+            logger.info(String.format("The restore %s is in %s state, cancel the job", jobFullName, jobStatus));
             CompletableFuture.runAsync(() -> {
                 try {
                     jobMaster.cancelJob();
@@ -211,6 +229,8 @@ public class CoordinatorService {
         }
 
         if (JobStatus.RUNNING.equals(jobStatus)) {
+            logger.info(String.format("The restore %s is in %s state, restore pipeline and take over this job running",
+                jobFullName, jobStatus));
             CompletableFuture.runAsync(() -> {
                 try {
                     jobMaster.getPhysicalPlan().getPipelineList().forEach(SubPlan::restorePipelineState);
@@ -226,14 +246,20 @@ public class CoordinatorService {
     }
 
     private void checkNewActiveMaster() {
-        if (!isActive && isMasterNode()) {
-            logger.info("This node become a new active master node, begin init coordinator service");
-            initCoordinatorService();
-            isActive = true;
-        } else if (isActive && !isMasterNode()) {
+        try {
+            if (!isActive && this.seaTunnelServer.isMasterNode()) {
+                logger.info("This node become a new active master node, begin init coordinator service");
+                initCoordinatorService();
+                isActive = true;
+            } else if (isActive && !this.seaTunnelServer.isMasterNode()) {
+                isActive = false;
+                logger.info("This node become leave active master node, begin clear coordinator service");
+                clearCoordinatorService();
+            }
+        } catch (Exception e) {
             isActive = false;
-            logger.info("This node become leave active master node, begin clear coordinator service");
-            clearCoordinatorService();
+            logger.severe(ExceptionUtils.getMessage(e));
+            throw new SeaTunnelEngineException("check new active master error, stop loop");
         }
     }
 
@@ -255,15 +281,6 @@ public class CoordinatorService {
         }
     }
 
-    public boolean isMasterNode() {
-        Address masterAddress = nodeEngine.getMasterAddress();
-        if (masterAddress == null) {
-            return false;
-        }
-
-        return masterAddress.equals(nodeEngine.getThisAddress());
-    }
-
     /**
      * Lazy load for resource manager
      */
@@ -387,10 +404,10 @@ public class CoordinatorService {
     }
 
     public void shutdown() {
-        if (resourceManager != null) {
-            resourceManager.close();
+        if (masterActiveListener != null) {
+            masterActiveListener.shutdownNow();
         }
-        monitorService.shutdown();
+        clearCoordinatorService();
     }
 
     /**
@@ -418,11 +435,12 @@ public class CoordinatorService {
             ExecutionState executionState = physicalVertex.getExecutionState();
             if (null != deployAddress && deployAddress.equals(lostAddress) &&
                 (executionState.equals(ExecutionState.DEPLOYING) ||
-                    executionState.equals(ExecutionState.RUNNING))) {
+                    executionState.equals(ExecutionState.RUNNING) ||
+                    executionState.equals(ExecutionState.CANCELING))) {
                 TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
                 physicalVertex.updateTaskExecutionState(
                     new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED,
-                        new SeaTunnelEngineException(
+                        new JobException(
                             String.format("The taskGroup(%s) deployed node(%s) offline", taskGroupLocation,
                                 lostAddress))));
             }
@@ -433,4 +451,33 @@ public class CoordinatorService {
         this.getResourceManager().memberRemoved(event);
         this.failedTaskOnMemberRemoved(event);
     }
+
+    public void printExecutionInfo() {
+        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
+        int activeCount = threadPoolExecutor.getActiveCount();
+        int corePoolSize = threadPoolExecutor.getCorePoolSize();
+        int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
+        int poolSize = threadPoolExecutor.getPoolSize();
+        long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
+        long taskCount = threadPoolExecutor.getTaskCount();
+        StringBuffer sbf = new StringBuffer();
+        sbf.append("activeCount=")
+            .append(activeCount)
+            .append("\n")
+            .append("corePoolSize=")
+            .append(corePoolSize)
+            .append("\n")
+            .append("maximumPoolSize=")
+            .append(maximumPoolSize)
+            .append("\n")
+            .append("poolSize=")
+            .append(poolSize)
+            .append("\n")
+            .append("completedTaskCount=")
+            .append(completedTaskCount)
+            .append("\n")
+            .append("taskCount=")
+            .append(taskCount);
+        logger.info(sbf.toString());
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 90a87a770..269fd3ed5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server;
 
+import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
@@ -25,7 +26,6 @@ import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
 import org.apache.seatunnel.engine.server.service.slot.SlotService;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.internal.services.ManagedService;
 import com.hazelcast.internal.services.MembershipAwareService;
@@ -41,10 +41,8 @@ import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
 import lombok.NonNull;
 
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker {
@@ -60,17 +58,14 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
     private CoordinatorService coordinatorService;
     private ScheduledExecutorService monitorService;
 
-    private final ExecutorService executorService;
-
     private final SeaTunnelConfig seaTunnelConfig;
 
+    private volatile boolean isRunning = true;
+
     public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
         this.logger = node.getLogger(getClass());
         this.liveOperationRegistry = new LiveOperationRegistry();
         this.seaTunnelConfig = seaTunnelConfig;
-        this.executorService =
-            Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-                .setNameFormat("seatunnel-server-executor-%d").build());
         logger.info("SeaTunnel server start...");
     }
 
@@ -100,7 +95,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         );
         taskExecutionService.start();
         getSlotService();
-        coordinatorService = new CoordinatorService(nodeEngine, executorService);
+        coordinatorService = new CoordinatorService(nodeEngine, this);
         monitorService = Executors.newSingleThreadScheduledExecutor();
         monitorService.scheduleAtFixedRate(() -> printExecutionInfo(), 0, 60, TimeUnit.SECONDS);
     }
@@ -112,15 +107,19 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
 
     @Override
     public void shutdown(boolean terminate) {
+        isRunning = false;
+        if (taskExecutionService != null) {
+            taskExecutionService.shutdown();
+        }
+        if (monitorService != null) {
+            monitorService.shutdownNow();
+        }
         if (slotService != null) {
             slotService.close();
         }
         if (coordinatorService != null) {
             coordinatorService.shutdown();
         }
-        executorService.shutdown();
-        taskExecutionService.shutdown();
-        monitorService.shutdown();
     }
 
     @Override
@@ -131,7 +130,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
     @Override
     public void memberRemoved(MembershipServiceEvent event) {
         try {
-            if (coordinatorService.isMasterNode()) {
+            if (isMasterNode()) {
                 this.getCoordinatorService().memberRemoved(event);
             }
         } catch (SeaTunnelEngineException e) {
@@ -159,19 +158,25 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
     @SuppressWarnings("checkstyle:MagicNumber")
     public CoordinatorService getCoordinatorService() {
         int retryCount = 0;
-        while (coordinatorService.isMasterNode() && !coordinatorService.isCoordinatorActive() && retryCount < 20) {
-            try {
-                logger.warning("Waiting this node become the active master node");
-                Thread.sleep(1000);
-                retryCount++;
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+        if (isMasterNode()) {
+            // TODO the retry count and sleep time need configurable
+            while (!coordinatorService.isCoordinatorActive() && retryCount < 20 && isRunning) {
+                try {
+                    logger.warning("This is master node, waiting the coordinator service init finished");
+                    Thread.sleep(1000);
+                    retryCount++;
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
             }
+            if (coordinatorService.isCoordinatorActive()) {
+                return coordinatorService;
+            }
+
+            throw new SeaTunnelEngineException("Can not get coordinator service from an active master node.");
+        } else {
+            throw new SeaTunnelEngineException("Please don't get coordinator service from an inactive master node");
         }
-        if (!coordinatorService.isCoordinatorActive()) {
-            throw new SeaTunnelEngineException("Can not get coordinator service from an inactive master node.");
-        }
-        return coordinatorService;
     }
 
     public TaskExecutionService getTaskExecutionService() {
@@ -180,11 +185,13 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
 
     /**
      * return whether task is end
+     *
      * @param taskGroupLocation taskGroupLocation
      * @return
      */
     public boolean taskIsEnded(@NonNull TaskGroupLocation taskGroupLocation) {
-        IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+        IMap<Object, Object> runningJobState =
+            nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
         if (runningJobState == null) {
             return false;
         }
@@ -193,32 +200,20 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         return taskState == null ? false : ((ExecutionState) taskState).isEndState();
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public boolean isMasterNode() {
+        // must retry until the cluster have master node
+        try {
+            return RetryUtils.retryWithException(() -> {
+                return nodeEngine.getMasterAddress().equals(nodeEngine.getThisAddress());
+            }, new RetryUtils.RetryMaterial(20, true,
+                exception -> exception instanceof NullPointerException && isRunning, 1000));
+        } catch (Exception e) {
+            throw new SeaTunnelEngineException("cluster have no master node", e);
+        }
+    }
+
     private void printExecutionInfo() {
-        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
-        int activeCount = threadPoolExecutor.getActiveCount();
-        int corePoolSize = threadPoolExecutor.getCorePoolSize();
-        int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
-        int poolSize = threadPoolExecutor.getPoolSize();
-        long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
-        long taskCount = threadPoolExecutor.getTaskCount();
-        StringBuffer sbf = new StringBuffer();
-        sbf.append("activeCount=")
-            .append(activeCount)
-            .append("\n")
-            .append("corePoolSize=")
-            .append(corePoolSize)
-            .append("\n")
-            .append("maximumPoolSize=")
-            .append(maximumPoolSize)
-            .append("\n")
-            .append("poolSize=")
-            .append(poolSize)
-            .append("\n")
-            .append("completedTaskCount=")
-            .append(completedTaskCount)
-            .append("\n")
-            .append("taskCount=")
-            .append(taskCount);
-        logger.info(sbf.toString());
+        coordinatorService.printExecutionInfo();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 6f88f89dc..44d2002a9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -77,7 +77,7 @@ public class TaskExecutionService {
     private final String hzInstanceName;
     private final NodeEngineImpl nodeEngine;
     private final ILogger logger;
-    private volatile boolean isShutdown;
+    private volatile boolean isRunning = true;
     private final LinkedBlockingDeque<TaskTracker> threadShareTaskQueue = new LinkedBlockingDeque<>();
     private final ExecutorService executorService = newCachedThreadPool(new BlockingTaskThreadFactory());
     private final RunBusWorkSupplier runBusWorkSupplier = new RunBusWorkSupplier(executorService, threadShareTaskQueue);
@@ -97,7 +97,7 @@ public class TaskExecutionService {
     }
 
     public void shutdown() {
-        isShutdown = true;
+        isRunning = false;
         executorService.shutdownNow();
     }
 
@@ -211,10 +211,12 @@ public class TaskExecutionService {
             logger.severe(ExceptionUtils.getMessage(t));
             resultFuture.completeExceptionally(t);
         }
-        resultFuture.whenComplete((r, s) -> {
+        resultFuture.whenComplete(withTryCatch(logger, (r, s) -> {
+            logger.info(
+                String.format("Task %s complete with state %s", r.getTaskGroupLocation(), r.getExecutionState()));
             InvocationFuture<Object> invoke = null;
             long sleepTime = 1000;
-            do {
+            while (isRunning && (invoke == null || !invoke.isDone())) {
                 if (null != invoke) {
                     logger.warning(String.format("notify the job of the task(%s) status failed, retry in %s millis",
                         taskGroup.getTaskGroupLocation(), sleepTime));
@@ -230,8 +232,8 @@ public class TaskExecutionService {
                     new NotifyTaskStatusOperation(taskGroup.getTaskGroupLocation(), r),
                     nodeEngine.getMasterAddress()).invoke();
                 invoke.join();
-            } while (!invoke.isDone());
-        });
+            }
+        }));
         return new PassiveCompletableFuture<>(resultFuture);
     }
 
@@ -270,7 +272,7 @@ public class TaskExecutionService {
                 ProgressState result;
                 do {
                     result = t.call();
-                } while (!result.isDone() && !isShutdown &&
+                } while (!result.isDone() && isRunning &&
                     !tracker.taskGroupExecutionTracker.executionCompletedExceptionally());
             } catch (Throwable e) {
                 logger.warning("Exception in " + t, e);
@@ -313,7 +315,7 @@ public class TaskExecutionService {
         @SneakyThrows
         @Override
         public void run() {
-            while (keep.get()) {
+            while (keep.get() && isRunning) {
                 TaskTracker taskTracker = null != exclusiveTaskTracker.get() ?
                     exclusiveTaskTracker.get() :
                     taskqueue.takeFirst();
@@ -431,6 +433,7 @@ public class TaskExecutionService {
         }
 
         void taskDone() {
+            logger.info("taskDone: " + taskGroup.getTaskGroupLocation());
             if (completionLatch.decrementAndGet() == 0) {
                 TaskGroupLocation taskGroupLocation = taskGroup.getTaskGroupLocation();
                 executionContexts.remove(taskGroupLocation);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java
index 91c1e3ce4..257bdbe6a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java
@@ -101,7 +101,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
 
     @SuppressWarnings("MagicNumber")
     public static final class Builder {
-        private long checkpointInterval = 300000;
+        // TODO 5000 is for test, we can update checkpointInterval to 300000 after we support it read from job config
+        private long checkpointInterval = 5000;
         private long checkpointTimeout = 300000;
         private int maxConcurrentCheckpoints = 1;
         private int tolerableFailureCheckpoints = 0;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index aacd99297..556d15809 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -169,20 +169,20 @@ public class PhysicalPlan {
                     jobMaster.releasePipelineResource(subPlan);
                     LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
                 }
-            } catch (Throwable e) {
-                // Because only cancelJob or releasePipelineResource can throw exception, so we only output log here
-                LOGGER.severe(ExceptionUtils.getMessage(e));
-            }
 
-            if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
-                if (failedPipelineNum.get() > 0) {
-                    updateJobState(JobStatus.FAILING);
-                } else if (canceledPipelineNum.get() > 0) {
-                    turnToEndState(JobStatus.CANCELED);
-                } else {
-                    turnToEndState(JobStatus.FINISHED);
+                if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
+                    if (failedPipelineNum.get() > 0) {
+                        updateJobState(JobStatus.FAILING);
+                    } else if (canceledPipelineNum.get() > 0) {
+                        turnToEndState(JobStatus.CANCELED);
+                    } else {
+                        turnToEndState(JobStatus.FINISHED);
+                    }
+                    jobEndFuture.complete((JobStatus) runningJobStateIMap.get(jobId));
                 }
-                jobEndFuture.complete((JobStatus) runningJobStateIMap.get(jobId));
+            } catch (Throwable e) {
+                // Because only cancelJob or releasePipelineResource can throw exception, so we only output log here
+                LOGGER.severe("Never come here ", e);
             }
         });
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index afcaf4a3c..a6f352505 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -101,8 +101,6 @@ public class PhysicalVertex {
 
     private final NodeEngine nodeEngine;
 
-    private Address currentExecutionAddress;
-
     private TaskGroupImmutableInformation taskGroupImmutableInformation;
 
     private JobMaster jobMaster;
@@ -150,14 +148,15 @@ public class PhysicalVertex {
         this.nodeEngine = nodeEngine;
         this.taskFullName =
             String.format(
-                "Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)]",
+                "Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)], taskGroupLocation: [%s]",
                 jobImmutableInformation.getJobConfig().getName(),
                 jobImmutableInformation.getJobId(),
                 pipelineId,
                 totalPipelineNum,
                 taskGroup.getTaskGroupName(),
                 subTaskGroupIndex + 1,
-                parallelism);
+                parallelism,
+                taskGroupLocation);
         this.taskFuture = new CompletableFuture<>();
 
         this.runningJobStateIMap = runningJobStateIMap;
@@ -167,6 +166,9 @@ public class PhysicalVertex {
     public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
         this.taskFuture = new CompletableFuture<>();
         ExecutionState executionState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
+        if (executionState != null) {
+            LOGGER.info(String.format("The task %s is in state %s when init state future", taskFullName, executionState));
+        }
         // If the task state is CANCELING we need call noticeTaskExecutionServiceCancel().
         if (ExecutionState.CANCELING.equals(executionState)) {
             noticeTaskExecutionServiceCancel();
@@ -202,7 +204,6 @@ public class PhysicalVertex {
     // This method must not throw an exception
     public void deploy(@NonNull SlotProfile slotProfile) {
         try {
-            currentExecutionAddress = slotProfile.getWorker();
             if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
                 deployOnLocal(slotProfile);
             } else {
@@ -257,12 +258,11 @@ public class PhysicalVertex {
                 return false;
             }
 
+            updateStateTimestamps(endState);
+            runningJobStateIMap.set(taskGroupLocation, endState);
             LOGGER.info(String.format("%s turn to end state %s.",
                 taskFullName,
                 endState));
-            updateStateTimestamps(endState);
-
-            runningJobStateIMap.set(taskGroupLocation, endState);
             return true;
         }
     }
@@ -296,14 +296,12 @@ public class PhysicalVertex {
 
             // now do the actual state transition
             if (current.equals(runningJobStateIMap.get(taskGroupLocation))) {
+                updateStateTimestamps(targetState);
+                runningJobStateIMap.set(taskGroupLocation, targetState);
                 LOGGER.info(String.format("%s turn from state %s to %s.",
                     taskFullName,
                     current,
                     targetState));
-
-                updateStateTimestamps(targetState);
-
-                runningJobStateIMap.set(taskGroupLocation, targetState);
                 return true;
             } else {
                 return false;
@@ -332,9 +330,10 @@ public class PhysicalVertex {
         while (!taskFuture.isDone()) {
             try {
                 i++;
+                LOGGER.info(String.format("send cancel %s operator to member %s", taskFullName, getCurrentExecutionAddress()));
                 nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
                         new CancelTaskOperation(taskGroupLocation),
-                        currentExecutionAddress)
+                        getCurrentExecutionAddress())
                     .invoke().get();
                 return;
             } catch (Exception e) {
@@ -403,7 +402,7 @@ public class PhysicalVertex {
     }
 
     public Address getCurrentExecutionAddress() {
-        return currentExecutionAddress;
+        return jobMaster.getOwnedSlotProfiles(taskGroupLocation).getWorker();
     }
 
     public TaskGroupLocation getTaskGroupLocation() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index d9672c6d2..752c6741f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -143,28 +143,34 @@ public class SubPlan {
 
     private void addPhysicalVertexCallBack(PassiveCompletableFuture<TaskExecutionState> future) {
         future.thenAcceptAsync(executionState -> {
-            // We need not handle t, Because we will not return t from PhysicalVertex
-            if (ExecutionState.CANCELED.equals(executionState.getExecutionState())) {
-                canceledTaskNum.incrementAndGet();
-            } else if (ExecutionState.FAILED.equals(executionState.getExecutionState())) {
-                LOGGER.severe(String.format("Task Failed in %s, Begin to cancel other tasks in this pipeline.",
-                    this.getPipelineFullName()));
-                failedTaskNum.incrementAndGet();
-                cancelPipeline();
-            }
+            try {
+                // We need not handle t, Because we will not return t from PhysicalVertex
+                if (ExecutionState.CANCELED.equals(executionState.getExecutionState())) {
+                    canceledTaskNum.incrementAndGet();
+                } else if (ExecutionState.FAILED.equals(executionState.getExecutionState())) {
+                    LOGGER.severe(String.format("Task %s Failed in %s, Begin to cancel other tasks in this pipeline.",
+                        executionState.getTaskGroupLocation(),
+                        this.getPipelineFullName()));
+                    failedTaskNum.incrementAndGet();
+                    cancelPipeline();
+                }
 
-            if (finishedTaskNum.incrementAndGet() == (physicalVertexList.size() + coordinatorVertexList.size())) {
-                if (failedTaskNum.get() > 0) {
-                    turnToEndState(PipelineState.FAILED);
-                    LOGGER.info(String.format("%s end with state FAILED", this.pipelineFullName));
-                } else if (canceledTaskNum.get() > 0) {
-                    turnToEndState(PipelineState.CANCELED);
-                    LOGGER.info(String.format("%s end with state CANCELED", this.pipelineFullName));
-                } else {
-                    turnToEndState(PipelineState.FINISHED);
-                    LOGGER.info(String.format("%s end with state FINISHED", this.pipelineFullName));
+                if (finishedTaskNum.incrementAndGet() == (physicalVertexList.size() + coordinatorVertexList.size())) {
+                    if (failedTaskNum.get() > 0) {
+                        turnToEndState(PipelineState.FAILED);
+                        LOGGER.info(String.format("%s end with state FAILED", this.pipelineFullName));
+                    } else if (canceledTaskNum.get() > 0) {
+                        turnToEndState(PipelineState.CANCELED);
+                        LOGGER.info(String.format("%s end with state CANCELED", this.pipelineFullName));
+                    } else {
+                        turnToEndState(PipelineState.FINISHED);
+                        LOGGER.info(String.format("%s end with state FINISHED", this.pipelineFullName));
+                    }
+                    pipelineFuture.complete((PipelineState) runningJobStateIMap.get(pipelineLocation));
                 }
-                pipelineFuture.complete((PipelineState) runningJobStateIMap.get(pipelineLocation));
+            } catch (Throwable e) {
+                LOGGER.severe(String.format("Never come here. handle %s %s error",
+                    executionState.getTaskGroupLocation(), executionState.getExecutionState()), e);
             }
         });
     }
@@ -330,7 +336,9 @@ public class SubPlan {
                 pipelineRestoreNum++;
                 LOGGER.info(String.format("Restore pipeline %s", pipelineFullName));
                 // We must ensure the scheduler complete and then can handle pipeline state change.
-                jobMaster.getScheduleFuture().join();
+                if (jobMaster.getScheduleFuture() != null) {
+                    jobMaster.getScheduleFuture().join();
+                }
 
                 if (reSchedulerPipelineFuture != null) {
                     reSchedulerPipelineFuture.join();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index cd593c112..b17bdc705 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.server.master;
 
+import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
@@ -88,7 +90,7 @@ public class JobMaster extends Thread {
 
     private final IMap<Object, Object> runningJobStateTimestampsIMap;
 
-    private CompletableFuture<Void> scheduleFuture = new CompletableFuture<>();
+    private CompletableFuture<Void> scheduleFuture;
 
     private volatile boolean restore = false;
 
@@ -113,9 +115,10 @@ public class JobMaster extends Thread {
     public void init(long initializationTimestamp) throws Exception {
         jobImmutableInformation = nodeEngine.getSerializationService().toObject(
             jobImmutableInformationData);
-        LOGGER.info("Job [" + jobImmutableInformation.getJobId() + "] submit");
-        LOGGER.info(
-            "Job [" + jobImmutableInformation.getJobId() + "] jar urls " + jobImmutableInformation.getPluginJarsUrls());
+        LOGGER.info(String.format("Init JobMaster for Job %s (%s) ", jobImmutableInformation.getJobConfig().getName(),
+            jobImmutableInformation.getJobId()));
+        LOGGER.info(String.format("Job %s (%s) needed jar urls %s", jobImmutableInformation.getJobConfig().getName(),
+            jobImmutableInformation.getJobId(), jobImmutableInformation.getPluginJarsUrls()));
 
         LogicalDag logicalDag;
         if (!CollectionUtils.isEmpty(jobImmutableInformation.getPluginJarsUrls())) {
@@ -149,14 +152,14 @@ public class JobMaster extends Thread {
     public void initStateFuture() {
         jobMasterCompleteFuture = new CompletableFuture<>();
         PassiveCompletableFuture<JobStatus> jobStatusFuture = physicalPlan.initStateFuture();
-        jobStatusFuture.whenComplete((v, t) -> {
+        jobStatusFuture.whenComplete(withTryCatch(LOGGER, (v, t) -> {
             // We need not handle t, Because we will not return t from physicalPlan
             if (JobStatus.FAILING.equals(v)) {
                 cleanJob();
                 physicalPlan.updateJobState(JobStatus.FAILING, JobStatus.FAILED);
             }
             jobMasterCompleteFuture.complete(physicalPlan.getJobStatus());
-        });
+        }));
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
@@ -184,13 +187,17 @@ public class JobMaster extends Thread {
     public void handleCheckpointTimeout(long pipelineId) {
         this.physicalPlan.getPipelineList().forEach(pipeline -> {
             if (pipeline.getPipelineLocation().getPipelineId() == pipelineId) {
-                LOGGER.warning(String.format("%s checkpoint timeout, cancel the pipeline", pipeline.getPipelineFullName()));
+                LOGGER.warning(
+                    String.format("%s checkpoint timeout, cancel the pipeline", pipeline.getPipelineFullName()));
                 pipeline.cancelPipeline();
             }
         });
     }
 
     public PassiveCompletableFuture<Void> reSchedulerPipeline(SubPlan subPlan) {
+        if (jobScheduler == null) {
+            jobScheduler = new PipelineBaseScheduler(physicalPlan, this);
+        }
         return new PassiveCompletableFuture<>(jobScheduler.reSchedulerPipeline(subPlan));
     }
 
@@ -279,6 +286,12 @@ public class JobMaster extends Thread {
         ownedSlotProfilesIMap.put(pipelineLocation, pipelineOwnedSlotProfiles);
     }
 
+    public SlotProfile getOwnedSlotProfiles(@NonNull TaskGroupLocation taskGroupLocation) {
+        return ownedSlotProfilesIMap.get(
+                new PipelineLocation(taskGroupLocation.getJobId(), taskGroupLocation.getPipelineId()))
+            .get(taskGroupLocation);
+    }
+
     public CompletableFuture<Void> getScheduleFuture() {
         return scheduleFuture;
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index aac24d732..e13c02807 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -54,6 +54,8 @@ public abstract class AbstractResourceManager implements ResourceManager {
 
     private final ExecutionMode mode = ExecutionMode.LOCAL;
 
+    private volatile boolean isRunning = true;
+
     public AbstractResourceManager(NodeEngine nodeEngine) {
         this.registerWorker = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RESOURCE_MANAGER_REGISTER_WORKER);
         this.nodeEngine = nodeEngine;
@@ -66,14 +68,17 @@ public abstract class AbstractResourceManager implements ResourceManager {
 
     private void checkRegisterWorkerStillAlive() {
         if (!registerWorker.isEmpty()) {
-            List<Address> aliveWorker = nodeEngine.getClusterService().getMembers().stream().map(Member::getAddress).collect(Collectors.toList());
-            List<Address> dead = registerWorker.keySet().stream().filter(r -> !aliveWorker.contains(r)).collect(Collectors.toList());
+            List<Address> aliveWorker = nodeEngine.getClusterService().getMembers().stream().map(Member::getAddress)
+                .collect(Collectors.toList());
+            List<Address> dead =
+                registerWorker.keySet().stream().filter(r -> !aliveWorker.contains(r)).collect(Collectors.toList());
             dead.forEach(registerWorker::remove);
         }
     }
 
     @Override
-    public CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile resourceProfile) throws NoEnoughResourceException {
+    public CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile resourceProfile)
+        throws NoEnoughResourceException {
         CompletableFuture<SlotProfile> completableFuture = new CompletableFuture<>();
         applyResources(jobId, Collections.singletonList(resourceProfile)).whenComplete((profile, error) -> {
             if (error != null) {
@@ -89,7 +94,7 @@ public abstract class AbstractResourceManager implements ResourceManager {
         if (ExecutionMode.LOCAL.equals(mode)) {
             // Local mode, should wait worker(master node) register.
             try {
-                while (registerWorker.isEmpty()) {
+                while (registerWorker.isEmpty() && isRunning) {
                     LOGGER.info("waiting current worker register to resource manager...");
                     Thread.sleep(DEFAULT_WORKER_CHECK_INTERVAL);
                 }
@@ -108,7 +113,8 @@ public abstract class AbstractResourceManager implements ResourceManager {
 
     @Override
     public CompletableFuture<List<SlotProfile>> applyResources(long jobId,
-                                                               List<ResourceProfile> resourceProfile) throws NoEnoughResourceException {
+                                                               List<ResourceProfile> resourceProfile)
+        throws NoEnoughResourceException {
         waitingWorkerRegister();
         return new ResourceRequestHandler(jobId, resourceProfile, registerWorker, this).request();
     }
@@ -123,11 +129,13 @@ public abstract class AbstractResourceManager implements ResourceManager {
      * @param resourceProfiles the worker should have resource profile list
      */
     protected void findNewWorker(List<ResourceProfile> resourceProfiles) {
-        throw new UnsupportedOperationException("Unsupported operation to find new worker in " + this.getClass().getName());
+        throw new UnsupportedOperationException(
+            "Unsupported operation to find new worker in " + this.getClass().getName());
     }
 
     @Override
     public void close() {
+        isRunning = false;
     }
 
     protected <E> InvocationFuture<E> sendToMember(Operation operation, Address address) {
@@ -162,7 +170,8 @@ public abstract class AbstractResourceManager implements ResourceManager {
 
     @Override
     public boolean slotActiveCheck(SlotProfile profile) {
-        return registerWorker.values().stream().flatMap(workerProfile -> Arrays.stream(workerProfile.getAssignedSlots()))
+        return registerWorker.values().stream()
+            .flatMap(workerProfile -> Arrays.stream(workerProfile.getAssignedSlots()))
             .anyMatch(s -> s.getSlotID() == profile.getSlotID());
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
index 6b6b21ff4..b45306645 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.server.resourcemanager;
 
+import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+
 import org.apache.seatunnel.engine.common.runtime.DeployType;
 import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
@@ -83,7 +85,7 @@ public class ResourceRequestHandler {
             }
         }
         // all resource preCheck done, also had sent request to worker
-        getAllOfFuture(allRequestFuture).whenComplete((unused, error) -> {
+        getAllOfFuture(allRequestFuture).whenComplete(withTryCatch(LOGGER, (unused, error) -> {
             if (error != null) {
                 completeRequestWithException(error);
             }
@@ -95,7 +97,7 @@ public class ResourceRequestHandler {
                     completeRequestWithException(new NoEnoughResourceException("can't apply resource request: " + resourceProfile.get(findNullIndexInResultSlotProfiles())));
                 }
             }
-        });
+        }));
         return completableFuture;
     }
 
@@ -129,7 +131,7 @@ public class ResourceRequestHandler {
     private CompletableFuture<SlotAndWorkerProfile> singleResourceRequestToMember(int i, ResourceProfile r, WorkerProfile workerProfile) {
         InvocationFuture<SlotAndWorkerProfile> future = resourceManager.sendToMember(new RequestSlotOperation(jobId, r), workerProfile.getAddress());
         return future.whenComplete(
-            (slotAndWorkerProfile, error) -> {
+            withTryCatch(LOGGER, (slotAndWorkerProfile, error) -> {
                 if (error != null) {
                     throw new RuntimeException(error);
                 } else {
@@ -137,7 +139,7 @@ public class ResourceRequestHandler {
                     addSlotToCacheMap(i, slotAndWorkerProfile.getSlotProfile());
                 }
             }
-        );
+        ));
     }
 
     private Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) {
@@ -169,7 +171,7 @@ public class ResourceRequestHandler {
             }
         }
         resourceManager.findNewWorker(needApplyResource);
-        resourceManager.applyResources(jobId, needApplyResource).whenComplete((s, e) -> {
+        resourceManager.applyResources(jobId, needApplyResource).whenComplete(withTryCatch(LOGGER, (s, e) -> {
             if (e != null) {
                 completeRequestWithException(e);
                 return;
@@ -177,7 +179,7 @@ public class ResourceRequestHandler {
             for (int i = 0; i < s.size(); i++) {
                 addSlotToCacheMap(needApplyIndex.get(i), s.get(i));
             }
-        });
+        }));
     }
 
     private void releaseAllResourceInternal() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index b2791ddf0..fc9a06d30 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.engine.server.service.slot;
 
-import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.TaskExecutionService;
@@ -68,7 +67,8 @@ public class DefaultSlotService implements SlotService {
     private final TaskExecutionService taskExecutionService;
     private ConcurrentMap<Integer, SlotContext> contexts;
 
-    public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService, boolean dynamicSlot, int slotNumber) {
+    public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService, boolean dynamicSlot,
+                              int slotNumber) {
         this.nodeEngine = nodeEngine;
         this.dynamicSlot = dynamicSlot;
         this.taskExecutionService = taskExecutionService;
@@ -92,14 +92,13 @@ public class DefaultSlotService implements SlotService {
         unassignedResource.set(getNodeResource());
         scheduledExecutorService.scheduleAtFixedRate(() -> {
             try {
-                RetryUtils.retryWithException(() -> {
-                    LOGGER.fine("start send heartbeat to resource manager, this address: " + nodeEngine.getClusterService().getThisAddress());
-                    sendToMaster(new WorkerHeartbeatOperation(toWorkerProfile())).join();
-                    return null;
-                }, new RetryUtils.RetryMaterial(HEARTBEAT_RETRY_TIME, true, e -> true, DEFAULT_HEARTBEAT_TIMEOUT));
+                LOGGER.fine("start send heartbeat to resource manager, this address: " +
+                    nodeEngine.getClusterService().getThisAddress());
+                sendToMaster(new WorkerHeartbeatOperation(toWorkerProfile())).join();
             } catch (Exception e) {
-                LOGGER.severe(e);
-                LOGGER.severe("failed send heartbeat to resource manager, will retry later. this address: " + nodeEngine.getClusterService().getThisAddress());
+                LOGGER.warning(e);
+                LOGGER.warning("failed send heartbeat to resource manager, will retry later. this address: " +
+                    nodeEngine.getClusterService().getThisAddress());
             }
         }, 0, DEFAULT_HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);
     }
@@ -127,7 +126,8 @@ public class DefaultSlotService implements SlotService {
             unassignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::subtract);
             unassignedSlots.remove(profile.getSlotID());
             assignedSlots.put(profile.getSlotID(), profile);
-            contexts.computeIfAbsent(profile.getSlotID(), p -> new SlotContext(profile.getSlotID(), taskExecutionService));
+            contexts.computeIfAbsent(profile.getSlotID(),
+                p -> new SlotContext(profile.getSlotID(), taskExecutionService));
         }
         return new SlotAndWorkerProfile(toWorkerProfile(), profile);
     }
@@ -148,7 +148,7 @@ public class DefaultSlotService implements SlotService {
 
         if (assignedSlots.get(profile.getSlotID()).getOwnerJobID() != jobId) {
             throw new WrongTargetSlotException(String.format("The profile %s not belong with job %d",
-                    assignedSlots.get(profile.getSlotID()), jobId));
+                assignedSlots.get(profile.getSlotID()), jobId));
         }
 
         assignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::subtract);
@@ -163,7 +163,9 @@ public class DefaultSlotService implements SlotService {
 
     @Override
     public void close() {
-        scheduledExecutorService.shutdown();
+        if (scheduledExecutorService != null) {
+            scheduledExecutorService.shutdownNow();
+        }
     }
 
     private SlotProfile selectBestMatchSlot(ResourceProfile profile) {
@@ -176,14 +178,17 @@ public class DefaultSlotService implements SlotService {
             }
         } else {
             Optional<SlotProfile> result = unassignedSlots.values().stream()
-                    .filter(slot -> slot.getResourceProfile().enoughThan(profile))
-                    .min((slot1, slot2) -> {
-                        if (slot1.getResourceProfile().getHeapMemory().getBytes() != slot2.getResourceProfile().getHeapMemory().getBytes()) {
-                            return slot1.getResourceProfile().getHeapMemory().getBytes() - slot2.getResourceProfile().getHeapMemory().getBytes() >= 0 ? 1 : -1;
-                        } else {
-                            return slot1.getResourceProfile().getCpu().getCore() - slot2.getResourceProfile().getCpu().getCore();
-                        }
-                    });
+                .filter(slot -> slot.getResourceProfile().enoughThan(profile))
+                .min((slot1, slot2) -> {
+                    if (slot1.getResourceProfile().getHeapMemory().getBytes() !=
+                        slot2.getResourceProfile().getHeapMemory().getBytes()) {
+                        return slot1.getResourceProfile().getHeapMemory().getBytes() -
+                            slot2.getResourceProfile().getHeapMemory().getBytes() >= 0 ? 1 : -1;
+                    } else {
+                        return slot1.getResourceProfile().getCpu().getCore() -
+                            slot2.getResourceProfile().getCpu().getCore();
+                    }
+                });
             return result.orElse(null);
         }
         return null;
@@ -193,7 +198,7 @@ public class DefaultSlotService implements SlotService {
         long maxMemory = Runtime.getRuntime().maxMemory();
         for (int i = 0; i < slotNumber; i++) {
             unassignedSlots.put(i, new SlotProfile(nodeEngine.getThisAddress(), i,
-                    new ResourceProfile(CPU.of(0), Memory.of(maxMemory / slotNumber))));
+                new ResourceProfile(CPU.of(0), Memory.of(maxMemory / slotNumber))));
         }
     }
 
@@ -211,7 +216,8 @@ public class DefaultSlotService implements SlotService {
     }
 
     public <E> InvocationFuture<E> sendToMaster(Operation operation) {
-        InvocationBuilder invocationBuilder = nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME, operation, nodeEngine.getMasterAddress());
+        InvocationBuilder invocationBuilder = nodeEngine.getOperationService()
+            .createInvocationBuilder(SeaTunnelServer.SERVICE_NAME, operation, nodeEngine.getMasterAddress());
         return invocationBuilder.invoke();
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index 403bbd128..af945ee23 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -17,13 +17,17 @@
 
 package org.apache.seatunnel.engine.server;
 
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.spi.impl.NodeEngine;
+import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestInstance;
 
+@Slf4j
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public abstract class AbstractSeaTunnelServerTest {
 
@@ -36,7 +40,7 @@ public abstract class AbstractSeaTunnelServerTest {
     protected static ILogger LOGGER;
 
     @BeforeAll
-    public  void before() {
+    public void before() {
         instance = SeaTunnelServerStarter.createHazelcastInstance(
             TestUtils.getClusterName("AbstractSeaTunnelServerTest_" + System.currentTimeMillis()));
         nodeEngine = instance.node.nodeEngine;
@@ -46,7 +50,16 @@ public abstract class AbstractSeaTunnelServerTest {
 
     @AfterAll
     public void after() {
-        server.shutdown(true);
-        instance.shutdown();
+        try {
+            if (server != null) {
+                server.shutdown(true);
+            }
+
+            if (instance != null) {
+                instance.shutdown();
+            }
+        } catch (Exception e) {
+            log.error(ExceptionUtils.getMessage(e));
+        }
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index 1397206fe..b84225793 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -47,6 +47,7 @@ public class CoordinatorServiceTest {
         SeaTunnelServer server1 = instance1.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
         SeaTunnelServer server2 = instance2.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
 
+        Assertions.assertTrue(server1.isMasterNode());
         CoordinatorService coordinatorService1 = server1.getCoordinatorService();
         Assertions.assertTrue(coordinatorService1.isCoordinatorActive());
 
@@ -62,14 +63,13 @@ public class CoordinatorServiceTest {
         await().atMost(20000, TimeUnit.MILLISECONDS)
             .untilAsserted(() -> {
                 try {
+                    Assertions.assertTrue(server2.isMasterNode());
                     CoordinatorService coordinatorService = server2.getCoordinatorService();
-                    Assertions.assertTrue(coordinatorService.isMasterNode());
+                    Assertions.assertTrue(coordinatorService.isCoordinatorActive());
                 } catch (SeaTunnelEngineException e) {
                     Assertions.assertTrue(false);
                 }
             });
-        CoordinatorService coordinatorService2 = server2.getCoordinatorService();
-        Assertions.assertTrue(coordinatorService2.isCoordinatorActive());
         instance2.shutdown();
     }
 
@@ -107,10 +107,65 @@ public class CoordinatorServiceTest {
 
         // because runningJobMasterMap is empty and we have no JobHistoryServer, so return finished.
         Assertions.assertTrue(JobStatus.FINISHED.equals(coordinatorService.getJobStatus(jobId)));
+        coordinatorServiceTest.shutdown();
     }
 
     @Test
-    public void testJobRestoreWhenMasterNodeSwitch() {
-        // TODO wait CheckpointManager support restore from master node switch.
+    public void testJobRestoreWhenMasterNodeSwitch() throws InterruptedException {
+        HazelcastInstanceImpl instance1 = SeaTunnelServerStarter.createHazelcastInstance(
+            TestUtils.getClusterName("CoordinatorServiceTest_testJobRestoreWhenMasterNodeSwitch"));
+        HazelcastInstanceImpl instance2 = SeaTunnelServerStarter.createHazelcastInstance(
+            TestUtils.getClusterName("CoordinatorServiceTest_testJobRestoreWhenMasterNodeSwitch"));
+
+        SeaTunnelServer server1 = instance1.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+        SeaTunnelServer server2 = instance2.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+
+        CoordinatorService coordinatorService = server1.getCoordinatorService();
+        Assertions.assertTrue(coordinatorService.isCoordinatorActive());
+
+        Long jobId = instance1.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+        LogicalDag testLogicalDag =
+            TestUtils.createTestLogicalPlan("stream_fakesource_to_file.conf", "testJobRestoreWhenMasterNodeSwitch",
+                jobId);
+
+        JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(jobId,
+            instance1.getSerializationService().toData(testLogicalDag), testLogicalDag.getJobConfig(),
+            Collections.emptyList());
+
+        Data data = instance1.getSerializationService().toData(jobImmutableInformation);
+
+        coordinatorService.submitJob(jobId, data).join();
+
+        // waiting for job status turn to running
+        await().atMost(20000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertEquals(JobStatus.RUNNING, coordinatorService.getJobStatus(jobId)));
+
+        // test master node shutdown
+        instance1.shutdown();
+        await().atMost(20000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> {
+                try {
+                    Assertions.assertTrue(server2.isMasterNode());
+                    Assertions.assertTrue(server2.getCoordinatorService().isCoordinatorActive());
+                } catch (SeaTunnelEngineException e) {
+                    Assertions.assertTrue(false);
+                }
+            });
+
+        // wait job restore
+        Thread.sleep(5000);
+
+        // job will recovery running state
+        await().atMost(200000, TimeUnit.MILLISECONDS)
+            .untilAsserted(
+                () -> Assertions.assertEquals(JobStatus.RUNNING, server2.getCoordinatorService().getJobStatus(jobId)));
+
+        server2.getCoordinatorService().cancelJob(jobId);
+
+        // because runningJobMasterMap is empty and we have no JobHistoryServer, so return finished.
+        await().atMost(200000, TimeUnit.MILLISECONDS)
+            .untilAsserted(
+                () -> Assertions.assertEquals(JobStatus.FINISHED, server2.getCoordinatorService().getJobStatus(jobId)));
+        instance2.shutdown();
     }
 }