You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/09/08 12:37:42 UTC

[incubator-seatunnel] branch st-engine updated: [Feature][ST-Engine] Add handle checkpoint timeout (#2667)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 0ffc40fe8 [Feature][ST-Engine] Add handle checkpoint timeout (#2667)
0ffc40fe8 is described below

commit 0ffc40fe8456cc5e5646e708f47567a06b2539b0
Author: Eric <ga...@gmail.com>
AuthorDate: Thu Sep 8 20:37:37 2022 +0800

    [Feature][ST-Engine] Add handle checkpoint timeout (#2667)
    
    * Add Checkpoint Timeout Handle
    
    * trigger ci
    
    * fix ci error
    
    * fix ci error
    
    * fix review problem
    
    * Update seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
    
    Co-authored-by: Zongwen Li <zo...@gmail.com>
    
    * fix resource relase error
    
    * fix resource relase error
    
    * fix resource relase error
    
    Co-authored-by: Zongwen Li <zo...@gmail.com>
---
 .../engine/client/JobConfigParserTest.java         |   1 -
 .../engine/server/dag/physical/PhysicalPlan.java   | 132 +++++++++----
 .../engine/server/dag/physical/PhysicalVertex.java |  39 +++-
 .../engine/server/dag/physical/SubPlan.java        |  69 ++++---
 .../seatunnel/engine/server/master/JobMaster.java  |  58 +++++-
 .../engine/server/scheduler/JobScheduler.java      |  11 +-
 .../server/scheduler/PipelineBaseScheduler.java    | 206 +++++++++++++--------
 .../operation/GetTaskGroupAddressOperation.java    |   2 +-
 .../apache/seatunnel/engine/server/TestUtils.java  |  64 +++++++
 .../seatunnel/engine/server/dag/TaskTest.java      |  28 +--
 .../engine/server/master/JobMasterTest.java        |  93 ++++++++++
 11 files changed, 513 insertions(+), 190 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
index 39231a6df..a04932eeb 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -46,7 +46,6 @@ public class JobConfigParserTest {
         ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
         List<Action> actions = parse.getLeft();
         Assert.assertEquals(1, actions.size());
-
         Assert.assertEquals("LocalFile", actions.get(0).getName());
         Assert.assertEquals(1, actions.get(0).getUpstream().size());
         Assert.assertEquals("FakeSource", actions.get(0).getUpstream().get(0).getName());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index e55b0da65..296ad2e1f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -22,12 +22,15 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineState;
+import org.apache.seatunnel.engine.server.master.JobMaster;
 
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import lombok.NonNull;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -50,6 +53,11 @@ public class PhysicalPlan {
 
     private final JobImmutableInformation jobImmutableInformation;
 
+    /**
+     * If the job or pipeline cancel by user, needRestore will be false
+     **/
+    private volatile boolean needRestore = true;
+
     /**
      * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
      * execution graph transitioned into a certain state. The index into this array is the ordinal
@@ -68,6 +76,15 @@ public class PhysicalPlan {
 
     private final String jobFullName;
 
+    private JobMaster jobMaster;
+
+    private final Map<Integer, CompletableFuture> pipelineSchedulerFutureMap;
+
+    /**
+     * Whether we make the job end when pipeline turn to end state.
+     */
+    private boolean makeJobEndWhenPipelineEnded = true;
+
     public PhysicalPlan(@NonNull List<SubPlan> pipelineList,
                         @NonNull ExecutorService executorService,
                         @NonNull JobImmutableInformation jobImmutableInformation,
@@ -85,53 +102,77 @@ public class PhysicalPlan {
         }
         this.jobFullName = String.format("Job %s (%s)", jobImmutableInformation.getJobConfig().getName(),
             jobImmutableInformation.getJobId());
+
+        pipelineSchedulerFutureMap = new HashMap<>(pipelineList.size());
+    }
+
+    public void setJobMaster(JobMaster jobMaster) {
+        this.jobMaster = jobMaster;
     }
 
     public void initStateFuture() {
-        pipelineList.forEach(subPlan -> {
-            PassiveCompletableFuture<PipelineState> future = subPlan.initStateFuture();
-            future.whenComplete((v, t) -> {
-                // We need not handle t, Because we will not return t from Pipeline
-                if (PipelineState.CANCELED.equals(v)) {
+        pipelineList.forEach(subPlan -> addPipelineEndCallback(subPlan));
+    }
+
+    private void addPipelineEndCallback(SubPlan subPlan) {
+        PassiveCompletableFuture<PipelineState> future = subPlan.initStateFuture();
+        future.thenAcceptAsync(pipelineState -> {
+            try {
+                if (PipelineState.CANCELED.equals(pipelineState)) {
+                    if (needRestore) {
+                        restorePipeline(subPlan);
+                        return;
+                    }
                     canceledPipelineNum.incrementAndGet();
-                } else if (PipelineState.FAILED.equals(v)) {
-                    LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
-                    failedPipelineNum.incrementAndGet();
-                    cancelJob();
-                } else if (!PipelineState.FINISHED.equals(v)) {
-                    LOGGER.severe(
-                        "Pipeline Failed with Unknown PipelineState, Begin to cancel other pipelines in this job.");
+                    if (makeJobEndWhenPipelineEnded) {
+                        LOGGER.info(
+                            String.format("cancel job %s because makeJobEndWhenPipelineEnded is %s", jobFullName,
+                                makeJobEndWhenPipelineEnded));
+                        cancelJob();
+                    }
+                    LOGGER.info(String.format("release the pipeline %s resource", subPlan.getPipelineFullName()));
+                    jobMaster.releasePipelineResource(subPlan.getPipelineId());
+                } else if (PipelineState.FAILED.equals(pipelineState)) {
+                    if (needRestore) {
+                        restorePipeline(subPlan);
+                        return;
+                    }
                     failedPipelineNum.incrementAndGet();
-                    cancelJob();
+                    if (makeJobEndWhenPipelineEnded) {
+                        cancelJob();
+                    }
+                    jobMaster.releasePipelineResource(subPlan.getPipelineId());
+                    LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
                 }
+            } catch (Throwable e) {
+                // Because only cancelJob or releasePipelineResource can throw exception, so we only output log here
+                LOGGER.severe(ExceptionUtils.getMessage(e));
+            }
 
-                if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
-                    if (failedPipelineNum.get() > 0) {
-                        updateJobState(JobStatus.FAILING);
-                    } else if (canceledPipelineNum.get() > 0) {
-                        turnToEndState(JobStatus.CANCELED);
-                    } else {
-                        turnToEndState(JobStatus.FINISHED);
-                    }
-                    jobEndFuture.complete(jobStatus.get());
+            if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
+                if (failedPipelineNum.get() > 0) {
+                    updateJobState(JobStatus.FAILING);
+                } else if (canceledPipelineNum.get() > 0) {
+                    turnToEndState(JobStatus.CANCELED);
+                } else {
+                    turnToEndState(JobStatus.FINISHED);
                 }
-            });
+                jobEndFuture.complete(jobStatus.get());
+            }
         });
     }
 
     public void cancelJob() {
-        if (!updateJobState(JobStatus.CREATED, JobStatus.CANCELED)) {
-            // may be running, failing, failed, canceling , canceled, finished
-            if (updateJobState(JobStatus.RUNNING, JobStatus.CANCELLING)) {
-                cancelRunningJob();
-            } else {
-                LOGGER.info(
-                    String.format("%s in a non cancellable state: %s, skip cancel", jobFullName, jobStatus.get()));
-            }
+        if (jobStatus.get().isEndState()) {
+            LOGGER.warning(String.format("%s is in end state %s, can not be cancel", jobFullName, jobStatus.get()));
+            return;
         }
+
+        updateJobState(jobStatus.get(), JobStatus.CANCELLING);
+        cancelJobPipelines();
     }
 
-    private void cancelRunningJob() {
+    private void cancelJobPipelines() {
         List<CompletableFuture<Void>> collect = pipelineList.stream().map(pipeline -> {
             if (!pipeline.getPipelineState().get().isEndState() &&
                 !PipelineState.CANCELING.equals(pipeline.getPipelineState().get())) {
@@ -176,6 +217,7 @@ public class PhysicalPlan {
             throw new IllegalStateException(message);
         }
 
+        LOGGER.info(String.format("%s turn to end state %s", jobFullName, endState));
         jobStatus.set(endState);
         stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
     }
@@ -207,6 +249,28 @@ public class PhysicalPlan {
         }
     }
 
+    private void restorePipeline(SubPlan subPlan) {
+        try {
+            LOGGER.info(String.format("Restore pipeline %s", subPlan.getPipelineFullName()));
+            // We must ensure the scheduler complete and then can handle pipeline state change.
+            jobMaster.getScheduleFuture().join();
+
+            if (pipelineSchedulerFutureMap.get(subPlan.getPipelineId()) != null) {
+                pipelineSchedulerFutureMap.get(subPlan.getPipelineId()).join();
+            }
+            subPlan.reset();
+            addPipelineEndCallback(subPlan);
+            pipelineSchedulerFutureMap.put(subPlan.getPipelineId(), jobMaster.reSchedulerPipeline(subPlan));
+            if (pipelineSchedulerFutureMap.get(subPlan.getPipelineId()) != null) {
+                pipelineSchedulerFutureMap.get(subPlan.getPipelineId()).join();
+            }
+        } catch (Throwable e) {
+            LOGGER.severe(String.format("Restore pipeline %s error with exception: %s", subPlan.getPipelineFullName(),
+                ExceptionUtils.getMessage(e)));
+            subPlan.cancelPipeline();
+        }
+    }
+
     public PassiveCompletableFuture<JobStatus> getJobEndCompletableFuture() {
         return new PassiveCompletableFuture<>(jobEndFuture);
     }
@@ -222,4 +286,8 @@ public class PhysicalPlan {
     public String getJobFullName() {
         return jobFullName;
     }
+
+    public void neverNeedRestore() {
+        this.needRestore = false;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 399c4850d..b90e21b8a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -85,7 +85,7 @@ public class PhysicalVertex {
      * When PhysicalVertex status turn to end, complete this future. And then the waitForCompleteByPhysicalVertex
      * in {@link SubPlan} whenComplete method will be called.
      */
-    private final CompletableFuture<TaskExecutionState> taskFuture;
+    private CompletableFuture<TaskExecutionState> taskFuture;
 
     /**
      * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
@@ -144,6 +144,7 @@ public class PhysicalVertex {
     }
 
     public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
+        this.taskFuture = new CompletableFuture<>();
         return new PassiveCompletableFuture<>(this.taskFuture);
     }
 
@@ -229,18 +230,17 @@ public class PhysicalVertex {
             this.pluginJarsUrls);
     }
 
-    private void turnToEndState(@NonNull ExecutionState endState) {
+    private boolean turnToEndState(@NonNull ExecutionState endState) {
         // consistency check
         if (executionState.get().isEndState()) {
-            String message = "Task is trying to leave terminal state " + executionState.get();
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
+            String message = String.format("Task %s is already in terminal state %s", taskFullName, executionState.get());
+            LOGGER.warning(message);
+            return false;
         }
-
         if (!endState.isEndState()) {
-            String message = "Need a end state, not " + endState;
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
+            String message = String.format("Turn task %s state to end state need gave a end state, not %s", taskFullName, endState);
+            LOGGER.warning(message);
+            return false;
         }
 
         LOGGER.info(String.format("%s turn to end state %s.",
@@ -248,6 +248,7 @@ public class PhysicalVertex {
             endState));
         executionState.set(endState);
         stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
+        return true;
     }
 
     public boolean updateTaskState(@NonNull ExecutionState current, @NonNull ExecutionState targetState) {
@@ -329,6 +330,22 @@ public class PhysicalVertex {
         }
     }
 
+    private void resetExecutionState() {
+        if (!executionState.get().isEndState()) {
+            String message =
+                String.format("%s reset state failed, only end state can be reset, current is %s", getTaskFullName(),
+                    executionState.get());
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+        executionState.set(ExecutionState.CREATED);
+        stateTimestamps[ExecutionState.CREATED.ordinal()] = System.currentTimeMillis();
+    }
+
+    public void reset() {
+        resetExecutionState();
+    }
+
     public AtomicReference<ExecutionState> getExecutionState() {
         return executionState;
     }
@@ -338,7 +355,9 @@ public class PhysicalVertex {
     }
 
     public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
-        turnToEndState(taskExecutionState.getExecutionState());
+        if (!turnToEndState(taskExecutionState.getExecutionState())) {
+            return;
+        }
         if (taskExecutionState.getThrowable() != null) {
             LOGGER.severe(String.format("%s end with state %s and Exception: %s",
                 this.taskFullName,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 7df5074b7..ff7264636 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 public class SubPlan {
@@ -71,7 +70,7 @@ public class SubPlan {
      * Complete this future when this sub plan complete. When this future completed, the waitForCompleteBySubPlan in {@link PhysicalPlan }
      * whenComplete method will be called.
      */
-    private final CompletableFuture<PipelineState> pipelineFuture;
+    private CompletableFuture<PipelineState> pipelineFuture;
 
     private final ExecutorService executorService;
 
@@ -111,20 +110,21 @@ public class SubPlan {
             addPhysicalVertexCallBack(m.initStateFuture());
         });
 
+        this.pipelineFuture = new CompletableFuture<>();
         return new PassiveCompletableFuture<>(pipelineFuture);
     }
 
-    private void addPhysicalVertexCallBack(CompletableFuture<TaskExecutionState> future) {
-        future.whenComplete((v, t) -> {
+    private void addPhysicalVertexCallBack(PassiveCompletableFuture<TaskExecutionState> future) {
+        future.thenAcceptAsync(executionState -> {
             // We need not handle t, Because we will not return t from PhysicalVertex
-            if (ExecutionState.CANCELED.equals(v.getExecutionState())) {
+            if (ExecutionState.CANCELED.equals(executionState.getExecutionState())) {
                 canceledTaskNum.incrementAndGet();
-            } else if (ExecutionState.FAILED.equals(v.getExecutionState())) {
+            } else if (ExecutionState.FAILED.equals(executionState.getExecutionState())) {
                 LOGGER.severe(String.format("Task Failed in %s, Begin to cancel other tasks in this pipeline.",
                     this.getPipelineFullName()));
                 failedTaskNum.incrementAndGet();
                 cancelPipeline();
-            } else if (!ExecutionState.FINISHED.equals(v.getExecutionState())) {
+            } else if (!ExecutionState.FINISHED.equals(executionState.getExecutionState())) {
                 LOGGER.severe(String.format(
                     "Task Failed in %s, with Unknown ExecutionState, Begin to cancel other tasks in this pipeline.",
                     this.getPipelineFullName()));
@@ -148,10 +148,6 @@ public class SubPlan {
         });
     }
 
-    public void whenComplete(BiConsumer<? super PipelineState, ? super Throwable> action) {
-        this.pipelineFuture.whenComplete(action);
-    }
-
     private void turnToEndState(@NonNull PipelineState endState) {
         // consistency check
         if (pipelineState.get().isEndState()) {
@@ -170,6 +166,18 @@ public class SubPlan {
         stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
     }
 
+    private void resetPipelineState() {
+        if (!pipelineState.get().isEndState()) {
+            String message = String.format("%s reset state failed, only end state can be reset, current is %s",
+                getPipelineFullName(), pipelineState.get());
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        pipelineState.set(PipelineState.CREATED);
+        stateTimestamps[PipelineState.CREATED.ordinal()] = System.currentTimeMillis();
+    }
+
     public boolean updatePipelineState(@NonNull PipelineState current, @NonNull PipelineState targetState) {
         // consistency check
         if (current.isEndState()) {
@@ -211,23 +219,15 @@ public class SubPlan {
     }
 
     public void cancelPipeline() {
-        if (!updatePipelineState(PipelineState.CREATED, PipelineState.CANCELED) &&
-            !updatePipelineState(PipelineState.SCHEDULED, PipelineState.CANCELED)) {
-            // may be deploying, running, failed, canceling , canceled, finished
-            if (updatePipelineState(PipelineState.DEPLOYING, PipelineState.CANCELING) ||
-                updatePipelineState(PipelineState.RUNNING, PipelineState.CANCELING)) {
-                cancelRunningPipeline();
-            } else {
-                LOGGER.info(
-                    String.format("%s in a non cancellable state: %s, skip cancel", pipelineFullName,
-                        pipelineState.get()));
-            }
-        } else {
-            pipelineFuture.complete(PipelineState.CANCELED);
+        if (pipelineState.get().isEndState()) {
+            LOGGER.warning(String.format("%s is in end state %s, can not be cancel", pipelineFullName, pipelineState.get()));
+            return;
         }
+        updatePipelineState(pipelineState.get(), PipelineState.CANCELING);
+        cancelPipelineTasks();
     }
 
-    private void cancelRunningPipeline() {
+    private void cancelPipelineTasks() {
         List<CompletableFuture<Void>> coordinatorCancelList =
             coordinatorVertexList.stream().map(coordinator -> cancelTask(coordinator)).filter(x -> x != null)
                 .collect(Collectors.toList());
@@ -259,9 +259,22 @@ public class SubPlan {
         return null;
     }
 
-    public void failedWithNoEnoughResource() {
-        LOGGER.severe(String.format("%s failed with have no enough resource to run.", this.getPipelineFullName()));
-        cancelPipeline();
+    /**
+     * Before restore a pipeline, the pipeline must do reset
+     */
+    public void reset() {
+        resetPipelineState();
+        finishedTaskNum.set(0);
+        canceledTaskNum.set(0);
+        failedTaskNum.set(0);
+
+        coordinatorVertexList.forEach(coordinate -> {
+            coordinate.reset();
+        });
+
+        physicalVertexList.forEach(task -> {
+            task.reset();
+        });
     }
 
     public int getPipelineId() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index ca91a15f1..c55bc07fc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -31,11 +31,14 @@ import org.apache.seatunnel.engine.server.checkpoint.CheckpointStorageConfigurat
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
 import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
 import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler;
 
+import com.google.common.collect.Lists;
 import com.hazelcast.cluster.Address;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.internal.serialization.Data;
@@ -73,11 +76,15 @@ public class JobMaster implements Runnable {
 
     private JobImmutableInformation jobImmutableInformation;
 
+    private JobScheduler jobScheduler;
     private final Map<Integer, Map<PhysicalVertex, SlotProfile>> ownedSlotProfiles;
 
+    private CompletableFuture<Void> scheduleFuture = new CompletableFuture<>();
+
     public JobMaster(@NonNull Data jobImmutableInformationData,
                      @NonNull NodeEngine nodeEngine,
-                     @NonNull ExecutorService executorService, @NonNull ResourceManager resourceManager) {
+                     @NonNull ExecutorService executorService,
+                     @NonNull ResourceManager resourceManager) {
         this.jobImmutableInformationData = jobImmutableInformationData;
         this.nodeEngine = nodeEngine;
         this.executorService = executorService;
@@ -123,30 +130,53 @@ public class JobMaster implements Runnable {
     @Override
     public void run() {
         try {
+            physicalPlan.setJobMaster(this);
+
             PassiveCompletableFuture<JobStatus> jobStatusPassiveCompletableFuture =
                 physicalPlan.getJobEndCompletableFuture();
 
-            jobStatusPassiveCompletableFuture.whenComplete((v, t) -> {
+            jobStatusPassiveCompletableFuture.thenAcceptAsync(jobStatus -> {
                 // We need not handle t, Because we will not return t from physicalPlan
-                if (JobStatus.FAILING.equals(v)) {
+                if (JobStatus.FAILING.equals(jobStatus)) {
                     cleanJob();
                     physicalPlan.updateJobState(JobStatus.FAILING, JobStatus.FAILED);
                 }
                 jobMasterCompleteFuture.complete(physicalPlan.getJobStatus());
             });
-            ownedSlotProfiles.putAll(new PipelineBaseScheduler(physicalPlan, this).startScheduling());
+            jobScheduler = new PipelineBaseScheduler(physicalPlan, this);
+            scheduleFuture = CompletableFuture.runAsync(() -> jobScheduler.startScheduling(), executorService);
+            LOGGER.info(String.format("Job %s waiting for scheduler finished", physicalPlan.getJobFullName()));
+            scheduleFuture.join();
+            LOGGER.info(String.format("%s scheduler finished", physicalPlan.getJobFullName()));
         } catch (Throwable e) {
             LOGGER.severe(String.format("Job %s (%s) run error with: %s",
                 physicalPlan.getJobImmutableInformation().getJobConfig().getName(),
                 physicalPlan.getJobImmutableInformation().getJobId(),
                 ExceptionUtils.getMessage(e)));
             // try to cancel job
-            physicalPlan.cancelJob();
+            cancelJob();
         } finally {
             jobMasterCompleteFuture.join();
         }
     }
 
+    public void handleCheckpointTimeout(long pipelineId) {
+        this.physicalPlan.getPipelineList().forEach(pipeline -> {
+            if (pipeline.getPipelineId() == pipelineId) {
+                pipeline.cancelPipeline();
+            }
+        });
+    }
+
+    public CompletableFuture<Void> reSchedulerPipeline(SubPlan subPlan) {
+        return jobScheduler.reSchedulerPipeline(subPlan);
+    }
+
+    public void releasePipelineResource(int pipelineId) {
+        resourceManager.releaseResources(jobImmutableInformation.getJobId(),
+            Lists.newArrayList(ownedSlotProfiles.get(pipelineId).values()));
+    }
+
     public void cleanJob() {
         // TODO Add some job clean operation
     }
@@ -154,7 +184,7 @@ public class JobMaster implements Runnable {
     public Address queryTaskGroupAddress(long taskGroupId) {
         for (Integer pipelineId : ownedSlotProfiles.keySet()) {
             Optional<PhysicalVertex> currentVertex = ownedSlotProfiles.get(pipelineId).keySet().stream()
-                .filter(physicalVertex -> physicalVertex.getTaskGroup().getTaskGroupLocation().getTaskGroupId() == taskGroupId)
+                .filter(task -> task.getTaskGroupLocation().getTaskGroupId() == taskGroupId)
                 .findFirst();
             if (currentVertex.isPresent()) {
                 return ownedSlotProfiles.get(pipelineId).get(currentVertex.get()).getWorker();
@@ -164,7 +194,8 @@ public class JobMaster implements Runnable {
     }
 
     public void cancelJob() {
-        this.physicalPlan.cancelJob();
+        physicalPlan.neverNeedRestore();
+        physicalPlan.cancelJob();
     }
 
     public ResourceManager getResourceManager() {
@@ -214,4 +245,17 @@ public class JobMaster implements Runnable {
             });
         });
     }
+
+    public Map<Integer, Map<PhysicalVertex, SlotProfile>> getOwnedSlotProfiles() {
+        return ownedSlotProfiles;
+    }
+
+    public void setOwnedSlotProfiles(@NonNull Integer pipelineId,
+        @NonNull Map<PhysicalVertex, SlotProfile> pipelineOwnedSlotProfiles) {
+        ownedSlotProfiles.put(pipelineId, pipelineOwnedSlotProfiles);
+    }
+
+    public CompletableFuture<Void> getScheduleFuture() {
+        return scheduleFuture;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
index 25aece4a4..dbfcc235e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
@@ -17,11 +17,14 @@
 
 package org.apache.seatunnel.engine.server.scheduler;
 
-import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
-import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 
-import java.util.Map;
+import lombok.NonNull;
+
+import java.util.concurrent.CompletableFuture;
 
 public interface JobScheduler {
-    Map<Integer, Map<PhysicalVertex, SlotProfile>> startScheduling();
+    CompletableFuture<Void> reSchedulerPipeline(@NonNull SubPlan subPlan);
+
+    void startScheduling();
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 1727e06ba..a3d27cd75 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -18,20 +18,18 @@
 package org.apache.seatunnel.engine.server.scheduler;
 
 import org.apache.seatunnel.engine.common.exception.JobException;
-import org.apache.seatunnel.engine.common.exception.JobNoEnoughResourceException;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineState;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
 import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.master.JobMaster;
-import org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 
-import com.google.common.collect.Lists;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import lombok.NonNull;
@@ -41,8 +39,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 public class PipelineBaseScheduler implements JobScheduler {
@@ -61,84 +57,103 @@ public class PipelineBaseScheduler implements JobScheduler {
     }
 
     @Override
-    public Map<Integer, Map<PhysicalVertex, SlotProfile>> startScheduling() {
-        Map<Integer, Map<PhysicalVertex, SlotProfile>> ownedSlotProfiles = new ConcurrentHashMap<>();
+    public void startScheduling() {
         if (physicalPlan.turnToRunning()) {
-            List<CompletableFuture<Object>> collect = physicalPlan.getPipelineList().stream().map(pipeline -> {
-                if (!pipeline.updatePipelineState(PipelineState.CREATED, PipelineState.SCHEDULED)) {
-                    handlePipelineStateUpdateError(pipeline, PipelineState.SCHEDULED);
-                    return null;
-                }
-                Map<PhysicalVertex, SlotProfile> slotProfiles;
-                try {
-                    slotProfiles = applyResourceForPipeline(pipeline);
-                    ownedSlotProfiles.put(pipeline.getPipelineId(), slotProfiles);
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-                pipeline.whenComplete((state, error) -> {
-                    releasePipelineResource(Lists.newArrayList(slotProfiles.values()));
-                });
-                // deploy pipeline
-                return CompletableFuture.supplyAsync(() -> {
-                    // TODO before deploy should check slotProfiles is exist, because it maybe can't use when retry.
-                    deployPipeline(pipeline, slotProfiles);
-                    return null;
-                });
-            }).filter(Objects::nonNull).collect(Collectors.toList());
+            List<CompletableFuture<Void>> collect =
+                physicalPlan.getPipelineList()
+                    .stream()
+                    .map(pipeline -> schedulerPipeline(pipeline))
+                    .filter(Objects::nonNull).collect(Collectors.toList());
             try {
                 CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
                     collect.toArray(new CompletableFuture[0]));
                 voidCompletableFuture.get();
             } catch (Exception e) {
-                // cancel pipeline and throw an exception
-                physicalPlan.cancelJob();
                 throw new RuntimeException(e);
             }
         } else if (!JobStatus.CANCELED.equals(physicalPlan.getJobStatus())) {
             throw new JobException(String.format("%s turn to a unexpected state: %s", physicalPlan.getJobFullName(),
                 physicalPlan.getJobStatus()));
         }
-        return ownedSlotProfiles;
     }
 
-    private void releasePipelineResource(List<SlotProfile> slotProfiles) {
-        if (null == slotProfiles || slotProfiles.isEmpty()) {
-            return;
+    // This method cannot throw an exception
+    private CompletableFuture<Void> schedulerPipeline(SubPlan pipeline) {
+        try {
+            if (!pipeline.updatePipelineState(PipelineState.CREATED, PipelineState.SCHEDULED)) {
+                handlePipelineStateTurnError(pipeline, PipelineState.SCHEDULED);
+                return null;
+            }
+
+            Map<PhysicalVertex, SlotProfile> slotProfiles =
+                getOrApplyResourceForPipeline(pipeline, jobMaster.getOwnedSlotProfiles().get(pipeline.getPipelineId()));
+
+            // To ensure release pipeline resource after new master node active, we need store slotProfiles first and then deploy tasks.
+            jobMaster.setOwnedSlotProfiles(pipeline.getPipelineId(), slotProfiles);
+            // deploy pipeline
+            return CompletableFuture.runAsync(() -> {
+                deployPipeline(pipeline, slotProfiles);
+            });
+        } catch (Exception e) {
+            pipeline.cancelPipeline();
+            return null;
         }
-        resourceManager.releaseResources(jobId, slotProfiles).join();
     }
 
-    private Map<PhysicalVertex, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) throws Exception {
-        try {
-            Map<PhysicalVertex, CompletableFuture<SlotProfile>> futures = new HashMap<>();
-            Map<PhysicalVertex, SlotProfile> slotProfiles = new HashMap<>();
-            // TODO If there is no enough resources for tasks, we need add some wait profile
-            subPlan.getCoordinatorVertexList().forEach(coordinator -> futures.put(coordinator, applyResourceForTask(coordinator)));
-
-            subPlan.getPhysicalVertexList().forEach(task -> futures.put(task, applyResourceForTask(task)));
-
-            for (Map.Entry<PhysicalVertex, CompletableFuture<SlotProfile>> future : futures.entrySet()) {
-                try {
-                    slotProfiles.put(future.getKey(), future.getValue().get());
-                } catch (NoEnoughResourceException e) {
-                    // TODO custom exception with pipelineID, jobName etc.
-                    throw new JobNoEnoughResourceException("No enough resource to execute pipeline", e);
-                }
+    private Map<PhysicalVertex, SlotProfile> getOrApplyResourceForPipeline(@NonNull SubPlan pipeline,
+                                                                           Map<PhysicalVertex, SlotProfile> ownedSlotProfiles) {
+        if (ownedSlotProfiles == null || ownedSlotProfiles.isEmpty()) {
+            return applyResourceForPipeline(pipeline);
+        }
+
+        // TODO ensure the slots still exist and is owned by this pipeline
+        for (Map.Entry<PhysicalVertex, SlotProfile> entry : ownedSlotProfiles.entrySet()) {
+            if (entry.getValue() == null) {
+                ownedSlotProfiles.put(entry.getKey(), applyResourceForTask(entry.getKey()).join());
+            } else {
+                entry.getKey().updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
             }
-            return slotProfiles;
-        } catch (JobNoEnoughResourceException | ExecutionException | InterruptedException e) {
-            LOGGER.severe(e);
-            throw e;
         }
+        return ownedSlotProfiles;
+    }
+
+    private Map<PhysicalVertex, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) {
+        Map<PhysicalVertex, CompletableFuture<SlotProfile>> futures = new HashMap<>();
+        Map<PhysicalVertex, SlotProfile> slotProfiles = new HashMap<>();
+        // TODO If there is no enough resources for tasks, we need add some wait profile
+        subPlan.getCoordinatorVertexList()
+            .forEach(
+                coordinator -> futures.put(coordinator, applyResourceForTask(coordinator)));
+
+        subPlan.getPhysicalVertexList()
+            .forEach(task -> futures.put(task, applyResourceForTask(task)));
+
+        for (Map.Entry<PhysicalVertex, CompletableFuture<SlotProfile>> future : futures.entrySet()) {
+            slotProfiles.put(future.getKey(),
+                future.getValue() == null ? null : future.getValue().join());
+        }
+        return slotProfiles;
     }
 
     private CompletableFuture<SlotProfile> applyResourceForTask(PhysicalVertex task) {
-        if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
-            // TODO custom resource size
-            return resourceManager.applyResource(jobId, new ResourceProfile());
-        } else {
-            handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
+        try {
+            if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
+                // TODO custom resource size
+                return resourceManager.applyResource(jobId, new ResourceProfile());
+            } else if (ExecutionState.CANCELING.equals(task.getExecutionState().get()) ||
+                ExecutionState.CANCELED.equals(task.getExecutionState().get())) {
+                LOGGER.info(
+                    String.format("%s be canceled, skip %s this task.", task.getTaskFullName(),
+                        ExecutionState.SCHEDULED));
+                return null;
+            } else {
+                makeTaskFailed(task,
+                    new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job.",
+                        task.getTaskFullName(), task.getExecutionState().get())));
+                return null;
+            }
+        } catch (Throwable e) {
+            makeTaskFailed(task, e);
             return null;
         }
     }
@@ -146,14 +161,23 @@ public class PipelineBaseScheduler implements JobScheduler {
     private CompletableFuture<Void> deployTask(PhysicalVertex task, SlotProfile slotProfile) {
         if (task.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
             // deploy is a time-consuming operation, so we do it async
-            return CompletableFuture.supplyAsync(() -> {
+            return CompletableFuture.runAsync(() -> {
                 task.deploy(slotProfile);
-                return null;
             });
+        } else if (ExecutionState.CANCELING.equals(task.getExecutionState().get()) ||
+            ExecutionState.CANCELED.equals(task.getExecutionState().get())) {
+            LOGGER.info(
+                String.format("%s be canceled, skip %s this task.", task.getTaskFullName(), ExecutionState.DEPLOYING));
+            return null;
         } else {
-            handleTaskStateUpdateError(task, ExecutionState.DEPLOYING);
+            jobMaster.updateTaskExecutionState(
+                new TaskExecutionState(
+                    task.getTaskGroupLocation(),
+                    ExecutionState.FAILED,
+                    new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job.",
+                        task.getTaskFullName(), task.getExecutionState().get()))));
+            return null;
         }
-        return null;
     }
 
     private void deployPipeline(@NonNull SubPlan pipeline, Map<PhysicalVertex, SlotProfile> slotProfiles) {
@@ -161,7 +185,8 @@ public class PipelineBaseScheduler implements JobScheduler {
 
             try {
                 List<CompletableFuture<?>> deployCoordinatorFuture =
-                    pipeline.getCoordinatorVertexList().stream().map(coordinator -> deployTask(coordinator, slotProfiles.get(coordinator)))
+                    pipeline.getCoordinatorVertexList().stream()
+                        .map(coordinator -> deployTask(coordinator, slotProfiles.get(coordinator)))
                         .filter(Objects::nonNull).collect(Collectors.toList());
 
                 List<CompletableFuture<?>> deployTaskFuture =
@@ -178,35 +203,52 @@ public class PipelineBaseScheduler implements JobScheduler {
                             pipeline.getPipelineState().get()));
                 }
             } catch (Exception e) {
-                // cancel pipeline and throw an exception
-                pipeline.cancelPipeline();
-                throw new RuntimeException(e);
+                makePipelineFailed(pipeline, e);
             }
+        } else if (PipelineState.CANCELING.equals(pipeline.getPipelineState().get()) ||
+            PipelineState.CANCELED.equals(pipeline.getPipelineState().get())) {
+            // may be canceled
+            LOGGER.info(String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
+                pipeline.getPipelineState().get(), PipelineState.DEPLOYING));
         } else {
-            handlePipelineStateUpdateError(pipeline, PipelineState.DEPLOYING);
+            makePipelineFailed(pipeline, new JobException(
+                String.format("%s turn to a unexpected state: %s, stop scheduler job", pipeline.getPipelineFullName(),
+                    pipeline.getPipelineState().get())));
         }
     }
 
-    private void handlePipelineStateUpdateError(SubPlan pipeline, PipelineState targetState) {
+    @Override
+    public CompletableFuture<Void> reSchedulerPipeline(@NonNull SubPlan subPlan) {
+        return schedulerPipeline(subPlan);
+    }
+
+    private void handlePipelineStateTurnError(SubPlan pipeline, PipelineState targetState) {
         if (PipelineState.CANCELING.equals(pipeline.getPipelineState().get()) ||
             PipelineState.CANCELED.equals(pipeline.getPipelineState().get())) {
             // may be canceled
-            LOGGER.info(String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
-                pipeline.getPipelineState().get(), targetState));
+            LOGGER.info(
+                String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
+                    pipeline.getPipelineState().get(), targetState));
         } else {
             throw new JobException(
-                String.format("%s turn to a unexpected state: %s, stop scheduler job", pipeline.getPipelineFullName(),
+                String.format("%s turn to a unexpected state: %s, stop scheduler job",
+                    pipeline.getPipelineFullName(),
                     pipeline.getPipelineState().get()));
         }
     }
 
-    private void handleTaskStateUpdateError(PhysicalVertex task, ExecutionState targetState) {
-        if (ExecutionState.CANCELING.equals(task.getExecutionState().get()) ||
-            ExecutionState.CANCELED.equals(task.getExecutionState().get())) {
-            LOGGER.info(String.format("%s be canceled, skip %s this task.", task.getTaskFullName(), targetState));
-        } else {
-            throw new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job.",
-                task.getTaskFullName(), task.getExecutionState().get()));
-        }
+    private void makePipelineFailed(@NonNull SubPlan pipeline, Throwable e) {
+        pipeline.getCoordinatorVertexList().forEach(coordinator -> {
+            makeTaskFailed(coordinator, e);
+        });
+
+        pipeline.getPhysicalVertexList().forEach(task -> {
+            makeTaskFailed(task, e);
+        });
+    }
+
+    private void makeTaskFailed(@NonNull PhysicalVertex task, Throwable e) {
+        jobMaster.updateTaskExecutionState(
+            new TaskExecutionState(task.getTaskGroupLocation(), ExecutionState.FAILED, e));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
index 1c4a6605e..2f6e39ee4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
@@ -50,7 +50,7 @@ public class GetTaskGroupAddressOperation extends Operation implements Identifie
         response = RetryUtils.retryWithException(() -> server.getJobMaster(taskLocation.getJobId())
                 .queryTaskGroupAddress(taskLocation.getTaskGroupLocation().getTaskGroupId()),
             new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-                exception -> exception instanceof IllegalArgumentException, Constant.OPERATION_RETRY_SLEEP));
+                exception -> exception instanceof Exception, Constant.OPERATION_RETRY_SLEEP));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
new file mode 100644
index 000000000..518f409c3
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
+import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
+
+import com.google.common.collect.Sets;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+public class TestUtils {
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static LogicalDag getTestLogicalDag() throws MalformedURLException {
+        IdGenerator idGenerator = new IdGenerator();
+        FakeSource fakeSource = new FakeSource();
+        fakeSource.setSeaTunnelContext(SeaTunnelContext.getContext());
+
+        Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", fakeSource,
+            Sets.newHashSet(new URL("file:///fake.jar")));
+        fake.setParallelism(3);
+        LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
+
+        ConsoleSink consoleSink = new ConsoleSink();
+        consoleSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+        Action console = new SinkAction<>(idGenerator.getNextId(), "console", consoleSink,
+            Sets.newHashSet(new URL("file:///console.jar")));
+        console.setParallelism(3);
+        LogicalVertex consoleVertex = new LogicalVertex(console.getId(), console, 3);
+
+        LogicalEdge edge = new LogicalEdge(fakeVertex, consoleVertex);
+
+        LogicalDag logicalDag = new LogicalDag();
+        logicalDag.addLogicalVertex(fakeVertex);
+        logicalDag.addLogicalVertex(consoleVertex);
+        logicalDag.addEdge(edge);
+        return logicalDag;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 6baf2d98b..125776a29 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.TestUtils;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
 
@@ -49,37 +50,14 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
 
     @Test
     public void testTask() throws MalformedURLException {
-
-        IdGenerator idGenerator = new IdGenerator();
-
         SeaTunnelContext.getContext().setJobMode(JobMode.BATCH);
-        FakeSource fakeSource = new FakeSource();
-        fakeSource.setSeaTunnelContext(SeaTunnelContext.getContext());
-
-        Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", fakeSource,
-            Sets.newHashSet(new URL("file:///fake.jar")));
-        fake.setParallelism(3);
-        LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
-
-        ConsoleSink consoleSink = new ConsoleSink();
-        consoleSink.setSeaTunnelContext(SeaTunnelContext.getContext());
-        Action console = new SinkAction<>(idGenerator.getNextId(), "console", consoleSink,
-            Sets.newHashSet(new URL("file:///console.jar")));
-        console.setParallelism(3);
-        LogicalVertex consoleVertex = new LogicalVertex(console.getId(), console, 3);
-
-        LogicalEdge edge = new LogicalEdge(fakeVertex, consoleVertex);
-
-        LogicalDag logicalDag = new LogicalDag();
-        logicalDag.addLogicalVertex(fakeVertex);
-        logicalDag.addLogicalVertex(consoleVertex);
-        logicalDag.addEdge(edge);
+        LogicalDag testLogicalDag = TestUtils.getTestLogicalDag();
 
         JobConfig config = new JobConfig();
         config.setName("test");
 
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
-            nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
+            nodeEngine.getSerializationService().toData(testLogicalDag), config, Collections.emptyList());
 
         PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
             server.submitJob(jobImmutableInformation.getJobId(),
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
new file mode 100644
index 000000000..227c347e7
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.master;
+
+import static org.awaitility.Awaitility.await;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.TestUtils;
+
+import com.hazelcast.internal.serialization.Data;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * JobMaster Tester.
+ */
+public class JobMasterTest extends AbstractSeaTunnelServerTest {
+    private Long jobId;
+
+    @Before
+    public void before() {
+        super.before();
+        jobId = instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+    }
+
+    @Test
+    public void testHandleCheckpointTimeout() throws Exception {
+        SeaTunnelContext.getContext().setJobMode(JobMode.STREAMING);
+        LogicalDag testLogicalDag = TestUtils.getTestLogicalDag();
+        JobConfig config = new JobConfig();
+        config.setName("test_checkpoint_timeout");
+
+        JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(jobId,
+            nodeEngine.getSerializationService().toData(testLogicalDag), config, Collections.emptyList());
+
+        Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+        PassiveCompletableFuture<Void> voidPassiveCompletableFuture = server.submitJob(jobId, data);
+        voidPassiveCompletableFuture.join();
+
+        JobMaster jobMaster = server.getJobMaster(jobId);
+
+        // waiting for job status turn to running
+        await().atMost(10000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assert.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus()));
+
+        // call checkpoint timeout
+        jobMaster.handleCheckpointTimeout(1);
+
+        // Because handleCheckpointTimeout is an async method, so we need sleep 5s to waiting job status become running again
+        Thread.sleep(5000);
+
+        // test job still run
+        await().atMost(20000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assert.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus()));
+
+        PassiveCompletableFuture<JobStatus> jobMasterCompleteFuture = jobMaster.getJobMasterCompleteFuture();
+        // cancel job
+        jobMaster.cancelJob();
+
+        // test job turn to complete
+        await().atMost(20000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assert.assertTrue(
+                jobMasterCompleteFuture.isDone() && JobStatus.CANCELED.equals(jobMasterCompleteFuture.get())));
+    }
+}