You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/09 10:50:05 UTC

[incubator-seatunnel] branch st-engine updated: [ST-Engine][PhysicalPlan] Update PhysicalPlan to support scheduler by pipeline (#2382)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 246c64811 [ST-Engine][PhysicalPlan] Update PhysicalPlan to support scheduler by pipeline (#2382)
246c64811 is described below

commit 246c648113fc8428fda8a950d0a135f2397ad7d3
Author: Eric <ga...@gmail.com>
AuthorDate: Tue Aug 9 18:49:58 2022 +0800

    [ST-Engine][PhysicalPlan] Update PhysicalPlan to support scheduler by pipeline (#2382)
    
    * Add SeaTunnel Engine ConfigProvider and seatunnel-seatunnel-starter
    
    * add source file to licenserc.yaml
    
    * fix checkstyle
    
    * fix error
    
    * tmp
    
    * tmp
    
    * Update PhysicalPlan to support scheduler by pipeline
    
    * remove init in run
    
    * fix string format error
---
 LICENSE                                            |  27 ++-
 .../src/main/bin/start-seatunnel-seatunnel.sh      |  64 ++++++
 .../seatunnel/engine/client/JobConfigParser.java   |  19 +-
 .../engine/client/SeaTunnelClientTest.java         |   2 +-
 .../engine/common/config/EngineConfig.java         |   8 +
 .../config/YamlSeaTunnelDomConfigProcessor.java    |   5 +
 ...ionSeaTunnel.java => JobCanceledException.java} |   9 +-
 ...SeaTunnel.java => JobDefineCheckException.java} |   6 +-
 ...ckExceptionSeaTunnel.java => JobException.java} |  10 +-
 ...ptionSeaTunnel.java => JobFailedException.java} |   9 +-
 ...ionSeaTunnel.java => JobNotFoundException.java} |   8 +-
 .../engine/common/utils/ExceptionUtil.java         |   8 +-
 .../engine/common/utils/NonCompletableFuture.java  |  73 ++++++
 .../seatunnel/engine/core/job/JobStatus.java       | 103 +++++++++
 .../seatunnel/engine/core/job/PipelineState.java   |  79 +++++++
 .../seatunnel/engine/core/job/StatusUpdate.java}   |  18 +-
 .../seatunnel/engine/server/NodeExtension.java     |   7 +-
 .../engine/server/SeaTunnelNodeContext.java        |  13 +-
 .../seatunnel/engine/server/SeaTunnelServer.java   |  33 ++-
 .../engine/server/SeaTunnelServerStarter.java      |  12 +-
 .../engine/server/dag/execution/ExecutionPlan.java |  13 +-
 .../dag/execution/ExecutionPlanGenerator.java      |  36 ++-
 .../engine/server/dag/physical/PhysicalPlan.java   | 132 +++++++++--
 .../server/dag/physical/PhysicalPlanGenerator.java | 247 +++++++++++++++------
 .../server/dag/physical/PhysicalPlanUtils.java     |  21 +-
 .../engine/server/dag/physical/PhysicalVertex.java | 137 ++++++++++++
 .../engine/server/dag/physical/SubPlan.java        | 137 ++++++++++++
 .../engine/server/execution/ExecutionState.java    |  79 +++++++
 .../{TaskGroup.java => TaskExecutionState.java}    |  32 ++-
 .../engine/server/execution/TaskGroup.java         |   9 +-
 .../seatunnel/engine/server/master/JobMaster.java  |  70 +++---
 .../JobScheduler.java}                             |  17 +-
 32 files changed, 1209 insertions(+), 234 deletions(-)

diff --git a/LICENSE b/LICENSE
index 696821eed..eb58900c3 100644
--- a/LICENSE
+++ b/LICENSE
@@ -210,17 +210,20 @@ The text of each license is the standard Apache 2.0 license.
 tools/dependencies/checkLicense.sh files from https://github.com/apache/skywalking
 mvnw files from https://github.com/apache/maven-wrapper Apache 2.0
 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/RowKind.java from  https://github.com/apache/flink
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigNodePath.java    from  https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigParser.java      from  https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java              from  https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java        from  https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java     from https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java     from https://github.com/lightbend/config
-generate_client_protocol.sh                                                                                                      from https://github.com/hazelcast/hazelcast
-seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java               from https://github.com/hazelcast/hazelcast
-seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java               from https://github.com/hazelcast/hazelcast
-seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java               from https://github.com/hazelcast/hazelcast
-seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java               from https://github.com/hazelcast/hazelcast
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigNodePath.java               from  https://github.com/lightbend/config
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigParser.java                 from  https://github.com/lightbend/config
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java                         from  https://github.com/lightbend/config
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java                   from  https://github.com/lightbend/config
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java                from https://github.com/lightbend/config
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java                from https://github.com/lightbend/config
+generate_client_protocol.sh                                                                                                                 from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java                          from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java   from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java                     from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java                        from https://github.com/hazelcast/hazelcast
 seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfigSections.java               from https://github.com/hazelcast/hazelcast
-seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java               from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java            from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java                                    from https://github.com/apache/flink
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java                     from https://github.com/apache/flink
+
 
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-seatunnel.sh b/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-seatunnel.sh
new file mode 100755
index 000000000..1cfc741d8
--- /dev/null
+++ b/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-seatunnel.sh
@@ -0,0 +1,64 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -eu
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ] ; do
+  # shellcheck disable=SC2006
+  ls=`ls -ld "$PRG"`
+  # shellcheck disable=SC2006
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    # shellcheck disable=SC2006
+    PRG=`dirname "$PRG"`/"$link"
+  fi
+done
+
+PRG_DIR=`dirname "$PRG"`
+APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
+CONF_DIR=${APP_DIR}/config
+APP_JAR=${APP_DIR}/lib/seatunnel-seatunnel-starter.jar
+APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelStarter"
+
+if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
+    . "${CONF_DIR}/seatunnel-env.sh"
+fi
+
+if [ $# == 0 ]
+then
+    args="-h"
+else
+    args=$@
+fi
+
+CMD=$(java -cp ${APP_JAR} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
+if [ ${EXIT_CODE} -eq 234 ]; then
+    # print usage
+    echo "${CMD}"
+    exit 0
+elif [ ${EXIT_CODE} -eq 0 ]; then
+    echo "Execute SeaTunnel Job: ${CMD}"
+    eval ${CMD}
+else
+    echo "${CMD}"
+    exit ${EXIT_CODE}
+fi
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
index cbd014db7..58b2ee3ad 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.apis.base.plugin.Plugin;
 import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.exception.JobDefineCheckExceptionSeaTunnel;
+import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
@@ -73,7 +73,8 @@ public class JobConfigParser {
 
     private JobConfig jobConfig;
 
-    protected JobConfigParser(@NonNull String jobDefineFilePath, @NonNull IdGenerator idGenerator, @NonNull JobConfig jobConfig) {
+    protected JobConfigParser(@NonNull String jobDefineFilePath, @NonNull IdGenerator idGenerator,
+                              @NonNull JobConfig jobConfig) {
         this.jobDefineFilePath = jobDefineFilePath;
         this.idGenerator = idGenerator;
         this.jobConfig = jobConfig;
@@ -87,7 +88,7 @@ public class JobConfigParser {
         List<? extends Config> sourceConfigs = seaTunnelJobConfig.getConfigList("source");
 
         if (CollectionUtils.isEmpty(sinkConfigs) || CollectionUtils.isEmpty(sourceConfigs)) {
-            throw new JobDefineCheckExceptionSeaTunnel("Source And Sink can not be null");
+            throw new JobDefineCheckException("Source And Sink can not be null");
         }
 
         jobConfigAnalyze(envConfigs);
@@ -129,7 +130,7 @@ public class JobConfigParser {
 
             actions.add(sinkAction);
             if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
-                throw new JobDefineCheckExceptionSeaTunnel(Plugin.SOURCE_TABLE_NAME
+                throw new JobDefineCheckException(Plugin.SOURCE_TABLE_NAME
                     + " must be set in the sink plugin config when the job have complex dependencies");
             }
             String sourceTableName = config.getString(Plugin.SOURCE_TABLE_NAME);
@@ -137,7 +138,7 @@ public class JobConfigParser {
             if (CollectionUtils.isEmpty(transformConfigList)) {
                 sourceAnalyze(sourceTableName, sinkAction);
             } else if (transformConfigList.size() > 1) {
-                throw new JobDefineCheckExceptionSeaTunnel("Only UnionTransform can have more than one upstream, "
+                throw new JobDefineCheckException("Only UnionTransform can have more than one upstream, "
                     + sinkAction.getName()
                     + " is not UnionTransform Connector");
             } else {
@@ -150,7 +151,7 @@ public class JobConfigParser {
     private void sourceAnalyze(String sourceTableName, Action action) {
         List<Config> sourceConfigList = sourceResultTableNameMap.get(sourceTableName);
         if (CollectionUtils.isEmpty(sourceConfigList)) {
-            throw new JobDefineCheckExceptionSeaTunnel(action.getName()
+            throw new JobDefineCheckException(action.getName()
                 + " source table name [" + sourceTableName + "] can not be found");
         }
 
@@ -203,7 +204,7 @@ public class JobConfigParser {
     private void initRelationMap(List<? extends Config> sourceConfigs, List<? extends Config> transformConfigs) {
         for (Config config : sourceConfigs) {
             if (!config.hasPath(Plugin.RESULT_TABLE_NAME)) {
-                throw new JobDefineCheckExceptionSeaTunnel(Plugin.RESULT_TABLE_NAME
+                throw new JobDefineCheckException(Plugin.RESULT_TABLE_NAME
                     + " must be set in the source plugin config when the job have complex dependencies");
             }
             String resultTableName = config.getString(Plugin.RESULT_TABLE_NAME);
@@ -215,12 +216,12 @@ public class JobConfigParser {
 
         for (Config config : transformConfigs) {
             if (!config.hasPath(Plugin.RESULT_TABLE_NAME)) {
-                throw new JobDefineCheckExceptionSeaTunnel(Plugin.RESULT_TABLE_NAME
+                throw new JobDefineCheckException(Plugin.RESULT_TABLE_NAME
                     + " must be set in the transform plugin config when the job have complex dependencies");
             }
 
             if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
-                throw new JobDefineCheckExceptionSeaTunnel(Plugin.SOURCE_TABLE_NAME
+                throw new JobDefineCheckException(Plugin.SOURCE_TABLE_NAME
                     + " must be set in the transform plugin config when the job have complex dependencies");
             }
             String resultTableName = config.getString(Plugin.RESULT_TABLE_NAME);
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 2c451034c..ee9e2533a 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -44,7 +44,7 @@ public class SeaTunnelClientTest {
         SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
         HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
             Thread.currentThread().getName(),
-            new SeaTunnelNodeContext());
+            new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
     }
 
     @Test
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index e62481cf8..12f562c90 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -25,9 +25,17 @@ import lombok.Data;
 public class EngineConfig {
     private int backupCount;
 
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private int serverExecutorPoolSize = 20;
+
     public EngineConfig setBackupCount(int newBackupCount) {
         checkBackupCount(newBackupCount, 0);
         this.backupCount = newBackupCount;
         return this;
     }
+
+    public EngineConfig setServerExecutorPoolSize(int serverExecutorPoolSize) {
+        this.serverExecutorPoolSize = serverExecutorPoolSize;
+        return this;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 5531db478..5d75a540a 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -71,6 +71,11 @@ public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor
                         getIntegerValue("backup-count", getTextContent(node))
                     );
                     break;
+                case "server-executor-pool-size":
+                    engineConfig.setServerExecutorPoolSize(
+                        getIntegerValue("server-executor-pool-size", getTextContent(node))
+                    );
+                    break;
                 default:
                     throw new AssertionError("Unrecognized element: " + name);
             }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobCanceledException.java
similarity index 76%
copy from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobCanceledException.java
index 2d0e10fe6..0d4daff1c 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobCanceledException.java
@@ -17,13 +17,16 @@
 
 package org.apache.seatunnel.engine.common.exception;
 
-public class JobDefineCheckExceptionSeaTunnel extends SeaTunnelEngineException {
+public class JobCanceledException extends SeaTunnelEngineException {
+    public JobCanceledException(long jobId) {
+        super("Job with id " + jobId + " canceled");
+    }
 
-    public JobDefineCheckExceptionSeaTunnel(String message) {
+    public JobCanceledException(String message) {
         super(message);
     }
 
-    public JobDefineCheckExceptionSeaTunnel(String message, Throwable cause) {
+    public JobCanceledException(String message, Throwable cause) {
         super(message, cause);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckException.java
similarity index 80%
copy from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckException.java
index 2d0e10fe6..d4c758367 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckException.java
@@ -17,13 +17,13 @@
 
 package org.apache.seatunnel.engine.common.exception;
 
-public class JobDefineCheckExceptionSeaTunnel extends SeaTunnelEngineException {
+public class JobDefineCheckException extends SeaTunnelEngineException {
 
-    public JobDefineCheckExceptionSeaTunnel(String message) {
+    public JobDefineCheckException(String message) {
         super(message);
     }
 
-    public JobDefineCheckExceptionSeaTunnel(String message, Throwable cause) {
+    public JobDefineCheckException(String message, Throwable cause) {
         super(message, cause);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobException.java
similarity index 74%
copy from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobException.java
index 2d0e10fe6..ba872a63b 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobException.java
@@ -17,13 +17,17 @@
 
 package org.apache.seatunnel.engine.common.exception;
 
-public class JobDefineCheckExceptionSeaTunnel extends SeaTunnelEngineException {
+public class JobException extends SeaTunnelEngineException {
 
-    public JobDefineCheckExceptionSeaTunnel(String message) {
+    public JobException(String message) {
         super(message);
     }
 
-    public JobDefineCheckExceptionSeaTunnel(String message, Throwable cause) {
+    public JobException(String message, Throwable cause) {
         super(message, cause);
     }
+
+    public JobException(long jobId, String message, Throwable cause) {
+        super("Job with id [" + jobId + "] Exception " + message, cause);
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobFailedException.java
similarity index 76%
rename from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
rename to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobFailedException.java
index 2d0e10fe6..dd4bc2323 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobFailedException.java
@@ -17,13 +17,16 @@
 
 package org.apache.seatunnel.engine.common.exception;
 
-public class JobDefineCheckExceptionSeaTunnel extends SeaTunnelEngineException {
+public class JobFailedException extends SeaTunnelEngineException {
+    public JobFailedException(long jobId) {
+        super("Job with id " + jobId + " failed");
+    }
 
-    public JobDefineCheckExceptionSeaTunnel(String message) {
+    public JobFailedException(String message) {
         super(message);
     }
 
-    public JobDefineCheckExceptionSeaTunnel(String message, Throwable cause) {
+    public JobFailedException(String message, Throwable cause) {
         super(message, cause);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundExceptionSeaTunnel.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundException.java
similarity index 78%
rename from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundExceptionSeaTunnel.java
rename to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundException.java
index 1b4158886..fccf3c0bd 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundExceptionSeaTunnel.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundException.java
@@ -17,16 +17,16 @@
 
 package org.apache.seatunnel.engine.common.exception;
 
-public class JobNotFoundExceptionSeaTunnel extends SeaTunnelEngineException {
-    public JobNotFoundExceptionSeaTunnel(long jobId) {
+public class JobNotFoundException extends SeaTunnelEngineException {
+    public JobNotFoundException(long jobId) {
         super("Job with id " + jobId + " not found");
     }
 
-    public JobNotFoundExceptionSeaTunnel(String message) {
+    public JobNotFoundException(String message) {
         super(message);
     }
 
-    public JobNotFoundExceptionSeaTunnel(String message, Throwable cause) {
+    public JobNotFoundException(String message, Throwable cause) {
         super(message, cause);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
index a3c211af5..8e8bf42ac 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
@@ -16,8 +16,8 @@
 
 package org.apache.seatunnel.engine.common.utils;
 
-import org.apache.seatunnel.engine.common.exception.JobDefineCheckExceptionSeaTunnel;
-import org.apache.seatunnel.engine.common.exception.JobNotFoundExceptionSeaTunnel;
+import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
+import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 
 import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
@@ -36,8 +36,8 @@ public final class ExceptionUtil {
 
     private static final List<ImmutableTriple<Integer, Class<? extends Throwable>, ClientExceptionFactory.ExceptionFactory>> EXCEPTIONS = Arrays.asList(
         new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START, SeaTunnelEngineException.class, SeaTunnelEngineException::new),
-        new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 1, JobNotFoundExceptionSeaTunnel.class, JobNotFoundExceptionSeaTunnel::new),
-        new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 2, JobDefineCheckExceptionSeaTunnel.class, JobDefineCheckExceptionSeaTunnel::new)
+        new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 1, JobNotFoundException.class, JobNotFoundException::new),
+        new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 2, JobDefineCheckException.class, JobDefineCheckException::new)
     );
 
     private ExceptionUtil() {
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
new file mode 100644
index 000000000..a3c6d2380
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.utils;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A future which prevents completion by outside caller
+ */
+public class NonCompletableFuture<T> extends CompletableFuture<T> {
+
+    public NonCompletableFuture() {
+    }
+
+    public NonCompletableFuture(CompletableFuture<T> chainedFuture) {
+        chainedFuture.whenComplete((r, t) -> {
+            if (t != null) {
+                internalCompleteExceptionally(t);
+            } else {
+                internalComplete(r);
+            }
+        });
+    }
+
+    @Override
+    public boolean completeExceptionally(Throwable ex) {
+        throw new UnsupportedOperationException("This future can't be completed by an outside caller");
+    }
+
+    @Override
+    public boolean complete(T value) {
+        throw new UnsupportedOperationException("This future can't be completed by an outside caller");
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        throw new UnsupportedOperationException("This future can't be cancelled by an outside caller");
+    }
+
+    @Override
+    public void obtrudeException(Throwable ex) {
+        throw new UnsupportedOperationException("This future can't be completed by an outside caller");
+    }
+
+    @Override
+    public void obtrudeValue(T value) {
+        throw new UnsupportedOperationException("This future can't be completed by an outside caller");
+    }
+
+    private void internalComplete(T value) {
+        super.complete(value);
+    }
+
+    private void internalCompleteExceptionally(Throwable ex) {
+        super.completeExceptionally(ex);
+    }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
new file mode 100644
index 000000000..d0c9fd963
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.job;
+
+/** Possible states of a job once it has been accepted by the dispatcher. */
+public enum JobStatus {
+    /**
+     * The job has been received by the Dispatcher, and is waiting for the job manager to receive
+     * leadership and to be created.
+     */
+    INITIALIZING(EndState.NOT_END),
+
+    /** Job is newly created, no task has started to run. */
+    CREATED(EndState.NOT_END),
+
+    /** Some tasks are scheduled or running, some may be pending, some may be finished. */
+    RUNNING(EndState.NOT_END),
+
+    /** The job has failed and is currently waiting for the cleanup to complete. */
+    FAILING(EndState.NOT_END),
+
+    /** The job has failed with a non-recoverable task failure. */
+    FAILED(EndState.GLOBALLY),
+
+    /** Job is being cancelled. */
+    CANCELLING(EndState.NOT_END),
+
+    /** Job has been cancelled. */
+    CANCELED(EndState.GLOBALLY),
+
+    /** All of the job's tasks have successfully finished. */
+    FINISHED(EndState.GLOBALLY),
+
+    /** The job is currently undergoing a reset and total restart. */
+    RESTARTING(EndState.NOT_END),
+
+    /**
+     * The job has been suspended which means that it has been stopped but not been removed from a
+     * potential HA job store.
+     */
+    SUSPENDED(EndState.LOCALLY),
+
+    /** The job is currently reconciling and waits for task execution report to recover state. */
+    RECONCILING(EndState.NOT_END);
+
+    // --------------------------------------------------------------------------------------------
+
+    private enum EndState {
+        NOT_END,
+        LOCALLY,
+        GLOBALLY
+    }
+
+    private final EndState endState;
+
+    JobStatus(EndState endState) {
+        this.endState = endState;
+    }
+
+    /**
+     * Checks whether this state is <i>globally terminal</i>. A globally terminal job is complete
+     * and cannot fail any more and will not be restarted or recovered by another standby master
+     * node.
+     *
+     * <p>When a globally terminal state has been reached, all recovery data for the job is dropped
+     * from the high-availability services.
+     *
+     * @return True, if this job status is globally terminal, false otherwise.
+     */
+    public boolean isGloballyEndState() {
+        return endState == EndState.GLOBALLY;
+    }
+
+    /**
+     * Checks whether this state is <i>locally terminal</i>. Locally terminal refers to the state of
+     * a job's execution graph within an executing JobManager. If the execution graph is locally
+     * terminal, the JobManager will not continue executing or recovering the job.
+     *
+     * <p>The only state that is locally terminal, but not globally terminal is {@link #SUSPENDED},
+     * which is typically entered when the executing JobManager loses its leader status.
+     *
+     * @return True, if this job status is terminal, false otherwise.
+     */
+    public boolean isEndState() {
+        return endState != EndState.NOT_END;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineState.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineState.java
new file mode 100644
index 000000000..df6c2a370
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.job;
+
+/**
+ * An enumeration of all states that a pipeline can be in during its execution. Pipeline usually start in
+ * the state {@code CREATED} and switch states according to this diagram:
+ *
+ * <pre>{@code
+ *  CREATED  -> SCHEDULED -> DEPLOYING -> INITIALIZING -> RUNNING -> FINISHED
+ *     |            |            |          |              |
+ *     |            |            |    +-----+--------------+
+ *     |            |            V    V
+ *     |            |         CANCELLING -----+----> CANCELED
+ *     |            |                         |
+ *     |            +-------------------------+
+ *     |
+ *     |                                   ... -> FAILED
+ *     V
+ * RECONCILING  -> INITIALIZING | RUNNING | FINISHED | CANCELED | FAILED
+ *
+ * }</pre>
+ *
+ * <p>It is possible to enter the {@code RECONCILING} state from {@code CREATED} state if job
+ * manager fail over, and the {@code RECONCILING} state can switch into any existing Pipeline state.
+ *
+ * <p>It is possible to enter the {@code FAILED} state from any other state.
+ *
+ * <p>The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are considered terminal
+ * states.
+ */
+public enum PipelineState {
+    CREATED,
+
+    SCHEDULED,
+
+    DEPLOYING,
+
+    RUNNING,
+
+    /**
+     * This state marks "successfully completed". It can only be reached when a program reaches the
+     * "end of its input". The "end of input" can be reached when consuming a bounded input (fix set
+     * of files, bounded query, etc) or when stopping a program (not cancelling!) which make the
+     * input look like it reached its end at a specific point.
+     */
+    FINISHED,
+
+    CANCELING,
+
+    CANCELED,
+
+    FAILED,
+
+    RECONCILING,
+
+    /** Restoring last possible valid state of the pipeline if it has it. */
+    INITIALIZING;
+
+    public boolean isEnd() {
+        return this == FINISHED || this == CANCELED || this == FAILED;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/StatusUpdate.java
similarity index 71%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/StatusUpdate.java
index b72c62287..b7b23b1a6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/StatusUpdate.java
@@ -15,19 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.dag.execution;
+package org.apache.seatunnel.engine.core.job;
 
-import java.util.List;
-
-public class ExecutionPlan {
-
-    private final List<Pipeline> pipelines;
-
-    ExecutionPlan(List<Pipeline> pipelines) {
-        this.pipelines = pipelines;
-    }
-
-    public List<Pipeline> getPipelines() {
-        return pipelines;
-    }
+public enum StatusUpdate {
+    STOP,
+    CANCEL;
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
index 1bd5a83d4..311673271 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
@@ -17,18 +17,21 @@
 
 package org.apache.seatunnel.engine.server;
 
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+
 import com.hazelcast.cluster.ClusterState;
 import com.hazelcast.instance.impl.DefaultNodeExtension;
 import com.hazelcast.instance.impl.Node;
+import lombok.NonNull;
 
 import java.util.Map;
 
 public class NodeExtension extends DefaultNodeExtension {
     private final NodeExtensionCommon extCommon;
 
-    public NodeExtension(Node node) {
+    public NodeExtension(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
         super(node);
-        extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(node));
+        extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(node, seaTunnelConfig));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
index 9c112591f..388b0fad9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
@@ -17,14 +17,23 @@
 
 package org.apache.seatunnel.engine.server;
 
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+
 import com.hazelcast.instance.impl.DefaultNodeContext;
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.instance.impl.NodeExtension;
+import lombok.NonNull;
 
 public class SeaTunnelNodeContext extends DefaultNodeContext {
 
+    private final SeaTunnelConfig seaTunnelConfig;
+
+    public SeaTunnelNodeContext(@NonNull SeaTunnelConfig seaTunnelConfig) {
+        this.seaTunnelConfig = seaTunnelConfig;
+    }
+
     @Override
-    public NodeExtension createNodeExtension(Node node) {
-        return new org.apache.seatunnel.engine.server.NodeExtension(node);
+    public NodeExtension createNodeExtension(@NonNull Node node) {
+        return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 42c3cb157..24a36c355 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -17,7 +17,8 @@
 
 package org.apache.seatunnel.engine.server;
 
-import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.server.master.JobMaster;
 
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.internal.serialization.Data;
@@ -30,9 +31,12 @@ import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.impl.operationservice.LiveOperations;
 import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
+import lombok.NonNull;
 
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker {
     public static final String SERVICE_NAME = "st:impl:seaTunnelServer";
@@ -43,13 +47,20 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
 
     private TaskExecutionService taskExecutionService;
 
-    public SeaTunnelServer(Node node) {
+    private final ExecutorService executorService;
+
+    private final SeaTunnelConfig seaTunnelConfig;
+
+    public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
         this.logger = node.getLogger(getClass());
         this.liveOperationRegistry = new LiveOperationRegistry();
+        this.seaTunnelConfig = seaTunnelConfig;
+        this.executorService =
+            Executors.newFixedThreadPool(seaTunnelConfig.getEngineConfig().getServerExecutorPoolSize());
         logger.info("SeaTunnel server start...");
     }
 
-    public TaskExecutionService getTaskExecutionService(){
+    public TaskExecutionService getTaskExecutionService() {
         return this.taskExecutionService;
     }
 
@@ -103,21 +114,19 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
      */
     @SuppressWarnings("checkstyle:MagicNumber")
     public CompletableFuture<Void> submitJob(Data jobImmutableInformation) {
-        // TODO Here we need new a JobMaster and run it.
-        JobImmutableInformation jobInformation = nodeEngine.getSerializationService().toObject(jobImmutableInformation);
-        logger.info("Job [" + jobInformation.getJobId() + "] submit");
-        logger.info("Job [" + jobInformation.getJobId() + "] jar urls " + jobInformation.getPluginJarsUrls());
         CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
-        new Thread(() -> {
+        JobMaster jobMaster = new JobMaster(jobImmutableInformation, this.nodeEngine, executorService);
+        executorService.submit(() -> {
             try {
-                Thread.sleep(2000);
-                logger.info("I am sleep 2000 ms");
-            } catch (InterruptedException e) {
+                jobMaster.init();
+            } catch (Throwable e) {
                 throw new RuntimeException(e);
             } finally {
+                // We specify that when init is complete, the submitJob is complete
                 voidCompletableFuture.complete(null);
             }
-        }).start();
+            jobMaster.run();
+        });
         return voidCompletableFuture;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index 49db3f60c..0104c1e55 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -17,18 +17,16 @@
 
 package org.apache.seatunnel.engine.server;
 
-import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 
-import com.hazelcast.config.Config;
 import com.hazelcast.instance.impl.HazelcastInstanceFactory;
 
 public class SeaTunnelServerStarter {
 
     public static void main(String[] args) {
-        Config config = new Config();
-        config.getSecurityConfig().setEnabled(false);
-        config.getJetConfig().setEnabled(false);
-        config.setClusterName(Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
-        HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new SeaTunnelNodeContext());
+        SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+        HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
+            Thread.currentThread().getName(), new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
index b72c62287..26c93b69e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
@@ -17,17 +17,28 @@
 
 package org.apache.seatunnel.engine.server.dag.execution;
 
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+
+import lombok.NonNull;
+
 import java.util.List;
 
 public class ExecutionPlan {
 
     private final List<Pipeline> pipelines;
 
-    ExecutionPlan(List<Pipeline> pipelines) {
+    private final JobImmutableInformation jobImmutableInformation;
+
+    public ExecutionPlan(@NonNull List<Pipeline> pipelines, @NonNull JobImmutableInformation jobImmutableInformation) {
         this.pipelines = pipelines;
+        this.jobImmutableInformation = jobImmutableInformation;
     }
 
     public List<Pipeline> getPipelines() {
         return pipelines;
     }
+
+    public JobImmutableInformation getJobImmutableInformation() {
+        return jobImmutableInformation;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index af3a4c68c..2c16829b0 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -29,6 +29,9 @@ import org.apache.seatunnel.engine.core.dag.actions.UnknownActionException;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+
+import lombok.NonNull;
 
 import java.net.URL;
 import java.util.ArrayList;
@@ -48,9 +51,17 @@ public class ExecutionPlanGenerator {
     private final Map<Integer, LogicalVertex> logicalVertexes;
     private final List<LogicalEdge> logicalEdges;
 
-    public ExecutionPlanGenerator(LogicalDag logicalPlan) {
+    private final JobImmutableInformation jobImmutableInformation;
+
+    private final long initializationTimestamp;
+
+    public ExecutionPlanGenerator(@NonNull LogicalDag logicalPlan,
+                                  @NonNull JobImmutableInformation jobImmutableInformation,
+                                  long initializationTimestamp) {
         this.logicalVertexes = new HashMap<>(logicalPlan.getLogicalVertexMap());
         this.logicalEdges = new ArrayList<>(logicalPlan.getEdges());
+        this.jobImmutableInformation = jobImmutableInformation;
+        this.initializationTimestamp = initializationTimestamp;
     }
 
     public ExecutionPlan generate() {
@@ -59,7 +70,7 @@ public class ExecutionPlanGenerator {
             throw new IllegalArgumentException("ExecutionPlan Builder must have LogicalPlan and Action");
         }
         List<LogicalVertex> next = logicalVertexes.values().stream().filter(a -> a.getAction() instanceof SourceAction)
-                .collect(Collectors.toList());
+            .collect(Collectors.toList());
         while (!next.isEmpty()) {
             List<LogicalVertex> newNext = new ArrayList<>();
             next.forEach(n -> {
@@ -71,12 +82,12 @@ public class ExecutionPlanGenerator {
 
         Map<Integer, LogicalVertex> vertexes = new HashMap<>();
         actions.forEach((key, value) -> vertexes.put(key, new LogicalVertex(key, value,
-                logicalVertexes.get(key).getParallelism())));
+            logicalVertexes.get(key).getParallelism())));
 
         return new ExecutionPlan(PipelineGenerator.generatePipelines(edgeMap.entrySet().stream()
-                .flatMap(e -> e.getValue().stream().map(d -> new ExecutionEdge(convertFromLogical(vertexes.get(e.getKey())),
-                        convertFromLogical(vertexes.get(d)))))
-                .collect(Collectors.toList())));
+            .flatMap(e -> e.getValue().stream().map(d -> new ExecutionEdge(convertFromLogical(vertexes.get(e.getKey())),
+                convertFromLogical(vertexes.get(d)))))
+            .collect(Collectors.toList())), jobImmutableInformation);
     }
 
     private ExecutionVertex convertFromLogical(LogicalVertex vertex) {
@@ -105,14 +116,14 @@ public class ExecutionPlanGenerator {
                 SourceAction<?, ?, ?> source = (SourceAction<?, ?, ?>) start;
                 jars.addAll(source.getJarUrls());
                 executionAction = new PhysicalSourceAction<>(start.getId(), start.getName(),
-                        source.getSource(), new ArrayList<>(jars), transforms);
+                    source.getSource(), new ArrayList<>(jars), transforms);
                 actions.put(start.getId(), start);
             } else if (start instanceof TransformAction) {
                 TransformAction transform = (TransformAction) start;
                 jars.addAll(transform.getJarUrls());
                 transforms.add(0, transform.getTransform());
                 executionAction = new TransformChainAction(start.getId(), start.getName(),
-                        new ArrayList<>(jars), transforms);
+                    new ArrayList<>(jars), transforms);
                 actions.put(start.getId(), executionAction);
             } else {
                 throw new UnknownActionException(start);
@@ -121,7 +132,8 @@ public class ExecutionPlanGenerator {
 
         final Action e = end;
         // find next should be converted action
-        List<LogicalVertex> nextStarts = logicalEdges.stream().filter(edge -> edge.getLeftVertex().getAction().equals(e))
+        List<LogicalVertex> nextStarts =
+            logicalEdges.stream().filter(edge -> edge.getLeftVertex().getAction().equals(e))
                 .map(LogicalEdge::getRightVertex).collect(Collectors.toList());
         for (LogicalVertex n : nextStarts) {
             if (!edgeMap.containsKey(executionAction.getId())) {
@@ -134,11 +146,11 @@ public class ExecutionPlanGenerator {
 
     private List<TransformAction> findMigrateTransform(List<LogicalEdge> executionEdges, Action start) {
         List<Action> actionAfterStart =
-                executionEdges.stream().filter(edge -> edge.getLeftVertex().getAction().equals(start))
-                        .map(edge -> edge.getRightVertex().getAction()).collect(Collectors.toList());
+            executionEdges.stream().filter(edge -> edge.getLeftVertex().getAction().equals(start))
+                .map(edge -> edge.getRightVertex().getAction()).collect(Collectors.toList());
         // make sure the start's next only have one LogicalTransform, so it can be migrated.
         if (actionAfterStart.size() != 1 || actionAfterStart.get(0) instanceof PartitionTransformAction ||
-                actionAfterStart.get(0) instanceof SinkAction) {
+            actionAfterStart.get(0) instanceof SinkAction) {
             return Collections.emptyList();
         } else {
             List<TransformAction> transforms = new ArrayList<>();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index af5a9d7c1..ecd684a3a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -17,38 +17,134 @@
 
 package org.apache.seatunnel.engine.server.dag.physical;
 
-import org.apache.seatunnel.engine.server.task.TaskGroupInfo;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineState;
 
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.NonNull;
+
+import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class PhysicalPlan {
 
-    private final List<SubPlan> plans;
+    private static final ILogger LOGGER = Logger.getLogger(PhysicalPlan.class);
 
-    public PhysicalPlan(List<SubPlan> plans) {
-        this.plans = plans;
-    }
+    private final List<SubPlan> pipelineList;
+
+    private AtomicInteger finishedPipelineNum = new AtomicInteger(0);
+
+    private AtomicInteger canceledPipelineNum = new AtomicInteger(0);
+
+    private AtomicInteger failedPipelineNum = new AtomicInteger(0);
+
+    private AtomicReference<JobStatus> jobStatus = new AtomicReference<>();
+
+    private final JobImmutableInformation jobImmutableInformation;
+
+    /**
+     * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
+     * execution graph transitioned into a certain state. The index into this array is the ordinal
+     * of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
+     * stateTimestamps[RUNNING.ordinal()]}.
+     */
+    private final long[] stateTimestamps;
+
+    /**
+     * when job status turn to end, complete this future. And then the waitForCompleteByPhysicalPlan
+     * in {@link org.apache.seatunnel.engine.server.scheduler.JobScheduler} whenComplete method will be called.
+     */
+    private final CompletableFuture<JobStatus> jobEndFuture;
 
-    public List<SubPlan> getPlans() {
-        return plans;
-    }
 
-    public static class SubPlan {
-        private final List<TaskGroupInfo> tasks;
+    /**
+     * This future only can completion by the {@link SubPlan } subPlanFuture.
+     * When subPlanFuture completed, this NonCompletableFuture's whenComplete method will be called.
+     */
+    private final NonCompletableFuture<PipelineState>[] waitForCompleteBySubPlan;
 
-        private final List<TaskGroupInfo> coordinatorTasks;
+    private final ExecutorService executorService;
 
-        public SubPlan(List<TaskGroupInfo> tasks, List<TaskGroupInfo> coordinatorTasks) {
-            this.tasks = tasks;
-            this.coordinatorTasks = coordinatorTasks;
+    public PhysicalPlan(@NonNull List<SubPlan> pipelineList,
+                        @NonNull ExecutorService executorService,
+                        @NonNull JobImmutableInformation jobImmutableInformation,
+                        long initializationTimestamp,
+                        @NonNull NonCompletableFuture<PipelineState>[] waitForCompleteBySubPlan) {
+        this.executorService = executorService;
+        this.jobImmutableInformation = jobImmutableInformation;
+        stateTimestamps = new long[JobStatus.values().length];
+        this.stateTimestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
+        this.jobStatus.set(JobStatus.CREATED);
+        this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
+        this.jobEndFuture = new CompletableFuture<JobStatus>();
+        this.waitForCompleteBySubPlan = waitForCompleteBySubPlan;
+        this.pipelineList = pipelineList;
+
+        Arrays.stream(this.waitForCompleteBySubPlan).forEach(x -> {
+            x.whenComplete((v, t) -> {
+                if (PipelineState.CANCELED.equals(v)) {
+                    canceledPipelineNum.incrementAndGet();
+                } else if (PipelineState.FAILED.equals(v)) {
+                    failedPipelineNum.incrementAndGet();
+                }
+
+                if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
+                    if (failedPipelineNum.get() > 0) {
+                        jobStatus.set(JobStatus.FAILING);
+                    } else if (canceledPipelineNum.get() > 0) {
+                        jobStatus.set(JobStatus.CANCELED);
+                    } else {
+                        jobStatus.set(JobStatus.FINISHED);
+                    }
+                    jobEndFuture.complete(jobStatus.get());
+                }
+            });
+        });
+    }
+
+    public List<SubPlan> getPipelineList() {
+        return pipelineList;
+    }
+
+    public void turnToRunning() {
+        if (!updateJobState(JobStatus.CREATED, JobStatus.RUNNING)) {
+            throw new IllegalStateException(
+                "Job may only be scheduled from state " + JobStatus.CREATED);
         }
+    }
 
-        public List<TaskGroupInfo> getTasks() {
-            return tasks;
+    public boolean updateJobState(JobStatus current, JobStatus targetState) {
+        // consistency check
+        if (current.isEndState()) {
+            String message = "Job is trying to leave terminal state " + current;
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
         }
 
-        public List<TaskGroupInfo> getCoordinatorTasks() {
-            return coordinatorTasks;
+        // now do the actual state transition
+        if (jobStatus.get() == current) {
+            jobStatus.set(targetState);
+            LOGGER.info(String.format("Job %s (%s) turn from state %s to %s.",
+                jobImmutableInformation.getJobConfig().getName(),
+                jobImmutableInformation.getJobId(),
+                current,
+                targetState));
+
+            stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
+            return true;
+        } else {
+            return false;
         }
     }
+
+    public CompletableFuture<JobStatus> getJobEndCompletableFuture() {
+        return this.jobEndFuture;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 1bb0652c1..68da4e796 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -18,11 +18,14 @@
 package org.apache.seatunnel.engine.server.dag.physical;
 
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
 import org.apache.seatunnel.engine.core.dag.actions.PhysicalSourceAction;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.internal.IntermediateDataQueue;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.PipelineState;
 import org.apache.seatunnel.engine.server.dag.execution.ExecutionEdge;
 import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlan;
 import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
@@ -30,21 +33,29 @@ import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
 import org.apache.seatunnel.engine.server.dag.physical.flow.IntermediateExecutionFlow;
 import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
 import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroup;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
 import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
-import org.apache.seatunnel.engine.server.task.TaskGroupInfo;
 
+import com.google.common.collect.Lists;
+import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.spi.impl.NodeEngine;
+import lombok.NonNull;
 
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class PhysicalPlanGenerator {
 
@@ -54,97 +65,208 @@ public class PhysicalPlanGenerator {
 
     private final IdGenerator idGenerator = new IdGenerator();
 
-    public PhysicalPlanGenerator(ExecutionPlan executionPlan, NodeEngine nodeEngine) {
+    private final JobImmutableInformation jobImmutableInformation;
+
+    private final long initializationTimestamp;
+
+    private final ExecutorService executorService;
+
+    private final FlakeIdGenerator flakeIdGenerator;
+
+    public PhysicalPlanGenerator(@NonNull ExecutionPlan executionPlan,
+                                 @NonNull NodeEngine nodeEngine,
+                                 @NonNull JobImmutableInformation jobImmutableInformation,
+                                 long initializationTimestamp,
+                                 @NonNull ExecutorService executorService,
+                                 @NonNull FlakeIdGenerator flakeIdGenerator) {
         edgesList = executionPlan.getPipelines().stream().map(Pipeline::getEdges).collect(Collectors.toList());
         this.nodeEngine = nodeEngine;
+        this.jobImmutableInformation = jobImmutableInformation;
+        this.initializationTimestamp = initializationTimestamp;
+        this.executorService = executorService;
+        this.flakeIdGenerator = flakeIdGenerator;
     }
 
     public PhysicalPlan generate() {
 
         // TODO Determine which tasks do not need to be restored according to state
-        return new PhysicalPlan(edgesList.stream().map(edges -> {
+        AtomicInteger index = new AtomicInteger(-1);
+        CopyOnWriteArrayList<NonCompletableFuture<PipelineState>> waitForCompleteBySubPlanList =
+            new CopyOnWriteArrayList<>();
+
+        Stream<SubPlan> subPlanStream = edgesList.stream().map(edges -> {
+            int currIndex = index.incrementAndGet();
+            CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList =
+                new CopyOnWriteArrayList<>();
             List<PhysicalSourceAction<?, ?, ?>> sources = findSourceAction(edges);
 
-            List<TaskGroupInfo> coordinatorTasks = getEnumeratorTask(sources);
+            List<PhysicalVertex> coordinatorVertexList =
+                getEnumeratorTask(sources, currIndex, edgesList.size(), waitForCompleteByPhysicalVertexList);
 
-            List<TaskGroupInfo> tasks = getSourceTask(edges, sources);
+            List<PhysicalVertex> physicalVertexList =
+                getSourceTask(edges, sources, currIndex, edgesList.size(), waitForCompleteByPhysicalVertexList);
 
-            tasks.addAll(getPartitionTask(edges));
+            physicalVertexList.addAll(
+                getPartitionTask(edges, currIndex, edgesList.size(), waitForCompleteByPhysicalVertexList));
 
-            coordinatorTasks.addAll(getCommitterTask(edges));
+            coordinatorVertexList.addAll(
+                getCommitterTask(edges, currIndex, edgesList.size(), waitForCompleteByPhysicalVertexList));
 
-            return new PhysicalPlan.SubPlan(tasks, coordinatorTasks);
-        }).collect(Collectors.toList()));
+            CompletableFuture<PipelineState> pipelineFuture = new CompletableFuture<>();
+            waitForCompleteBySubPlanList.add(new NonCompletableFuture<>(pipelineFuture));
+
+            return new SubPlan(currIndex,
+                edgesList.size(),
+                initializationTimestamp,
+                physicalVertexList,
+                coordinatorVertexList,
+                pipelineFuture,
+                waitForCompleteByPhysicalVertexList.toArray(
+                    new NonCompletableFuture[waitForCompleteByPhysicalVertexList.size()]));
+        });
+
+        PhysicalPlan physicalPlan = new PhysicalPlan(subPlanStream.collect(Collectors.toList()),
+            executorService,
+            jobImmutableInformation,
+            initializationTimestamp,
+            waitForCompleteBySubPlanList.toArray(new NonCompletableFuture[waitForCompleteBySubPlanList.size()]));
+        return physicalPlan;
     }
 
     private List<PhysicalSourceAction<?, ?, ?>> findSourceAction(List<ExecutionEdge> edges) {
         return edges.stream().filter(s -> s.getLeftVertex().getAction() instanceof PhysicalSourceAction)
-                .map(s -> (PhysicalSourceAction<?, ?, ?>) s.getLeftVertex().getAction())
-                .collect(Collectors.toList());
+            .map(s -> (PhysicalSourceAction<?, ?, ?>) s.getLeftVertex().getAction())
+            .collect(Collectors.toList());
     }
 
-    private List<TaskGroupInfo> getCommitterTask(List<ExecutionEdge> edges) {
-        return edges.stream().filter(s -> s.getRightVertex().getAction() instanceof SinkAction)
-                .map(s -> (SinkAction<?, ?, ?, ?>) s.getRightVertex().getAction())
-                .map(s -> {
-                    SinkAggregatedCommitterTask t =
-                            new SinkAggregatedCommitterTask(idGenerator.getNextId(), s);
-                    return new TaskGroupInfo(toData(new TaskGroup(t)), t.getJarsUrl());
-                }).collect(Collectors.toList());
+    private List<PhysicalVertex> getCommitterTask(List<ExecutionEdge> edges,
+                                                  int pipelineIndex,
+                                                  int totalPipelineNum,
+                                                  CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
+        AtomicInteger atomicInteger = new AtomicInteger(-1);
+        List<ExecutionEdge> collect = edges.stream().filter(s -> s.getRightVertex().getAction() instanceof SinkAction)
+            .collect(Collectors.toList());
+
+        return collect.stream().map(s -> (SinkAction<?, ?, ?, ?>) s.getRightVertex().getAction())
+            .map(s -> {
+                SinkAggregatedCommitterTask t =
+                    new SinkAggregatedCommitterTask(idGenerator.getNextId(), s);
+
+                CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
+                waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+
+                return new PhysicalVertex(atomicInteger.incrementAndGet(),
+                    executorService,
+                    collect.size(),
+                    new TaskGroup("SinkAggregatedCommitterTask", Lists.newArrayList(t)),
+                    taskFuture,
+                    flakeIdGenerator,
+                    pipelineIndex,
+                    totalPipelineNum,
+                    null);
+            }).collect(Collectors.toList());
     }
 
-    private List<TaskGroupInfo> getPartitionTask(List<ExecutionEdge> edges) {
+    private List<PhysicalVertex> getPartitionTask(List<ExecutionEdge> edges,
+                                                  int pipelineIndex,
+                                                  int totalPipelineNum,
+                                                  CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
         return edges.stream().filter(s -> s.getLeftVertex().getAction() instanceof PartitionTransformAction)
-                .map(q -> (PartitionTransformAction) q.getLeftVertex().getAction())
-                .map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
-                .flatMap(flow -> {
-                    List<TaskGroupInfo> t = new ArrayList<>();
-                    for (int i = 0; i < flow.getAction().getParallelism(); i++) {
-                        SeaTunnelTask seaTunnelTask = new SeaTunnelTask(idGenerator.getNextId(), flow);
-                        t.add(new TaskGroupInfo(toData(new TaskGroup(seaTunnelTask)),
-                                seaTunnelTask.getJarsUrl()));
-                    }
-                    return t.stream();
-                }).collect(Collectors.toList());
+            .map(q -> (PartitionTransformAction) q.getLeftVertex().getAction())
+            .map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
+            .flatMap(flow -> {
+                List<PhysicalVertex> t = new ArrayList<>();
+                for (int i = 0; i < flow.getAction().getParallelism(); i++) {
+                    SeaTunnelTask seaTunnelTask = new SeaTunnelTask(idGenerator.getNextId(), flow);
+
+                    CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
+                    waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+
+                    t.add(new PhysicalVertex(i,
+                        executorService,
+                        flow.getAction().getParallelism(),
+                        new TaskGroup("PartitionTransformTask", Lists.newArrayList(seaTunnelTask)),
+                        taskFuture,
+                        flakeIdGenerator,
+                        pipelineIndex,
+                        totalPipelineNum,
+                        seaTunnelTask.getJarsUrl()));
+                }
+                return t.stream();
+            }).collect(Collectors.toList());
     }
 
-    private List<TaskGroupInfo> getEnumeratorTask(List<PhysicalSourceAction<?, ?, ?>> sources) {
+    private List<PhysicalVertex> getEnumeratorTask(List<PhysicalSourceAction<?, ?, ?>> sources,
+                                                   int pipelineIndex,
+                                                   int totalPipelineNum,
+                                                   CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
+        AtomicInteger atomicInteger = new AtomicInteger(-1);
+
         return sources.stream().map(s -> {
             SourceSplitEnumeratorTask<?> t = new SourceSplitEnumeratorTask<>(idGenerator.getNextId(), s);
-            return new TaskGroupInfo(toData(new TaskGroup(t)), t.getJarsUrl());
+            CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
+            waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+
+            return new PhysicalVertex(atomicInteger.incrementAndGet(),
+                executorService,
+                sources.size(),
+                new TaskGroup(s.getName(), Lists.newArrayList(t)),
+                taskFuture,
+                flakeIdGenerator,
+                pipelineIndex,
+                totalPipelineNum,
+                t.getJarsUrl());
         }).collect(Collectors.toList());
     }
 
-    private List<TaskGroupInfo> getSourceTask(List<ExecutionEdge> edges,
-                                              List<PhysicalSourceAction<?, ?, ?>> sources) {
+    private List<PhysicalVertex> getSourceTask(List<ExecutionEdge> edges,
+                                               List<PhysicalSourceAction<?, ?, ?>> sources,
+                                               int pipelineIndex,
+                                               int totalPipelineNum,
+                                               CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
         return sources.stream()
-                .map(s -> new PhysicalExecutionFlow(s, getNextWrapper(edges, s)))
-                .flatMap(flow -> {
-                    List<TaskGroupInfo> t = new ArrayList<>();
-                    List<Flow> flows = new ArrayList<>(Collections.singletonList(flow));
-                    if (sourceWithSink(flow)) {
-                        flows.addAll(splitSinkFromFlow(flow));
-                    }
-                    for (int i = 0; i < flow.getAction().getParallelism(); i++) {
-                        List<SeaTunnelTask> taskList =
-                                flows.stream().map(f -> new SeaTunnelTask(idGenerator.getNextId(), f)).collect(Collectors.toList());
-                        Set<URL> jars =
-                                taskList.stream().flatMap(task -> task.getJarsUrl().stream()).collect(Collectors.toSet());
-                        t.add(new TaskGroupInfo(toData(new TaskGroup(taskList.stream().map(task -> (Task) task).collect(Collectors.toList()))), jars));
-                    }
-                    return t.stream();
-                }).collect(Collectors.toList());
+            .map(s -> new PhysicalExecutionFlow(s, getNextWrapper(edges, s)))
+            .flatMap(flow -> {
+                List<PhysicalVertex> t = new ArrayList<>();
+                List<Flow> flows = new ArrayList<>(Collections.singletonList(flow));
+                if (sourceWithSink(flow)) {
+                    flows.addAll(splitSinkFromFlow(flow));
+                }
+                for (int i = 0; i < flow.getAction().getParallelism(); i++) {
+                    List<SeaTunnelTask> taskList =
+                        flows.stream().map(f -> new SeaTunnelTask(idGenerator.getNextId(), f))
+                            .collect(Collectors.toList());
+                    Set<URL> jars =
+                        taskList.stream().flatMap(task -> task.getJarsUrl().stream()).collect(Collectors.toSet());
+
+                    CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
+                    waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+
+                    // TODO We need give every task a appropriate name
+                    t.add(new PhysicalVertex(i,
+                        executorService,
+                        flow.getAction().getParallelism(),
+                        new TaskGroup("SourceTask",
+                            taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
+                        taskFuture,
+                        flakeIdGenerator,
+                        pipelineIndex,
+                        totalPipelineNum,
+                        jars));
+                }
+                return t.stream();
+            }).collect(Collectors.toList());
     }
 
     private static List<Flow> splitSinkFromFlow(Flow flow) {
         List<PhysicalExecutionFlow> sinkFlows =
-                flow.getNext().stream().filter(f -> f instanceof PhysicalExecutionFlow).map(f -> (PhysicalExecutionFlow) f)
-                        .filter(f -> f.getAction() instanceof SinkAction).collect(Collectors.toList());
+            flow.getNext().stream().filter(f -> f instanceof PhysicalExecutionFlow).map(f -> (PhysicalExecutionFlow) f)
+                .filter(f -> f.getAction() instanceof SinkAction).collect(Collectors.toList());
         List<Flow> allFlows = new ArrayList<>();
         flow.getNext().removeAll(sinkFlows);
         sinkFlows.forEach(s -> {
             IntermediateDataQueue queue = new IntermediateDataQueue(s.getAction().getId(),
-                    s.getAction().getName() + "-Queue", s.getAction().getParallelism());
+                s.getAction().getName() + "-Queue", s.getAction().getParallelism());
             IntermediateExecutionFlow intermediateFlow = new IntermediateExecutionFlow(queue);
             flow.getNext().add(intermediateFlow);
             IntermediateExecutionFlow intermediateFlowQuote = new IntermediateExecutionFlow(queue);
@@ -153,26 +275,27 @@ public class PhysicalPlanGenerator {
         });
 
         if (flow.getNext().size() > sinkFlows.size()) {
-            allFlows.addAll(flow.getNext().stream().flatMap(f -> splitSinkFromFlow(f).stream()).collect(Collectors.toList()));
+            allFlows.addAll(
+                flow.getNext().stream().flatMap(f -> splitSinkFromFlow(f).stream()).collect(Collectors.toList()));
         }
         return allFlows;
     }
 
     private static boolean sourceWithSink(PhysicalExecutionFlow flow) {
         return flow.getAction() instanceof SinkAction ||
-                flow.getNext().stream().map(f -> (PhysicalExecutionFlow) f).map(PhysicalPlanGenerator::sourceWithSink)
-                        .collect(Collectors.toList()).contains(true);
+            flow.getNext().stream().map(f -> (PhysicalExecutionFlow) f).map(PhysicalPlanGenerator::sourceWithSink)
+                .collect(Collectors.toList()).contains(true);
     }
 
     private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start) {
         List<Action> actions = edges.stream().filter(e -> e.getLeftVertex().getAction().equals(start))
-                .map(e -> e.getLeftVertex().getAction()).collect(Collectors.toList());
+            .map(e -> e.getLeftVertex().getAction()).collect(Collectors.toList());
         List<Flow> wrappers = actions.stream()
-                .filter(a -> a instanceof PartitionTransformAction || a instanceof SinkAction)
-                .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
+            .filter(a -> a instanceof PartitionTransformAction || a instanceof SinkAction)
+            .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
         wrappers.addAll(actions.stream()
-                .filter(a -> !(a instanceof PartitionTransformAction || a instanceof SinkAction))
-                .map(a -> new PhysicalExecutionFlow(a, getNextWrapper(edges, a))).collect(Collectors.toList()));
+            .filter(a -> !(a instanceof PartitionTransformAction || a instanceof SinkAction))
+            .map(a -> new PhysicalExecutionFlow(a, getNextWrapper(edges, a))).collect(Collectors.toList()));
         return wrappers;
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
index fc8284877..2e15f159f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
@@ -18,14 +18,29 @@
 package org.apache.seatunnel.engine.server.dag.physical;
 
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
 
+import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.spi.impl.NodeEngine;
+import lombok.NonNull;
+
+import java.util.concurrent.ExecutorService;
 
 public class PhysicalPlanUtils {
 
-    public static PhysicalPlan fromLogicalDAG(LogicalDag logicalDag, NodeEngine nodeEngine) {
-        return new PhysicalPlanGenerator(new ExecutionPlanGenerator(logicalDag).generate(), nodeEngine).generate();
+    public static PhysicalPlan fromLogicalDAG(@NonNull LogicalDag logicalDag,
+                                              @NonNull NodeEngine nodeEngine,
+                                              @NonNull JobImmutableInformation jobImmutableInformation,
+                                              long initializationTimestamp,
+                                              @NonNull ExecutorService executorService,
+                                              @NonNull FlakeIdGenerator flakeIdGenerator) {
+        return new PhysicalPlanGenerator(
+            new ExecutionPlanGenerator(logicalDag, jobImmutableInformation, initializationTimestamp).generate(),
+            nodeEngine,
+            jobImmutableInformation,
+            initializationTimestamp,
+            executorService,
+            flakeIdGenerator).generate();
     }
-
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
new file mode 100644
index 000000000..fbc568a62
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.physical;
+
+import org.apache.seatunnel.engine.common.exception.JobException;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroup;
+
+import com.hazelcast.flakeidgen.FlakeIdGenerator;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.NonNull;
+
+import java.net.URL;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * PhysicalVertex is responsible for the scheduling and execution of a single task parallel
+ * Each {@link org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex} generates some PhysicalVertex.
+ * And the number of PhysicalVertex equals the {@link ExecutionVertex#getParallelism()}.
+ */
+public class PhysicalVertex {
+
+    private static final ILogger LOGGER = Logger.getLogger(PhysicalVertex.class);
+
+    /**
+     * the index of PhysicalVertex
+     */
+    private final int subTaskGroupIndex;
+
+    private final String taskNameWithSubtaskAndPipeline;
+
+    private final int parallelism;
+
+    private final TaskGroup taskGroup;
+
+    private final ExecutorService executorService;
+
+    private final FlakeIdGenerator flakeIdGenerator;
+
+    private final int pipelineIndex;
+
+    private final int totalPipelineNum;
+
+    private final Set<URL> pluginJarsUrls;
+
+    /**
+     * When PhysicalVertex status turn to end, complete this future. And then the waitForCompleteByPhysicalVertex
+     * in {@link SubPlan} whenComplete method will be called.
+     */
+    private final CompletableFuture<TaskExecutionState> taskFuture;
+
+
+    /**
+     * This future only can completion by the task run in {@link com.hazelcast.spi.impl.executionservice.ExecutionService }
+     */
+    private NonCompletableFuture<TaskExecutionState> waitForCompleteByExecutionService;
+
+    public PhysicalVertex(int subTaskGroupIndex,
+                          @NonNull ExecutorService executorService,
+                          int parallelism,
+                          @NonNull TaskGroup taskGroup,
+                          @NonNull CompletableFuture<TaskExecutionState> taskFuture,
+                          @NonNull FlakeIdGenerator flakeIdGenerator,
+                          int pipelineIndex,
+                          int totalPipelineNum,
+                          Set<URL> pluginJarsUrls) {
+        this.subTaskGroupIndex = subTaskGroupIndex;
+        this.executorService = executorService;
+        this.parallelism = parallelism;
+        this.taskGroup = taskGroup;
+        this.flakeIdGenerator = flakeIdGenerator;
+        this.pipelineIndex = pipelineIndex;
+        this.totalPipelineNum = totalPipelineNum;
+        this.pluginJarsUrls = pluginJarsUrls;
+        this.taskNameWithSubtaskAndPipeline =
+            String.format(
+                "task: [%s (%d/%d)], pipeline: [%d/%d]",
+                taskGroup.getTaskGroupName(),
+                subTaskGroupIndex + 1,
+                parallelism,
+                pipelineIndex,
+                totalPipelineNum);
+        this.taskFuture = taskFuture;
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public void deploy() throws JobException {
+
+        // TODO really submit job to ExecutionService and get a NonCompletableFuture<ExecutionState>
+        long executionId = flakeIdGenerator.newId();
+        CompletableFuture<TaskExecutionState> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
+            try {
+                Thread.sleep(5000);
+                return new TaskExecutionState(executionId, ExecutionState.FINISHED, null);
+            } catch (InterruptedException e) {
+                return new TaskExecutionState(executionId, ExecutionState.FAILED, e);
+            }
+        }, executorService);
+
+        waitForCompleteByExecutionService = new NonCompletableFuture<TaskExecutionState>(uCompletableFuture);
+        waitForCompleteByExecutionService.whenComplete((v, t) -> {
+            if (t != null) {
+                // TODO t.getMessage() need be replace
+                LOGGER.info(String.format("The Task %s Failed with Exception: %s",
+                    this.taskNameWithSubtaskAndPipeline,
+                    t.getMessage()));
+                taskFuture.complete(new TaskExecutionState(executionId, ExecutionState.FAILED, t));
+            } else {
+                LOGGER.info(String.format("The Task %s end with state %s",
+                    this.taskNameWithSubtaskAndPipeline,
+                    v));
+                taskFuture.complete(v);
+            }
+        });
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
new file mode 100644
index 000000000..8c1c4b0b6
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.physical;
+
+import org.apache.seatunnel.engine.common.exception.JobException;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.core.job.PipelineState;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.NonNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class SubPlan {
+    private static final ILogger LOGGER = Logger.getLogger(SubPlan.class);
+
+    private final List<PhysicalVertex> physicalVertexList;
+
+    private final List<PhysicalVertex> coordinatorVertexList;
+
+    private final int pipelineIndex;
+
+    private final int totalPipelineNum;
+
+    private AtomicInteger finishedTaskNum = new AtomicInteger(0);
+
+    private AtomicInteger canceledTaskNum = new AtomicInteger(0);
+
+    private AtomicInteger failedTaskNum = new AtomicInteger(0);
+
+    private AtomicReference<PipelineState> pipelineState = new AtomicReference<>();
+
+    private final String pipelineNameWithIndex;
+
+    /**
+     * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
+     * pipeline transitioned into a certain state. The index into this array is the ordinal
+     * of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
+     * stateTimestamps[RUNNING.ordinal()]}.
+     */
+    private final long[] stateTimestamps;
+
+    /**
+     * Complete this future when this sub plan complete. When this future completed, the waitForCompleteBySubPlan in {@link PhysicalPlan }
+     * whenComplete method will be called.
+     */
+    private final CompletableFuture<PipelineState> pipelineFuture;
+
+    /**
+     * This future only can completion by the {@link PhysicalVertex } taskFuture.
+     * When the taskFuture in {@link PhysicalVertex} completed, The NonCompletableFuture's whenComplete method will be called
+     */
+    private final NonCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex;
+
+    public SubPlan(int pipelineIndex,
+                   int totalPipelineNum,
+                   long initializationTimestamp,
+                   @NonNull List<PhysicalVertex> physicalVertexList,
+                   @NonNull List<PhysicalVertex> coordinatorVertexList,
+                   @NonNull CompletableFuture<PipelineState> pipelineFuture,
+                   @NonNull NonCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex) {
+        this.pipelineIndex = pipelineIndex;
+        this.pipelineFuture = pipelineFuture;
+        this.totalPipelineNum = totalPipelineNum;
+        this.physicalVertexList = physicalVertexList;
+        this.coordinatorVertexList = coordinatorVertexList;
+        this.waitForCompleteByPhysicalVertex = waitForCompleteByPhysicalVertex;
+        stateTimestamps = new long[PipelineState.values().length];
+        this.stateTimestamps[PipelineState.INITIALIZING.ordinal()] = initializationTimestamp;
+        this.pipelineState.set(PipelineState.CREATED);
+        this.stateTimestamps[PipelineState.CREATED.ordinal()] = System.currentTimeMillis();
+        this.pipelineNameWithIndex = String.format(
+            "pipeline: [(%d/%d)]",
+            pipelineIndex + 1,
+            totalPipelineNum);
+
+        Arrays.stream(this.waitForCompleteByPhysicalVertex).forEach(x -> {
+            x.whenComplete((v, t) -> {
+                if (ExecutionState.CANCELED.equals(v)) {
+                    canceledTaskNum.incrementAndGet();
+                } else if (ExecutionState.FAILED.equals(v)) {
+                    failedTaskNum.incrementAndGet();
+                } else {
+                    throw new JobException("Unknown Task end state [" + v + "]");
+                }
+
+                if (finishedTaskNum.incrementAndGet() == (physicalVertexList.size() + coordinatorVertexList.size())) {
+                    if (failedTaskNum.get() > 0) {
+                        LOGGER.info(String.format("Pipeline failed %s", this.pipelineNameWithIndex));
+                        pipelineState.set(PipelineState.FAILED);
+                    } else if (canceledTaskNum.get() > 0) {
+                        LOGGER.info(String.format("Pipeline canceled %s", this.pipelineNameWithIndex));
+                        pipelineState.set(PipelineState.CANCELED);
+                    } else {
+                        LOGGER.info(String.format("Pipeline finished %s", this.pipelineNameWithIndex));
+                        pipelineState.set(PipelineState.FINISHED);
+                    }
+                    pipelineFuture.complete(pipelineState.get());
+                }
+            });
+        });
+    }
+
+    public int getPipelineIndex() {
+        return pipelineIndex;
+    }
+
+    public List<PhysicalVertex> getPhysicalVertexList() {
+        return physicalVertexList;
+    }
+
+    public List<PhysicalVertex> getCoordinatorVertexList() {
+        return coordinatorVertexList;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java
new file mode 100644
index 000000000..b2db98f18
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.execution;
+
+/**
+ * An enumeration of all states that a task can be in during its execution. Tasks usually start in
+ * the state {@code CREATED} and switch states according to this diagram:
+ *
+ * <pre>{@code
+ *  CREATED  -> SCHEDULED -> DEPLOYING -> INITIALIZING -> RUNNING -> FINISHED
+ *     |            |            |          |              |
+ *     |            |            |    +-----+--------------+
+ *     |            |            V    V
+ *     |            |         CANCELLING -----+----> CANCELED
+ *     |            |                         |
+ *     |            +-------------------------+
+ *     |
+ *     |                                   ... -> FAILED
+ *     V
+ * RECONCILING  -> INITIALIZING | RUNNING | FINISHED | CANCELED | FAILED
+ *
+ * }</pre>
+ *
+ * <p>It is possible to enter the {@code RECONCILING} state from {@code CREATED} state if job
+ * manager fail over, and the {@code RECONCILING} state can switch into any existing task state.
+ *
+ * <p>It is possible to enter the {@code FAILED} state from any other state.
+ *
+ * <p>The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are considered terminal
+ * states.
+ */
+public enum ExecutionState {
+    CREATED,
+
+    SCHEDULED,
+
+    DEPLOYING,
+
+    RUNNING,
+
+    /**
+     * This state marks "successfully completed". It can only be reached when a program reaches the
+     * "end of its input". The "end of input" can be reached when consuming a bounded input (fix set
+     * of files, bounded query, etc) or when stopping a program (not cancelling!) which make the
+     * input look like it reached its end at a specific point.
+     */
+    FINISHED,
+
+    CANCELING,
+
+    CANCELED,
+
+    FAILED,
+
+    RECONCILING,
+
+    /** Restoring last possible valid state of the task if it has it. */
+    INITIALIZING;
+
+    public boolean isEnd() {
+        return this == FINISHED || this == CANCELED || this == FAILED;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
similarity index 58%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
index 2763942ee..546db419e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
@@ -17,19 +17,29 @@
 
 package org.apache.seatunnel.engine.server.execution;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
+public class TaskExecutionState {
 
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collection;
+    private final long taskExecutionId;
 
-@Data
-@AllArgsConstructor
-public class TaskGroup implements Serializable {
-    private final Collection<Task> tasks;
+    private final ExecutionState executionState;
 
-    public TaskGroup(Task... tasks) {
-        this.tasks = Arrays.asList(tasks);
+    private Throwable throwable;
+
+    public TaskExecutionState(long taskExecutionId, ExecutionState executionState, Throwable throwable) {
+        this.taskExecutionId = taskExecutionId;
+        this.executionState = executionState;
+        this.throwable = throwable;
+    }
+
+    public ExecutionState getExecutionState() {
+        return executionState;
+    }
+
+    public Throwable getThrowable() {
+        return throwable;
+    }
+
+    public long getTaskExecutionId() {
+        return taskExecutionId;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
index 2763942ee..c0123277c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
@@ -17,19 +17,14 @@
 
 package org.apache.seatunnel.engine.server.execution;
 
-import lombok.AllArgsConstructor;
 import lombok.Data;
 
 import java.io.Serializable;
-import java.util.Arrays;
 import java.util.Collection;
 
 @Data
-@AllArgsConstructor
 public class TaskGroup implements Serializable {
-    private final Collection<Task> tasks;
+    private final String taskGroupName;
 
-    public TaskGroup(Task... tasks) {
-        this.tasks = Arrays.asList(tasks);
-    }
+    private final Collection<Task> tasks;
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 37542cd2d..6c9d89517 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -17,53 +17,67 @@
 
 package org.apache.seatunnel.engine.server.master;
 
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanUtils;
-import org.apache.seatunnel.engine.server.execution.ProgressState;
-import org.apache.seatunnel.engine.server.execution.Task;
 
+import com.hazelcast.flakeidgen.FlakeIdGenerator;
+import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
 import com.hazelcast.spi.impl.NodeEngine;
-import com.hazelcast.spi.impl.operationservice.OperationService;
 import lombok.NonNull;
 
-import java.io.IOException;
+import java.util.concurrent.ExecutorService;
 
-public class JobMaster implements Task {
+public class JobMaster implements Runnable {
+    private static final ILogger LOGGER = Logger.getLogger(JobMaster.class);
 
-    private final LogicalDag logicalDag;
+    private LogicalDag logicalDag;
     private PhysicalPlan physicalPlan;
+    private final Data jobImmutableInformation;
 
-    private NodeEngine nodeEngine;
+    private final NodeEngine nodeEngine;
 
-    public JobMaster() {
-        this.logicalDag = new LogicalDag();
-    }
+    private final ExecutorService executorService;
 
-    @Override
-    public void init() throws Exception {
-        physicalPlan = PhysicalPlanUtils.fromLogicalDAG(logicalDag, nodeEngine);
-    }
+    private FlakeIdGenerator flakeIdGenerator;
 
-    @NonNull
-    @Override
-    public ProgressState call() {
-        return ProgressState.DONE;
+    public JobMaster(@NonNull Data jobImmutableInformation,
+                     @NonNull NodeEngine nodeEngine,
+                     @NonNull ExecutorService executorService) {
+        this.jobImmutableInformation = jobImmutableInformation;
+        this.nodeEngine = nodeEngine;
+        this.executorService = executorService;
+        flakeIdGenerator =
+            this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
     }
 
-    @NonNull
-    @Override
-    public Long getTaskID() {
-        return null;
-    }
+    public void init() throws Exception {
+        JobImmutableInformation jobInformation = nodeEngine.getSerializationService().toObject(jobImmutableInformation);
+        LOGGER.info("Job [" + jobInformation.getJobId() + "] submit");
+        LOGGER.info("Job [" + jobInformation.getJobId() + "] jar urls " + jobInformation.getPluginJarsUrls());
 
-    @Override
-    public void close() throws IOException {
-        Task.super.close();
+        // TODO Use classloader load the connector jars and deserialize logicalDag
+        this.logicalDag = new LogicalDag();
+        physicalPlan = PhysicalPlanUtils.fromLogicalDAG(logicalDag,
+                nodeEngine,
+                jobInformation,
+                System.currentTimeMillis(),
+                executorService,
+                flakeIdGenerator);
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
     @Override
-    public void setOperationService(OperationService operationService) {
-        Task.super.setOperationService(operationService);
+    public void run() {
+        try {
+            LOGGER.info("I will sleep 2000ms");
+            Thread.sleep(2000);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
similarity index 71%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
index b72c62287..8fceed6c9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
@@ -15,19 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.dag.execution;
+package org.apache.seatunnel.engine.server.scheduler;
 
-import java.util.List;
+public interface JobScheduler {
+    void startScheduling();
 
-public class ExecutionPlan {
-
-    private final List<Pipeline> pipelines;
-
-    ExecutionPlan(List<Pipeline> pipelines) {
-        this.pipelines = pipelines;
-    }
-
-    public List<Pipeline> getPipelines() {
-        return pipelines;
-    }
+    boolean updateExecutionState();
 }