You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/12 06:49:00 UTC

[incubator-seatunnel] branch st-engine updated: [ST-Engine] Add PipelineBaseScheduler For Engine (#2396)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 9fe2d3def [ST-Engine] Add PipelineBaseScheduler For Engine (#2396)
9fe2d3def is described below

commit 9fe2d3def324cad7b37db9608ceebd2f07b13241
Author: Eric <ga...@gmail.com>
AuthorDate: Fri Aug 12 14:48:53 2022 +0800

    [ST-Engine] Add PipelineBaseScheduler For Engine (#2396)
    
    * Add SeaTunnel Engine ConfigProvider and seatunnel-seatunnel-starter
    
    * add source file to licenserc.yaml
    
    * fix checkstyle
    
    * fix error
    
    * tmp
    
    * tmp
    
    * Update PhysicalPlan to support scheduler by pipeline
    
    * remove init in run
    
    * tmp
    
    * complete scheduler
    
    * optimize scheduler
    
    * fix checkstyle
    
    * fix checkstyle
    
    * Update seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
    
    Co-authored-by: Hisoka <fa...@qq.com>
    
    * Update SeaTunnelStarter.java
    
    Co-authored-by: Hisoka <fa...@qq.com>
---
 .../seatunnel/common/utils/ExceptionUtils.java     |  35 ++---
 .../core/starter/seatunnel/SeaTunnelStarter.java   |   7 +-
 .../seatunnel/engine/client/JobConfigParser.java   |  10 +-
 .../engine/client/JobExecutionEnvironment.java     |   3 +-
 .../apache/seatunnel/engine/client/JobProxy.java   |  12 +-
 .../engine/client/SeaTunnelHazelcastClient.java    |   7 +-
 .../engine/client/SeaTunnelClientTest.java         |  11 +-
 .../exception/JobNoEnoughResourceException.java}   |  25 +--
 .../seatunnel/engine/common/utils/IdGenerator.java |   4 +-
 .../engine/common/utils/NonCompletableFuture.java  |   2 +-
 .../engine/core/dag/actions/AbstractAction.java    |   8 +-
 .../seatunnel/engine/core/dag/actions/Action.java  |   2 +-
 .../core/dag/actions/PartitionTransformAction.java |   4 +-
 .../core/dag/actions/PhysicalSourceAction.java     |   4 +-
 .../engine/core/dag/actions/SinkAction.java        |   4 +-
 .../engine/core/dag/actions/SourceAction.java      |   2 +-
 .../engine/core/dag/actions/TransformAction.java   |   4 +-
 .../core/dag/actions/TransformChainAction.java     |   4 +-
 .../core/dag/internal/IntermediateDataQueue.java   |   6 +-
 .../engine/core/dag/logical/LogicalDag.java        |  10 +-
 .../core/dag/logical/LogicalDagGenerator.java      |  10 +-
 .../engine/core/dag/logical/LogicalEdge.java       |  14 +-
 .../engine/core/dag/logical/LogicalVertex.java     |   6 +-
 .../org/apache/seatunnel/engine/core/job/Job.java  |   4 +-
 .../seatunnel/engine/core/job/PipelineState.java   |   2 +-
 .../seatunnel/engine/server/SeaTunnelServer.java   |  13 +-
 .../engine/server/TaskExecutionService.java        |  44 ++++--
 .../engine/server/dag/execution/ExecutionEdge.java |   4 +-
 .../dag/execution/ExecutionPlanGenerator.java      |   8 +-
 .../server/dag/execution/ExecutionVertex.java      |   2 +-
 .../engine/server/dag/execution/Pipeline.java      |   6 +-
 .../server/dag/execution/PipelineGenerator.java    |   2 +-
 .../engine/server/dag/physical/PhysicalPlan.java   |  53 +++++--
 .../server/dag/physical/PhysicalPlanGenerator.java |  37 +++--
 .../engine/server/dag/physical/PhysicalVertex.java | 171 +++++++++++++++++----
 .../engine/server/dag/physical/SubPlan.java        | 113 ++++++++++++--
 .../engine/server/execution/ExecutionState.java    |  22 ++-
 .../seatunnel/engine/server/master/JobMaster.java  |  60 ++++++--
 .../engine/server/operation/AsyncOperation.java    |  11 +-
 ...tJobOperation.java => DeployTaskOperation.java} |  36 +++--
 .../server/operation/SubmitJobOperation.java       |   9 +-
 .../ResourceManager.java}                          |  12 +-
 .../resourcemanager/SimpleResourceManager.java     |  71 +++++++++
 .../engine/server/scheduler/JobScheduler.java      |   2 -
 .../server/scheduler/PipelineBaseScheduler.java    | 121 +++++++++++++++
 .../serializable/OperationDataSerializerHook.java  |   4 +
 .../serializable/TaskDataSerializerHook.java       |   4 +-
 ...nfo.java => TaskGroupImmutableInformation.java} |  24 ++-
 48 files changed, 772 insertions(+), 257 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
similarity index 55%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
copy to seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
index b7def66ad..c4d2d690a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
@@ -15,27 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.dag.execution;
+package org.apache.seatunnel.common.utils;
 
