You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/15 10:47:42 UTC

[incubator-seatunnel] branch st-engine updated: [Feature][ST-Engine] Put Job Status in IMap (#2699)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 651c2460d [Feature][ST-Engine] Put Job Status in IMap (#2699)
651c2460d is described below

commit 651c2460d1b9a5a56ffc1d689081008b0f4f7330
Author: Eric <ga...@gmail.com>
AuthorDate: Thu Sep 15 18:47:35 2022 +0800

    [Feature][ST-Engine] Put Job Status in IMap (#2699)
---
 .../local/LocalTxtTransactionStateFileWriter.java  |   2 +-
 .../test/resources/batch_fakesource_to_file.conf   |   2 +-
 .../batch_fakesource_to_file_complex.conf          |   2 +-
 .../streaming_fakesource_to_file_complex.conf      |   2 +-
 .../test/resources/batch_fakesource_to_file.conf   |   2 +-
 .../batch_fakesource_to_file_complex.conf          |   2 +-
 .../src/test/resources/client_test.conf            |   2 +-
 .../seatunnel/engine/core/job/JobStatus.java       |   3 +
 .../seatunnel/engine/core/job/RunningJobInfo.java  |  61 +++++
 .../core/serializable/JobDataSerializerHook.java   |   5 +
 .../seatunnel/engine/server/SeaTunnelServer.java   | 185 ++++++++++++--
 .../engine/server/TaskExecutionService.java        |  34 ++-
 .../engine/server/dag/physical/PhysicalPlan.java   | 219 +++++++++--------
 .../server/dag/physical/PhysicalPlanGenerator.java |  39 ++-
 .../engine/server/dag/physical/PhysicalVertex.java | 219 +++++++++--------
 .../server/dag/physical/PipelineLocation.java      |  30 +++
 .../engine/server/dag/physical/PlanUtils.java      |  30 ++-
 .../engine/server/dag/physical/SubPlan.java        | 266 ++++++++++++++-------
 .../seatunnel/engine/server/master/JobMaster.java  |  67 ++++--
 .../server/operation/GetJobStatusOperation.java    |  13 +-
 .../server/scheduler/PipelineBaseScheduler.java    | 106 ++++----
 .../engine/server/AbstractSeaTunnelServerTest.java |   5 +-
 .../server/checkpoint/CheckpointPlanTest.java      |  10 +-
 .../seatunnel/engine/server/dag/TaskTest.java      |   9 +-
 .../engine/server/master/JobMasterTest.java        |  88 +++++++
 25 files changed, 995 insertions(+), 408 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTxtTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTxtTransactionStateFileWriter.java
index d04939a70..a5be3aa2a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTxtTransactionStateFileWriter.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTxtTransactionStateFileWriter.java
@@ -112,7 +112,7 @@ public class LocalTxtTransactionStateFileWriter extends AbstractTransactionState
                 FileUtils.createFile(filePath);
                 fileOutputStream = new FileOutputStream(new File(filePath));
                 beingWrittenOutputStream.put(filePath, fileOutputStream);
-            } catch (IOException e) {
+            } catch (Exception e) {
                 LOGGER.error("can not get output file stream");
                 throw new RuntimeException(e);
             }
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
index 1e49074ba..3cfb808b7 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
@@ -41,7 +41,7 @@ transform {
 
 sink {
   LocalFile {
-    path="file:///tmp/hive/warehouse/test2"
+    path="/tmp/hive/warehouse/test2"
     field_delimiter="\t"
     row_delimiter="\n"
     partition_by=["age"]
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
index d946739b6..2776e3fd1 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -47,7 +47,7 @@ transform {
 
 sink {
   LocalFile {
-    path="file:///tmp/hive/warehouse/test2"
+    path="/tmp/hive/warehouse/test2"
     field_delimiter="\t"
     row_delimiter="\n"
     partition_by=["age"]
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
index 0a52e7a0d..1fec906eb 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
@@ -47,7 +47,7 @@ transform {
 
 sink {
   LocalFile {
-    path="file:///tmp/hive/warehouse/test2"
+    path="/tmp/hive/warehouse/test2"
     field_delimiter="\t"
     row_delimiter="\n"
     partition_by=["age"]
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
index 7dc125e77..1979a6fc4 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
@@ -40,7 +40,7 @@ transform {
 
 sink {
   LocalFile {
-    path="file:///tmp/hive/warehouse/test2"
+    path="/tmp/hive/warehouse/test2"
     field_delimiter="\t"
     row_delimiter="\n"
     partition_by=["age"]
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
index 258c32352..575ac8b56 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -46,7 +46,7 @@ transform {
 
 sink {
   LocalFile {
-    path="file:///tmp/hive/warehouse/test2"
+    path="/tmp/hive/warehouse/test2"
     field_delimiter="\t"
     row_delimiter="\n"
     partition_by=["age"]
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
index e2634dfa8..c5d51fb17 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
@@ -46,7 +46,7 @@ transform {
 
 sink {
   LocalFile {
-    path = "file:///tmp/hive/warehouse/test2"
+    path = "/tmp/hive/warehouse/test2"
     field_delimiter = "\t"
     row_delimiter = "\n"
     partition_by = ["age"]
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
index d0c9fd963..f9dbfb4c6 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
@@ -29,6 +29,9 @@ public enum JobStatus {
     /** Job is newly created, no task has started to run. */
     CREATED(EndState.NOT_END),
 
+    /** Job is begin schedule but some task not deploy complete. */
+    SCHEDULED(EndState.NOT_END),
+
     /** Some tasks are scheduled or running, some may be pending, some may be finished. */
     RUNNING(EndState.NOT_END),
 
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/RunningJobInfo.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/RunningJobInfo.java
new file mode 100644
index 000000000..00e7a5dd5
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/RunningJobInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.job;
+
+import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
+
+import com.hazelcast.internal.nio.IOUtil;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.IOException;
+
+@AllArgsConstructor
+@Data
+public class RunningJobInfo implements IdentifiedDataSerializable {
+    private Long initializationTimestamp;
+
+    private com.hazelcast.internal.serialization.Data jobImmutableInformation;
+
+    public RunningJobInfo() {}
+
+    @Override
+    public int getFactoryId() {
+        return JobDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return JobDataSerializerHook.RUNNING_JOB_INFO;
+    }
+
+    @Override
+    public void writeData(ObjectDataOutput out) throws IOException {
+        out.writeLong(initializationTimestamp);
+        IOUtil.writeData(out, jobImmutableInformation);
+    }
+
+    @Override
+    public void readData(ObjectDataInput in) throws IOException {
+        initializationTimestamp = in.readLong();
+        jobImmutableInformation = IOUtil.readData(in);
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
index 1ff1c3288..6fb2da54a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.RunningJobInfo;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
 import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -57,6 +58,8 @@ public final class JobDataSerializerHook implements DataSerializerHook {
      */
     public static final int JOB_IMMUTABLE_INFORMATION = 3;
 
+    public static final int RUNNING_JOB_INFO = 4;
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY,
         SeaTunnelFactoryIdConstant.SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY_ID
@@ -85,6 +88,8 @@ public final class JobDataSerializerHook implements DataSerializerHook {
                     return new LogicalEdge();
                 case JOB_IMMUTABLE_INFORMATION:
                     return new JobImmutableInformation();
+                case RUNNING_JOB_INFO:
+                    return new RunningJobInfo();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index e1632ae8f..21ce26577 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -23,13 +23,16 @@ import org.apache.seatunnel.engine.common.exception.JobException;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.RunningJobInfo;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.master.JobMaster;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
 import org.apache.seatunnel.engine.server.service.slot.SlotService;
 
@@ -43,19 +46,24 @@ import com.hazelcast.internal.services.MembershipServiceEvent;
 import com.hazelcast.jet.impl.LiveOperationRegistry;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
+import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.impl.operationservice.LiveOperations;
 import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
 import lombok.NonNull;
 
-import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker {
     private static final ILogger LOGGER = Logger.getLogger(SeaTunnelServer.class);
@@ -73,12 +81,50 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
 
     private final SeaTunnelConfig seaTunnelConfig;
 
+    /**
+     * IMap key is jobId and value is a {@link RunningJobInfo}
+     * This IMap is used to recovery runningJobInfoIMap in JobMaster when a new master node active
+     */
+    private IMap<Long, RunningJobInfo> runningJobInfoIMap;
+
+    /**
+     * IMap key is one of jobId {@link org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and
+     * {@link org.apache.seatunnel.engine.server.execution.TaskGroupLocation}
+     * <p>
+     * The value of IMap is one of {@link JobStatus} {@link org.apache.seatunnel.engine.core.job.PipelineState}
+     * {@link org.apache.seatunnel.engine.server.execution.ExecutionState}
+     * <p>
+     * This IMap is used to recovery runningJobStateIMap in JobMaster when a new master node active
+     */
+    IMap<Object, Object> runningJobStateIMap;
+
+    /**
+     * IMap key is one of jobId {@link org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and
+     * {@link org.apache.seatunnel.engine.server.execution.TaskGroupLocation}
+     * <p>
+     * The value of IMap is one of {@link org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan} stateTimestamps
+     * {@link org.apache.seatunnel.engine.server.dag.physical.SubPlan} stateTimestamps
+     * {@link org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex} stateTimestamps
+     * <p>
+     * This IMap is used to recovery runningJobStateTimestampsIMap in JobMaster when a new master node active
+     */
+    IMap<Object, Long[]> runningJobStateTimestampsIMap;
+
     /**
      * key: job id;
      * <br> value: job master;
      */
     private Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap<>();
 
+    /**
+     * IMap key is {@link PipelineLocation}
+     * <p>
+     * The value of IMap is map of {@link TaskGroupLocation} and the {@link SlotProfile} it used.
+     * <p>
+     * This IMap is used to recovery ownedSlotProfilesIMap in JobMaster when a new master node active
+     */
+    private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;
+
     public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
         this.logger = node.getLogger(getClass());
         this.liveOperationRegistry = new LiveOperationRegistry();
@@ -109,6 +155,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         return runningJobMasterMap.get(jobId);
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
     @Override
     public void init(NodeEngine engine, Properties hzProperties) {
         this.nodeEngine = (NodeEngineImpl) engine;
@@ -118,6 +165,14 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         );
         taskExecutionService.start();
         getSlotService();
+
+        runningJobInfoIMap = nodeEngine.getHazelcastInstance().getMap("runningJobInfo");
+        runningJobStateIMap = nodeEngine.getHazelcastInstance().getMap("runningJobState");
+        runningJobStateTimestampsIMap = nodeEngine.getHazelcastInstance().getMap("stateTimestamps");
+        ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap("ownedSlotProfilesIMap");
+
+        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+        service.scheduleAtFixedRate(() -> printExecutionInfo(), 0, 60, TimeUnit.SECONDS);
     }
 
     @Override
@@ -190,12 +245,19 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
      */
     public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInformation) {
         CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
-        JobMaster jobMaster = new JobMaster(jobImmutableInformation, this.nodeEngine, executorService, getResourceManager());
+        JobMaster jobMaster = new JobMaster(jobImmutableInformation,
+            this.nodeEngine,
+            executorService,
+            getResourceManager(),
+            runningJobStateIMap,
+            runningJobStateTimestampsIMap,
+            ownedSlotProfilesIMap);
         executorService.submit(() -> {
             try {
-                jobMaster.init();
-                jobMaster.getPhysicalPlan().initStateFuture();
+                runningJobInfoIMap.put(jobId, new RunningJobInfo(System.currentTimeMillis(), jobImmutableInformation));
                 runningJobMasterMap.put(jobId, jobMaster);
+                jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
+                jobMaster.getPhysicalPlan().initStateFuture();
             } catch (Throwable e) {
                 LOGGER.severe(String.format("submit job %s error %s ", jobId, ExceptionUtils.getMessage(e)));
                 voidCompletableFuture.completeExceptionally(e);
@@ -207,12 +269,49 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
             try {
                 jobMaster.run();
             } finally {
+                // storage job state info to HistoryStorage
+                removeJobIMap(jobMaster);
                 runningJobMasterMap.remove(jobId);
             }
         });
         return new PassiveCompletableFuture(voidCompletableFuture);
     }
 
+    private void removeJobIMap(JobMaster jobMaster) {
+        Long jobId = jobMaster.getJobImmutableInformation().getJobId();
+        runningJobStateTimestampsIMap.remove(jobId);
+
+        jobMaster.getPhysicalPlan().getPipelineList().forEach(pipeline -> {
+            runningJobStateIMap.remove(pipeline.getPipelineLocation());
+            runningJobStateTimestampsIMap.remove(pipeline.getPipelineLocation());
+            pipeline.getCoordinatorVertexList().forEach(coordinator -> {
+                runningJobStateIMap.remove(coordinator.getTaskGroupLocation());
+                runningJobStateTimestampsIMap.remove(coordinator.getTaskGroupLocation());
+            });
+
+            pipeline.getPhysicalVertexList().forEach(task -> {
+                runningJobStateIMap.remove(task.getTaskGroupLocation());
+                runningJobStateTimestampsIMap.remove(task.getTaskGroupLocation());
+            });
+        });
+
+        // These should be deleted at the end. On the new master node
+        // 1. If runningJobStateIMap.get(jobId) == null and runningJobInfoIMap.get(jobId) != null. We will do
+        //    runningJobInfoIMap.remove(jobId)
+        //
+        // 2. If runningJobStateIMap.get(jobId) != null and the value equals JobStatus End State. We need new a
+        //    JobMaster and generate PhysicalPlan again and then try to remove all of PipelineLocation and
+        //    TaskGroupLocation key in the runningJobStateIMap.
+        //
+        // 3. If runningJobStateIMap.get(jobId) != null and the value equals JobStatus.SCHEDULED. We need cancel the job
+        //    and then call submitJob(long jobId, Data jobImmutableInformation) to resubmit it.
+        //
+        // 4. If runningJobStateIMap.get(jobId) != null and the value is CANCELING or RUNNING. We need recover the JobMaster
+        //    from runningJobStateIMap and then waiting for it complete.
+        runningJobStateIMap.remove(jobId);
+        runningJobInfoIMap.remove(jobId);
+    }
+
     public PassiveCompletableFuture<JobStatus> waitForJobComplete(long jobId) {
         JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
         if (runningJobMaster == null) {
@@ -245,34 +344,47 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
             // TODO Get Job Status from JobHistoryStorage
             return JobStatus.FINISHED;
         }
-        return runningJobMaster.getJobStatus();
+        // This method is called by operation and in the runningJobMaster.getJobStatus() we will get data from IMap.
+        // It will cause an error "Waiting for response on this thread is illegal". To solve it we need put
+        // runningJobMaster.getJobStatus() in another thread.
+        CompletableFuture<JobStatus> future = CompletableFuture.supplyAsync(() -> {
+            return runningJobMaster.getJobStatus();
+        }, executorService);
+
+        try {
+            return future.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public void failedTaskOnMemberRemoved(MembershipServiceEvent event) {
         Address lostAddress = event.getMember().getAddress();
         runningJobMasterMap.forEach((aLong, jobMaster) -> {
             jobMaster.getPhysicalPlan().getPipelineList().forEach(subPlan -> {
-                ArrayList<PhysicalVertex> allVertex = new ArrayList<>();
-                allVertex.addAll(subPlan.getPhysicalVertexList());
-                allVertex.addAll(subPlan.getCoordinatorVertexList());
-                allVertex.forEach(physicalVertex -> {
-                    Address deployAddress = physicalVertex.getCurrentExecutionAddress();
-                    ExecutionState executionState = physicalVertex.getExecutionState().get();
-                    if (null != deployAddress && deployAddress.equals(lostAddress) &&
-                        (executionState.equals(ExecutionState.DEPLOYING) ||
-                            executionState.equals(ExecutionState.RUNNING))) {
-                        TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
-                        physicalVertex.updateTaskExecutionState(
-                            new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED,
-                                new SeaTunnelEngineException(
-                                    String.format("The taskGroup(%s) deployed node(%s) offline", taskGroupLocation,
-                                        lostAddress))));
-                    }
-                });
+                makeTasksFailed(subPlan.getCoordinatorVertexList(), lostAddress);
+                makeTasksFailed(subPlan.getPhysicalVertexList(), lostAddress);
             });
         });
     }
 
+    private void makeTasksFailed(@NonNull List<PhysicalVertex> physicalVertexList, @NonNull Address lostAddress) {
+        physicalVertexList.forEach(physicalVertex -> {
+            Address deployAddress = physicalVertex.getCurrentExecutionAddress();
+            ExecutionState executionState = physicalVertex.getExecutionState();
+            if (null != deployAddress && deployAddress.equals(lostAddress) &&
+                (executionState.equals(ExecutionState.DEPLOYING) ||
+                    executionState.equals(ExecutionState.RUNNING))) {
+                TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
+                physicalVertex.updateTaskExecutionState(
+                    new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED,
+                        new SeaTunnelEngineException(
+                            String.format("The taskGroup(%s) deployed node(%s) offline", taskGroupLocation,
+                                lostAddress))));
+            }
+        });
+    }
+
     /**
      * When TaskGroup ends, it is called by {@link TaskExecutionService} to notify JobMaster the TaskGroup's state.
      */
@@ -284,4 +396,33 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         }
         runningJobMaster.updateTaskExecutionState(taskExecutionState);
     }
+
+    private void printExecutionInfo() {
+        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
+        int activeCount = threadPoolExecutor.getActiveCount();
+        int corePoolSize = threadPoolExecutor.getCorePoolSize();
+        int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
+        int poolSize = threadPoolExecutor.getPoolSize();
+        long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
+        long taskCount = threadPoolExecutor.getTaskCount();
+        StringBuffer sbf = new StringBuffer();
+        sbf.append("activeCount=")
+            .append(activeCount)
+            .append("\n")
+            .append("corePoolSize=")
+            .append(corePoolSize)
+            .append("\n")
+            .append("maximumPoolSize=")
+            .append(maximumPoolSize)
+            .append("\n")
+            .append("poolSize=")
+            .append(poolSize)
+            .append("\n")
+            .append("completedTaskCount=")
+            .append(completedTaskCount)
+            .append("\n")
+            .append("taskCount=")
+            .append(taskCount);
+        logger.info(sbf.toString());
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index eb9d1c07f..d2d8b67d6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -83,7 +83,8 @@ public class TaskExecutionService {
     private final RunBusWorkSupplier runBusWorkSupplier = new RunBusWorkSupplier(executorService, threadShareTaskQueue);
     // key: TaskID
     private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> executionContexts = new ConcurrentHashMap<>();
-    private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> cancellationFutures = new ConcurrentHashMap<>();
+    private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> cancellationFutures =
+        new ConcurrentHashMap<>();
 
     public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) {
         this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -126,17 +127,18 @@ public class TaskExecutionService {
         uncheckRun(startedLatch::await);
     }
 
-    public synchronized PassiveCompletableFuture<TaskExecutionState> deployTask(@NonNull Data taskImmutableInformation) {
+    public PassiveCompletableFuture<TaskExecutionState> deployTask(@NonNull Data taskImmutableInformation) {
         TaskGroupImmutableInformation taskImmutableInfo =
             nodeEngine.getSerializationService().toObject(taskImmutableInformation);
         return deployTask(taskImmutableInfo);
     }
 
     public <T extends Task> T getTask(TaskLocation taskLocation) {
-        return this.getExecutionContext(taskLocation.getTaskGroupLocation()).getTaskGroup().getTask(taskLocation.getTaskID());
+        return this.getExecutionContext(taskLocation.getTaskGroupLocation()).getTaskGroup()
+            .getTask(taskLocation.getTaskID());
     }
 
-    public synchronized PassiveCompletableFuture<TaskExecutionState> deployTask(
+    public PassiveCompletableFuture<TaskExecutionState> deployTask(
         @NonNull TaskGroupImmutableInformation taskImmutableInfo) {
         CompletableFuture<TaskExecutionState> resultFuture = new CompletableFuture<>();
         TaskGroup taskGroup = null;
@@ -146,21 +148,30 @@ public class TaskExecutionService {
             if (!CollectionUtils.isEmpty(jars)) {
                 classLoader = new SeatunnelChildFirstClassLoader(Lists.newArrayList(jars));
                 taskGroup =
-                    CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(), classLoader,
+                    CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
+                        classLoader,
                         taskImmutableInfo.getGroup());
             } else {
                 taskGroup = nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
             }
-            if (executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
-                throw new RuntimeException(String.format("TaskGroupLocation: %s already exists", taskGroup.getTaskGroupLocation()));
+            logger.info(String.format("deploying task %s", taskGroup.getTaskGroupLocation()));
+
+            synchronized (this) {
+                if (executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
+                    throw new RuntimeException(
+                        String.format("TaskGroupLocation: %s already exists", taskGroup.getTaskGroupLocation()));
+                }
+                return deployLocalTask(taskGroup, resultFuture, classLoader);
             }
-            return deployLocalTask(taskGroup, resultFuture, classLoader);
         } catch (Throwable t) {
             logger.severe(String.format("TaskGroupID : %s  deploy error with Exception: %s",
-                taskGroup != null && taskGroup.getTaskGroupLocation() != null ? taskGroup.getTaskGroupLocation().toString() : "taskGroupLocation is null",
+                taskGroup != null && taskGroup.getTaskGroupLocation() != null ?
+                    taskGroup.getTaskGroupLocation().toString() : "taskGroupLocation is null",
                 ExceptionUtils.getMessage(t)));
             resultFuture.complete(
-                new TaskExecutionState(taskGroup != null && taskGroup.getTaskGroupLocation() != null ? taskGroup.getTaskGroupLocation() : null, ExecutionState.FAILED, t));
+                new TaskExecutionState(
+                    taskGroup != null && taskGroup.getTaskGroupLocation() != null ? taskGroup.getTaskGroupLocation() :
+                        null, ExecutionState.FAILED, t));
         }
         return new PassiveCompletableFuture<>(resultFuture);
     }
@@ -205,7 +216,8 @@ public class TaskExecutionService {
             long sleepTime = 1000;
             do {
                 if (null != invoke) {
-                    logger.warning(String.format("notify the job of the task(%s) status failed, retry in %s millis", taskGroup.getTaskGroupLocation(), sleepTime));
+                    logger.warning(String.format("notify the job of the task(%s) status failed, retry in %s millis",
+                        taskGroup.getTaskGroupLocation(), sleepTime));
                     try {
                         Thread.sleep(sleepTime += 1000);
                     } catch (InterruptedException e) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 296ad2e1f..b19e7d688 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -26,20 +26,24 @@ import org.apache.seatunnel.engine.server.master.JobMaster;
 
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
+import com.hazelcast.map.IMap;
 import lombok.NonNull;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 public class PhysicalPlan {
 
     private static final ILogger LOGGER = Logger.getLogger(PhysicalPlan.class);
+    /**
+     * The max num pipeline can restore.
+     */
+    public static final int PIPELINE_MAX_RESTORE_NUM = 3; // TODO should set by config
 
     private final List<SubPlan> pipelineList;
 
@@ -49,14 +53,9 @@ public class PhysicalPlan {
 
     private AtomicInteger failedPipelineNum = new AtomicInteger(0);
 
-    private AtomicReference<JobStatus> jobStatus = new AtomicReference<>();
-
     private final JobImmutableInformation jobImmutableInformation;
 
-    /**
-     * If the job or pipeline cancel by user, needRestore will be false
-     **/
-    private volatile boolean needRestore = true;
+    private final IMap<Object, Object> runningJobStateIMap;
 
     /**
      * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
@@ -64,7 +63,7 @@ public class PhysicalPlan {
      * of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
      * stateTimestamps[RUNNING.ordinal()]}.
      */
-    private final long[] stateTimestamps;
+    private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
 
     /**
      * when job status turn to end, complete this future. And then the waitForCompleteByPhysicalPlan
@@ -76,10 +75,17 @@ public class PhysicalPlan {
 
     private final String jobFullName;
 
-    private JobMaster jobMaster;
+    private final long jobId;
 
     private final Map<Integer, CompletableFuture> pipelineSchedulerFutureMap;
 
+    private JobMaster jobMaster;
+
+    /**
+     * If the job or pipeline cancel by user, needRestore will be false
+     **/
+    private volatile boolean needRestore = true;
+
     /**
      * Whether we make the job end when pipeline turn to end state.
      */
@@ -88,13 +94,28 @@ public class PhysicalPlan {
     public PhysicalPlan(@NonNull List<SubPlan> pipelineList,
                         @NonNull ExecutorService executorService,
                         @NonNull JobImmutableInformation jobImmutableInformation,
-                        long initializationTimestamp) {
+                        long initializationTimestamp,
+                        @NonNull IMap runningJobStateIMap,
+                        @NonNull IMap runningJobStateTimestampsIMap) {
         this.executorService = executorService;
         this.jobImmutableInformation = jobImmutableInformation;
-        stateTimestamps = new long[JobStatus.values().length];
-        this.stateTimestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
-        this.jobStatus.set(JobStatus.CREATED);
-        this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
+        this.jobId = jobImmutableInformation.getJobId();
+        Long[] stateTimestamps = new Long[JobStatus.values().length];
+        if (runningJobStateTimestampsIMap.get(jobId) == null) {
+            stateTimestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
+            runningJobStateTimestampsIMap.put(jobId, stateTimestamps);
+        }
+
+        if (runningJobStateIMap.get(jobId) == null) {
+            // We must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap.
+            // Because if a new Master Node become active, we can recover ExecutionState and PipelineState and JobStatus
+            // from TaskExecutionService. But we can not recover stateTimestamps.
+            stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
+            runningJobStateTimestampsIMap.put(jobId, stateTimestamps);
+
+            runningJobStateIMap.put(jobId, JobStatus.CREATED);
+        }
+
         this.jobEndFuture = new CompletableFuture<>();
         this.pipelineList = pipelineList;
         if (pipelineList.isEmpty()) {
@@ -103,24 +124,27 @@ public class PhysicalPlan {
         this.jobFullName = String.format("Job %s (%s)", jobImmutableInformation.getJobConfig().getName(),
             jobImmutableInformation.getJobId());
 
-        pipelineSchedulerFutureMap = new HashMap<>(pipelineList.size());
+        pipelineSchedulerFutureMap = new ConcurrentHashMap<>(pipelineList.size());
+        this.runningJobStateIMap = runningJobStateIMap;
+        this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
     }
 
     public void setJobMaster(JobMaster jobMaster) {
         this.jobMaster = jobMaster;
+        pipelineList.forEach(pipeline -> pipeline.setJobMaster(jobMaster));
     }
 
     public void initStateFuture() {
         pipelineList.forEach(subPlan -> addPipelineEndCallback(subPlan));
     }
 
-    private void addPipelineEndCallback(SubPlan subPlan) {
+    public void addPipelineEndCallback(SubPlan subPlan) {
         PassiveCompletableFuture<PipelineState> future = subPlan.initStateFuture();
         future.thenAcceptAsync(pipelineState -> {
             try {
                 if (PipelineState.CANCELED.equals(pipelineState)) {
-                    if (needRestore) {
-                        restorePipeline(subPlan);
+                    if (canRestorePipeline(subPlan)) {
+                        subPlan.restorePipeline();
                         return;
                     }
                     canceledPipelineNum.incrementAndGet();
@@ -131,17 +155,17 @@ public class PhysicalPlan {
                         cancelJob();
                     }
                     LOGGER.info(String.format("release the pipeline %s resource", subPlan.getPipelineFullName()));
-                    jobMaster.releasePipelineResource(subPlan.getPipelineId());
+                    jobMaster.releasePipelineResource(subPlan);
                 } else if (PipelineState.FAILED.equals(pipelineState)) {
-                    if (needRestore) {
-                        restorePipeline(subPlan);
+                    if (canRestorePipeline(subPlan)) {
+                        subPlan.restorePipeline();
                         return;
                     }
                     failedPipelineNum.incrementAndGet();
                     if (makeJobEndWhenPipelineEnded) {
                         cancelJob();
                     }
-                    jobMaster.releasePipelineResource(subPlan.getPipelineId());
+                    jobMaster.releasePipelineResource(subPlan);
                     LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
                 }
             } catch (Throwable e) {
@@ -157,38 +181,50 @@ public class PhysicalPlan {
                 } else {
                     turnToEndState(JobStatus.FINISHED);
                 }
-                jobEndFuture.complete(jobStatus.get());
+                jobEndFuture.complete((JobStatus) runningJobStateIMap.get(jobId));
             }
         });
     }
 
+    private boolean canRestorePipeline(SubPlan subPlan) {
+        return needRestore && subPlan.getPipelineRestoreNum() < PIPELINE_MAX_RESTORE_NUM;
+    }
+
     public void cancelJob() {
-        if (jobStatus.get().isEndState()) {
-            LOGGER.warning(String.format("%s is in end state %s, can not be cancel", jobFullName, jobStatus.get()));
+        if (getJobStatus().isEndState()) {
+            LOGGER.warning(String.format("%s is in end state %s, can not be cancel", jobFullName, getJobStatus()));
+            return;
+        }
+
+        // If an active Master Node done and another Master Node active, we can not know whether cancelRunningJob
+        // complete. So we need cancelRunningJob again.
+        if (JobStatus.CANCELLING.equals(getJobStatus())) {
+            cancelJobPipelines();
             return;
         }
 
-        updateJobState(jobStatus.get(), JobStatus.CANCELLING);
+        updateJobState((JobStatus) runningJobStateIMap.get(jobId), JobStatus.CANCELLING);
         cancelJobPipelines();
     }
 
     private void cancelJobPipelines() {
         List<CompletableFuture<Void>> collect = pipelineList.stream().map(pipeline -> {
-            if (!pipeline.getPipelineState().get().isEndState() &&
-                !PipelineState.CANCELING.equals(pipeline.getPipelineState().get())) {
-                CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
-                    pipeline.cancelPipeline();
-                    return null;
-                }, executorService);
-                return future;
+            if (PipelineState.CANCELING.equals(pipeline.getPipelineState()) ||
+                pipeline.getPipelineState().isEndState()) {
+                LOGGER.info(String.format("%s already in state %s", pipeline.getPipelineFullName(),
+                    pipeline.getPipelineState()));
+                return null;
             }
-            return null;
+            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                pipeline.cancelPipeline();
+            });
+            return future;
         }).filter(x -> x != null).collect(Collectors.toList());
 
         try {
             CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
-                collect.toArray(new CompletableFuture[collect.size()]));
-            voidCompletableFuture.get();
+                collect.toArray(new CompletableFuture[0]));
+            voidCompletableFuture.join();
         } catch (Exception e) {
             LOGGER.severe(
                 String.format("%s cancel error with exception: %s", jobFullName, ExceptionUtils.getMessage(e)));
@@ -199,75 +235,68 @@ public class PhysicalPlan {
         return pipelineList;
     }
 
-    public boolean turnToRunning() {
-        return updateJobState(JobStatus.CREATED, JobStatus.RUNNING);
-    }
-
     private void turnToEndState(@NonNull JobStatus endState) {
-        // consistency check
-        if (jobStatus.get().isEndState()) {
-            String message = "Job is trying to leave terminal state " + jobStatus.get();
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
-        }
+        synchronized (this) {
+            // consistency check
+            JobStatus current = (JobStatus) runningJobStateIMap.get(jobId);
+            if (current.isEndState()) {
+                String message = "Job is trying to leave terminal state " + current;
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
 
-        if (!endState.isEndState()) {
-            String message = "Need a end state, not " + endState;
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
-        }
+            if (!endState.isEndState()) {
+                String message = "Need a end state, not " + endState;
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
 
-        LOGGER.info(String.format("%s turn to end state %s", jobFullName, endState));
-        jobStatus.set(endState);
-        stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
-    }
+            LOGGER.info(String.format("%s end with state %s", getJobFullName(), endState));
+            // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+            updateStateTimestamps(endState);
 
-    public boolean updateJobState(@NonNull JobStatus targetState) {
-        return updateJobState(jobStatus.get(), targetState);
+            runningJobStateIMap.put(jobId, endState);
+        }
     }
 
-    public boolean updateJobState(@NonNull JobStatus current, @NonNull JobStatus targetState) {
-        // consistency check
-        if (current.isEndState()) {
-            String message = "Job is trying to leave terminal state " + current;
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
-        }
+    private void updateStateTimestamps(@NonNull JobStatus targetState) {
+        // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+        Long[] stateTimestamps = runningJobStateTimestampsIMap.get(jobId);
+        stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
+        runningJobStateTimestampsIMap.set(jobId, stateTimestamps);
+    }
 
-        // now do the actual state transition
-        if (jobStatus.compareAndSet(current, targetState)) {
-            LOGGER.info(String.format("Job %s (%s) turn from state %s to %s.",
-                jobImmutableInformation.getJobConfig().getName(),
-                jobImmutableInformation.getJobId(),
-                current,
-                targetState));
-
-            stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
-            return true;
-        } else {
-            return false;
+    public boolean updateJobState(@NonNull JobStatus targetState) {
+        synchronized (this) {
+            return updateJobState((JobStatus) runningJobStateIMap.get(jobId), targetState);
         }
     }
 
-    private void restorePipeline(SubPlan subPlan) {
-        try {
-            LOGGER.info(String.format("Restore pipeline %s", subPlan.getPipelineFullName()));
-            // We must ensure the scheduler complete and then can handle pipeline state change.
-            jobMaster.getScheduleFuture().join();
-
-            if (pipelineSchedulerFutureMap.get(subPlan.getPipelineId()) != null) {
-                pipelineSchedulerFutureMap.get(subPlan.getPipelineId()).join();
+    public boolean updateJobState(@NonNull JobStatus current, @NonNull JobStatus targetState) {
+        synchronized (this) {
+            // consistency check
+            if (current.isEndState()) {
+                String message = "Job is trying to leave terminal state " + current;
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
             }
-            subPlan.reset();
-            addPipelineEndCallback(subPlan);
-            pipelineSchedulerFutureMap.put(subPlan.getPipelineId(), jobMaster.reSchedulerPipeline(subPlan));
-            if (pipelineSchedulerFutureMap.get(subPlan.getPipelineId()) != null) {
-                pipelineSchedulerFutureMap.get(subPlan.getPipelineId()).join();
+
+            // now do the actual state transition
+            if (current.equals(runningJobStateIMap.get(jobId))) {
+                LOGGER.info(String.format("Job %s (%s) turn from state %s to %s.",
+                    jobImmutableInformation.getJobConfig().getName(),
+                    jobId,
+                    current,
+                    targetState));
+
+                // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+                updateStateTimestamps(targetState);
+
+                runningJobStateIMap.set(jobId, targetState);
+                return true;
+            } else {
+                return false;
             }
-        } catch (Throwable e) {
-            LOGGER.severe(String.format("Restore pipeline %s error with exception: %s", subPlan.getPipelineFullName(),
-                ExceptionUtils.getMessage(e)));
-            subPlan.cancelPipeline();
         }
     }
 
@@ -280,7 +309,7 @@ public class PhysicalPlan {
     }
 
     public JobStatus getJobStatus() {
-        return jobStatus.get();
+        return (JobStatus) runningJobStateIMap.get(jobId);
     }
 
     public String getJobFullName() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 25c4c8c7e..90bf6336f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -54,6 +54,7 @@ import org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateQu
 import com.google.common.collect.Lists;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.jet.datamodel.Tuple2;
+import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import lombok.NonNull;
 
@@ -110,12 +111,18 @@ public class PhysicalPlanGenerator {
      */
     private final Set<TaskLocation> startingTasks;
 
+    private final IMap<Object, Object> runningJobStateIMap;
+
+    private final IMap<Object, Object> runningJobStateTimestampsIMap;
+
     public PhysicalPlanGenerator(@NonNull ExecutionPlan executionPlan,
                                  @NonNull NodeEngine nodeEngine,
                                  @NonNull JobImmutableInformation jobImmutableInformation,
                                  long initializationTimestamp,
                                  @NonNull ExecutorService executorService,
-                                 @NonNull FlakeIdGenerator flakeIdGenerator) {
+                                 @NonNull FlakeIdGenerator flakeIdGenerator,
+                                 @NonNull IMap runningJobStateIMap,
+                                 @NonNull IMap runningJobStateTimestampsIMap) {
         this.pipelines = executionPlan.getPipelines();
         this.nodeEngine = nodeEngine;
         this.jobImmutableInformation = jobImmutableInformation;
@@ -125,6 +132,8 @@ public class PhysicalPlanGenerator {
         // the checkpoint of a pipeline
         this.pipelineTasks = new HashSet<>();
         this.startingTasks = new HashSet<>();
+        this.runningJobStateIMap = runningJobStateIMap;
+        this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
     }
 
     public Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> generate() {
@@ -170,13 +179,17 @@ public class PhysicalPlanGenerator {
                 physicalVertexList,
                 coordinatorVertexList,
                 jobImmutableInformation,
-                executorService);
+                executorService,
+                runningJobStateIMap,
+                runningJobStateTimestampsIMap);
         });
 
         PhysicalPlan physicalPlan = new PhysicalPlan(subPlanStream.collect(Collectors.toList()),
             executorService,
             jobImmutableInformation,
-            initializationTimestamp);
+            initializationTimestamp,
+            runningJobStateIMap,
+            runningJobStateTimestampsIMap);
         return Tuple2.tuple2(physicalPlan, checkpointPlans);
     }
 
@@ -227,7 +240,9 @@ public class PhysicalPlanGenerator {
                         s.getJarUrls(),
                         jobImmutableInformation,
                         initializationTimestamp,
-                        nodeEngine);
+                        nodeEngine,
+                        runningJobStateIMap,
+                        runningJobStateTimestampsIMap);
                 } else {
                     return null;
                 }
@@ -267,7 +282,9 @@ public class PhysicalPlanGenerator {
                         seaTunnelTask.getJarsUrl(),
                         jobImmutableInformation,
                         initializationTimestamp,
-                        nodeEngine));
+                        nodeEngine,
+                        runningJobStateIMap,
+                        runningJobStateTimestampsIMap));
                 }
                 return t.stream();
             }).collect(Collectors.toList());
@@ -303,7 +320,9 @@ public class PhysicalPlanGenerator {
                 t.getJarsUrl(),
                 jobImmutableInformation,
                 initializationTimestamp,
-                nodeEngine);
+                nodeEngine,
+                runningJobStateIMap,
+                runningJobStateTimestampsIMap);
         }).collect(Collectors.toList());
     }
 
@@ -363,7 +382,9 @@ public class PhysicalPlanGenerator {
                             jars,
                             jobImmutableInformation,
                             initializationTimestamp,
-                            nodeEngine));
+                            nodeEngine,
+                            runningJobStateIMap,
+                            runningJobStateTimestampsIMap));
                     } else {
                         t.add(new PhysicalVertex(
                             i,
@@ -378,7 +399,9 @@ public class PhysicalPlanGenerator {
                             jars,
                             jobImmutableInformation,
                             initializationTimestamp,
-                            nodeEngine));
+                            nodeEngine,
+                            runningJobStateIMap,
+                            runningJobStateTimestampsIMap));
                     }
                 }
                 return t.stream();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 6838df1f6..b4386f14a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.engine.server.dag.physical;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.Constant;
-import org.apache.seatunnel.engine.common.exception.JobException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
@@ -37,6 +36,7 @@ import com.hazelcast.cluster.Address;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
+import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import lombok.NonNull;
 
@@ -44,7 +44,6 @@ import java.net.URL;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 /**
@@ -79,7 +78,7 @@ public class PhysicalVertex {
 
     private final Set<URL> pluginJarsUrls;
 
-    private AtomicReference<ExecutionState> executionState = new AtomicReference<>();
+    private final IMap<Object, Object> runningJobStateIMap;
 
     /**
      * When PhysicalVertex status turn to end, complete this future. And then the waitForCompleteByPhysicalVertex
@@ -93,7 +92,7 @@ public class PhysicalVertex {
      * of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
      * stateTimestamps[RUNNING.ordinal()]}.
      */
-    private final long[] stateTimestamps;
+    private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
 
     private final JobImmutableInformation jobImmutableInformation;
 
@@ -103,6 +102,8 @@ public class PhysicalVertex {
 
     private Address currentExecutionAddress;
 
+    private TaskGroupImmutableInformation taskGroupImmutableInformation;
+
     public PhysicalVertex(int subTaskGroupIndex,
                           @NonNull ExecutorService executorService,
                           int parallelism,
@@ -113,7 +114,10 @@ public class PhysicalVertex {
                           Set<URL> pluginJarsUrls,
                           @NonNull JobImmutableInformation jobImmutableInformation,
                           long initializationTimestamp,
-                          @NonNull NodeEngine nodeEngine) {
+                          @NonNull NodeEngine nodeEngine,
+                          @NonNull IMap runningJobStateIMap,
+                          @NonNull IMap runningJobStateTimestampsIMap) {
+        this.taskGroupLocation = taskGroup.getTaskGroupLocation();
         this.subTaskGroupIndex = subTaskGroupIndex;
         this.executorService = executorService;
         this.parallelism = parallelism;
@@ -124,10 +128,22 @@ public class PhysicalVertex {
         this.pluginJarsUrls = pluginJarsUrls;
         this.jobImmutableInformation = jobImmutableInformation;
         this.initializationTimestamp = initializationTimestamp;
-        stateTimestamps = new long[ExecutionState.values().length];
-        this.stateTimestamps[ExecutionState.INITIALIZING.ordinal()] = initializationTimestamp;
-        this.executionState.set(ExecutionState.CREATED);
-        this.stateTimestamps[ExecutionState.CREATED.ordinal()] = System.currentTimeMillis();
+
+        Long[] stateTimestamps = new Long[ExecutionState.values().length];
+        if (runningJobStateTimestampsIMap.get(taskGroup.getTaskGroupLocation()) == null) {
+            stateTimestamps[ExecutionState.INITIALIZING.ordinal()] = initializationTimestamp;
+            runningJobStateTimestampsIMap.put(taskGroup.getTaskGroupLocation(), stateTimestamps);
+
+        }
+
+        if (runningJobStateIMap.get(taskGroupLocation) == null) {
+            // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+            stateTimestamps[ExecutionState.CREATED.ordinal()] = System.currentTimeMillis();
+            runningJobStateTimestampsIMap.put(taskGroupLocation, stateTimestamps);
+
+            runningJobStateIMap.put(taskGroupLocation, ExecutionState.CREATED);
+        }
+
         this.nodeEngine = nodeEngine;
         this.taskFullName =
             String.format(
@@ -140,10 +156,17 @@ public class PhysicalVertex {
                 subTaskGroupIndex + 1,
                 parallelism);
         this.taskFuture = new CompletableFuture<>();
-        this.taskGroupLocation = taskGroup.getTaskGroupLocation();
+
+        this.runningJobStateIMap = runningJobStateIMap;
+        this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
     }
 
     public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
+        ExecutionState executionState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
+        // If the task state is CANCELING we need call noticeTaskExecutionServiceCancel().
+        if (ExecutionState.CANCELING.equals(executionState)) {
+            noticeTaskExecutionServiceCancel();
+        }
         this.taskFuture = new CompletableFuture<>();
         return new PassiveCompletableFuture<>(this.taskFuture);
     }
@@ -187,31 +210,12 @@ public class PhysicalVertex {
 
     private void deployInternal(Consumer<TaskGroupImmutableInformation> taskGroupConsumer) {
         TaskGroupImmutableInformation taskGroupImmutableInformation = getTaskGroupImmutableInformation();
-        if (ExecutionState.DEPLOYING.equals(executionState.get())) {
-            taskGroupConsumer.accept(taskGroupImmutableInformation);
-            // may be canceling
-            if (!updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
-                // If we found the task state turned to CANCELING after deployed to TaskExecutionService. We need
-                // notice the TaskExecutionService to cancel this task.
-                noticeTaskExecutionServiceCancel();
-                if (ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
-                    turnToEndState(ExecutionState.CANCELED);
-                    taskFuture.complete(
-                        new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED, null));
-                } else {
-                    turnToEndState(ExecutionState.FAILED);
-                    taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED,
-                        new JobException(String.format("%s turn to a unexpected state: %s, make it Failed",
-                            this.getTaskFullName(), executionState.get()))));
-                }
+        synchronized (this) {
+            ExecutionState currentState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
+            if (ExecutionState.DEPLOYING.equals(currentState)) {
+                taskGroupConsumer.accept(taskGroupImmutableInformation);
+                updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
             }
-        } else if (ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
-            turnToEndState(ExecutionState.CANCELED);
-            taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, executionState.get(), null));
-        } else {
-            turnToEndState(ExecutionState.FAILED);
-            taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, executionState.get(),
-                new JobException(String.format("%s turn to a unexpected state", getTaskFullName()))));
         }
     }
 
@@ -231,63 +235,71 @@ public class PhysicalVertex {
     }
 
     private boolean turnToEndState(@NonNull ExecutionState endState) {
-        // consistency check
-        if (executionState.get().isEndState()) {
-            String message = String.format("Task %s is already in terminal state %s", taskFullName, executionState.get());
-            LOGGER.warning(message);
-            return false;
-        }
-        if (!endState.isEndState()) {
-            String message = String.format("Turn task %s state to end state need gave a end state, not %s", taskFullName, endState);
-            LOGGER.warning(message);
-            return false;
-        }
+        synchronized (this) {
+            // consistency check
+            ExecutionState currentState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
+            if (currentState.isEndState()) {
+                String message = String.format("Task %s is already in terminal state %s", taskFullName, currentState);
+                LOGGER.warning(message);
+                return false;
+            }
+            if (!endState.isEndState()) {
+                String message = String.format("Turn task %s state to end state need gave a end state, not %s", taskFullName, endState);
+                LOGGER.warning(message);
+                return false;
+            }
+
+            LOGGER.info(String.format("%s turn to end state %s.",
+                taskFullName,
+                endState));
+            updateStateTimestamps(endState);
 
-        LOGGER.info(String.format("%s turn to end state %s.",
-            taskFullName,
-            endState));
-        executionState.set(endState);
-        stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
-        return true;
+            runningJobStateIMap.set(taskGroupLocation, endState);
+            return true;
+        }
     }
 
     public boolean updateTaskState(@NonNull ExecutionState current, @NonNull ExecutionState targetState) {
-        // consistency check
-        if (current.isEndState()) {
-            String message = "Task is trying to leave terminal state " + current;
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
-        }
+        synchronized (this) {
+            // consistency check
+            if (current.isEndState()) {
+                String message = "Task is trying to leave terminal state " + current;
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
 
-        if (ExecutionState.SCHEDULED.equals(targetState) && !ExecutionState.CREATED.equals(current)) {
-            String message = "Only [CREATED] task can turn to [SCHEDULED]" + current;
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
-        }
+            if (ExecutionState.SCHEDULED.equals(targetState) && !ExecutionState.CREATED.equals(current)) {
+                String message = "Only [CREATED] task can turn to [SCHEDULED]" + current;
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
 
-        if (ExecutionState.DEPLOYING.equals(targetState) && !ExecutionState.SCHEDULED.equals(current)) {
-            String message = "Only [SCHEDULED] task can turn to [DEPLOYING]" + current;
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
-        }
+            if (ExecutionState.DEPLOYING.equals(targetState) && !ExecutionState.SCHEDULED.equals(current)) {
+                String message = "Only [SCHEDULED] task can turn to [DEPLOYING]" + current;
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
 
-        if (ExecutionState.RUNNING.equals(targetState) && !ExecutionState.DEPLOYING.equals(current)) {
-            String message = "Only [DEPLOYING] task can turn to [RUNNING]" + current;
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
-        }
+            if (ExecutionState.RUNNING.equals(targetState) && !ExecutionState.DEPLOYING.equals(current)) {
+                String message = "Only [DEPLOYING] task can turn to [RUNNING]" + current;
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
 
-        // now do the actual state transition
-        if (executionState.compareAndSet(current, targetState)) {
-            LOGGER.info(String.format("%s turn from state %s to %s.",
-                taskFullName,
-                current,
-                targetState));
+            // now do the actual state transition
+            if (current.equals(runningJobStateIMap.get(taskGroupLocation))) {
+                LOGGER.info(String.format("%s turn from state %s to %s.",
+                    taskFullName,
+                    current,
+                    targetState));
 
-            stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
-            return true;
-        } else {
-            return false;
+                updateStateTimestamps(targetState);
+
+                runningJobStateIMap.set(taskGroupLocation, targetState);
+                return true;
+            } else {
+                return false;
+            }
         }
     }
 
@@ -297,10 +309,9 @@ public class PhysicalVertex {
 
     public void cancel() {
         if (updateTaskState(ExecutionState.CREATED, ExecutionState.CANCELED) ||
-            updateTaskState(ExecutionState.SCHEDULED, ExecutionState.CANCELED)) {
+            updateTaskState(ExecutionState.SCHEDULED, ExecutionState.CANCELED) ||
+            updateTaskState(ExecutionState.DEPLOYING, ExecutionState.CANCELED)) {
             taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED, null));
-        } else if (updateTaskState(ExecutionState.DEPLOYING, ExecutionState.CANCELING)) {
-            // do nothing, because even if task is deployed to TaskExecutionService, we can do the cancel in deploy method
         } else if (updateTaskState(ExecutionState.RUNNING, ExecutionState.CANCELING)) {
             noticeTaskExecutionServiceCancel();
         }
@@ -314,7 +325,7 @@ public class PhysicalVertex {
             try {
                 i++;
                 nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
-                        new CancelTaskOperation(taskGroup.getTaskGroupLocation()),
+                        new CancelTaskOperation(taskGroupLocation),
                         currentExecutionAddress)
                     .invoke().get();
                 return;
@@ -330,26 +341,38 @@ public class PhysicalVertex {
         }
     }
 
+    private void updateStateTimestamps(@NonNull ExecutionState targetState) {
+        // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+        Long[] stateTimestamps = runningJobStateTimestampsIMap.get(taskGroupLocation);
+        stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
+        runningJobStateTimestampsIMap.set(taskGroupLocation, stateTimestamps);
+
+    }
+
+    public ExecutionState getExecutionState() {
+        return (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
+    }
+
     private void resetExecutionState() {
-        if (!executionState.get().isEndState()) {
-            String message =
-                String.format("%s reset state failed, only end state can be reset, current is %s", getTaskFullName(),
-                    executionState.get());
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
+        synchronized (this) {
+            ExecutionState executionState = getExecutionState();
+            if (!executionState.isEndState()) {
+                String message =
+                    String.format("%s reset state failed, only end state can be reset, current is %s",
+                        getTaskFullName(),
+                        executionState);
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
+            updateStateTimestamps(ExecutionState.CREATED);
+            runningJobStateIMap.set(taskGroupLocation, ExecutionState.CREATED);
         }
-        executionState.set(ExecutionState.CREATED);
-        stateTimestamps[ExecutionState.CREATED.ordinal()] = System.currentTimeMillis();
     }
 
     public void reset() {
         resetExecutionState();
     }
 
-    public AtomicReference<ExecutionState> getExecutionState() {
-        return executionState;
-    }
-
     public String getTaskFullName() {
         return taskFullName;
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PipelineLocation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PipelineLocation.java
new file mode 100644
index 000000000..45609e5ce
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PipelineLocation.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.physical;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@AllArgsConstructor
+@Data
+public class PipelineLocation implements Serializable {
+    private long jobId;
+    private int pipelineId;
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
index bd66dc91e..6dc657976 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
 
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.jet.datamodel.Tuple2;
+import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import lombok.NonNull;
 
@@ -33,17 +34,26 @@ import java.util.concurrent.ExecutorService;
 public class PlanUtils {
 
     public static Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> fromLogicalDAG(@NonNull LogicalDag logicalDag,
-                                                                                 @NonNull NodeEngine nodeEngine,
-                                                                                 @NonNull JobImmutableInformation jobImmutableInformation,
-                                                                                 long initializationTimestamp,
-                                                                                 @NonNull ExecutorService executorService,
-                                                                                 @NonNull FlakeIdGenerator flakeIdGenerator) {
+                                                                      @NonNull NodeEngine nodeEngine,
+                                                                      @NonNull
+                                                                      JobImmutableInformation jobImmutableInformation,
+                                                                      long initializationTimestamp,
+                                                                      @NonNull ExecutorService executorService,
+                                                                      @NonNull FlakeIdGenerator flakeIdGenerator,
+                                                                      @NonNull IMap runningJobStateIMap,
+                                                                      @NonNull IMap runningJobStateTimestampsIMap) {
         return new PhysicalPlanGenerator(
-                new ExecutionPlanGenerator(logicalDag, jobImmutableInformation, initializationTimestamp).generate(),
-                nodeEngine,
+            new ExecutionPlanGenerator(
+                logicalDag,
                 jobImmutableInformation,
-                initializationTimestamp,
-                executorService,
-                flakeIdGenerator).generate();
+                initializationTimestamp)
+                .generate(),
+            nodeEngine,
+            jobImmutableInformation,
+            initializationTimestamp,
+            executorService,
+            flakeIdGenerator,
+            runningJobStateIMap,
+            runningJobStateTimestampsIMap).generate();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index ff7264636..b754f50ee 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -23,16 +23,17 @@ import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.PipelineState;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.master.JobMaster;
 
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
+import com.hazelcast.map.IMap;
 import lombok.NonNull;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 public class SubPlan {
@@ -54,17 +55,17 @@ public class SubPlan {
 
     private AtomicInteger failedTaskNum = new AtomicInteger(0);
 
-    private AtomicReference<PipelineState> pipelineState = new AtomicReference<>();
-
     private final String pipelineFullName;
 
+    private final IMap<Object, Object> runningJobStateIMap;
+
     /**
      * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
      * pipeline transitioned into a certain state. The index into this array is the ordinal
      * of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
      * stateTimestamps[RUNNING.ordinal()]}.
      */
-    private final long[] stateTimestamps;
+    private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
 
     /**
      * Complete this future when this sub plan complete. When this future completed, the waitForCompleteBySubPlan in {@link PhysicalPlan }
@@ -72,24 +73,48 @@ public class SubPlan {
      */
     private CompletableFuture<PipelineState> pipelineFuture;
 
+    private final PipelineLocation pipelineLocation;
+
     private final ExecutorService executorService;
 
+    private JobMaster jobMaster;
+
+    private PassiveCompletableFuture<Void> reSchedulerPipelineFuture;
+
+    private Integer pipelineRestoreNum;
+
     public SubPlan(int pipelineId,
                    int totalPipelineNum,
                    long initializationTimestamp,
                    @NonNull List<PhysicalVertex> physicalVertexList,
                    @NonNull List<PhysicalVertex> coordinatorVertexList,
                    @NonNull JobImmutableInformation jobImmutableInformation,
-                   @NonNull ExecutorService executorService) {
+                   @NonNull ExecutorService executorService,
+                   @NonNull IMap runningJobStateIMap,
+                   @NonNull IMap runningJobStateTimestampsIMap) {
         this.pipelineId = pipelineId;
+        this.pipelineLocation = new PipelineLocation(jobImmutableInformation.getJobId(), pipelineId);
         this.pipelineFuture = new CompletableFuture<>();
         this.totalPipelineNum = totalPipelineNum;
         this.physicalVertexList = physicalVertexList;
         this.coordinatorVertexList = coordinatorVertexList;
-        stateTimestamps = new long[PipelineState.values().length];
-        this.stateTimestamps[PipelineState.INITIALIZING.ordinal()] = initializationTimestamp;
-        this.pipelineState.set(PipelineState.CREATED);
-        this.stateTimestamps[PipelineState.CREATED.ordinal()] = System.currentTimeMillis();
+        pipelineRestoreNum = 0;
+
+        Long[] stateTimestamps = new Long[PipelineState.values().length];
+        if (runningJobStateTimestampsIMap.get(pipelineLocation) == null) {
+            stateTimestamps[PipelineState.INITIALIZING.ordinal()] = initializationTimestamp;
+            runningJobStateTimestampsIMap.put(pipelineLocation, stateTimestamps);
+
+        }
+
+        if (runningJobStateIMap.get(pipelineLocation) == null) {
+            // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+            stateTimestamps[PipelineState.CREATED.ordinal()] = System.currentTimeMillis();
+            runningJobStateTimestampsIMap.put(pipelineLocation, stateTimestamps);
+
+            runningJobStateIMap.put(pipelineLocation, PipelineState.CREATED);
+        }
+
         this.jobImmutableInformation = jobImmutableInformation;
         this.pipelineFullName = String.format(
             "Job %s (%s), Pipeline: [(%d/%d)]",
@@ -98,16 +123,18 @@ public class SubPlan {
             pipelineId,
             totalPipelineNum);
 
+        this.runningJobStateIMap = runningJobStateIMap;
+        this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
         this.executorService = executorService;
     }
 
     public PassiveCompletableFuture<PipelineState> initStateFuture() {
-        physicalVertexList.forEach(m -> {
-            addPhysicalVertexCallBack(m.initStateFuture());
+        physicalVertexList.stream().forEach(physicalVertex -> {
+            addPhysicalVertexCallBack(physicalVertex.initStateFuture());
         });
 
-        coordinatorVertexList.forEach(m -> {
-            addPhysicalVertexCallBack(m.initStateFuture());
+        coordinatorVertexList.stream().forEach(coordinator -> {
+            addPhysicalVertexCallBack(coordinator.initStateFuture());
         });
 
         this.pipelineFuture = new CompletableFuture<>();
@@ -124,12 +151,6 @@ public class SubPlan {
                     this.getPipelineFullName()));
                 failedTaskNum.incrementAndGet();
                 cancelPipeline();
-            } else if (!ExecutionState.FINISHED.equals(executionState.getExecutionState())) {
-                LOGGER.severe(String.format(
-                    "Task Failed in %s, with Unknown ExecutionState, Begin to cancel other tasks in this pipeline.",
-                    this.getPipelineFullName()));
-                failedTaskNum.incrementAndGet();
-                cancelPipeline();
             }
 
             if (finishedTaskNum.incrementAndGet() == (physicalVertexList.size() + coordinatorVertexList.size())) {
@@ -143,88 +164,93 @@ public class SubPlan {
                     turnToEndState(PipelineState.FINISHED);
                     LOGGER.info(String.format("%s end with state FINISHED", this.pipelineFullName));
                 }
-                this.pipelineFuture.complete(pipelineState.get());
+                pipelineFuture.complete((PipelineState) runningJobStateIMap.get(pipelineLocation));
             }
         });
     }
 
     private void turnToEndState(@NonNull PipelineState endState) {
-        // consistency check
-        if (pipelineState.get().isEndState()) {
-            String message = "Pipeline is trying to leave terminal state " + pipelineState.get();
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
-        }
+        synchronized (this) {
+            // consistency check
+            PipelineState current = (PipelineState) runningJobStateIMap.get(pipelineLocation);
+            if (current.isEndState()) {
+                String message = "Pipeline is trying to leave terminal state " + current;
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
 
-        if (!endState.isEndState()) {
-            String message = "Need a end state, not " + endState;
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
-        }
+            if (!endState.isEndState()) {
+                String message = "Need a end state, not " + endState;
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
 
-        pipelineState.set(endState);
-        stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
-    }
+            // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+            updateStateTimestamps(endState);
 
-    private void resetPipelineState() {
-        if (!pipelineState.get().isEndState()) {
-            String message = String.format("%s reset state failed, only end state can be reset, current is %s",
-                getPipelineFullName(), pipelineState.get());
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
+            runningJobStateIMap.set(pipelineLocation, endState);
         }
-
-        pipelineState.set(PipelineState.CREATED);
-        stateTimestamps[PipelineState.CREATED.ordinal()] = System.currentTimeMillis();
     }
 
     public boolean updatePipelineState(@NonNull PipelineState current, @NonNull PipelineState targetState) {
-        // consistency check
-        if (current.isEndState()) {
-            String message = "Pipeline is trying to leave terminal state " + current;
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
-        }
+        synchronized (this) {
+            // consistency check
+            if (current.isEndState()) {
+                String message = "Pipeline is trying to leave terminal state " + current;
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
 
-        if (PipelineState.SCHEDULED.equals(targetState) && !PipelineState.CREATED.equals(current)) {
-            String message = "Only [CREATED] pipeline can turn to [SCHEDULED]" + current;
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
-        }
+            if (PipelineState.SCHEDULED.equals(targetState) && !PipelineState.CREATED.equals(current)) {
+                String message = "Only [CREATED] pipeline can turn to [SCHEDULED]" + current;
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
 
-        if (PipelineState.DEPLOYING.equals(targetState) && !PipelineState.SCHEDULED.equals(current)) {
-            String message = "Only [SCHEDULED] pipeline can turn to [DEPLOYING]" + current;
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
-        }
+            if (PipelineState.DEPLOYING.equals(targetState) && !PipelineState.SCHEDULED.equals(current)) {
+                String message = "Only [SCHEDULED] pipeline can turn to [DEPLOYING]" + current;
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
 
-        if (PipelineState.RUNNING.equals(targetState) && !PipelineState.DEPLOYING.equals(current)) {
-            String message = "Only [DEPLOYING] pipeline can turn to [RUNNING]" + current;
-            LOGGER.severe(message);
-            throw new IllegalStateException(message);
-        }
+            if (PipelineState.RUNNING.equals(targetState) && !PipelineState.DEPLOYING.equals(current)) {
+                String message = "Only [DEPLOYING] pipeline can turn to [RUNNING]" + current;
+                LOGGER.severe(message);
+                throw new IllegalStateException(message);
+            }
 
-        // now do the actual state transition
-        if (pipelineState.compareAndSet(current, targetState)) {
-            LOGGER.info(String.format("%s turn from state %s to %s.",
-                pipelineFullName,
-                current,
-                targetState));
+            // now do the actual state transition
+            if (current.equals(runningJobStateIMap.get(pipelineLocation))) {
+                LOGGER.info(String.format("%s turn from state %s to %s.",
+                    pipelineFullName,
+                    current,
+                    targetState));
 
-            stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
-            return true;
-        } else {
-            return false;
+                // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+                updateStateTimestamps(targetState);
+
+                runningJobStateIMap.set(pipelineLocation, targetState);
+                return true;
+            } else {
+                return false;
+            }
         }
     }
 
     public void cancelPipeline() {
-        if (pipelineState.get().isEndState()) {
-            LOGGER.warning(String.format("%s is in end state %s, can not be cancel", pipelineFullName, pipelineState.get()));
+        if (getPipelineState().isEndState()) {
+            LOGGER.warning(
+                String.format("%s is in end state %s, can not be cancel", pipelineFullName, getPipelineState()));
             return;
         }
-        updatePipelineState(pipelineState.get(), PipelineState.CANCELING);
-        cancelPipelineTasks();
+        // If an active Master Node done and another Master Node active, we can not know whether canceled pipeline
+        // complete. So we need cancel running pipeline again.
+        if (PipelineState.CANCELING.equals((PipelineState) runningJobStateIMap.get(pipelineLocation))) {
+            LOGGER.info(String.format("%s already in state CANCELING, skip cancel", pipelineFullName));
+        } else {
+            updatePipelineState(getPipelineState(), PipelineState.CANCELING);
+            cancelPipelineTasks();
+        }
     }
 
     private void cancelPipelineTasks() {
@@ -248,8 +274,8 @@ public class SubPlan {
     }
 
     private CompletableFuture<Void> cancelTask(@NonNull PhysicalVertex task) {
-        if (!task.getExecutionState().get().isEndState() &&
-            !ExecutionState.CANCELING.equals(task.getExecutionState().get())) {
+        if (!task.getExecutionState().isEndState() &&
+            !ExecutionState.CANCELING.equals(task.getExecutionState())) {
             CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                 task.cancel();
                 return null;
@@ -262,7 +288,7 @@ public class SubPlan {
     /**
      * Before restore a pipeline, the pipeline must do reset
      */
-    public void reset() {
+    private void reset() {
         resetPipelineState();
         finishedTaskNum.set(0);
         canceledTaskNum.set(0);
@@ -277,8 +303,66 @@ public class SubPlan {
         });
     }
 
-    public int getPipelineId() {
-        return pipelineId;
+    private void updateStateTimestamps(@NonNull PipelineState targetState) {
+        // we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
+        Long[] stateTimestamps = runningJobStateTimestampsIMap.get(pipelineLocation);
+        stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
+        runningJobStateTimestampsIMap.set(pipelineLocation, stateTimestamps);
+
+    }
+
+    private void resetPipelineState() {
+        PipelineState pipelineState = getPipelineState();
+        if (!pipelineState.isEndState()) {
+            String message = String.format("%s reset state failed, only end state can be reset, current is %s",
+                getPipelineFullName(), pipelineState);
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        updateStateTimestamps(PipelineState.CREATED);
+        runningJobStateIMap.set(pipelineLocation, PipelineState.CREATED);
+    }
+
+    /**
+     * restore the pipeline when pipeline failed or canceled by error.
+     */
+    public void restorePipeline() {
+        synchronized (pipelineRestoreNum) {
+            try {
+                pipelineRestoreNum++;
+                LOGGER.info(String.format("Restore pipeline %s", pipelineFullName));
+                // We must ensure the scheduler complete and then can handle pipeline state change.
+                jobMaster.getScheduleFuture().join();
+
+                if (reSchedulerPipelineFuture != null) {
+                    reSchedulerPipelineFuture.join();
+                }
+                reset();
+                jobMaster.getPhysicalPlan().addPipelineEndCallback(this);
+                reSchedulerPipelineFuture = jobMaster.reSchedulerPipeline(this);
+                if (reSchedulerPipelineFuture != null) {
+                    reSchedulerPipelineFuture.join();
+                }
+            } catch (Throwable e) {
+                LOGGER.severe(
+                    String.format("Restore pipeline %s error with exception: %s", pipelineFullName,
+                        ExceptionUtils.getMessage(e)));
+                cancelPipeline();
+            }
+        }
+    }
+
+    /**
+     * restore the pipeline state after new Master Node active
+     */
+    public void restorePipelineState() {
+        // only need restore from RUNNING or CANCELING state
+        if (getPipelineState().ordinal() < PipelineState.RUNNING.ordinal()) {
+            restorePipeline();
+        } else if (PipelineState.CANCELING.equals(getPipelineState())) {
+            cancelPipelineTasks();
+        }
     }
 
     public List<PhysicalVertex> getPhysicalVertexList() {
@@ -293,7 +377,19 @@ public class SubPlan {
         return pipelineFullName;
     }
 
-    public AtomicReference<PipelineState> getPipelineState() {
-        return pipelineState;
+    public PipelineState getPipelineState() {
+        return (PipelineState) runningJobStateIMap.get(pipelineLocation);
+    }
+
+    public PipelineLocation getPipelineLocation() {
+        return pipelineLocation;
+    }
+
+    public void setJobMaster(JobMaster jobMaster) {
+        this.jobMaster = jobMaster;
+    }
+
+    public int getPipelineRestoreNum() {
+        return pipelineRestoreNum;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index c55bc07fc..5e76f2b5a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -29,10 +29,11 @@ import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointStorageConfiguration;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
-import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
 import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
 import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
@@ -46,6 +47,7 @@ import com.hazelcast.jet.datamodel.Tuple2;
 import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
+import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import lombok.NonNull;
 import org.apache.commons.collections4.CollectionUtils;
@@ -53,7 +55,6 @@ import org.apache.commons.collections4.CollectionUtils;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 
 public class JobMaster implements Runnable {
@@ -77,24 +78,37 @@ public class JobMaster implements Runnable {
     private JobImmutableInformation jobImmutableInformation;
 
     private JobScheduler jobScheduler;
-    private final Map<Integer, Map<PhysicalVertex, SlotProfile>> ownedSlotProfiles;
+
+    /**
+     * we need store slot used by task in Hazelcast IMap and release or reuse it when a new master node active.
+     */
+    private final IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;
+
+    private final IMap<Object, Object> runningJobStateIMap;
+
+    private final IMap<Object, Object> runningJobStateTimestampsIMap;
 
     private CompletableFuture<Void> scheduleFuture = new CompletableFuture<>();
 
     public JobMaster(@NonNull Data jobImmutableInformationData,
                      @NonNull NodeEngine nodeEngine,
                      @NonNull ExecutorService executorService,
-                     @NonNull ResourceManager resourceManager) {
+                     @NonNull ResourceManager resourceManager,
+                     @NonNull IMap runningJobStateIMap,
+                     @NonNull IMap runningJobStateTimestampsIMap,
+                     @NonNull IMap ownedSlotProfilesIMap) {
         this.jobImmutableInformationData = jobImmutableInformationData;
         this.nodeEngine = nodeEngine;
         this.executorService = executorService;
         flakeIdGenerator =
             this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
-        this.ownedSlotProfiles = new ConcurrentHashMap<>();
+        this.ownedSlotProfilesIMap = ownedSlotProfilesIMap;
         this.resourceManager = resourceManager;
+        this.runningJobStateIMap = runningJobStateIMap;
+        this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
     }
 
-    public void init() throws Exception {
+    public void init(long initializationTimestamp) throws Exception {
         jobImmutableInformation = nodeEngine.getSerializationService().toObject(
             jobImmutableInformationData);
         LOGGER.info("Job [" + jobImmutableInformation.getJobId() + "] submit");
@@ -113,9 +127,11 @@ public class JobMaster implements Runnable {
         final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple = PlanUtils.fromLogicalDAG(logicalDag,
             nodeEngine,
             jobImmutableInformation,
-            System.currentTimeMillis(),
+            initializationTimestamp,
             executorService,
-            flakeIdGenerator);
+            flakeIdGenerator,
+            runningJobStateIMap,
+            runningJobStateTimestampsIMap);
         this.physicalPlan = planTuple.f0();
         this.checkpointManager = new CheckpointManager(
             jobImmutableInformation.getJobId(),
@@ -162,19 +178,20 @@ public class JobMaster implements Runnable {
 
     public void handleCheckpointTimeout(long pipelineId) {
         this.physicalPlan.getPipelineList().forEach(pipeline -> {
-            if (pipeline.getPipelineId() == pipelineId) {
+            if (pipeline.getPipelineLocation().getPipelineId() == pipelineId) {
                 pipeline.cancelPipeline();
             }
         });
     }
 
-    public CompletableFuture<Void> reSchedulerPipeline(SubPlan subPlan) {
-        return jobScheduler.reSchedulerPipeline(subPlan);
+    public PassiveCompletableFuture<Void> reSchedulerPipeline(SubPlan subPlan) {
+        return new PassiveCompletableFuture<>(jobScheduler.reSchedulerPipeline(subPlan));
     }
 
-    public void releasePipelineResource(int pipelineId) {
+    public void releasePipelineResource(SubPlan subPlan) {
         resourceManager.releaseResources(jobImmutableInformation.getJobId(),
-            Lists.newArrayList(ownedSlotProfiles.get(pipelineId).values()));
+            Lists.newArrayList(ownedSlotProfilesIMap.get(subPlan.getPipelineLocation()).values())).join();
+        ownedSlotProfilesIMap.remove(subPlan.getPipelineLocation());
     }
 
     public void cleanJob() {
@@ -182,12 +199,12 @@ public class JobMaster implements Runnable {
     }
 
     public Address queryTaskGroupAddress(long taskGroupId) {
-        for (Integer pipelineId : ownedSlotProfiles.keySet()) {
-            Optional<PhysicalVertex> currentVertex = ownedSlotProfiles.get(pipelineId).keySet().stream()
-                .filter(task -> task.getTaskGroupLocation().getTaskGroupId() == taskGroupId)
+        for (PipelineLocation pipelineLocation : ownedSlotProfilesIMap.keySet()) {
+            Optional<TaskGroupLocation> currentVertex = ownedSlotProfilesIMap.get(pipelineLocation).keySet().stream()
+                .filter(taskGroupLocation -> taskGroupLocation.getTaskGroupId() == taskGroupId)
                 .findFirst();
             if (currentVertex.isPresent()) {
-                return ownedSlotProfiles.get(pipelineId).get(currentVertex.get()).getWorker();
+                return ownedSlotProfilesIMap.get(pipelineLocation).get(currentVertex.get()).getWorker();
             }
         }
         throw new IllegalArgumentException("can't find task group address from task group id: " + taskGroupId);
@@ -224,7 +241,7 @@ public class JobMaster implements Runnable {
 
     public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
         this.physicalPlan.getPipelineList().forEach(pipeline -> {
-            if (pipeline.getPipelineId() != taskExecutionState.getTaskGroupLocation().getPipelineId()) {
+            if (pipeline.getPipelineLocation().getPipelineId() != taskExecutionState.getTaskGroupLocation().getPipelineId()) {
                 return;
             }
 
@@ -246,16 +263,20 @@ public class JobMaster implements Runnable {
         });
     }
 
-    public Map<Integer, Map<PhysicalVertex, SlotProfile>> getOwnedSlotProfiles() {
-        return ownedSlotProfiles;
+    public Map<TaskGroupLocation, SlotProfile> getOwnedSlotProfiles(PipelineLocation pipelineLocation) {
+        return ownedSlotProfilesIMap.get(pipelineLocation);
     }
 
-    public void setOwnedSlotProfiles(@NonNull Integer pipelineId,
-        @NonNull Map<PhysicalVertex, SlotProfile> pipelineOwnedSlotProfiles) {
-        ownedSlotProfiles.put(pipelineId, pipelineOwnedSlotProfiles);
+    public void setOwnedSlotProfiles(@NonNull PipelineLocation pipelineLocation,
+                                     @NonNull Map<TaskGroupLocation, SlotProfile> pipelineOwnedSlotProfiles) {
+        ownedSlotProfilesIMap.put(pipelineLocation, pipelineOwnedSlotProfiles);
     }
 
     public CompletableFuture<Void> getScheduleFuture() {
         return scheduleFuture;
     }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
index 9d6d4cd14..f47a3da0a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.operation;
 
+import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
@@ -27,6 +28,8 @@ import com.hazelcast.spi.impl.AllowedDuringPassiveState;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 public class GetJobStatusOperation extends Operation implements IdentifiedDataSerializable, AllowedDuringPassiveState {
     private long jobId;
@@ -65,7 +68,15 @@ public class GetJobStatusOperation extends Operation implements IdentifiedDataSe
     @Override
     public void run() {
         SeaTunnelServer service = getService();
-        response = service.getJobStatus(jobId).ordinal();
+        CompletableFuture<JobStatus> future = CompletableFuture.supplyAsync(() -> {
+            return service.getJobStatus(jobId);
+        });
+
+        try {
+            response = future.get().ordinal();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index a3d27cd75..52dd0410e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
 import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.master.JobMaster;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
@@ -39,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 public class PipelineBaseScheduler implements JobScheduler {
@@ -58,7 +60,7 @@ public class PipelineBaseScheduler implements JobScheduler {
 
     @Override
     public void startScheduling() {
-        if (physicalPlan.turnToRunning()) {
+        if (physicalPlan.updateJobState(JobStatus.CREATED, JobStatus.SCHEDULED)) {
             List<CompletableFuture<Void>> collect =
                 physicalPlan.getPipelineList()
                     .stream()
@@ -68,6 +70,7 @@ public class PipelineBaseScheduler implements JobScheduler {
                 CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
                     collect.toArray(new CompletableFuture[0]));
                 voidCompletableFuture.get();
+                physicalPlan.updateJobState(JobStatus.SCHEDULED, JobStatus.RUNNING);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
@@ -85,50 +88,63 @@ public class PipelineBaseScheduler implements JobScheduler {
                 return null;
             }
 
-            Map<PhysicalVertex, SlotProfile> slotProfiles =
-                getOrApplyResourceForPipeline(pipeline, jobMaster.getOwnedSlotProfiles().get(pipeline.getPipelineId()));
+            Map<TaskGroupLocation, SlotProfile> slotProfiles =
+                getOrApplyResourceForPipeline(pipeline, jobMaster.getOwnedSlotProfiles(pipeline.getPipelineLocation()));
 
             // To ensure release pipeline resource after new master node active, we need store slotProfiles first and then deploy tasks.
-            jobMaster.setOwnedSlotProfiles(pipeline.getPipelineId(), slotProfiles);
+            jobMaster.setOwnedSlotProfiles(pipeline.getPipelineLocation(), slotProfiles);
             // deploy pipeline
             return CompletableFuture.runAsync(() -> {
                 deployPipeline(pipeline, slotProfiles);
-            });
+            }, jobMaster.getExecutorService());
         } catch (Exception e) {
             pipeline.cancelPipeline();
             return null;
         }
     }
 
-    private Map<PhysicalVertex, SlotProfile> getOrApplyResourceForPipeline(@NonNull SubPlan pipeline,
-                                                                           Map<PhysicalVertex, SlotProfile> ownedSlotProfiles) {
+    private Map<TaskGroupLocation, SlotProfile> getOrApplyResourceForPipeline(@NonNull SubPlan pipeline,
+                                                                              Map<TaskGroupLocation, SlotProfile> ownedSlotProfiles) {
         if (ownedSlotProfiles == null || ownedSlotProfiles.isEmpty()) {
             return applyResourceForPipeline(pipeline);
         }
 
         // TODO ensure the slots still exist and is owned by this pipeline
-        for (Map.Entry<PhysicalVertex, SlotProfile> entry : ownedSlotProfiles.entrySet()) {
-            if (entry.getValue() == null) {
-                ownedSlotProfiles.put(entry.getKey(), applyResourceForTask(entry.getKey()).join());
-            } else {
-                entry.getKey().updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
-            }
+        Map<TaskGroupLocation, SlotProfile> currentOwnedSlotProfiles = new ConcurrentHashMap<>();
+        pipeline.getCoordinatorVertexList().forEach(
+            coordinator -> currentOwnedSlotProfiles.put(coordinator.getTaskGroupLocation(),
+                getOrApplyResourceForTask(coordinator, ownedSlotProfiles)));
+
+        pipeline.getPhysicalVertexList().forEach(
+            task -> currentOwnedSlotProfiles.put(task.getTaskGroupLocation(),
+                getOrApplyResourceForTask(task, ownedSlotProfiles)));
+
+        return currentOwnedSlotProfiles;
+    }
+
+    private SlotProfile getOrApplyResourceForTask(@NonNull PhysicalVertex task,
+                                                  Map<TaskGroupLocation, SlotProfile> ownedSlotProfiles) {
+
+        if (ownedSlotProfiles == null || ownedSlotProfiles.isEmpty() ||
+            ownedSlotProfiles.get(task.getTaskGroupLocation()) == null) {
+            return applyResourceForTask(task).join();
         }
-        return ownedSlotProfiles;
+        task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
+        return ownedSlotProfiles.get(task.getTaskGroupLocation());
     }
 
-    private Map<PhysicalVertex, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) {
-        Map<PhysicalVertex, CompletableFuture<SlotProfile>> futures = new HashMap<>();
-        Map<PhysicalVertex, SlotProfile> slotProfiles = new HashMap<>();
+    private Map<TaskGroupLocation, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) {
+        Map<TaskGroupLocation, CompletableFuture<SlotProfile>> futures = new HashMap<>();
+        Map<TaskGroupLocation, SlotProfile> slotProfiles = new HashMap<>();
         // TODO If there is no enough resources for tasks, we need add some wait profile
         subPlan.getCoordinatorVertexList()
             .forEach(
-                coordinator -> futures.put(coordinator, applyResourceForTask(coordinator)));
+                coordinator -> futures.put(coordinator.getTaskGroupLocation(), applyResourceForTask(coordinator)));
 
         subPlan.getPhysicalVertexList()
-            .forEach(task -> futures.put(task, applyResourceForTask(task)));
+            .forEach(task -> futures.put(task.getTaskGroupLocation(), applyResourceForTask(task)));
 
-        for (Map.Entry<PhysicalVertex, CompletableFuture<SlotProfile>> future : futures.entrySet()) {
+        for (Map.Entry<TaskGroupLocation, CompletableFuture<SlotProfile>> future : futures.entrySet()) {
             slotProfiles.put(future.getKey(),
                 future.getValue() == null ? null : future.getValue().join());
         }
@@ -140,20 +156,20 @@ public class PipelineBaseScheduler implements JobScheduler {
             if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
                 // TODO custom resource size
                 return resourceManager.applyResource(jobId, new ResourceProfile());
-            } else if (ExecutionState.CANCELING.equals(task.getExecutionState().get()) ||
-                ExecutionState.CANCELED.equals(task.getExecutionState().get())) {
+            } else if (ExecutionState.CANCELING.equals(task.getExecutionState()) ||
+                ExecutionState.CANCELED.equals(task.getExecutionState())) {
                 LOGGER.info(
                     String.format("%s be canceled, skip %s this task.", task.getTaskFullName(),
                         ExecutionState.SCHEDULED));
                 return null;
             } else {
-                makeTaskFailed(task,
+                makeTaskFailed(task.getTaskGroupLocation(),
                     new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job.",
-                        task.getTaskFullName(), task.getExecutionState().get())));
+                        task.getTaskFullName(), task.getExecutionState())));
                 return null;
             }
         } catch (Throwable e) {
-            makeTaskFailed(task, e);
+            makeTaskFailed(task.getTaskGroupLocation(), e);
             return null;
         }
     }
@@ -164,8 +180,8 @@ public class PipelineBaseScheduler implements JobScheduler {
             return CompletableFuture.runAsync(() -> {
                 task.deploy(slotProfile);
             });
-        } else if (ExecutionState.CANCELING.equals(task.getExecutionState().get()) ||
-            ExecutionState.CANCELED.equals(task.getExecutionState().get())) {
+        } else if (ExecutionState.CANCELING.equals(task.getExecutionState()) ||
+            ExecutionState.CANCELED.equals(task.getExecutionState())) {
             LOGGER.info(
                 String.format("%s be canceled, skip %s this task.", task.getTaskFullName(), ExecutionState.DEPLOYING));
             return null;
@@ -175,22 +191,24 @@ public class PipelineBaseScheduler implements JobScheduler {
                     task.getTaskGroupLocation(),
                     ExecutionState.FAILED,
                     new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job.",
-                        task.getTaskFullName(), task.getExecutionState().get()))));
+                        task.getTaskFullName(), task.getExecutionState()))));
             return null;
         }
     }
 
-    private void deployPipeline(@NonNull SubPlan pipeline, Map<PhysicalVertex, SlotProfile> slotProfiles) {
+    private void deployPipeline(@NonNull SubPlan pipeline, Map<TaskGroupLocation, SlotProfile> slotProfiles) {
         if (pipeline.updatePipelineState(PipelineState.SCHEDULED, PipelineState.DEPLOYING)) {
 
             try {
                 List<CompletableFuture<?>> deployCoordinatorFuture =
                     pipeline.getCoordinatorVertexList().stream()
-                        .map(coordinator -> deployTask(coordinator, slotProfiles.get(coordinator)))
+                        .map(coordinator -> deployTask(coordinator,
+                            slotProfiles.get(coordinator.getTaskGroupLocation())))
                         .filter(Objects::nonNull).collect(Collectors.toList());
 
                 List<CompletableFuture<?>> deployTaskFuture =
-                    pipeline.getPhysicalVertexList().stream().map(task -> deployTask(task, slotProfiles.get(task)))
+                    pipeline.getPhysicalVertexList().stream()
+                        .map(task -> deployTask(task, slotProfiles.get(task.getTaskGroupLocation())))
                         .filter(Objects::nonNull).collect(Collectors.toList());
 
                 deployCoordinatorFuture.addAll(deployTaskFuture);
@@ -200,20 +218,20 @@ public class PipelineBaseScheduler implements JobScheduler {
                 if (!pipeline.updatePipelineState(PipelineState.DEPLOYING, PipelineState.RUNNING)) {
                     LOGGER.info(
                         String.format("%s turn to state %s, skip the running state.", pipeline.getPipelineFullName(),
-                            pipeline.getPipelineState().get()));
+                            pipeline.getPipelineState()));
                 }
             } catch (Exception e) {
                 makePipelineFailed(pipeline, e);
             }
-        } else if (PipelineState.CANCELING.equals(pipeline.getPipelineState().get()) ||
-            PipelineState.CANCELED.equals(pipeline.getPipelineState().get())) {
+        } else if (PipelineState.CANCELING.equals(pipeline.getPipelineState()) ||
+            PipelineState.CANCELED.equals(pipeline.getPipelineState())) {
             // may be canceled
             LOGGER.info(String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
-                pipeline.getPipelineState().get(), PipelineState.DEPLOYING));
+                pipeline.getPipelineState(), PipelineState.DEPLOYING));
         } else {
             makePipelineFailed(pipeline, new JobException(
                 String.format("%s turn to a unexpected state: %s, stop scheduler job", pipeline.getPipelineFullName(),
-                    pipeline.getPipelineState().get())));
+                    pipeline.getPipelineState())));
         }
     }
 
@@ -223,32 +241,32 @@ public class PipelineBaseScheduler implements JobScheduler {
     }
 
     private void handlePipelineStateTurnError(SubPlan pipeline, PipelineState targetState) {
-        if (PipelineState.CANCELING.equals(pipeline.getPipelineState().get()) ||
-            PipelineState.CANCELED.equals(pipeline.getPipelineState().get())) {
+        if (PipelineState.CANCELING.equals(pipeline.getPipelineState()) ||
+            PipelineState.CANCELED.equals(pipeline.getPipelineState())) {
             // may be canceled
             LOGGER.info(
                 String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
-                    pipeline.getPipelineState().get(), targetState));
+                    pipeline.getPipelineState(), targetState));
         } else {
             throw new JobException(
                 String.format("%s turn to a unexpected state: %s, stop scheduler job",
                     pipeline.getPipelineFullName(),
-                    pipeline.getPipelineState().get()));
+                    pipeline.getPipelineState()));
         }
     }
 
     private void makePipelineFailed(@NonNull SubPlan pipeline, Throwable e) {
         pipeline.getCoordinatorVertexList().forEach(coordinator -> {
-            makeTaskFailed(coordinator, e);
+            makeTaskFailed(coordinator.getTaskGroupLocation(), e);
         });
 
         pipeline.getPhysicalVertexList().forEach(task -> {
-            makeTaskFailed(task, e);
+            makeTaskFailed(task.getTaskGroupLocation(), e);
         });
     }
 
-    private void makeTaskFailed(@NonNull PhysicalVertex task, Throwable e) {
+    private void makeTaskFailed(@NonNull TaskGroupLocation taskGroupLocation, Throwable e) {
         jobMaster.updateTaskExecutionState(
-            new TaskExecutionState(task.getTaskGroupLocation(), ExecutionState.FAILED, e));
+            new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, e));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index 46e05e93c..1daa5fa87 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -41,8 +41,9 @@ public abstract class AbstractSeaTunnelServerTest {
     @Before
     public void before() {
         Config config = new Config();
-        config.setInstanceName(this.getClass().getSimpleName());
-        config.setClusterName(this.getClass().getSimpleName());
+        long time = System.currentTimeMillis();
+        config.setInstanceName(this.getClass().getSimpleName() + "_" + time);
+        config.setClusterName(this.getClass().getSimpleName() + "_" + time);
         instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(config,
             Thread.currentThread().getName(), new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
         nodeEngine = instance.node.nodeEngine;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index 1913f0c84..a289a8854 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -34,6 +34,7 @@ import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
 
+import com.hazelcast.map.IMap;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -55,11 +56,18 @@ public class CheckpointPlanTest extends AbstractSeaTunnelServerTest {
 
         JobImmutableInformation jobInfo = new JobImmutableInformation(1,
             nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
+
+        IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap("testRunningJobState");
+        IMap<Object, Long[]> runningJobStateTimestamp =
+            nodeEngine.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
+
         Map<Integer, CheckpointPlan> checkpointPlans = PlanUtils.fromLogicalDAG(logicalDag, nodeEngine,
             jobInfo,
             System.currentTimeMillis(),
             Executors.newCachedThreadPool(),
-            instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)).f1();
+            instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
+            runningJobState,
+            runningJobStateTimestamp).f1();
         Assert.assertNotNull(checkpointPlans);
         Assert.assertEquals(2, checkpointPlans.size());
         // enum(1) + reader(2) + writer(2)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index d9bb880da..e589df31e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -38,6 +38,7 @@ import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
 
 import com.google.common.collect.Sets;
+import com.hazelcast.map.IMap;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -97,11 +98,17 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
             nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
 
+        IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap("testRunningJobState");
+        IMap<Object, Long[]> runningJobStateTimestamp =
+            nodeEngine.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
+
         PhysicalPlan physicalPlan = PlanUtils.fromLogicalDAG(logicalDag, nodeEngine,
             jobImmutableInformation,
             System.currentTimeMillis(),
             Executors.newCachedThreadPool(),
-            instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)).f0();
+            instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
+            runningJobState,
+            runningJobStateTimestamp).f0();
 
         Assert.assertEquals(physicalPlan.getPipelineList().size(), 1);
         Assert.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(), 1);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 5c98b4bc2..060f67c70 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -27,15 +27,21 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.RunningJobInfo;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.TestUtils;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 
 import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.map.IMap;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -44,6 +50,46 @@ import java.util.concurrent.TimeUnit;
 public class JobMasterTest extends AbstractSeaTunnelServerTest {
     private Long jobId;
 
+    /**
+     * IMap key is jobId and value is a Tuple2
+     * Tuple2 key is JobMaster init timestamp and value is the jobImmutableInformation which is sent by client when submit job
+     * <p>
+     * This IMap is used to recovery runningJobInfoIMap in JobMaster when a new master node active
+     */
+    private IMap<Long, RunningJobInfo> runningJobInfoIMap;
+
+    /**
+     * IMap key is one of jobId {@link org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and
+     * {@link org.apache.seatunnel.engine.server.execution.TaskGroupLocation}
+     * <p>
+     * The value of IMap is one of {@link JobStatus} {@link org.apache.seatunnel.engine.core.job.PipelineState}
+     * {@link org.apache.seatunnel.engine.server.execution.ExecutionState}
+     * <p>
+     * This IMap is used to recovery runningJobStateIMap in JobMaster when a new master node active
+     */
+    IMap<Object, Object> runningJobStateIMap;
+
+    /**
+     * IMap key is one of jobId {@link org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and
+     * {@link org.apache.seatunnel.engine.server.execution.TaskGroupLocation}
+     * <p>
+     * The value of IMap is one of {@link org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan} stateTimestamps
+     * {@link org.apache.seatunnel.engine.server.dag.physical.SubPlan} stateTimestamps
+     * {@link org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex} stateTimestamps
+     * <p>
+     * This IMap is used to recovery runningJobStateTimestampsIMap in JobMaster when a new master node active
+     */
+    IMap<Object, Long[]> runningJobStateTimestampsIMap;
+
+    /**
+     * IMap key is {@link PipelineLocation}
+     * <p>
+     * The value of IMap is map of {@link TaskGroupLocation} and the {@link SlotProfile} it used.
+     * <p>
+     * This IMap is used to recovery ownedSlotProfilesIMap in JobMaster when a new master node active
+     */
+    private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;
+
     @Before
     public void before() {
         super.before();
@@ -90,5 +136,47 @@ public class JobMasterTest extends AbstractSeaTunnelServerTest {
         await().atMost(20000, TimeUnit.MILLISECONDS)
             .untilAsserted(() -> Assert.assertTrue(
                 jobMasterCompleteFuture.isDone() && JobStatus.CANCELED.equals(jobMasterCompleteFuture.get())));
+
+        testIMapRemovedAfterJobComplete(jobMaster);
+    }
+
+    private void testIMapRemovedAfterJobComplete(JobMaster jobMaster) {
+        runningJobInfoIMap = nodeEngine.getHazelcastInstance().getMap("runningJobInfo");
+        runningJobStateIMap = nodeEngine.getHazelcastInstance().getMap("runningJobState");
+        runningJobStateTimestampsIMap = nodeEngine.getHazelcastInstance().getMap("stateTimestamps");
+        ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap("ownedSlotProfilesIMap");
+
+        await().atMost(20000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> {
+                Assert.assertNull(runningJobInfoIMap.get(jobId));
+                Assert.assertNull(runningJobStateIMap.get(jobId));
+                Assert.assertNull(runningJobStateTimestampsIMap.get(jobId));
+                Assert.assertNull(ownedSlotProfilesIMap.get(jobId));
+
+                jobMaster.getPhysicalPlan().getPipelineList().forEach(pipeline -> {
+                    Assert.assertNull(
+                        runningJobStateIMap.get(pipeline.getPipelineLocation()));
+
+                    Assert.assertNull(
+                        runningJobStateTimestampsIMap.get(pipeline.getPipelineLocation()));
+                });
+                jobMaster.getPhysicalPlan().getPipelineList().forEach(pipeline -> {
+                    pipeline.getCoordinatorVertexList().forEach(coordinator -> {
+                        Assert.assertNull(
+                            runningJobStateIMap.get(coordinator.getTaskGroupLocation()));
+
+                        Assert.assertNull(
+                            runningJobStateTimestampsIMap.get(coordinator.getTaskGroupLocation()));
+                    });
+
+                    pipeline.getPhysicalVertexList().forEach(task -> {
+                        Assert.assertNull(
+                            runningJobStateIMap.get(task.getTaskGroupLocation()));
+
+                        Assert.assertNull(
+                            runningJobStateTimestampsIMap.get(task.getTaskGroupLocation()));
+                    });
+                });
+            });
     }
 }