-import java.util.List;
-import java.util.Map;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 
-public class Pipeline {
-    private final List<ExecutionEdge> edges;
-
-    private final Map<Integer, ExecutionVertex> vertexes;
-
-    Pipeline(List<ExecutionEdge> edges, Map<Integer, ExecutionVertex> vertexes) {
-        this.edges = edges;
-        this.vertexes = vertexes;
+public class ExceptionUtils {
+    private ExceptionUtils() {
     }
 
-    public List<ExecutionEdge> getEdges() {
-        return edges;
+    public static String getMessage(Throwable e) {
+        try (StringWriter sw = new StringWriter();
+             PrintWriter pw = new PrintWriter(sw)) {
+            // Output the error stack information to the printWriter
+            e.printStackTrace(pw);
+            pw.flush();
+            sw.flush();
+            return sw.toString();
+        } catch (Exception e1) {
+            e1.printStackTrace();
+            throw new RuntimeException("Failed to print exception logs", e1);
+        }
     }
-
-    public Map<Integer, ExecutionVertex> getVertexes() {
-        return vertexes;
-    }
-
 }
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
index 6c3372ad7..ef202fd97 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
+++ b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.engine.common.config.JobConfig;
 import com.hazelcast.client.config.ClientConfig;
 
 import java.nio.file.Path;
+import java.util.concurrent.ExecutionException;
 
 public class SeaTunnelStarter {
     public static void main(String[] args) {
@@ -43,7 +44,11 @@ public class SeaTunnelStarter {
         SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
         JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig);
 
-        JobProxy jobProxy = jobExecutionEnv.execute();
+        try {
+            JobProxy jobProxy = jobExecutionEnv.execute();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
 
         // TODO wait for job complete and then exit
     }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
index 58b2ee3ad..843296bed 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
@@ -312,7 +312,7 @@ public class JobConfigParser {
         return 1;
     }
 
-    private SourceAction createSourceAction(int id,
+    private SourceAction createSourceAction(long id,
                                             @NonNull String name,
                                             @NonNull SeaTunnelSource source,
                                             List<URL> jarUrls) {
@@ -322,7 +322,7 @@ public class JobConfigParser {
         return new SourceAction(id, name, source, jarUrls);
     }
 
-    private TransformAction createTransformAction(int id,
+    private TransformAction createTransformAction(long id,
                                                   @NonNull String name,
                                                   @NonNull List<Action> upstreams,
                                                   @NonNull SeaTunnelTransform transformation,
@@ -333,7 +333,7 @@ public class JobConfigParser {
         return new TransformAction(id, name, upstreams, transformation, jarUrls);
     }
 
-    private SinkAction createSinkAction(int id,
+    private SinkAction createSinkAction(long id,
                                         @NonNull String name,
                                         @NonNull List<Action> upstreams,
                                         @NonNull SeaTunnelSink sink,
@@ -344,7 +344,7 @@ public class JobConfigParser {
         return new SinkAction(id, name, upstreams, sink, jarUrls);
     }
 
-    private TransformAction createTransformAction(int id,
+    private TransformAction createTransformAction(long id,
                                                   @NonNull String name,
                                                   @NonNull SeaTunnelTransform transformation,
                                                   List<URL> jarUrls) {
@@ -354,7 +354,7 @@ public class JobConfigParser {
         return new TransformAction(id, name, transformation, jarUrls);
     }
 
-    private SinkAction createSinkAction(int id,
+    private SinkAction createSinkAction(long id,
                                         @NonNull String name,
                                         @NonNull SeaTunnelSink sink,
                                         List<URL> jarUrls) {
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
index 94b965460..81a278111 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
@@ -31,6 +31,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
 public class JobExecutionEnvironment {
 
@@ -74,7 +75,7 @@ public class JobExecutionEnvironment {
         return actions;
     }
 
-    public JobProxy execute() {
+    public JobProxy execute() throws ExecutionException, InterruptedException {
         JobClient jobClient = new JobClient(seaTunnelHazelcastClient);
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(
             jobClient.getNewJobId(),
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java
index ab45b5cfa..84b80770d 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java
@@ -17,16 +17,20 @@
 
 package org.apache.seatunnel.engine.client;
 
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
 import org.apache.seatunnel.engine.core.job.Job;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
 
 import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
 import lombok.NonNull;
 
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 public class JobProxy implements Job {
+    private static final ILogger LOGGER = Logger.getLogger(JobProxy.class);
     private SeaTunnelHazelcastClient seaTunnelHazelcastClient;
     private JobImmutableInformation jobImmutableInformation;
 
@@ -42,11 +46,11 @@ public class JobProxy implements Job {
     }
 
     @Override
-    public void submitJob() {
+    public void submitJob() throws ExecutionException, InterruptedException {
         ClientMessage request = SeaTunnelSubmitJobCodec.encodeRequest(
             seaTunnelHazelcastClient.getSerializationService().toData(jobImmutableInformation));
-        CompletableFuture<Void> voidCompletableFuture =
+        NonCompletableFuture<Void> submitJobFuture =
             seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);
-        voidCompletableFuture.join();
+        submitJobFuture.get();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
index 369d290bb..c22faeb1c 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.client;
 
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
 
 import com.hazelcast.client.config.ClientConfig;
 import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
@@ -89,16 +90,16 @@ public class SeaTunnelHazelcastClient {
         }
     }
 
-    public CompletableFuture<Void> requestAndGetCompletableFuture(UUID uuid, ClientMessage request) {
+    public NonCompletableFuture<Void> requestAndGetCompletableFuture(UUID uuid, ClientMessage request) {
         ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
         try {
-            return invocation.invoke().thenApply(c -> null);
+            return new NonCompletableFuture<>(invocation.invoke().thenApply(c -> null));
         } catch (Throwable t) {
             throw ExceptionUtil.rethrow(t);
         }
     }
 
-    public CompletableFuture<Void> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request) {
+    public NonCompletableFuture<Void> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request) {
         UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
         return requestAndGetCompletableFuture(masterUuid, request);
     }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index ee9e2533a..7c97ed7fc 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -35,6 +35,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.util.concurrent.ExecutionException;
+
 @SuppressWarnings("checkstyle:MagicNumber")
 @RunWith(JUnit4.class)
 public class SeaTunnelClientTest {
@@ -70,7 +72,14 @@ public class SeaTunnelClientTest {
         SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
         JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
 
-        JobProxy jobProxy = jobExecutionEnv.execute();
+        JobProxy jobProxy = null;
+        try {
+            jobProxy = jobExecutionEnv.execute();
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
 
         Assert.assertNotNull(jobProxy);
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNoEnoughResourceException.java
similarity index 59%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNoEnoughResourceException.java
index b7def66ad..9a0b89565 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNoEnoughResourceException.java
@@ -15,27 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.dag.execution;
+package org.apache.seatunnel.engine.common.exception;
 
-import java.util.List;
-import java.util.Map;
-
-public class Pipeline {
-    private final List<ExecutionEdge> edges;
-
-    private final Map<Integer, ExecutionVertex> vertexes;
-
-    Pipeline(List<ExecutionEdge> edges, Map<Integer, ExecutionVertex> vertexes) {
-        this.edges = edges;
-        this.vertexes = vertexes;
+public class JobNoEnoughResourceException extends SeaTunnelEngineException {
+    public JobNoEnoughResourceException(String jobName, long jobId, int pipelineIndex, int totalPipelineNum) {
+        super(String.format("Job %s (%s), Pipeline [(%s/%s)] have no enough resource.", jobName, jobId, pipelineIndex + 1, totalPipelineNum));
     }
 
-    public List<ExecutionEdge> getEdges() {
-        return edges;
+    public JobNoEnoughResourceException(String message) {
+        super(message);
     }
 
-    public Map<Integer, ExecutionVertex> getVertexes() {
-        return vertexes;
+    public JobNoEnoughResourceException(String message, Throwable cause) {
+        super(message, cause);
     }
-
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
index 4b10e8d60..0b477a956 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
@@ -26,9 +26,9 @@ import java.io.Serializable;
 public class IdGenerator implements Serializable {
 
     private static final long serialVersionUID = 7683323453014131725L;
-    private int id = 0;
+    private long id = 0;
 
-    public int getNextId() {
+    public long getNextId() {
         id++;
         return id;
     }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
index 5bedc6900..a3c6d2380 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
@@ -62,7 +62,7 @@ public class NonCompletableFuture<T> extends CompletableFuture<T> {
         throw new UnsupportedOperationException("This future can't be completed by an outside caller");
     }
 
-    public void internalComplete(T value) {
+    private void internalComplete(T value) {
         super.complete(value);
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
index 0ede9cc6d..63aad155a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
@@ -27,20 +27,20 @@ public abstract class AbstractAction implements Action {
     private String name;
     private List<Action> upstreams = new ArrayList<>();
     // This is used to assign a unique ID to every Action
-    private int id;
+    private long id;
 
     private int parallelism = 1;
 
     private List<URL> jarUrls;
 
-    protected AbstractAction(int id, @NonNull String name, @NonNull List<Action> upstreams, @NonNull List<URL> jarUrls) {
+    protected AbstractAction(long id, @NonNull String name, @NonNull List<Action> upstreams, @NonNull List<URL> jarUrls) {
         this.id = id;
         this.name = name;
         this.upstreams = upstreams;
         this.jarUrls = jarUrls;
     }
 
-    protected AbstractAction(int id, @NonNull String name, @NonNull List<URL> jarUrls) {
+    protected AbstractAction(long id, @NonNull String name, @NonNull List<URL> jarUrls) {
         this.id = id;
         this.name = name;
         this.jarUrls = jarUrls;
@@ -79,7 +79,7 @@ public abstract class AbstractAction implements Action {
     }
 
     @Override
-    public int getId() {
+    public long getId() {
         return id;
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
index 6abe84bf3..e35e450f9 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
@@ -38,7 +38,7 @@ public interface Action extends Serializable {
 
     void setParallelism(int parallelism);
 
-    int getId();
+    long getId();
 
     List<URL> getJarUrls();
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
index 88e5e5881..c9e8f1d78 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
@@ -27,7 +27,7 @@ import java.util.List;
 public class PartitionTransformAction extends AbstractAction {
     private final PartitionSeaTunnelTransform partitionTransformation;
 
-    public PartitionTransformAction(int id,
+    public PartitionTransformAction(long id,
                                     @NonNull String name,
                                     @NonNull List<Action> upstreams,
                                     @NonNull PartitionSeaTunnelTransform partitionTransformation,
@@ -36,7 +36,7 @@ public class PartitionTransformAction extends AbstractAction {
         this.partitionTransformation = partitionTransformation;
     }
 
-    public PartitionTransformAction(int id,
+    public PartitionTransformAction(long id,
                                     @NonNull String name,
                                     @NonNull PartitionSeaTunnelTransform partitionTransformation,
                                     @NonNull List<URL> jarUrls) {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PhysicalSourceAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PhysicalSourceAction.java
index 8391be830..c95d7785f 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PhysicalSourceAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PhysicalSourceAction.java
@@ -33,7 +33,7 @@ public class PhysicalSourceAction<T, SplitT extends SourceSplit, StateT extends
     private final SeaTunnelSource<T, SplitT, StateT> source;
     private final List<SeaTunnelTransform> transforms;
 
-    public PhysicalSourceAction(int id,
+    public PhysicalSourceAction(long id,
                                 @NonNull String name,
                                 @NonNull SeaTunnelSource<T, SplitT, StateT> source,
                                 @NonNull List<URL> jarUrls,
@@ -43,7 +43,7 @@ public class PhysicalSourceAction<T, SplitT extends SourceSplit, StateT extends
         this.transforms = transforms;
     }
 
-    protected PhysicalSourceAction(int id, @NonNull String name, @NonNull List<Action> upstreams,
+    protected PhysicalSourceAction(long id, @NonNull String name, @NonNull List<Action> upstreams,
                                    @NonNull SeaTunnelSource<T, SplitT, StateT> source,
                                    @NonNull List<URL> jarUrls,
                                    List<SeaTunnelTransform> transforms) {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
index 47c387dcd..7deed1860 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
@@ -28,7 +28,7 @@ import java.util.List;
 public class SinkAction<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends AbstractAction {
     private SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
 
-    public SinkAction(int id,
+    public SinkAction(long id,
                       @NonNull String name,
                       @NonNull List<Action> upstreams,
                       @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
@@ -37,7 +37,7 @@ public class SinkAction<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends
         this.sink = sink;
     }
 
-    public SinkAction(int id,
+    public SinkAction(long id,
                       @NonNull String name,
                       @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
                       @NonNull List<URL> jarUrls) {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
index 41b2bcb63..ed4fb1967 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
@@ -32,7 +32,7 @@ public class SourceAction<T, SplitT extends SourceSplit, StateT extends Serializ
     private static final long serialVersionUID = -4104531889750766731L;
     private final SeaTunnelSource<T, SplitT, StateT> source;
 
-    public SourceAction(int id,
+    public SourceAction(long id,
                         @NonNull String name,
                         @NonNull SeaTunnelSource<T, SplitT, StateT> source,
                         @NonNull List<URL> jarUrls) {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
index c82f41947..5271a2933 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
@@ -27,7 +27,7 @@ import java.util.List;
 public class TransformAction extends AbstractAction {
     private SeaTunnelTransform transform;
 
-    public TransformAction(int id,
+    public TransformAction(long id,
                            @NonNull String name,
                            @NonNull List<Action> upstreams,
                            @NonNull SeaTunnelTransform transform,
@@ -36,7 +36,7 @@ public class TransformAction extends AbstractAction {
         this.transform = transform;
     }
 
-    public TransformAction(int id,
+    public TransformAction(long id,
                            @NonNull String name,
                            @NonNull SeaTunnelTransform transform,
                            @NonNull List<URL> jarUrls) {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
index 339dbc1a0..c18315505 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
@@ -29,7 +29,7 @@ public class TransformChainAction extends AbstractAction {
     private static final long serialVersionUID = -340174711145367535L;
     private final List<SeaTunnelTransform> transforms;
 
-    public TransformChainAction(int id,
+    public TransformChainAction(long id,
                                 @NonNull String name,
                                 @NonNull List<Action> upstreams,
                                 @NonNull List<URL> jarUrls,
@@ -38,7 +38,7 @@ public class TransformChainAction extends AbstractAction {
         this.transforms = transforms;
     }
 
-    public TransformChainAction(int id,
+    public TransformChainAction(long id,
                                 @NonNull String name,
                                 @NonNull List<URL> jarUrls,
                                 @NonNull List<SeaTunnelTransform> transforms) {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/internal/IntermediateDataQueue.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/internal/IntermediateDataQueue.java
index df8776548..c9673b0ba 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/internal/IntermediateDataQueue.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/internal/IntermediateDataQueue.java
@@ -23,17 +23,17 @@ public class IntermediateDataQueue implements Serializable {
 
     private static final long serialVersionUID = -3049265155605303992L;
 
-    private final int id;
+    private final long id;
     private final int parallelism;
     private final String name;
 
-    public IntermediateDataQueue(int id, String name, int parallelism) {
+    public IntermediateDataQueue(long id, String name, int parallelism) {
         this.id = id;
         this.name = name;
         this.parallelism = parallelism;
     }
 
-    public int getId() {
+    public long getId() {
         return id;
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
index 69c7e635c..75fdbad67 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
@@ -60,7 +60,7 @@ public class LogicalDag implements IdentifiedDataSerializable {
     private static final Logger LOG = LoggerFactory.getLogger(LogicalDag.class);
     private JobConfig jobConfig;
     private final Set<LogicalEdge> edges = new LinkedHashSet<>();
-    private final Map<Integer, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();
+    private final Map<Long, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();
     private IdGenerator idGenerator;
 
     public LogicalDag() {
@@ -84,7 +84,7 @@ public class LogicalDag implements IdentifiedDataSerializable {
         return this.edges;
     }
 
-    public Map<Integer, LogicalVertex> getLogicalVertexMap() {
+    public Map<Long, LogicalVertex> getLogicalVertexMap() {
         return logicalVertexMap;
     }
 
@@ -128,8 +128,8 @@ public class LogicalDag implements IdentifiedDataSerializable {
     public void writeData(ObjectDataOutput out) throws IOException {
         out.writeInt(logicalVertexMap.size());
 
-        for (Map.Entry<Integer, LogicalVertex> entry : logicalVertexMap.entrySet()) {
-            out.writeInt(entry.getKey());
+        for (Map.Entry<Long, LogicalVertex> entry : logicalVertexMap.entrySet()) {
+            out.writeLong(entry.getKey());
             out.writeObject(entry.getValue());
         }
 
@@ -148,7 +148,7 @@ public class LogicalDag implements IdentifiedDataSerializable {
         int vertexCount = in.readInt();
 
         for (int i = 0; i < vertexCount; i++) {
-            Integer key = in.readInt();
+            Long key = in.readLong();
             LogicalVertex value = in.readObject();
             logicalVertexMap.put(key, value);
         }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
index 1ea396f7a..a03eb8db3 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
@@ -40,9 +40,9 @@ public class LogicalDagGenerator {
     private JobConfig jobConfig;
     private IdGenerator idGenerator;
 
-    private Map<Action, Collection<Integer>> alreadyTransformed = new HashMap<>();
+    private Map<Action, Collection<Long>> alreadyTransformed = new HashMap<>();
 
-    private Map<Integer, LogicalVertex> logicalIdVertexMap = new HashMap<>();
+    private Map<Long, LogicalVertex> logicalIdVertexMap = new HashMap<>();
 
     public LogicalDagGenerator(@NonNull List<Action> actions,
                                @NonNull JobConfig jobConfig,
@@ -63,12 +63,12 @@ public class LogicalDagGenerator {
         return logicalDag;
     }
 
-    private Collection<Integer> transformAction(Action action) {
+    private Collection<Long> transformAction(Action action) {
         if (alreadyTransformed.containsKey(action)) {
             return alreadyTransformed.get(action);
         }
 
-        Collection<Integer> upstreamVertexIds = new ArrayList<>();
+        Collection<Long> upstreamVertexIds = new ArrayList<>();
         List<Action> upstream = action.getUpstream();
         if (!CollectionUtils.isEmpty(upstream)) {
             for (Action upstreamAction : upstream) {
@@ -79,7 +79,7 @@ public class LogicalDagGenerator {
         LogicalVertex logicalVertex =
             new LogicalVertex(action.getId(), action, action.getParallelism());
         logicalDag.addLogicalVertex(logicalVertex);
-        Collection<Integer> transformedActions = Lists.newArrayList(logicalVertex.getVertexId());
+        Collection<Long> transformedActions = Lists.newArrayList(logicalVertex.getVertexId());
         alreadyTransformed.put(action, transformedActions);
         logicalIdVertexMap.put(logicalVertex.getVertexId(), logicalVertex);
 
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java
index bfa7091d6..e7d906b9a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java
@@ -35,9 +35,9 @@ public class LogicalEdge implements IdentifiedDataSerializable {
     private LogicalVertex leftVertex;
     private LogicalVertex rightVertex;
 
-    private Integer leftVertexId;
+    private Long leftVertexId;
 
-    private Integer rightVertexId;
+    private Long rightVertexId;
 
     public LogicalEdge(){}
 
@@ -48,7 +48,7 @@ public class LogicalEdge implements IdentifiedDataSerializable {
         this.rightVertexId = rightVertex.getVertexId();
     }
 
-    public void recoveryFromVertexMap(@NonNull Map<Integer, LogicalVertex> vertexMap) {
+    public void recoveryFromVertexMap(@NonNull Map<Long, LogicalVertex> vertexMap) {
         leftVertex = vertexMap.get(leftVertexId);
         rightVertex = vertexMap.get(rightVertexId);
 
@@ -69,13 +69,13 @@ public class LogicalEdge implements IdentifiedDataSerializable {
     @Override
     public void writeData(ObjectDataOutput out) throws IOException {
         // To prevent circular serialization, we only serialize the ID of vertices for edges
-        out.writeInt(leftVertexId);
-        out.writeInt(rightVertexId);
+        out.writeLong(leftVertexId);
+        out.writeLong(rightVertexId);
     }
 
     @Override
     public void readData(ObjectDataInput in) throws IOException {
-        leftVertexId = in.readInt();
-        rightVertexId = in.readInt();
+        leftVertexId = in.readLong();
+        rightVertexId = in.readLong();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalVertex.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalVertex.java
index c9e145c86..d84b83521 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalVertex.java
@@ -32,7 +32,7 @@ import java.io.IOException;
 @AllArgsConstructor
 public class LogicalVertex implements IdentifiedDataSerializable {
 
-    private Integer vertexId;
+    private Long vertexId;
     private Action action;
     private int parallelism;
 
@@ -51,14 +51,14 @@ public class LogicalVertex implements IdentifiedDataSerializable {
 
     @Override
     public void writeData(ObjectDataOutput out) throws IOException {
-        out.writeInt(vertexId);
+        out.writeLong(vertexId);
         out.writeObject(action);
         out.writeInt(parallelism);
     }
 
     @Override
     public void readData(ObjectDataInput in) throws IOException {
-        vertexId = in.readInt();
+        vertexId = in.readLong();
         action = in.readObject();
         parallelism = in.readInt();
     }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index 26b2e76ff..90b74350d 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -17,8 +17,10 @@
 
 package org.apache.seatunnel.engine.core.job;
 
+import java.util.concurrent.ExecutionException;
+
 public interface Job {
     long getJobId();
 
-    void submitJob();
+    void submitJob() throws ExecutionException, InterruptedException;
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineState.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineState.java
index df6c2a370..aa4175247 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineState.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineState.java
@@ -73,7 +73,7 @@ public enum PipelineState {
     /** Restoring last possible valid state of the pipeline if it has it. */
     INITIALIZING;
 
-    public boolean isEnd() {
+    public boolean isEndState() {
         return this == FINISHED || this == CANCELED || this == FAILED;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 4159346e4..567f81440 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server;
 
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
 import org.apache.seatunnel.engine.server.master.JobMaster;
 
 import com.hazelcast.instance.impl.Node;
@@ -27,6 +28,7 @@ import com.hazelcast.internal.services.MembershipAwareService;
 import com.hazelcast.internal.services.MembershipServiceEvent;
 import com.hazelcast.jet.impl.LiveOperationRegistry;
 import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.impl.operationservice.LiveOperations;
@@ -39,6 +41,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker {
+    private static final ILogger LOGGER = Logger.getLogger(SeaTunnelServer.class);
     public static final String SERVICE_NAME = "st:impl:seaTunnelServer";
 
     private NodeEngineImpl nodeEngine;
@@ -114,20 +117,22 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
      * call by client to submit job
      */
     @SuppressWarnings("checkstyle:MagicNumber")
-    public CompletableFuture<Void> submitJob(Data jobImmutableInformation) {
+    public NonCompletableFuture<Void> submitJob(Data jobImmutableInformation) {
         CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
         JobMaster jobMaster = new JobMaster(jobImmutableInformation, this.nodeEngine, executorService);
         executorService.submit(() -> {
             try {
                 jobMaster.init();
+                jobMaster.run();
             } catch (Throwable e) {
-                throw new RuntimeException(e);
+                LOGGER.severe("submit job error: " + e.getMessage());
+                voidCompletableFuture.completeExceptionally(e);
             } finally {
                 // We specify that when init is complete, the submitJob is complete
                 voidCompletableFuture.complete(null);
             }
-            jobMaster.run();
+            //jobMaster.run();
         });
-        return voidCompletableFuture;
+        return new NonCompletableFuture(voidCompletableFuture);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 4d15b319a..b34ebe6ca 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -24,6 +24,7 @@ import static java.util.concurrent.Executors.newCachedThreadPool;
 import static java.util.stream.Collectors.partitioningBy;
 import static java.util.stream.Collectors.toList;
 
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.ProgressState;
@@ -34,16 +35,20 @@ import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroup;
 import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
 import org.apache.seatunnel.engine.server.execution.TaskTracker;
+import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 
+import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.properties.HazelcastProperties;
 import lombok.NonNull;
 import lombok.SneakyThrows;
 
+import java.net.URL;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -112,14 +117,24 @@ public class TaskExecutionService {
         uncheckRun(startedLatch::await);
     }
 
-    public CompletableFuture<TaskExecutionState> submitTaskGroup(
-        TaskGroup taskGroup,
-        CompletableFuture<Void> cancellationFuture
+    public NonCompletableFuture<TaskExecutionState> deployTask(
+        @NonNull Data taskImmutableInformation
     ) {
-        taskGroup.init();
-        Collection<Task> tasks = taskGroup.getTasks();
-        final TaskGroupExecutionTracker executionTracker = new TaskGroupExecutionTracker(cancellationFuture, taskGroup);
+        CompletableFuture<TaskExecutionState> resultFuture = new CompletableFuture<>();
+        TaskGroup taskGroup = null;
         try {
+            TaskGroupImmutableInformation taskImmutableInfo =
+                nodeEngine.getSerializationService().toObject(taskImmutableInformation);
+            Set<URL> jars = taskImmutableInfo.getJars();
+
+            // TODO Use classloader load the connector jars and deserialize Task
+            taskGroup = nodeEngine.getSerializationService().toData(taskImmutableInfo.getGroup());
+            taskGroup.init();
+            Collection<Task> tasks = taskGroup.getTasks();
+
+            // TODO We need add a method to cancel task
+            CompletableFuture<Void> cancellationFuture = new CompletableFuture<>();
+            TaskGroupExecutionTracker executionTracker = new TaskGroupExecutionTracker(cancellationFuture, taskGroup, resultFuture);
             ConcurrentMap<Long, TaskExecutionContext> taskExecutionContextMap = new ConcurrentHashMap<>();
             final Map<Boolean, List<Task>> byCooperation =
                 tasks.stream()
@@ -134,9 +149,10 @@ public class TaskExecutionService {
             taskGroup.setTasksContext(taskExecutionContextMap);
             executionContexts.put(taskGroup.getId(), new TaskGroupContext(taskGroup));
         } catch (Throwable t) {
-            executionTracker.future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, t));
+            logger.severe(ExceptionUtils.getMessage(t));
+            resultFuture.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, t));
         }
-        return new NonCompletableFuture<>(executionTracker.future);
+        return new NonCompletableFuture<>(resultFuture);
     }
 
     private final class BlockingWorker implements Runnable {
@@ -158,7 +174,8 @@ public class TaskExecutionService {
                 ProgressState result;
                 do {
                     result = t.call();
-                } while (!result.isDone() && !isShutdown && !tracker.taskGroupExecutionTracker.executionCompletedExceptionally());
+                } while (!result.isDone() && !isShutdown &&
+                    !tracker.taskGroupExecutionTracker.executionCompletedExceptionally());
             } catch (Throwable e) {
                 logger.warning("Exception in " + t, e);
                 tracker.taskGroupExecutionTracker.exception(e);
@@ -190,7 +207,8 @@ public class TaskExecutionService {
         public LinkedBlockingDeque<TaskTracker> taskqueue;
 
         @SuppressWarnings("checkstyle:MagicNumber")
-        public CooperativeTaskWorker(LinkedBlockingDeque<TaskTracker> taskqueue, RunBusWorkSupplier runBusWorkSupplier) {
+        public CooperativeTaskWorker(LinkedBlockingDeque<TaskTracker> taskqueue,
+                                     RunBusWorkSupplier runBusWorkSupplier) {
             logger.info(String.format("Created new BusWork : %s", this.hashCode()));
             this.taskqueue = taskqueue;
             this.timer = new TaskCallTimer(50, keep, runBusWorkSupplier, this);
@@ -287,7 +305,7 @@ public class TaskExecutionService {
     public final class TaskGroupExecutionTracker {
 
         private final TaskGroup taskGroup;
-        final CompletableFuture<TaskExecutionState> future = new CompletableFuture<>();
+        final CompletableFuture<TaskExecutionState> future;
         volatile List<Future<?>> blockingFutures = emptyList();
 
         private final AtomicInteger completionLatch;
@@ -295,7 +313,9 @@ public class TaskExecutionService {
 
         private final AtomicBoolean isCancel = new AtomicBoolean(false);
 
-        TaskGroupExecutionTracker(CompletableFuture<Void> cancellationFuture, TaskGroup taskGroup) {
+        TaskGroupExecutionTracker(@NonNull CompletableFuture<Void> cancellationFuture, @NonNull TaskGroup taskGroup,
+                                  @NonNull CompletableFuture<TaskExecutionState> future) {
+            this.future = future;
             this.completionLatch = new AtomicInteger(taskGroup.getTasks().size());
             this.taskGroup = taskGroup;
             cancellationFuture.whenComplete(withTryCatch(logger, (r, e) -> {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionEdge.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionEdge.java
index b1527e120..b46b4ef31 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionEdge.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionEdge.java
@@ -24,9 +24,9 @@ public class ExecutionEdge {
     private ExecutionVertex leftVertex;
     private ExecutionVertex rightVertex;
 
-    private Integer leftVertexId;
+    private Long leftVertexId;
 
-    private Integer rightVertexId;
+    private Long rightVertexId;
 
     public ExecutionEdge(ExecutionVertex leftVertex, ExecutionVertex rightVertex) {
         this.leftVertex = leftVertex;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 2c16829b0..fa3e38052 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -45,10 +45,10 @@ import java.util.stream.Collectors;
 
 public class ExecutionPlanGenerator {
 
-    private final Map<Integer, List<Integer>> edgeMap = new HashMap<>();
-    private final Map<Integer, Action> actions = new HashMap<>();
+    private final Map<Long, List<Long>> edgeMap = new HashMap<>();
+    private final Map<Long, Action> actions = new HashMap<>();
 
-    private final Map<Integer, LogicalVertex> logicalVertexes;
+    private final Map<Long, LogicalVertex> logicalVertexes;
     private final List<LogicalEdge> logicalEdges;
 
     private final JobImmutableInformation jobImmutableInformation;
@@ -80,7 +80,7 @@ public class ExecutionPlanGenerator {
             next = newNext;
         }
 
-        Map<Integer, LogicalVertex> vertexes = new HashMap<>();
+        Map<Long, LogicalVertex> vertexes = new HashMap<>();
         actions.forEach((key, value) -> vertexes.put(key, new LogicalVertex(key, value,
             logicalVertexes.get(key).getParallelism())));
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionVertex.java
index f39bb2d8e..f0b282048 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionVertex.java
@@ -25,7 +25,7 @@ import lombok.Data;
 @Data
 @AllArgsConstructor
 public class ExecutionVertex {
-    private Integer vertexId;
+    private Long vertexId;
     private Action action;
     private int parallelism;
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
index b7def66ad..c3bf85cb2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
@@ -23,9 +23,9 @@ import java.util.Map;
 public class Pipeline {
     private final List<ExecutionEdge> edges;
 
-    private final Map<Integer, ExecutionVertex> vertexes;
+    private final Map<Long, ExecutionVertex> vertexes;
 
-    Pipeline(List<ExecutionEdge> edges, Map<Integer, ExecutionVertex> vertexes) {
+    Pipeline(List<ExecutionEdge> edges, Map<Long, ExecutionVertex> vertexes) {
         this.edges = edges;
         this.vertexes = vertexes;
     }
@@ -34,7 +34,7 @@ public class Pipeline {
         return edges;
     }
 
-    public Map<Integer, ExecutionVertex> getVertexes() {
+    public Map<Long, ExecutionVertex> getVertexes() {
         return vertexes;
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
index 1f8dab913..313c06194 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -34,7 +34,7 @@ public class PipelineGenerator {
         // cache in the future
 
         return edgesList.stream().map(e -> {
-            Map<Integer, ExecutionVertex> vertexes = new HashMap<>();
+            Map<Long, ExecutionVertex> vertexes = new HashMap<>();
             List<ExecutionEdge> pipelineEdges = e.stream().map(edge -> {
                 if (!vertexes.containsKey(edge.getLeftVertexId())) {
                     vertexes.put(edge.getLeftVertexId(), edge.getLeftVertex());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index ecd684a3a..ffbc1d6bb 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -62,7 +62,7 @@ public class PhysicalPlan {
      * in {@link org.apache.seatunnel.engine.server.scheduler.JobScheduler} whenComplete method will be called.
      */
     private final CompletableFuture<JobStatus> jobEndFuture;
-
+    private final NonCompletableFuture<JobStatus> nonCompletableFuture;
 
     /**
      * This future only can completion by the {@link SubPlan } subPlanFuture.
@@ -83,25 +83,38 @@ public class PhysicalPlan {
         this.stateTimestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
         this.jobStatus.set(JobStatus.CREATED);
         this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
-        this.jobEndFuture = new CompletableFuture<JobStatus>();
+        this.jobEndFuture = new CompletableFuture<>();
+        this.nonCompletableFuture = new NonCompletableFuture<>(jobEndFuture);
         this.waitForCompleteBySubPlan = waitForCompleteBySubPlan;
         this.pipelineList = pipelineList;
 
         Arrays.stream(this.waitForCompleteBySubPlan).forEach(x -> {
             x.whenComplete((v, t) -> {
+                // We need not handle t, Because we will not return t from Pipeline
                 if (PipelineState.CANCELED.equals(v)) {
                     canceledPipelineNum.incrementAndGet();
                 } else if (PipelineState.FAILED.equals(v)) {
-                    failedPipelineNum.incrementAndGet();
+                    LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
+                    cancelJob().whenComplete((v1, t1) -> {
+                        LOGGER.severe(String.format("Cancel other pipelines complete"));
+                        failedPipelineNum.incrementAndGet();
+                    });
+                } else if (!PipelineState.FINISHED.equals(v)) {
+                    LOGGER.severe(
+                        "Pipeline Failed with Unknown PipelineState, Begin to cancel other pipelines in this job.");
+                    cancelJob().whenComplete((v1, t1) -> {
+                        LOGGER.severe(String.format("Cancel other pipelines complete"));
+                        failedPipelineNum.incrementAndGet();
+                    });
                 }
 
                 if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
                     if (failedPipelineNum.get() > 0) {
-                        jobStatus.set(JobStatus.FAILING);
+                        updateJobState(JobStatus.FAILING);
                     } else if (canceledPipelineNum.get() > 0) {
-                        jobStatus.set(JobStatus.CANCELED);
+                        updateJobState(JobStatus.CANCELED);
                     } else {
-                        jobStatus.set(JobStatus.FINISHED);
+                        updateJobState(JobStatus.FINISHED);
                     }
                     jobEndFuture.complete(jobStatus.get());
                 }
@@ -109,6 +122,16 @@ public class PhysicalPlan {
         });
     }
 
+    public NonCompletableFuture<Void> cancelJob() {
+        CompletableFuture<Void> cancelFuture = CompletableFuture.supplyAsync(() -> {
+            // TODO Implement cancel pipeline in job.
+            return null;
+        });
+
+        cancelFuture.complete(null);
+        return new NonCompletableFuture<>(cancelFuture);
+    }
+
     public List<SubPlan> getPipelineList() {
         return pipelineList;
     }
@@ -120,7 +143,11 @@ public class PhysicalPlan {
         }
     }
 
-    public boolean updateJobState(JobStatus current, JobStatus targetState) {
+    public boolean updateJobState(@NonNull JobStatus targetState) {
+        return updateJobState(jobStatus.get(), targetState);
+    }
+
+    public boolean updateJobState(@NonNull JobStatus current, @NonNull JobStatus targetState) {
         // consistency check
         if (current.isEndState()) {
             String message = "Job is trying to leave terminal state " + current;
@@ -144,7 +171,15 @@ public class PhysicalPlan {
         }
     }
 
-    public CompletableFuture<JobStatus> getJobEndCompletableFuture() {
-        return this.jobEndFuture;
+    public NonCompletableFuture<JobStatus> getJobEndCompletableFuture() {
+        return this.nonCompletableFuture;
+    }
+
+    public JobImmutableInformation getJobImmutableInformation() {
+        return jobImmutableInformation;
+    }
+
+    public JobStatus getJobStatus() {
+        return jobStatus.get();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 056faa0d7..a13a0d17f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -122,7 +122,8 @@ public class PhysicalPlanGenerator {
                 coordinatorVertexList,
                 pipelineFuture,
                 waitForCompleteByPhysicalVertexList.toArray(
-                    new NonCompletableFuture[waitForCompleteByPhysicalVertexList.size()]));
+                    new NonCompletableFuture[waitForCompleteByPhysicalVertexList.size()]),
+                jobImmutableInformation);
         });
 
         PhysicalPlan physicalPlan = new PhysicalPlan(subPlanStream.collect(Collectors.toList()),
@@ -153,9 +154,10 @@ public class PhysicalPlanGenerator {
                     new SinkAggregatedCommitterTask(idGenerator.getNextId(), s);
 
                 CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
-                waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+                waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture(taskFuture));
 
-                return new PhysicalVertex(atomicInteger.incrementAndGet(),
+                return new PhysicalVertex(idGenerator.getNextId(),
+                    atomicInteger.incrementAndGet(),
                     executorService,
                     collect.size(),
                     new TaskGroupDefaultImpl("SinkAggregatedCommitterTask", Lists.newArrayList(t)),
@@ -163,7 +165,10 @@ public class PhysicalPlanGenerator {
                     flakeIdGenerator,
                     pipelineIndex,
                     totalPipelineNum,
-                    null);
+                    null,
+                    jobImmutableInformation,
+                    initializationTimestamp,
+                    nodeEngine);
             }).collect(Collectors.toList());
     }
 
@@ -182,7 +187,8 @@ public class PhysicalPlanGenerator {
                     CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
                     waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
 
-                    t.add(new PhysicalVertex(i,
+                    t.add(new PhysicalVertex(idGenerator.getNextId(),
+                        i,
                         executorService,
                         flow.getAction().getParallelism(),
                         new TaskGroupDefaultImpl("PartitionTransformTask", Lists.newArrayList(seaTunnelTask)),
@@ -190,7 +196,10 @@ public class PhysicalPlanGenerator {
                         flakeIdGenerator,
                         pipelineIndex,
                         totalPipelineNum,
-                        seaTunnelTask.getJarsUrl()));
+                        seaTunnelTask.getJarsUrl(),
+                        jobImmutableInformation,
+                        initializationTimestamp,
+                        nodeEngine));
                 }
                 return t.stream();
             }).collect(Collectors.toList());
@@ -207,7 +216,8 @@ public class PhysicalPlanGenerator {
             CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
             waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
 
-            return new PhysicalVertex(atomicInteger.incrementAndGet(),
+            return new PhysicalVertex(idGenerator.getNextId(),
+                atomicInteger.incrementAndGet(),
                 executorService,
                 sources.size(),
                 new TaskGroupDefaultImpl(s.getName(), Lists.newArrayList(t)),
@@ -215,7 +225,10 @@ public class PhysicalPlanGenerator {
                 flakeIdGenerator,
                 pipelineIndex,
                 totalPipelineNum,
-                t.getJarsUrl());
+                t.getJarsUrl(),
+                jobImmutableInformation,
+                initializationTimestamp,
+                nodeEngine);
         }).collect(Collectors.toList());
     }
 
@@ -243,7 +256,8 @@ public class PhysicalPlanGenerator {
                     waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
 
                     // TODO We need give every task a appropriate name
-                    t.add(new PhysicalVertex(i,
+                    t.add(new PhysicalVertex(idGenerator.getNextId(),
+                        i,
                         executorService,
                         flow.getAction().getParallelism(),
                         new TaskGroupDefaultImpl("SourceTask",
@@ -252,7 +266,10 @@ public class PhysicalPlanGenerator {
                         flakeIdGenerator,
                         pipelineIndex,
                         totalPipelineNum,
-                        jars));
+                        jars,
+                        jobImmutableInformation,
+                        initializationTimestamp,
+                        nodeEngine));
                 }
                 return t.stream();
             }).collect(Collectors.toList());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 879226c82..2751ce8ff 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -17,22 +17,26 @@
 
 package org.apache.seatunnel.engine.server.dag.physical;
 
-import org.apache.seatunnel.engine.common.exception.JobException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
 
+import com.hazelcast.cluster.Address;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngine;
 import lombok.NonNull;
 
 import java.net.URL;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * PhysicalVertex is responsible for the scheduling and execution of a single task parallel
@@ -43,12 +47,14 @@ public class PhysicalVertex {
 
     private static final ILogger LOGGER = Logger.getLogger(PhysicalVertex.class);
 
+    private final long physicalVertexId;
+
     /**
      * the index of PhysicalVertex
      */
     private final int subTaskGroupIndex;
 
-    private final String taskNameWithSubtaskAndPipeline;
+    private final String taskFullName;
 
     private final int parallelism;
 
@@ -64,19 +70,35 @@ public class PhysicalVertex {
 
     private final Set<URL> pluginJarsUrls;
 
+    private AtomicReference<ExecutionState> executionState = new AtomicReference<>();
+
     /**
      * When PhysicalVertex status turn to end, complete this future. And then the waitForCompleteByPhysicalVertex
      * in {@link SubPlan} whenComplete method will be called.
      */
     private final CompletableFuture<TaskExecutionState> taskFuture;
 
+    /**
+     * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
+     * task transitioned into a certain state. The index into this array is the ordinal
+     * of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
+     * stateTimestamps[RUNNING.ordinal()]}.
+     */
+    private final long[] stateTimestamps;
 
     /**
      * This future only can completion by the task run in {@link com.hazelcast.spi.impl.executionservice.ExecutionService }
      */
     private NonCompletableFuture<TaskExecutionState> waitForCompleteByExecutionService;
 
-    public PhysicalVertex(int subTaskGroupIndex,
+    private final JobImmutableInformation jobImmutableInformation;
+
+    private final long initializationTimestamp;
+
+    private final NodeEngine nodeEngine;
+
+    public PhysicalVertex(long physicalVertexId,
+                          int subTaskGroupIndex,
                           @NonNull ExecutorService executorService,
                           int parallelism,
                           @NonNull TaskGroupDefaultImpl taskGroup,
@@ -84,7 +106,11 @@ public class PhysicalVertex {
                           @NonNull FlakeIdGenerator flakeIdGenerator,
                           int pipelineIndex,
                           int totalPipelineNum,
-                          Set<URL> pluginJarsUrls) {
+                          Set<URL> pluginJarsUrls,
+                          @NonNull JobImmutableInformation jobImmutableInformation,
+                          long initializationTimestamp,
+                          @NonNull NodeEngine nodeEngine) {
+        this.physicalVertexId = physicalVertexId;
         this.subTaskGroupIndex = subTaskGroupIndex;
         this.executorService = executorService;
         this.parallelism = parallelism;
@@ -93,45 +119,132 @@ public class PhysicalVertex {
         this.pipelineIndex = pipelineIndex;
         this.totalPipelineNum = totalPipelineNum;
         this.pluginJarsUrls = pluginJarsUrls;
-        this.taskNameWithSubtaskAndPipeline =
+        this.jobImmutableInformation = jobImmutableInformation;
+        this.initializationTimestamp = initializationTimestamp;
+        stateTimestamps = new long[ExecutionState.values().length];
+        this.stateTimestamps[ExecutionState.INITIALIZING.ordinal()] = initializationTimestamp;
+        this.executionState.set(ExecutionState.CREATED);
+        this.stateTimestamps[ExecutionState.CREATED.ordinal()] = System.currentTimeMillis();
+        this.nodeEngine = nodeEngine;
+        this.taskFullName =
             String.format(
-                "task: [%s (%d/%d)], pipeline: [%d/%d]",
+                "Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)]",
+                jobImmutableInformation.getJobConfig().getName(),
+                jobImmutableInformation.getJobId(),
+                pipelineIndex + 1,
+                totalPipelineNum,
                 taskGroup.getTaskGroupName(),
                 subTaskGroupIndex + 1,
-                parallelism,
-                pipelineIndex,
-                totalPipelineNum);
+                parallelism);
         this.taskFuture = taskFuture;
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
-    public void deploy() throws JobException {
-
-        // TODO really submit job to ExecutionService and get a NonCompletableFuture<ExecutionState>
-        long executionId = flakeIdGenerator.newId();
-        CompletableFuture<TaskExecutionState> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
+    // This method must not throw an exception
+    public void deploy(@NonNull Address address) {
+        /**
+         TaskGroupImmutableInformation taskGroupImmutableInformation =
+         new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
+         nodeEngine.getSerializationService().toData(this.taskGroup),
+         this.pluginJarsUrls);
+
+         try {
+         waitForCompleteByExecutionService = new NonCompletableFuture<>(
+         nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+         new DeployTaskOperation(nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+         address)
+         .invoke());
+         } catch (Throwable th) {
+         LOGGER.severe(String.format("%s deploy error with Exception: %s",
+         this.taskFullName,
+         ExceptionUtils.getMessage(th)));
+         updateTaskState(ExecutionState.DEPLOYING, ExecutionState.FAILED);
+         taskFuture.complete(
+         new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED, null));
+         }*/
+
+        waitForCompleteByExecutionService = new NonCompletableFuture<>(CompletableFuture.supplyAsync(() -> {
             try {
-                Thread.sleep(5000);
-                return new TaskExecutionState(executionId, ExecutionState.FINISHED, null);
+                Thread.sleep(2000);
             } catch (InterruptedException e) {
-                return new TaskExecutionState(executionId, ExecutionState.FAILED, e);
+                throw new RuntimeException(e);
             }
-        }, executorService);
+            return new TaskExecutionState(flakeIdGenerator.newId(), ExecutionState.FINISHED, null);
+        }));
 
-        waitForCompleteByExecutionService = new NonCompletableFuture<TaskExecutionState>(uCompletableFuture);
+        updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
         waitForCompleteByExecutionService.whenComplete((v, t) -> {
-            if (t != null) {
-                // TODO t.getMessage() need be replace
-                LOGGER.info(String.format("The Task %s Failed with Exception: %s",
-                    this.taskNameWithSubtaskAndPipeline,
-                    t.getMessage()));
-                taskFuture.complete(new TaskExecutionState(executionId, ExecutionState.FAILED, t));
-            } else {
-                LOGGER.info(String.format("The Task %s end with state %s",
-                    this.taskNameWithSubtaskAndPipeline,
-                    v));
+            try {
+                // We need not handle t, Because we will not return t from TaskExecutionService
+                // v will never be null
+                updateTaskState(executionState.get(), v.getExecutionState());
+                if (v.getThrowable() != null) {
+                    LOGGER.severe(String.format("%s end with state %s and Exception: %s",
+                        this.taskFullName,
+                        v.getExecutionState(),
+                        ExceptionUtils.getMessage(v.getThrowable())));
+                } else {
+                    LOGGER.severe(String.format("%s end with state %s",
+                        this.taskFullName,
+                        v.getExecutionState()));
+                }
+                taskFuture.complete(v);
+            } catch (Throwable th) {
+                LOGGER.severe(
+                    String.format("%s end with Exception: %s", this.taskFullName, ExceptionUtils.getMessage(th)));
+                updateTaskState(ExecutionState.RUNNING, ExecutionState.FAILED);
+                v = new TaskExecutionState(v.getTaskExecutionId(), ExecutionState.FAILED, null);
                 taskFuture.complete(v);
             }
         });
     }
+
+    public long getPhysicalVertexId() {
+        return physicalVertexId;
+    }
+
+    public boolean updateTaskState(@NonNull ExecutionState current, @NonNull ExecutionState targetState) {
+        // consistency check
+        if (current.isEndState()) {
+            String message = "Task is trying to leave terminal state " + current;
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        if (ExecutionState.SCHEDULED.equals(targetState) && !ExecutionState.CREATED.equals(current)) {
+            String message = "Only [CREATED] task can turn to [SCHEDULED]" + current;
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        if (ExecutionState.DEPLOYING.equals(targetState) && !ExecutionState.SCHEDULED.equals(current)) {
+            String message = "Only [SCHEDULED] task can turn to [DEPLOYING]" + current;
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        if (ExecutionState.RUNNING.equals(targetState) && !ExecutionState.DEPLOYING.equals(current)) {
+            String message = "Only [DEPLOYING] task can turn to [RUNNING]" + current;
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        // now do the actual state transition
+        if (executionState.get() == current) {
+            executionState.set(targetState);
+            LOGGER.info(String.format("%s turn from state %s to %s.",
+                taskFullName,
+                current,
+                targetState));
+
+            stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public TaskGroupDefaultImpl getTaskGroup() {
+        return taskGroup;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 8c1c4b0b6..4c2ba9dfc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.engine.server.dag.physical;
 
-import org.apache.seatunnel.engine.common.exception.JobException;
 import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.PipelineState;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
@@ -44,6 +44,8 @@ public class SubPlan {
 
     private final int totalPipelineNum;
 
+    private final JobImmutableInformation jobImmutableInformation;
+
     private AtomicInteger finishedTaskNum = new AtomicInteger(0);
 
     private AtomicInteger canceledTaskNum = new AtomicInteger(0);
@@ -52,7 +54,7 @@ public class SubPlan {
 
     private AtomicReference<PipelineState> pipelineState = new AtomicReference<>();
 
-    private final String pipelineNameWithIndex;
+    private final String pipelineFullName;
 
     /**
      * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
@@ -80,7 +82,8 @@ public class SubPlan {
                    @NonNull List<PhysicalVertex> physicalVertexList,
                    @NonNull List<PhysicalVertex> coordinatorVertexList,
                    @NonNull CompletableFuture<PipelineState> pipelineFuture,
-                   @NonNull NonCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex) {
+                   @NonNull NonCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex,
+                   @NonNull JobImmutableInformation jobImmutableInformation) {
         this.pipelineIndex = pipelineIndex;
         this.pipelineFuture = pipelineFuture;
         this.totalPipelineNum = totalPipelineNum;
@@ -91,31 +94,44 @@ public class SubPlan {
         this.stateTimestamps[PipelineState.INITIALIZING.ordinal()] = initializationTimestamp;
         this.pipelineState.set(PipelineState.CREATED);
         this.stateTimestamps[PipelineState.CREATED.ordinal()] = System.currentTimeMillis();
-        this.pipelineNameWithIndex = String.format(
-            "pipeline: [(%d/%d)]",
+        this.jobImmutableInformation = jobImmutableInformation;
+        this.pipelineFullName = String.format(
+            "Job %s (%s), Pipeline: [(%d/%d)]",
+            jobImmutableInformation.getJobConfig().getName(),
+            jobImmutableInformation.getJobId(),
             pipelineIndex + 1,
             totalPipelineNum);
 
         Arrays.stream(this.waitForCompleteByPhysicalVertex).forEach(x -> {
             x.whenComplete((v, t) -> {
-                if (ExecutionState.CANCELED.equals(v)) {
+                // We need not handle t, Because we will not return t from PhysicalVertex
+                if (ExecutionState.CANCELED.equals(v.getExecutionState())) {
                     canceledTaskNum.incrementAndGet();
-                } else if (ExecutionState.FAILED.equals(v)) {
-                    failedTaskNum.incrementAndGet();
-                } else {
-                    throw new JobException("Unknown Task end state [" + v + "]");
+                } else if (ExecutionState.FAILED.equals(v.getExecutionState())) {
+                    LOGGER.severe("Task Failed, Begin to cancel other tasks in this pipeline.");
+                    cancelPipeline().whenComplete((v1, t1) -> {
+                        LOGGER.severe(String.format("Cancel other tasks complete"));
+                        failedTaskNum.incrementAndGet();
+                    });
+                } else if (!ExecutionState.FINISHED.equals(v.getExecutionState())) {
+                    LOGGER.severe(
+                        "Task Failed with Unknown ExecutionState, Begin to cancel other tasks in this pipeline.");
+                    cancelPipeline().whenComplete((v1, t1) -> {
+                        LOGGER.severe(String.format("Cancel other tasks complete"));
+                        failedTaskNum.incrementAndGet();
+                    });
                 }
 
                 if (finishedTaskNum.incrementAndGet() == (physicalVertexList.size() + coordinatorVertexList.size())) {
                     if (failedTaskNum.get() > 0) {
-                        LOGGER.info(String.format("Pipeline failed %s", this.pipelineNameWithIndex));
-                        pipelineState.set(PipelineState.FAILED);
+                        updatePipelineState(PipelineState.FAILED);
+                        LOGGER.info(String.format("%s end with state FAILED", this.pipelineFullName));
                     } else if (canceledTaskNum.get() > 0) {
-                        LOGGER.info(String.format("Pipeline canceled %s", this.pipelineNameWithIndex));
-                        pipelineState.set(PipelineState.CANCELED);
+                        updatePipelineState(PipelineState.CANCELED);
+                        LOGGER.info(String.format("%s end with state CANCELED", this.pipelineFullName));
                     } else {
-                        LOGGER.info(String.format("Pipeline finished %s", this.pipelineNameWithIndex));
-                        pipelineState.set(PipelineState.FINISHED);
+                        updatePipelineState(PipelineState.FINISHED);
+                        LOGGER.info(String.format("%s end with state FINISHED", this.pipelineFullName));
                     }
                     pipelineFuture.complete(pipelineState.get());
                 }
@@ -123,6 +139,67 @@ public class SubPlan {
         });
     }
 
+    public boolean updatePipelineState(@NonNull PipelineState targetState) {
+        return updatePipelineState(pipelineState.get(), targetState);
+    }
+
+    public boolean updatePipelineState(@NonNull PipelineState current, @NonNull PipelineState targetState) {
+        // consistency check
+        if (current.isEndState()) {
+            String message = "Pipeline is trying to leave terminal state " + current;
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        if (PipelineState.SCHEDULED.equals(targetState) && !PipelineState.CREATED.equals(current)) {
+            String message = "Only [CREATED] pipeline can turn to [SCHEDULED]" + current;
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        if (PipelineState.DEPLOYING.equals(targetState) && !PipelineState.SCHEDULED.equals(current)) {
+            String message = "Only [SCHEDULED] pipeline can turn to [DEPLOYING]" + current;
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        if (PipelineState.RUNNING.equals(targetState) && !PipelineState.DEPLOYING.equals(current)) {
+            String message = "Only [DEPLOYING] pipeline can turn to [RUNNING]" + current;
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        // now do the actual state transition
+        if (pipelineState.get() == current) {
+            pipelineState.set(targetState);
+            LOGGER.info(String.format("%s turn from state %s to %s.",
+                pipelineFullName,
+                current,
+                targetState));
+
+            stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public NonCompletableFuture<Void> cancelPipeline() {
+        CompletableFuture<Void> cancelFuture = CompletableFuture.supplyAsync(() -> {
+            // TODO Implement cancel tasks in pipeline.
+            return null;
+        });
+
+        cancelFuture.complete(null);
+        return new NonCompletableFuture<>(cancelFuture);
+    }
+
+    public void failedWithNoEnoughResource() {
+        LOGGER.severe(String.format("%s failed with have no enough resource to run.", this.getPipelineFullName()));
+        updatePipelineState(PipelineState.SCHEDULED, PipelineState.FAILED);
+        pipelineFuture.complete(PipelineState.FAILED);
+    }
+
     public int getPipelineIndex() {
         return pipelineIndex;
     }
@@ -134,4 +211,8 @@ public class SubPlan {
     public List<PhysicalVertex> getCoordinatorVertexList() {
         return coordinatorVertexList;
     }
+
+    public String getPipelineFullName() {
+        return pipelineFullName;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java
index b2db98f18..b31d5bea3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java
@@ -23,17 +23,15 @@ package org.apache.seatunnel.engine.server.execution;
  * the state {@code CREATED} and switch states according to this diagram:
  *
  * <pre>{@code
- *  CREATED  -> SCHEDULED -> DEPLOYING -> INITIALIZING -> RUNNING -> FINISHED
- *     |            |            |          |              |
- *     |            |            |    +-----+--------------+
- *     |            |            V    V
- *     |            |         CANCELLING -----+----> CANCELED
- *     |            |                         |
- *     |            +-------------------------+
- *     |
- *     |                                   ... -> FAILED
- *     V
- * RECONCILING  -> INITIALIZING | RUNNING | FINISHED | CANCELED | FAILED
+ *  INITIALIZING -> CREATED  -> SCHEDULED -> DEPLOYING  -> RUNNING -> FINISHED
+ *                    |            |          |              |
+ *                    |            |    +-----+--------------+
+ *                    |            V    V
+ *                    |         CANCELLING -----+----> CANCELED
+ *                    |                         |
+ *                    +-------------------------+
+ *
+ *                                         ... -> FAILED
  *
  * }</pre>
  *
@@ -73,7 +71,7 @@ public enum ExecutionState {
     /** Restoring last possible valid state of the task if it has it. */
     INITIALIZING;
 
-    public boolean isEnd() {
+    public boolean isEndState() {
         return this == FINISHED || this == CANCELED || this == FAILED;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 6c9d89517..cfb3352bc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -17,11 +17,18 @@
 
 package org.apache.seatunnel.engine.server.master;
 
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanUtils;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.SimpleResourceManager;
+import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
+import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler;
 
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.internal.serialization.Data;
@@ -30,6 +37,7 @@ import com.hazelcast.logging.Logger;
 import com.hazelcast.spi.impl.NodeEngine;
 import lombok.NonNull;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 
 public class JobMaster implements Runnable {
@@ -45,6 +53,10 @@ public class JobMaster implements Runnable {
 
     private FlakeIdGenerator flakeIdGenerator;
 
+    private ResourceManager resourceManager;
+
+    private CompletableFuture<JobStatus> jobMasterCompleteFuture = new CompletableFuture<>();
+
     public JobMaster(@NonNull Data jobImmutableInformation,
                      @NonNull NodeEngine nodeEngine,
                      @NonNull ExecutorService executorService) {
@@ -53,6 +65,8 @@ public class JobMaster implements Runnable {
         this.executorService = executorService;
         flakeIdGenerator =
             this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
+
+        this.resourceManager = new SimpleResourceManager();
     }
 
     public void init() throws Exception {
@@ -61,23 +75,49 @@ public class JobMaster implements Runnable {
         LOGGER.info("Job [" + jobInformation.getJobId() + "] jar urls " + jobInformation.getPluginJarsUrls());
 
         // TODO Use classloader load the connector jars and deserialize logicalDag
-        this.logicalDag = new LogicalDag();
+        this.logicalDag = nodeEngine.getSerializationService().toObject(jobInformation.getLogicalDag());
         physicalPlan = PhysicalPlanUtils.fromLogicalDAG(logicalDag,
-                nodeEngine,
-                jobInformation,
-                System.currentTimeMillis(),
-                executorService,
-                flakeIdGenerator);
+            nodeEngine,
+            jobInformation,
+            System.currentTimeMillis(),
+            executorService,
+            flakeIdGenerator);
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @Override
     public void run() {
         try {
-            LOGGER.info("I will sleep 2000ms");
-            Thread.sleep(2000);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+            NonCompletableFuture<JobStatus> jobStatusNonCompletableFuture = physicalPlan.getJobEndCompletableFuture();
+
+            jobStatusNonCompletableFuture.whenComplete((v, t) -> {
+                // We need not handle t, Because we will not return t from physicalPlan
+                if (JobStatus.FAILING.equals(v)) {
+                    cleanJob();
+                    physicalPlan.updateJobState(JobStatus.FAILING, JobStatus.FAILED);
+                }
+                jobMasterCompleteFuture.complete(physicalPlan.getJobStatus());
+            });
+
+            JobScheduler jobScheduler = new PipelineBaseScheduler(physicalPlan, this);
+            jobScheduler.startScheduling();
+        } catch (Throwable e) {
+            LOGGER.severe(String.format("Job %s (%s) run error with: %s",
+                physicalPlan.getJobImmutableInformation().getJobConfig().getName(),
+                physicalPlan.getJobImmutableInformation().getJobId(),
+                ExceptionUtils.getMessage(e)));
+            // try to cancel job
+            physicalPlan.cancelJob();
+        } finally {
+            jobMasterCompleteFuture.join();
         }
     }
+
+    public void cleanJob() {
+        // TODO clean something
+    }
+
+    public ResourceManager getResourceManager() {
+        return resourceManager;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
index 58227776e..543197559 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
@@ -22,17 +22,16 @@ import static com.hazelcast.jet.impl.util.ExceptionUtil.stackTraceToString;
 import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
 import static com.hazelcast.spi.impl.operationservice.ExceptionAction.THROW_EXCEPTION;
 
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
-import com.hazelcast.jet.JetException;
 import com.hazelcast.nio.serialization.HazelcastSerializationException;
 import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 import com.hazelcast.spi.impl.operationservice.ExceptionAction;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
-import java.util.concurrent.CompletableFuture;
-
 /**
  * Base class for async operations. Handles registration/deregistration of
  * operations from live registry, exception handling and peeling and
@@ -48,7 +47,7 @@ public abstract class AsyncOperation extends Operation implements IdentifiedData
 
     @Override
     public final void run() {
-        CompletableFuture<?> future;
+        NonCompletableFuture<?> future;
         try {
             future = doRun();
         } catch (Exception e) {
@@ -59,7 +58,7 @@ public abstract class AsyncOperation extends Operation implements IdentifiedData
         future.whenComplete(withTryCatch(getLogger(), (r, f) -> doSendResponse(f != null ? peel(f) : r)));
     }
 
-    protected abstract CompletableFuture<?> doRun() throws Exception;
+    protected abstract NonCompletableFuture<?> doRun() throws Exception;
 
     @Override
     public final boolean returnsResponse() {
@@ -87,7 +86,7 @@ public abstract class AsyncOperation extends Operation implements IdentifiedData
                     // the response will not be sent and the operation will hang.
                     // To prevent this from happening, replace the exception with
                     // another exception that can be serialized.
-                    sendResponse(new JetException(stackTraceToString(ex)));
+                    sendResponse(new SeaTunnelEngineException(stackTraceToString(ex)));
                 } else {
                     throw e;
                 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
similarity index 61%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
index 689f697be..8015895a4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
@@ -17,48 +17,50 @@
 
 package org.apache.seatunnel.engine.server.operation;
 
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
 import com.hazelcast.internal.nio.IOUtil;
 import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
+import lombok.NonNull;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 
-public class SubmitJobOperation extends AsyncOperation {
-    private Data jobImmutableInformation;
+public class DeployTaskOperation extends AsyncOperation {
+    private Data taskImmutableInformation;
 
-    public SubmitJobOperation() {
+    public DeployTaskOperation() {
     }
 
-    public SubmitJobOperation(Data jobImmutableInformation) {
-        this.jobImmutableInformation = jobImmutableInformation;
+    public DeployTaskOperation(@NonNull Data taskImmutableInformation) {
+        this.taskImmutableInformation = taskImmutableInformation;
+    }
+
+    @Override
+    protected NonCompletableFuture<?> doRun() throws Exception {
+        TaskExecutionService taskExecutionService = getService();
+        NonCompletableFuture<TaskExecutionState> voidCompletableFuture = taskExecutionService.deployTask(taskImmutableInformation);
+        return voidCompletableFuture;
     }
 
     @Override
     public int getClassId() {
-        return OperationDataSerializerHook.SUBMIT_OPERATOR;
+        return OperationDataSerializerHook.DEPLOY_TASK_OPERATOR;
     }
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         super.writeInternal(out);
-        IOUtil.writeData(out, jobImmutableInformation);
+        IOUtil.writeData(out, taskImmutableInformation);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         super.readInternal(in);
-        jobImmutableInformation = IOUtil.readData(in);
-    }
-
-    @Override
-    protected CompletableFuture<?> doRun() throws Exception {
-        SeaTunnelServer seaTunnelServer = getService();
-        CompletableFuture<Void> voidCompletableFuture = seaTunnelServer.submitJob(jobImmutableInformation);
-        return voidCompletableFuture;
+        taskImmutableInformation = IOUtil.readData(in);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
index 689f697be..419712111 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.operation;
 
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
@@ -24,9 +25,9 @@ import com.hazelcast.internal.nio.IOUtil;
 import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
+import lombok.NonNull;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 
 public class SubmitJobOperation extends AsyncOperation {
     private Data jobImmutableInformation;
@@ -34,7 +35,7 @@ public class SubmitJobOperation extends AsyncOperation {
     public SubmitJobOperation() {
     }
 
-    public SubmitJobOperation(Data jobImmutableInformation) {
+    public SubmitJobOperation(@NonNull Data jobImmutableInformation) {
         this.jobImmutableInformation = jobImmutableInformation;
     }
 
@@ -56,9 +57,9 @@ public class SubmitJobOperation extends AsyncOperation {
     }
 
     @Override
-    protected CompletableFuture<?> doRun() throws Exception {
+    protected NonCompletableFuture<?> doRun() throws Exception {
         SeaTunnelServer seaTunnelServer = getService();
-        CompletableFuture<Void> voidCompletableFuture = seaTunnelServer.submitJob(jobImmutableInformation);
+        NonCompletableFuture<Void> voidCompletableFuture = seaTunnelServer.submitJob(jobImmutableInformation);
         return voidCompletableFuture;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
similarity index 74%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index 8fceed6c9..1ff9e4b76 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -15,10 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.scheduler;
+package org.apache.seatunnel.engine.server.resourcemanager;
 
-public interface JobScheduler {
-    void startScheduling();
+import com.hazelcast.cluster.Address;
+import lombok.NonNull;
 
-    boolean updateExecutionState();
+public interface ResourceManager {
+    Address applyForResource(Long jobId, Long taskId);
+
+    @NonNull
+    Address getAppliedResource(Long jobId, Long taskId);
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
new file mode 100644
index 000000000..209f3fb69
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.exception.JobException;
+
+import com.hazelcast.cluster.Address;
+import lombok.Data;
+import lombok.NonNull;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+@Data
+public class SimpleResourceManager implements ResourceManager {
+
+    // TODO We may need more detailed resource define, instead of the resource definition method of only Address.
+    private Map<Long, Map<Long, Address>> physicalVertexIdAndResourceMap = new HashMap<>();
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Override
+    public Address applyForResource(@NonNull Long jobId, @NonNull Long taskId) {
+        try {
+            Map<Long, Address> jobAddressMap = physicalVertexIdAndResourceMap.get(jobId);
+            if (jobAddressMap == null) {
+                jobAddressMap = new HashMap<>();
+                physicalVertexIdAndResourceMap.put(jobId, jobAddressMap);
+            }
+
+            Address localhost =
+                jobAddressMap.putIfAbsent(taskId, new Address("localhost", 5801));
+
+            if (null == localhost) {
+                localhost = jobAddressMap.get(taskId);
+            }
+
+            return localhost;
+
+        } catch (UnknownHostException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    @NonNull
+    public Address getAppliedResource(@NonNull Long jobId, @NonNull Long taskId) {
+        Map<Long, Address> longAddressMap = physicalVertexIdAndResourceMap.get(jobId);
+        if (null == longAddressMap || longAddressMap.isEmpty()) {
+            throw new JobException(
+                String.format("Job %s, Task %s can not found applied resource.", jobId, taskId));
+        }
+
+        return longAddressMap.get(taskId);
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
index 8fceed6c9..bbe003e39 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
@@ -19,6 +19,4 @@ package org.apache.seatunnel.engine.server.scheduler;
 
 public interface JobScheduler {
     void startScheduling();
-
-    boolean updateExecutionState();
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
new file mode 100644
index 000000000..5d608a62a
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.scheduler;
+
+import org.apache.seatunnel.engine.common.exception.JobNoEnoughResourceException;
+import org.apache.seatunnel.engine.core.job.PipelineState;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.NonNull;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+public class PipelineBaseScheduler implements JobScheduler {
+    private static final ILogger LOGGER = Logger.getLogger(PipelineBaseScheduler.class);
+    private final PhysicalPlan physicalPlan;
+    private final JobMaster jobMaster;
+    private final ResourceManager resourceManager;
+
+    public PipelineBaseScheduler(@NonNull PhysicalPlan physicalPlan, @NonNull JobMaster jobMaster) {
+        this.physicalPlan = physicalPlan;
+        this.jobMaster = jobMaster;
+        this.resourceManager = jobMaster.getResourceManager();
+    }
+
+    @Override
+    public void startScheduling() {
+        physicalPlan.turnToRunning();
+        physicalPlan.getPipelineList().forEach(pipeline -> {
+            pipeline.updatePipelineState(PipelineState.CREATED, PipelineState.SCHEDULED);
+            if (applyResourceForPipeline(pipeline)) {
+                // deploy pipeline
+                deployPipeline(pipeline);
+            } else {
+                pipeline.failedWithNoEnoughResource();
+            }
+        });
+    }
+
+    private boolean applyResourceForPipeline(@NonNull SubPlan subPlan) {
+        try {
+            // apply resource for coordinators
+            subPlan.getCoordinatorVertexList().forEach(coordinator -> {
+                // TODO If there is no enough resources for tasks, we need add some wait profile
+                coordinator.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
+                resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
+                    coordinator.getTaskGroup().getId());
+            });
+
+            // apply resource for other tasks
+            subPlan.getPhysicalVertexList().forEach(task -> {
+                task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
+                resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
+                    task.getTaskGroup().getId());
+            });
+        } catch (JobNoEnoughResourceException e) {
+            LOGGER.severe(e);
+            return false;
+        }
+
+        return true;
+    }
+
+    private void deployPipeline(@NonNull SubPlan pipeline) {
+        pipeline.updatePipelineState(PipelineState.SCHEDULED, PipelineState.DEPLOYING);
+        List<CompletableFuture> deployCoordinatorFuture =
+            pipeline.getCoordinatorVertexList().stream().map(coordinator -> {
+                if (coordinator.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
+                    // deploy is a time-consuming operation, so we do it async
+                    return CompletableFuture.supplyAsync(() -> {
+                        coordinator.deploy(
+                            resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
+                                coordinator.getTaskGroup().getId()));
+                        return null;
+                    });
+                }
+                return null;
+            }).filter(x -> x != null).collect(Collectors.toList());
+
+        List<CompletableFuture> deployTaskFuture =
+            pipeline.getPhysicalVertexList().stream().map(task -> {
+                if (task.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
+                    return CompletableFuture.supplyAsync(() -> {
+                        task.deploy(
+                            resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
+                                task.getTaskGroup().getId()));
+                        return null;
+                    });
+                }
+                return null;
+            }).filter(x -> x != null).collect(Collectors.toList());
+
+        deployCoordinatorFuture.addAll(deployTaskFuture);
+        CompletableFuture.allOf(deployCoordinatorFuture.toArray(new CompletableFuture[deployCoordinatorFuture.size()]))
+            .whenComplete((v, t) -> {
+                pipeline.updatePipelineState(PipelineState.DEPLOYING, PipelineState.RUNNING);
+            });
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index 3153413d4..07db59e4e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.serializable;
 
 import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
 import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
 import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
 
@@ -36,6 +37,7 @@ import com.hazelcast.spi.annotation.PrivateApi;
 public final class OperationDataSerializerHook implements DataSerializerHook {
     public static final int PRINT_MESSAGE_OPERATOR = 0;
     public static final int SUBMIT_OPERATOR = 1;
+    public static final int DEPLOY_TASK_OPERATOR = 2;
 
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
@@ -61,6 +63,8 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
                     return new PrintMessageOperation();
                 case SUBMIT_OPERATOR:
                     return new SubmitJobOperation();
+                case DEPLOY_TASK_OPERATOR:
+                    return new DeployTaskOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index a59721b78..2e09b137b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.engine.server.serializable;
 
 import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
-import org.apache.seatunnel.engine.server.task.TaskGroupInfo;
+import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 import org.apache.seatunnel.engine.server.task.operation.AssignSplitOperation;
 import org.apache.seatunnel.engine.server.task.operation.RegisterOperation;
 import org.apache.seatunnel.engine.server.task.operation.RequestSplitOperation;
@@ -64,7 +64,7 @@ public class TaskDataSerializerHook implements DataSerializerHook {
                 case ASSIGN_SPLIT_TYPE:
                     return new AssignSplitOperation();
                 case TASK_GROUP_INFO_TYPE:
-                    return new TaskGroupInfo();
+                    return new TaskGroupImmutableInformation();
                 default:
                     return null;
             }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupInfo.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java
similarity index 84%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupInfo.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java
index e05ddcf9b..cc1f07852 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupInfo.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java
@@ -24,31 +24,23 @@ import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
 import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.AllArgsConstructor;
 
 import java.io.IOException;
 import java.net.URL;
 import java.util.Set;
 
-public class TaskGroupInfo implements IdentifiedDataSerializable {
+@lombok.Data
+@AllArgsConstructor
+public class TaskGroupImmutableInformation implements IdentifiedDataSerializable {
+    // Each deployment generates a new executionId
+    private long executionId;
 
     private Data group;
 
     private Set<URL> jars;
 
-    public TaskGroupInfo() {
-    }
-
-    public TaskGroupInfo(Data group, Set<URL> jars) {
-        this.group = group;
-        this.jars = jars;
-    }
-
-    public Data getGroup() {
-        return group;
-    }
-
-    public Set<URL> getJars() {
-        return jars;
+    public TaskGroupImmutableInformation(){
     }
 
     @Override
@@ -63,12 +55,14 @@ public class TaskGroupInfo implements IdentifiedDataSerializable {
 
     @Override
     public void writeData(ObjectDataOutput out) throws IOException {
+        out.writeLong(executionId);
         out.writeObject(jars);
         IOUtil.writeData(out, group);
     }
 
     @Override
     public void readData(ObjectDataInput in) throws IOException {
+        executionId = in.readLong();
         jars = in.readObject();
         group = IOUtil.readData(in);
     }