You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/29 05:51:32 UTC

[incubator-seatunnel] branch st-engine updated: [Feature][ST-Engine] Add Job Cancel Feature (#2527)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 44bdc0c28 [Feature][ST-Engine] Add Job Cancel Feature (#2527)
44bdc0c28 is described below

commit 44bdc0c28444bcc251db31f5047463098018a2f4
Author: Eric <ga...@gmail.com>
AuthorDate: Mon Aug 29 13:51:26 2022 +0800

    [Feature][ST-Engine] Add Job Cancel Feature (#2527)
    
    * complete job cancel feature
    
    * fix task state error
    
    * fix license header
    
    * fix NullpointException in operator
    
    * update ci
    
    * update ci
    
    * fix review problems
    
    * fix cancel error
---
 .github/workflows/backend.yml                      |   6 +-
 .github/workflows/engine_backend.yml               |   2 +
 generate_client_protocol.sh                        |   2 +-
 .../core/starter/seatunnel/SeaTunnelStarter.java   |   9 +-
 seatunnel-e2e/seatunnel-engine-e2e/pom.xml         |   4 +
 .../org/apache/seatunnel/engine/e2e/TestUtils.java |   4 +
 .../engine/e2e/engine/JobExecutionIT.java          |  58 ++++++--
 ..._to_file.conf => batch_fakesource_to_file.conf} |   8 +-
 ....conf => batch_fakesource_to_file_complex.conf} |   8 +-
 .../streaming_fakesource_to_file_complex.conf      |   8 +-
 seatunnel-engine/seatunnel-engine-client/pom.xml   |   5 +
 .../job/{JobProxy.java => ClientJobProxy.java}     |  76 +++++++---
 .../client/{ => job}/ConnectorInstanceLoader.java  |  20 +--
 .../seatunnel/engine/client/job/JobClient.java     |   4 +-
 .../engine/client/job/JobConfigParser.java         |  61 ++++----
 .../engine/client/job/JobExecutionEnvironment.java |  12 +-
 .../engine/client/JobConfigParserTest.java         |   4 +-
 .../engine/client/LogicalDagGeneratorTest.java     |   4 +-
 .../engine/client/SeaTunnelClientTest.java         |  18 ++-
 .../apache/seatunnel/engine/client/TestUtils.java  |   4 +
 ..._to_file.conf => batch_fakesource_to_file.conf} |   0
 ....conf => batch_fakesource_to_file_complex.conf} |   0
 .../apache/seatunnel/engine/common/Constant.java   |   4 +
 .../seatunnel/engine/common/config/JobConfig.java  |   6 +-
 .../org/apache/seatunnel/engine/core/job/Job.java  |  11 +-
 .../protocol/codec/SeaTunnelCancelJobCodec.java    |  82 +++++++++++
 .../protocol/codec/SeaTunnelGetJobStatusCodec.java |  91 ++++++++++++
 .../SeaTunnelEngine.yaml                           |  36 +++++
 .../seatunnel/engine/server/SeaTunnelServer.java   |  23 +++
 .../engine/server/TaskExecutionService.java        |  12 +-
 .../engine/server/dag/physical/PhysicalPlan.java   |  77 ++++++++--
 .../engine/server/dag/physical/PhysicalVertex.java | 134 +++++++++++++++---
 .../engine/server/dag/physical/SubPlan.java        |  88 +++++++++---
 .../seatunnel/engine/server/master/JobMaster.java  |  10 +-
 ...yTaskOperation.java => CancelJobOperation.java} |  37 ++---
 ...skOperation.java => GetJobStatusOperation.java} |  41 ++++--
 .../engine/server/protocol/task/CancelJobTask.java |  49 +++++++
 .../server/protocol/task/GetJobStatusTask.java     |  50 +++++++
 .../task/SeaTunnelMessageTaskFactoryProvider.java  |   6 +
 .../server/scheduler/PipelineBaseScheduler.java    | 156 ++++++++++++++-------
 .../serializable/OperationDataSerializerHook.java  |  11 +-
 .../serializable/TaskDataSerializerHook.java       |  10 ++
 ...uestOperation.java => CancelTaskOperation.java} |  47 +++----
 .../{ => task}/operation/DeployTaskOperation.java  |   7 +-
 .../operation/source/AssignSplitOperation.java     |  13 +-
 .../operation/source/CloseRequestOperation.java    |  12 +-
 .../operation/source/RequestSplitOperation.java    |  13 +-
 .../source/SourceNoMoreElementOperation.java       |  12 +-
 .../seatunnel/engine/server/dag/TaskTest.java      |   2 -
 49 files changed, 1052 insertions(+), 305 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 0bc67c0a0..26e9b43eb 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -26,6 +26,8 @@ on:
       - 'seatunnel-engine/**'
       - 'seatunnel-engine/seatunnel-engine-e2e/**'
       - 'seatunnel-examples/seatunnel-engine-examples/**'
+      - 'seatunnel-core/seatunnel-seatunnel-starter/**'
+      - 'generate_client_protocol.sh'
 
 concurrency:
   group: backend-${{ github.event.pull_request.number || github.ref }}
@@ -146,7 +148,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest', 'windows-latest' ]
-    timeout-minutes: 50
+    timeout-minutes: 80
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
@@ -169,7 +171,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest' ]
-    timeout-minutes: 50
+    timeout-minutes: 80
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
diff --git a/.github/workflows/engine_backend.yml b/.github/workflows/engine_backend.yml
index 5dcca74ad..a57f6d9ac 100644
--- a/.github/workflows/engine_backend.yml
+++ b/.github/workflows/engine_backend.yml
@@ -23,6 +23,8 @@ on:
       - 'seatunnel-engine/**'
       - 'seatunnel-engine/seatunnel-engine-e2e/**'
       - 'seatunnel-examples/seatunnel-engine-examples/**'
+      - 'seatunnel-core/seatunnel-seatunnel-starter/**'
+      - 'generate_client_protocol.sh'
 
 concurrency:
   group: backend-${{ github.event.pull_request.number || github.ref }}
diff --git a/generate_client_protocol.sh b/generate_client_protocol.sh
index 2b26ecda3..7ad1d5b16 100755
--- a/generate_client_protocol.sh
+++ b/generate_client_protocol.sh
@@ -52,7 +52,7 @@ cd $PROTOCOL_DIRECTORY
 $PIP3 install -r requirements.txt
 
 $PYTHON generator.py -r $SEATUNNEL_ENGINE_HOME -p $SEATUNNEL_ENGINE_HOME/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition \
--o seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec \
+-o /tmp/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec \
 -n org.apache.seatunnel.engine.core.protocol.codec --no-binary --no-id-check
 
 rm -rf $PROTOCOL_DIRECTORY
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
index 90b12b48e..94335cca9 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
+++ b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
@@ -22,8 +22,8 @@ import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
 import org.apache.seatunnel.core.starter.utils.FileUtils;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
-import org.apache.seatunnel.engine.client.job.JobProxy;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 
@@ -39,16 +39,15 @@ public class SeaTunnelStarter {
         Common.setDeployMode(DeployMode.CLIENT);
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName("fake_to_file");
-        // TODO change jobConfig mode
 
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
         JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig);
 
-        JobProxy jobProxy;
+        ClientJobProxy clientJobProxy;
         try {
-            jobProxy = jobExecutionEnv.execute();
-            jobProxy.waitForJobComplete();
+            clientJobProxy = jobExecutionEnv.execute();
+            clientJobProxy.waitForJobComplete();
         } catch (ExecutionException | InterruptedException e) {
             throw new RuntimeException(e);
         }
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
index 292665a77..ea8a31f3a 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
@@ -66,5 +66,9 @@
             <artifactId>seatunnel-engine-server</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
index 26e4defc9..786a34ce6 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
@@ -30,6 +30,10 @@ public class TestUtils {
         return System.getProperty("user.dir") + "/src/test/resources" + confFile;
     }
 
+    public static String getClusterName(String testClassName) {
+        return System.getProperty("user.name") + "_" + testClassName;
+    }
+
     public static void initPluginDir() {
         // copy connectors to project_root/connectors dir
         System.setProperty("SEATUNNEL_HOME", System.getProperty("user.dir") +
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
index fa6679e3d..8b286c482 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
@@ -17,11 +17,14 @@
 
 package org.apache.seatunnel.engine.e2e.engine;
 
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
-import org.apache.seatunnel.engine.client.job.JobProxy;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig;
@@ -39,7 +42,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 public class JobExecutionIT {
     private static final Logger LOGGER = LoggerFactory.getLogger(JobExecutionIT.class);
@@ -47,6 +51,7 @@ public class JobExecutionIT {
     @BeforeClass
     public static void beforeClass() throws Exception {
         SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("JobExecutionIT"));
         HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
             Thread.currentThread().getName(),
             new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
@@ -55,6 +60,7 @@ public class JobExecutionIT {
     @Test
     public void testSayHello() {
         SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig();
+        seaTunnelClientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
         seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:5801"));
         SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig);
 
@@ -67,20 +73,56 @@ public class JobExecutionIT {
     public void testExecuteJob() {
         TestUtils.initPluginDir();
         Common.setDeployMode(DeployMode.CLIENT);
-        String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+        String filePath = TestUtils.getResource("/batch_fakesource_to_file.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName("fake_to_file");
+
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
+        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+        JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
+
+        try {
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> assertEquals(JobStatus.FINISHED, clientJobProxy.waitForJobComplete()));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void cancelJobTest() {
+        TestUtils.initPluginDir();
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = TestUtils.getResource("/streaming_fakesource_to_file_complex.conf");
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName("fake_to_file");
 
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
         SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
         JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
 
-        JobProxy jobProxy = null;
         try {
-            jobProxy = jobExecutionEnv.execute();
-            JobStatus jobStatus = jobProxy.waitForJobComplete();
-            Assert.assertEquals(JobStatus.FINISHED, jobStatus);
-        } catch (ExecutionException | InterruptedException e) {
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            JobStatus jobStatus1 = clientJobProxy.getJobStatus();
+            Assert.assertFalse(jobStatus1.isEndState());
+            ClientJobProxy finalClientJobProxy = clientJobProxy;
+            CompletableFuture<Object> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+                JobStatus jobStatus = finalClientJobProxy.waitForJobComplete();
+                Assert.assertEquals(JobStatus.CANCELED, jobStatus);
+                return null;
+            });
+            Thread.sleep(1000);
+            clientJobProxy.cancelJob();
+
+            await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    Assert.assertTrue(objectCompletableFuture.isDone());
+                });
+
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf
similarity index 72%
rename from seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file.conf
rename to seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf
index c6f0f7bbf..1e49074ba 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf
@@ -21,7 +21,7 @@
 env {
   # You can set flink configuration here
   execution.parallelism = 1
-  job.mode = "STREAMING"
+  job.mode = "BATCH"
   execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
@@ -34,13 +34,9 @@ source {
       parallelism = 3
     }
 
-  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
 }
 
 transform {
-  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
 }
 
 sink {
@@ -60,6 +56,4 @@ sink {
 
   }
 
-  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
 }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file_complex.conf
similarity index 74%
rename from seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file_complex.conf
rename to seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file_complex.conf
index dc73c2f09..d946739b6 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -21,7 +21,7 @@
 env {
   # You can set flink configuration here
   execution.parallelism = 1
-  job.mode = "STREAMING"
+  job.mode = "BATCH"
   execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
@@ -40,13 +40,9 @@ source {
       parallelism = 3
     }
 
-  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
 }
 
 transform {
-  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
 }
 
 sink {
@@ -66,6 +62,4 @@ sink {
     source_table_name="fake"
   }
 
-  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
 }
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/streaming_fakesource_to_file_complex.conf
similarity index 96%
copy from seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf
copy to seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/streaming_fakesource_to_file_complex.conf
index 258c32352..0a52e7a0d 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/streaming_fakesource_to_file_complex.conf
@@ -21,7 +21,7 @@
 env {
   # You can set flink configuration here
   execution.parallelism = 1
-  job.mode = "BATCH"
+  job.mode = "STREAMING"
   execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
@@ -31,14 +31,15 @@ source {
     FakeSource {
       result_table_name = "fake"
       field_name = "name,age",
-      parallelism = 3
+      parallelism = 1
     }
 
     FakeSource {
       result_table_name = "fake"
       field_name = "name,age",
-      parallelism = 3
+      parallelism = 1
     }
+
 }
 
 transform {
@@ -60,4 +61,5 @@ sink {
     save_mode="error",
     source_table_name="fake"
   }
+
 }
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/pom.xml b/seatunnel-engine/seatunnel-engine-client/pom.xml
index 3bfecac59..91297bf8c 100644
--- a/seatunnel-engine/seatunnel-engine-client/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-client/pom.xml
@@ -74,5 +74,10 @@
             <scope>test</scope>
             <classifier>tests</classifier>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
similarity index 59%
rename from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java
rename to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index 0ac8ac789..f25ee6998 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -18,11 +18,15 @@
 package org.apache.seatunnel.engine.client.job;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.Job;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
 
@@ -31,17 +35,16 @@ import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import lombok.NonNull;
 
-import java.util.concurrent.ExecutionException;
-
-public class JobProxy implements Job {
-    private static final ILogger LOGGER = Logger.getLogger(JobProxy.class);
+public class ClientJobProxy implements Job {
+    private static final ILogger LOGGER = Logger.getLogger(ClientJobProxy.class);
     private SeaTunnelHazelcastClient seaTunnelHazelcastClient;
     private JobImmutableInformation jobImmutableInformation;
 
-    public JobProxy(@NonNull SeaTunnelHazelcastClient seaTunnelHazelcastClient,
-                    @NonNull JobImmutableInformation jobImmutableInformation) {
+    public ClientJobProxy(@NonNull SeaTunnelHazelcastClient seaTunnelHazelcastClient,
+                          @NonNull JobImmutableInformation jobImmutableInformation) {
         this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
         this.jobImmutableInformation = jobImmutableInformation;
+        submitJob();
     }
 
     @Override
@@ -49,37 +52,66 @@ public class JobProxy implements Job {
         return jobImmutableInformation.getJobId();
     }
 
-    @Override
-    public void submitJob() throws ExecutionException, InterruptedException {
+    private void submitJob() {
         ClientMessage request = SeaTunnelSubmitJobCodec.encodeRequest(jobImmutableInformation.getJobId(),
             seaTunnelHazelcastClient.getSerializationService().toData(jobImmutableInformation));
         PassiveCompletableFuture<Void> submitJobFuture =
             seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);
-        submitJobFuture.get();
+        submitJobFuture.join();
     }
 
-    @Override
+    /**
+     * This method will block even the Job turn to a EndState
+     *
+     * @return
+     */
     public JobStatus waitForJobComplete() {
         JobStatus jobStatus = null;
-        PassiveCompletableFuture<JobStatus> jobFuture =
-            seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
-                SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobImmutableInformation.getJobId()),
-                response -> {
-                    return JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(response)];
-                });
         try {
-            jobStatus = jobFuture.get();
-            LOGGER.info(String.format("Job %s (%s) end with state %s",
-                jobImmutableInformation.getJobId(),
-                jobImmutableInformation.getJobConfig().getName(),
-                jobStatus));
-        } catch (InterruptedException | ExecutionException e) {
+            jobStatus = RetryUtils.retryWithException(() -> {
+                PassiveCompletableFuture<JobStatus> jobFuture = doWaitForJobComplete();
+                return jobFuture.get();
+            }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+                exception -> exception instanceof RuntimeException, Constant.OPERATION_RETRY_SLEEP));
+        } catch (Exception e) {
             LOGGER.info(String.format("Job %s (%s) end with unknown state, and throw Exception: %s",
                 jobImmutableInformation.getJobId(),
                 jobImmutableInformation.getJobConfig().getName(),
                 ExceptionUtils.getMessage(e)));
             throw new RuntimeException(e);
         }
+        LOGGER.info(String.format("Job %s (%s) end with state %s",
+            jobImmutableInformation.getJobConfig().getName(),
+            jobImmutableInformation.getJobId(),
+            jobStatus));
         return jobStatus;
     }
+
+    @Override
+    public PassiveCompletableFuture<JobStatus> doWaitForJobComplete() {
+        PassiveCompletableFuture<JobStatus> jobFuture =
+            seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
+                SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobImmutableInformation.getJobId()),
+                response -> {
+                    return JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(response)];
+                });
+        return jobFuture;
+    }
+
+    @Override
+    public void cancelJob() {
+        PassiveCompletableFuture<Void> cancelFuture = seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
+            SeaTunnelCancelJobCodec.encodeRequest(jobImmutableInformation.getJobId()));
+
+        cancelFuture.join();
+    }
+
+    @Override
+    public JobStatus getJobStatus() {
+        int jobStatusOrdinal = seaTunnelHazelcastClient.requestOnMasterAndDecodeResponse(
+            SeaTunnelGetJobStatusCodec.encodeRequest(jobImmutableInformation.getJobId()),
+            response -> SeaTunnelGetJobStatusCodec.decodeResponse(response));
+        return JobStatus.values()[jobStatusOrdinal];
+    }
+
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorInstanceLoader.java
similarity index 84%
rename from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
rename to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorInstanceLoader.java
index d4e8f703d..c62ca2871 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorInstanceLoader.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.client.job;
 
 import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -46,7 +46,8 @@ public class ConnectorInstanceLoader {
         throw new IllegalStateException("Utility class");
     }
 
-    public static ImmutablePair<SeaTunnelSource, Set<URL>> loadSourceInstance(Config sourceConfig) {
+    public static ImmutablePair<SeaTunnelSource, Set<URL>> loadSourceInstance(Config sourceConfig,
+                                                                              SeaTunnelContext seaTunnelContext) {
         SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
         PluginIdentifier pluginIdentifier = PluginIdentifier.of(
             CollectionConstants.SEATUNNEL_PLUGIN,
@@ -57,8 +58,8 @@ public class ConnectorInstanceLoader {
 
         SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
         seaTunnelSource.prepare(sourceConfig);
-        seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
-        if (SeaTunnelContext.getContext().getJobMode() == JobMode.BATCH
+        seaTunnelSource.setSeaTunnelContext(seaTunnelContext);
+        if (seaTunnelContext.getJobMode() == JobMode.BATCH
             && seaTunnelSource.getBoundedness() == org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
             throw new UnsupportedOperationException(
                 String.format("'%s' source don't support off-line job.", seaTunnelSource.getPluginName()));
@@ -67,7 +68,7 @@ public class ConnectorInstanceLoader {
     }
 
     public static ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, Set<URL>> loadSinkInstance(
-        Config sinkConfig) {
+        Config sinkConfig, SeaTunnelContext seaTunnelContext) {
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
         PluginIdentifier pluginIdentifier = PluginIdentifier.of(
             CollectionConstants.SEATUNNEL_PLUGIN,
@@ -78,11 +79,12 @@ public class ConnectorInstanceLoader {
             sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
         seaTunnelSink.prepare(sinkConfig);
         seaTunnelSink.setTypeInfo(null);
-        seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+        seaTunnelSink.setSeaTunnelContext(seaTunnelContext);
         return new ImmutablePair<>(seaTunnelSink, new HashSet<>(pluginJarPaths));
     }
 
-    public static ImmutablePair<SeaTunnelTransform<?>, Set<URL>> loadTransformInstance(Config transformConfig) {
+    public static ImmutablePair<SeaTunnelTransform<?>, Set<URL>> loadTransformInstance(Config transformConfig,
+                                                                                       SeaTunnelContext seaTunnelContext) {
         SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery();
         PluginIdentifier pluginIdentifier = PluginIdentifier.of(
             CollectionConstants.SEATUNNEL_PLUGIN,
@@ -91,7 +93,9 @@ public class ConnectorInstanceLoader {
 
         List<URL> pluginJarPaths = transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
         SeaTunnelTransform<?> seaTunnelTransform =
-                transformPluginDiscovery.createPluginInstance(pluginIdentifier);
+            transformPluginDiscovery.createPluginInstance(pluginIdentifier);
+        seaTunnelTransform.prepare(transformConfig);
+        seaTunnelTransform.setSeaTunnelContext(seaTunnelContext);
         return new ImmutablePair<>(seaTunnelTransform, new HashSet<>(pluginJarPaths));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index 964a7891e..bbcfea8ad 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -34,7 +34,7 @@ public class JobClient {
         return hazelcastClient.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
     }
 
-    public JobProxy createJobProxy(@NonNull JobImmutableInformation jobImmutableInformation) {
-        return new JobProxy(hazelcastClient, jobImmutableInformation);
+    public ClientJobProxy createJobProxy(@NonNull JobImmutableInformation jobImmutableInformation) {
+        return new ClientJobProxy(hazelcastClient, jobImmutableInformation);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
index 31cfa9a16..964bc5593 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.client.job;
 
+import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -26,8 +27,8 @@ import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.apis.base.plugin.Plugin;
 import org.apache.seatunnel.common.constants.CollectionConstants;
+import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.engine.client.ConnectorInstanceLoader;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -77,8 +78,8 @@ public class JobConfigParser {
     private JobConfig jobConfig;
 
     public JobConfigParser(@NonNull String jobDefineFilePath,
-                              @NonNull IdGenerator idGenerator,
-                              @NonNull JobConfig jobConfig) {
+                           @NonNull IdGenerator idGenerator,
+                           @NonNull JobConfig jobConfig) {
         this.jobDefineFilePath = jobDefineFilePath;
         this.idGenerator = idGenerator;
         this.jobConfig = jobConfig;
@@ -107,8 +108,14 @@ public class JobConfigParser {
         return new ImmutablePair<>(actions, jarUrlsSet);
     }
 
-    private void jobConfigAnalyze(Config envConfigs) {
-        // TODO Resolve env configuration and set jobConfig
+    private void jobConfigAnalyze(@NonNull Config envConfigs) {
+        SeaTunnelContext context = SeaTunnelContext.getContext();
+        if (envConfigs.hasPath("job.mode")) {
+            context.setJobMode(envConfigs.getEnum(JobMode.class, "job.mode"));
+        } else {
+            context.setJobMode(JobMode.BATCH);
+        }
+        jobConfig.setSeaTunnelContext(context);
     }
 
     /**
@@ -126,7 +133,7 @@ public class JobConfigParser {
         for (Config config : sinkConfigs) {
             ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, Set<URL>>
                 sinkListImmutablePair =
-                ConnectorInstanceLoader.loadSinkInstance(config);
+                ConnectorInstanceLoader.loadSinkInstance(config, jobConfig.getSeaTunnelContext());
 
             SinkAction sinkAction =
                 createSinkAction(idGenerator.getNextId(), sinkListImmutablePair.getLeft().getPluginName(),
@@ -157,7 +164,7 @@ public class JobConfigParser {
         List<Config> sourceConfigList = sourceResultTableNameMap.get(sourceTableName);
         if (CollectionUtils.isEmpty(sourceConfigList)) {
             throw new JobDefineCheckException(action.getName()
-                    + " source table name [" + sourceTableName + "] can not be found");
+                + " source table name [" + sourceTableName + "] can not be found");
         }
 
         // If a transform have more than one upstream action, the parallelism of this transform is the sum of the parallelism
@@ -166,13 +173,13 @@ public class JobConfigParser {
         AtomicInteger totalParallelism = new AtomicInteger();
         for (Config sourceConfig : sourceConfigList) {
             ImmutablePair<SeaTunnelSource, Set<URL>> seaTunnelSourceListImmutablePair =
-                    ConnectorInstanceLoader.loadSourceInstance(sourceConfig);
+                ConnectorInstanceLoader.loadSourceInstance(sourceConfig, jobConfig.getSeaTunnelContext());
             dataType = seaTunnelSourceListImmutablePair.getLeft().getProducedType();
             SourceAction sourceAction = createSourceAction(
-                    idGenerator.getNextId(),
-                    sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
-                    seaTunnelSourceListImmutablePair.getLeft(),
-                    seaTunnelSourceListImmutablePair.getRight());
+                idGenerator.getNextId(),
+                sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
+                seaTunnelSourceListImmutablePair.getLeft(),
+                seaTunnelSourceListImmutablePair.getRight());
 
             int sourceParallelism = getSourceParallelism(sourceConfig);
             sourceAction.setParallelism(sourceParallelism);
@@ -193,16 +200,16 @@ public class JobConfigParser {
             SeaTunnelDataType<?> dataTypeResult = null;
             for (Config config : transformConfigList) {
                 ImmutablePair<SeaTunnelTransform<?>, Set<URL>> transformListImmutablePair =
-                        ConnectorInstanceLoader.loadTransformInstance(config);
+                    ConnectorInstanceLoader.loadTransformInstance(config, jobConfig.getSeaTunnelContext());
                 TransformAction transformAction = createTransformAction(
-                        idGenerator.getNextId(),
-                        transformListImmutablePair.getLeft().getPluginName(),
-                        transformListImmutablePair.getLeft(),
-                        transformListImmutablePair.getRight());
+                    idGenerator.getNextId(),
+                    transformListImmutablePair.getLeft().getPluginName(),
+                    transformListImmutablePair.getLeft(),
+                    transformListImmutablePair.getRight());
 
                 action.addUpstream(transformAction);
                 SeaTunnelDataType dataType = transformAnalyze(config.getString(Plugin.SOURCE_TABLE_NAME),
-                        transformAction);
+                    transformAction);
                 transformListImmutablePair.getLeft().setTypeInfo(dataType);
                 dataTypeResult = transformListImmutablePair.getLeft().getProducedType();
                 totalParallelism.set(totalParallelism.get() + transformAction.getParallelism());
@@ -257,32 +264,32 @@ public class JobConfigParser {
                                List<? extends Config> transformConfigs,
                                List<? extends Config> sinkConfigs) {
         ImmutablePair<SeaTunnelSource, Set<URL>> pair =
-            ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0));
+            ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0), jobConfig.getSeaTunnelContext());
         SourceAction sourceAction =
             createSourceAction(idGenerator.getNextId(), pair.getLeft().getPluginName(), pair.getLeft(),
                 pair.getRight());
         sourceAction.setParallelism(getSourceParallelism(sourceConfigs.get(0)));
         SeaTunnelDataType dataType = sourceAction.getSource().getProducedType();
         ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, Set<URL>>
-                sinkListImmutablePair = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0));
+            sinkListImmutablePair = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0), jobConfig.getSeaTunnelContext());
 
         Action sinkUpstreamAction = sourceAction;
 
         if (!CollectionUtils.isEmpty(transformConfigs)) {
             ImmutablePair<SeaTunnelTransform<?>, Set<URL>> transformListImmutablePair =
-                    ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0));
+                ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0), jobConfig.getSeaTunnelContext());
             transformListImmutablePair.getLeft().setTypeInfo(dataType);
 
             dataType = transformListImmutablePair.getLeft().getProducedType();
             TransformAction transformAction = createTransformAction(
-                    idGenerator.getNextId(),
-                    transformListImmutablePair.getLeft().getPluginName(),
-                    Lists.newArrayList(sourceAction),
-                    transformListImmutablePair.getLeft(),
-                    transformListImmutablePair.getRight());
+                idGenerator.getNextId(),
+                transformListImmutablePair.getLeft().getPluginName(),
+                Lists.newArrayList(sourceAction),
+                transformListImmutablePair.getLeft(),
+                transformListImmutablePair.getRight());
 
             initTransformParallelism(transformConfigs, sourceAction, transformListImmutablePair.getLeft(),
-                    transformAction);
+                transformAction);
 
             sinkUpstreamAction = transformAction;
         }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 5c83874e7..7176867a2 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.engine.client.job;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -76,22 +75,15 @@ public class JobExecutionEnvironment {
         return actions;
     }
 
-    public JobProxy execute() throws ExecutionException, InterruptedException {
+    public ClientJobProxy execute() throws ExecutionException, InterruptedException {
         JobClient jobClient = new JobClient(seaTunnelHazelcastClient);
-        initSeaTunnelContext();
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(
             jobClient.getNewJobId(),
             seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
             jobConfig,
             jarUrls);
 
-        JobProxy jobProxy = jobClient.createJobProxy(jobImmutableInformation);
-        jobProxy.submitJob();
-        return jobProxy;
-    }
-
-    private void initSeaTunnelContext() {
-        SeaTunnelContext.getContext().setJobMode(jobConfig.getMode());
+        return jobClient.createJobProxy(jobImmutableInformation);
     }
 
     public LogicalDag getLogicalDag() {
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
index 919b492d5..39231a6df 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -41,7 +41,7 @@ public class JobConfigParserTest {
     @Test
     public void testSimpleJobParse() {
         Common.setDeployMode(DeployMode.CLIENT);
-        String filePath = TestUtils.getResource("/fakesource_to_file.conf");
+        String filePath = TestUtils.getResource("/batch_fakesource_to_file.conf");
         JobConfigParser jobConfigParser = new JobConfigParser(filePath, new IdGenerator(), new JobConfig());
         ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
         List<Action> actions = parse.getLeft();
@@ -59,7 +59,7 @@ public class JobConfigParserTest {
     @Test
     public void testComplexJobParse() {
         Common.setDeployMode(DeployMode.CLIENT);
-        String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+        String filePath = TestUtils.getResource("/batch_fakesource_to_file_complex.conf");
         JobConfigParser jobConfigParser = new JobConfigParser(filePath, new IdGenerator(), new JobConfig());
         ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
         List<Action> actions = parse.getLeft();
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index 1984bbef5..1c0da5503 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.engine.client;
 
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.engine.client.job.JobConfigParser;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -43,9 +42,8 @@ public class LogicalDagGeneratorTest {
     @Test
     public void testLogicalGenerator() {
         Common.setDeployMode(DeployMode.CLIENT);
-        String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+        String filePath = TestUtils.getResource("/batch_fakesource_to_file_complex.conf");
         JobConfig jobConfig = new JobConfig();
-        jobConfig.setMode(JobMode.BATCH);
         jobConfig.setName("fake_to_file");
 
         IdGenerator idGenerator = new IdGenerator();
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 5ba9787db..a4745687c 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -17,11 +17,13 @@
 
 package org.apache.seatunnel.engine.client;
 
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
-import org.apache.seatunnel.engine.client.job.JobProxy;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
@@ -39,6 +41,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 @SuppressWarnings("checkstyle:MagicNumber")
 @RunWith(JUnit4.class)
@@ -49,6 +52,7 @@ public class SeaTunnelClientTest {
     @Before
     public void beforeClass() throws Exception {
         SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
         instance = HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
             Thread.currentThread().getName(),
             new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
@@ -57,6 +61,7 @@ public class SeaTunnelClientTest {
     @Test
     public void testSayHello() {
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
         SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
 
         String msg = "Hello world";
@@ -69,18 +74,17 @@ public class SeaTunnelClientTest {
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = TestUtils.getResource("/client_test.conf");
         JobConfig jobConfig = new JobConfig();
-        jobConfig.setMode(JobMode.BATCH);
         jobConfig.setName("fake_to_file");
 
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
         SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
         JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
 
-        JobProxy jobProxy = null;
         try {
-            jobProxy = jobExecutionEnv.execute();
-            JobStatus jobStatus = jobProxy.waitForJobComplete();
-            Assert.assertEquals(JobStatus.FINISHED, jobStatus);
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> assertEquals(JobStatus.FINISHED, clientJobProxy.waitForJobComplete()));
         } catch (ExecutionException | InterruptedException e) {
             throw new RuntimeException(e);
         }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java
index c392fdaf5..4a16a6c6a 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java
@@ -21,4 +21,8 @@ public class TestUtils {
     public static String getResource(String confFile) {
         return System.getProperty("user.dir") + "/src/test/resources" + confFile;
     }
+
+    public static String getClusterName(String testClassName) {
+        return System.getProperty("user.name") + "_" + testClassName;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
similarity index 100%
rename from seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf
rename to seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
similarity index 100%
rename from seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf
rename to seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 9c016d91f..6aab140cd 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -35,4 +35,8 @@ public class Constant {
     public static final String HAZELCAST_SEATUNNEL_CONF_FILE_PREFIX = "seatunnel";
 
     public static final String HAZELCAST_SEATUNNEL_DEFAULT_YAML = "seatunnel-default.yaml";
+
+    public static final int OPERATION_RETRY_TIME = 5;
+
+    public static final int OPERATION_RETRY_SLEEP = 2000;
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
index 6bb41c469..2b31a1115 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.engine.common.config;
 
-import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.engine.common.serializeable.ConfigDataSerializerHook;
 
 import com.hazelcast.nio.ObjectDataInput;
@@ -30,7 +30,7 @@ import java.io.IOException;
 @Data
 public class JobConfig implements IdentifiedDataSerializable {
     private String name;
-    private JobMode mode;
+    private SeaTunnelContext seaTunnelContext;
 
     @Override
     public int getFactoryId() {
@@ -45,10 +45,12 @@ public class JobConfig implements IdentifiedDataSerializable {
     @Override
     public void writeData(ObjectDataOutput out) throws IOException {
         out.writeString(name);
+        out.writeObject(seaTunnelContext);
     }
 
     @Override
     public void readData(ObjectDataInput in) throws IOException {
         this.name = in.readString();
+        this.seaTunnelContext = in.readObject();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index fa1065518..517d5b1f2 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -17,12 +17,19 @@
 
 package org.apache.seatunnel.engine.core.job;
 
-import java.util.concurrent.ExecutionException;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 
+/**
+ * Job interface define the Running job apis
+ */
 public interface Job {
     long getJobId();
 
-    void submitJob() throws ExecutionException, InterruptedException;
+    PassiveCompletableFuture<JobStatus> doWaitForJobComplete();
+
+    void cancelJob();
+
+    JobStatus getJobStatus();
 
     JobStatus waitForJobComplete();
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelCancelJobCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelCancelJobCodec.java
new file mode 100644
index 000000000..0c48d17f6
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelCancelJobCodec.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.protocol.codec;
+
+import static com.hazelcast.client.impl.protocol.ClientMessage.ForwardFrameIterator;
+import static com.hazelcast.client.impl.protocol.ClientMessage.Frame;
+import static com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.Generated;
+
+/*
+ * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
+ * To change this file, edit the templates or the protocol
+ * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
+ * and regenerate it.
+ */
+
+@Generated("b8660c8a07cf0fd33e4191f33a26b46e")
+public final class SeaTunnelCancelJobCodec {
+    //hex: 0xDE0400
+    public static final int REQUEST_MESSAGE_TYPE = 14550016;
+    //hex: 0xDE0401
+    public static final int RESPONSE_MESSAGE_TYPE = 14550017;
+    private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+    private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
+    private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+
+    private SeaTunnelCancelJobCodec() {
+    }
+
+    public static ClientMessage encodeRequest(long jobId) {
+        ClientMessage clientMessage = ClientMessage.createForEncode();
+        clientMessage.setRetryable(true);
+        clientMessage.setOperationName("SeaTunnel.CancelJob");
+        Frame initialFrame = new Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
+        encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+        encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
+        clientMessage.add(initialFrame);
+        return clientMessage;
+    }
+
+    public static long decodeRequest(ClientMessage clientMessage) {
+        ForwardFrameIterator iterator = clientMessage.frameIterator();
+        Frame initialFrame = iterator.next();
+        return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
+    }
+
+    public static ClientMessage encodeResponse() {
+        ClientMessage clientMessage = ClientMessage.createForEncode();
+        Frame initialFrame = new Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
+        clientMessage.add(initialFrame);
+
+        return clientMessage;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobStatusCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobStatusCodec.java
new file mode 100644
index 000000000..f6002a6c7
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobStatusCodec.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.protocol.codec;
+
+import static com.hazelcast.client.impl.protocol.ClientMessage.ForwardFrameIterator;
+import static com.hazelcast.client.impl.protocol.ClientMessage.Frame;
+import static com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeInt;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.Generated;
+
+/*
+ * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
+ * To change this file, edit the templates or the protocol
+ * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
+ * and regenerate it.
+ */
+
+@Generated("069a370867d61e85d3d51ea5453d880a")
+public final class SeaTunnelGetJobStatusCodec {
+    //hex: 0xDE0500
+    public static final int REQUEST_MESSAGE_TYPE = 14550272;
+    //hex: 0xDE0501
+    public static final int RESPONSE_MESSAGE_TYPE = 14550273;
+    private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+    private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
+    private static final int RESPONSE_JOB_STATUS_FIELD_OFFSET = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+    private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_JOB_STATUS_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+
+    private SeaTunnelGetJobStatusCodec() {
+    }
+
+    public static ClientMessage encodeRequest(long jobId) {
+        ClientMessage clientMessage = ClientMessage.createForEncode();
+        clientMessage.setRetryable(true);
+        clientMessage.setOperationName("SeaTunnel.GetJobStatus");
+        Frame initialFrame = new Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
+        encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+        encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
+        clientMessage.add(initialFrame);
+        return clientMessage;
+    }
+
+    public static long decodeRequest(ClientMessage clientMessage) {
+        ForwardFrameIterator iterator = clientMessage.frameIterator();
+        Frame initialFrame = iterator.next();
+        return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
+    }
+
+    public static ClientMessage encodeResponse(int jobStatus) {
+        ClientMessage clientMessage = ClientMessage.createForEncode();
+        Frame initialFrame = new Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
+        encodeInt(initialFrame.content, RESPONSE_JOB_STATUS_FIELD_OFFSET, jobStatus);
+        clientMessage.add(initialFrame);
+
+        return clientMessage;
+    }
+
+    public static int decodeResponse(ClientMessage clientMessage) {
+        ForwardFrameIterator iterator = clientMessage.frameIterator();
+        Frame initialFrame = iterator.next();
+        return decodeInt(initialFrame.content, RESPONSE_JOB_STATUS_FIELD_OFFSET);
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index 19f616d29..39cb3c386 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -80,3 +80,39 @@ methods:
           nullable: false
           since: 2.0
           doc: ''
+
+  - id: 4
+    name: cancelJob
+    since: 2.0
+    doc: ''
+    request:
+      retryable: true
+      partitionIdentifier: -1
+      params:
+        - name: jobId
+          type: long
+          nullable: false
+          since: 2.0
+          doc: ''
+    response: {}
+
+  - id: 5
+    name: getJobStatus
+    since: 2.0
+    doc: ''
+    request:
+      retryable: true
+      partitionIdentifier: -1
+      params:
+        - name: jobId
+          type: long
+          nullable: false
+          since: 2.0
+          doc: ''
+    response:
+      params:
+        - name: jobStatus
+          type: int
+          nullable: false
+          since: 2.0
+          doc: ''
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 019a97797..38eb6205f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -167,4 +167,27 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
             return runningJobMaster.getJobMasterCompleteFuture();
         }
     }
+
+    public PassiveCompletableFuture<Void> cancelJob(long jodId) {
+        JobMaster runningJobMaster = runningJobMasterMap.get(jodId);
+        if (runningJobMaster == null) {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.complete(null);
+            return new PassiveCompletableFuture<>(future);
+        } else {
+            return new PassiveCompletableFuture<>(CompletableFuture.supplyAsync(() -> {
+                runningJobMaster.cancelJob();
+                return null;
+            }));
+        }
+    }
+
+    public JobStatus getJobStatus(long jobId) {
+        JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
+        if (runningJobMaster == null) {
+            // TODO Get Job Status from JobHistoryStorage
+            return JobStatus.FINISHED;
+        }
+        return runningJobMaster.getJobStatus();
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index df9651768..1bd2b4b9e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -25,7 +25,6 @@ import static java.util.stream.Collectors.partitioningBy;
 import static java.util.stream.Collectors.toList;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
@@ -183,11 +182,18 @@ public class TaskExecutionService {
         return new PassiveCompletableFuture<>(resultFuture);
     }
 
+    /**
+     * JobMaster call this method to cancel a task, and then {@link TaskExecutionService} cancel this task and send the
+     * {@link TaskExecutionState} to JobMaster.
+     *
+     * @param taskId TaskGroup.getId()
+     */
     public void cancelTaskGroup(long taskId) {
+        logger.info(String.format("Task (%s) need cancel.", taskId));
         if (cancellationFutures.containsKey(taskId)) {
             cancellationFutures.get(taskId).cancel(false);
         } else {
-            throw new SeaTunnelEngineException(String.format("taskId : %s is not exist", taskId));
+            logger.warning(String.format("need cancel taskId : %s is not exist", taskId));
         }
 
     }
@@ -378,7 +384,7 @@ public class TaskExecutionService {
                 if (ex == null) {
                     future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FINISHED, null));
                 } else if (isCancel.get()) {
-                    future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.CANCELED, ex));
+                    future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.CANCELED, null));
                 } else {
                     future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, ex));
                 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index b769f76be..dc79fceec 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.dag.physical;
 
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
@@ -32,6 +33,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 public class PhysicalPlan {
 
@@ -72,6 +74,8 @@ public class PhysicalPlan {
 
     private final ExecutorService executorService;
 
+    private final String jobFullName;
+
     public PhysicalPlan(@NonNull List<SubPlan> pipelineList,
                         @NonNull ExecutorService executorService,
                         @NonNull JobImmutableInformation jobImmutableInformation,
@@ -90,6 +94,7 @@ public class PhysicalPlan {
         if (pipelineList.isEmpty()) {
             throw new UnknownPhysicalPlanException("The physical plan didn't have any can execute pipeline");
         }
+        this.jobFullName = String.format("Job %s (%s)", jobImmutableInformation.getJobConfig().getName(), jobImmutableInformation.getJobId());
         Arrays.stream(this.waitForCompleteBySubPlan).forEach(x -> {
             x.whenComplete((v, t) -> {
                 // We need not handle t, Because we will not return t from Pipeline
@@ -109,9 +114,9 @@ public class PhysicalPlan {
                     if (failedPipelineNum.get() > 0) {
                         updateJobState(JobStatus.FAILING);
                     } else if (canceledPipelineNum.get() > 0) {
-                        updateJobState(JobStatus.CANCELED);
+                        turnToEndState(JobStatus.CANCELED);
                     } else {
-                        updateJobState(JobStatus.FINISHED);
+                        turnToEndState(JobStatus.FINISHED);
                     }
                     jobEndFuture.complete(jobStatus.get());
                 }
@@ -119,25 +124,64 @@ public class PhysicalPlan {
         });
     }
 
-    public PassiveCompletableFuture<Void> cancelJob() {
-        CompletableFuture<Void> cancelFuture = CompletableFuture.supplyAsync(() -> {
-            // TODO Implement cancel pipeline in job.
-            return null;
-        });
+    public void cancelJob() {
+        if (!updateJobState(JobStatus.CREATED, JobStatus.CANCELED)) {
+            // may be running, failing, failed, canceling , canceled, finished
+            if (updateJobState(JobStatus.RUNNING, JobStatus.CANCELLING)) {
+                cancelRunningJob();
+            } else {
+                LOGGER.info(String.format("%s in a non cancellable state: %s, skip cancel", jobFullName, jobStatus.get()));
+            }
+        }
+    }
 
-        cancelFuture.complete(null);
-        return new PassiveCompletableFuture<>(cancelFuture);
+    private void cancelRunningJob() {
+        List<CompletableFuture<Void>> collect = pipelineList.stream().map(pipeline -> {
+            if (!pipeline.getPipelineState().get().isEndState() &&
+                !PipelineState.CANCELING.equals(pipeline.getPipelineState().get())) {
+                CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
+                    pipeline.cancelPipeline();
+                    return null;
+                });
+                return future;
+            }
+            return null;
+        }).filter(x -> x != null).collect(Collectors.toList());
+
+        try {
+            CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
+                collect.toArray(new CompletableFuture[collect.size()]));
+            voidCompletableFuture.get();
+        } catch (Exception e) {
+            LOGGER.severe(
+                String.format("%s cancel error with exception: %s", jobFullName, ExceptionUtils.getMessage(e)));
+        }
     }
 
     public List<SubPlan> getPipelineList() {
         return pipelineList;
     }
 
-    public void turnToRunning() {
-        if (!updateJobState(JobStatus.CREATED, JobStatus.RUNNING)) {
-            throw new IllegalStateException(
-                "Job may only be scheduled from state " + JobStatus.CREATED);
+    public boolean turnToRunning() {
+        return updateJobState(JobStatus.CREATED, JobStatus.RUNNING);
+    }
+
+    private void turnToEndState(@NonNull JobStatus endState) {
+        // consistency check
+        if (jobStatus.get().isEndState()) {
+            String message = "Job is trying to leave terminal state " + jobStatus.get();
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
         }
+
+        if (!endState.isEndState()) {
+            String message = "Need a end state, not " + endState;
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        jobStatus.set(endState);
+        stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
     }
 
     public boolean updateJobState(@NonNull JobStatus targetState) {
@@ -153,8 +197,7 @@ public class PhysicalPlan {
         }
 
         // now do the actual state transition
-        if (jobStatus.get() == current) {
-            jobStatus.set(targetState);
+        if (jobStatus.compareAndSet(current, targetState)) {
             LOGGER.info(String.format("Job %s (%s) turn from state %s to %s.",
                 jobImmutableInformation.getJobConfig().getName(),
                 jobImmutableInformation.getJobId(),
@@ -179,4 +222,8 @@ public class PhysicalPlan {
     public JobStatus getJobStatus() {
         return jobStatus.get();
     }
+
+    public String getJobFullName() {
+        return jobFullName;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 84f9c25db..d63c87859 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -19,14 +19,16 @@ package org.apache.seatunnel.engine.server.dag.physical;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.exception.JobException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
-import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
+import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
+import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
@@ -100,6 +102,10 @@ public class PhysicalVertex {
 
     private final NodeEngine nodeEngine;
 
+    private Address currentExecutionAddress;
+
+    private TaskGroupImmutableInformation taskGroupImmutableInformation;
+
     public PhysicalVertex(long physicalVertexId,
                           int subTaskGroupIndex,
                           @NonNull ExecutorService executorService,
@@ -145,25 +151,56 @@ public class PhysicalVertex {
     @SuppressWarnings("checkstyle:MagicNumber")
     // This method must not throw an exception
     public void deploy(@NonNull Address address) {
-        TaskGroupImmutableInformation taskGroupImmutableInformation =
-            new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
-                nodeEngine.getSerializationService().toData(this.taskGroup),
-                this.pluginJarsUrls);
+        currentExecutionAddress = address;
+        taskGroupImmutableInformation = new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
+            nodeEngine.getSerializationService().toData(this.taskGroup),
+            this.pluginJarsUrls);
 
         try {
-            waitForCompleteByExecutionService = new PassiveCompletableFuture<>(
-                nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
-                        new DeployTaskOperation(nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
-                        address)
-                    .invoke());
-            updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
+            if (ExecutionState.DEPLOYING.equals(executionState.get())) {
+                waitForCompleteByExecutionService = new PassiveCompletableFuture<>(
+                    nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+                            new DeployTaskOperation(
+                                nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+                            currentExecutionAddress)
+                        .invoke());
+
+                // may be canceling
+                if (!updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
+                    // If we found the task state turned to CANCELING after deployed to TaskExecutionService. We need
+                    // notice the TaskExecutionService to cancel this task.
+                    noticeTaskExecutionServiceCancel();
+                    if (ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
+                        turnToEndState(ExecutionState.CANCELED);
+                        taskFuture.complete(
+                            new TaskExecutionState(this.physicalVertexId, ExecutionState.CANCELED, null));
+                    } else {
+                        turnToEndState(ExecutionState.FAILED);
+                        taskFuture.complete(new TaskExecutionState(this.physicalVertexId, ExecutionState.FAILED,
+                            new JobException(String.format("%s turn to a unexpected state: %s, make it Failed",
+                                this.getTaskFullName(), executionState.get()))));
+                    }
+                }
+            } else if (ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
+                turnToEndState(ExecutionState.CANCELED);
+                taskFuture.complete(new TaskExecutionState(this.physicalVertexId, executionState.get(), null));
+            } else {
+                turnToEndState(ExecutionState.FAILED);
+                taskFuture.complete(new TaskExecutionState(this.physicalVertexId, executionState.get(),
+                    new JobException(String.format("%s turn to a unexpected state"))));
+            }
+
         } catch (Throwable th) {
             LOGGER.severe(String.format("%s deploy error with Exception: %s",
                 this.taskFullName,
                 ExceptionUtils.getMessage(th)));
-            updateTaskState(ExecutionState.DEPLOYING, ExecutionState.FAILED);
+            turnToEndState(ExecutionState.FAILED);
             taskFuture.complete(
-                new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED, th));
+                new TaskExecutionState(this.physicalVertexId, ExecutionState.FAILED, th));
+        }
+
+        if (waitForCompleteByExecutionService == null) {
+            return;
         }
 
         waitForCompleteByExecutionService.whenComplete((v, t) -> {
@@ -174,7 +211,7 @@ public class PhysicalVertex {
                         new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED,
                             t));
                 } else {
-                    updateTaskState(executionState.get(), v.getExecutionState());
+                    turnToEndState(v.getExecutionState());
                     if (v.getThrowable() != null) {
                         LOGGER.severe(String.format("%s end with state %s and Exception: %s",
                             this.taskFullName,
@@ -190,7 +227,7 @@ public class PhysicalVertex {
             } catch (Throwable th) {
                 LOGGER.severe(
                     String.format("%s end with Exception: %s", this.taskFullName, ExceptionUtils.getMessage(th)));
-                updateTaskState(ExecutionState.RUNNING, ExecutionState.FAILED);
+                turnToEndState(ExecutionState.FAILED);
                 v = new TaskExecutionState(v.getTaskExecutionId(), ExecutionState.FAILED, th);
                 taskFuture.complete(v);
             }
@@ -201,6 +238,27 @@ public class PhysicalVertex {
         return physicalVertexId;
     }
 
+    private void turnToEndState(@NonNull ExecutionState endState) {
+        // consistency check
+        if (executionState.get().isEndState()) {
+            String message = "Task is trying to leave terminal state " + executionState.get();
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        if (!endState.isEndState()) {
+            String message = "Need a end state, not " + endState;
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        LOGGER.info(String.format("%s turn to end state %s.",
+            taskFullName,
+            endState));
+        executionState.set(endState);
+        stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
+    }
+
     public boolean updateTaskState(@NonNull ExecutionState current, @NonNull ExecutionState targetState) {
         // consistency check
         if (current.isEndState()) {
@@ -228,8 +286,7 @@ public class PhysicalVertex {
         }
 
         // now do the actual state transition
-        if (executionState.get() == current) {
-            executionState.set(targetState);
+        if (executionState.compareAndSet(current, targetState)) {
             LOGGER.info(String.format("%s turn from state %s to %s.",
                 taskFullName,
                 current,
@@ -245,4 +302,47 @@ public class PhysicalVertex {
     public TaskGroupDefaultImpl getTaskGroup() {
         return taskGroup;
     }
+
+    public void cancel() {
+        if (updateTaskState(ExecutionState.CREATED, ExecutionState.CANCELED) ||
+            updateTaskState(ExecutionState.SCHEDULED, ExecutionState.CANCELED)) {
+            taskFuture.complete(new TaskExecutionState(this.physicalVertexId, ExecutionState.CANCELED, null));
+        } else if (updateTaskState(ExecutionState.DEPLOYING, ExecutionState.CANCELING)) {
+            // do nothing, because even if task is deployed to TaskExecutionService, we can do the cancel in deploy method
+        } else if (updateTaskState(ExecutionState.RUNNING, ExecutionState.CANCELING)) {
+            noticeTaskExecutionServiceCancel();
+        }
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private void noticeTaskExecutionServiceCancel() {
+        int i = 0;
+        // In order not to generate uncontrolled tasks, We will try again until the taskFuture is completed
+        while (!taskFuture.isDone()) {
+            try {
+                i++;
+                nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+                        new CancelTaskOperation(taskGroup.getId()),
+                        currentExecutionAddress)
+                    .invoke().get();
+                return;
+            } catch (Exception e) {
+                LOGGER.warning(String.format("%s cancel failed with Exception: %s, retry %s", this.getTaskFullName(),
+                    ExceptionUtils.getMessage(e), i));
+                try {
+                    Thread.sleep(2000);
+                } catch (InterruptedException ex) {
+                    throw new RuntimeException(ex);
+                }
+            }
+        }
+    }
+
+    public AtomicReference<ExecutionState> getExecutionState() {
+        return executionState;
+    }
+
+    public String getTaskFullName() {
+        return taskFullName;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 950a63e95..fbccceef8 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.dag.physical;
 
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.PipelineState;
@@ -32,6 +33,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 public class SubPlan {
     private static final ILogger LOGGER = Logger.getLogger(SubPlan.class);
@@ -122,13 +124,13 @@ public class SubPlan {
 
                 if (finishedTaskNum.incrementAndGet() == (physicalVertexList.size() + coordinatorVertexList.size())) {
                     if (failedTaskNum.get() > 0) {
-                        updatePipelineState(PipelineState.FAILED);
+                        turnToEndState(PipelineState.FAILED);
                         LOGGER.info(String.format("%s end with state FAILED", this.pipelineFullName));
                     } else if (canceledTaskNum.get() > 0) {
-                        updatePipelineState(PipelineState.CANCELED);
+                        turnToEndState(PipelineState.CANCELED);
                         LOGGER.info(String.format("%s end with state CANCELED", this.pipelineFullName));
                     } else {
-                        updatePipelineState(PipelineState.FINISHED);
+                        turnToEndState(PipelineState.FINISHED);
                         LOGGER.info(String.format("%s end with state FINISHED", this.pipelineFullName));
                     }
                     pipelineFuture.complete(pipelineState.get());
@@ -137,8 +139,22 @@ public class SubPlan {
         });
     }
 
-    public boolean updatePipelineState(@NonNull PipelineState targetState) {
-        return updatePipelineState(pipelineState.get(), targetState);
+    private void turnToEndState(@NonNull PipelineState endState) {
+        // consistency check
+        if (pipelineState.get().isEndState()) {
+            String message = "Pipeline is trying to leave terminal state " + pipelineState.get();
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        if (!endState.isEndState()) {
+            String message = "Need a end state, not " + endState;
+            LOGGER.severe(message);
+            throw new IllegalStateException(message);
+        }
+
+        pipelineState.set(endState);
+        stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
     }
 
     public boolean updatePipelineState(@NonNull PipelineState current, @NonNull PipelineState targetState) {
@@ -168,8 +184,7 @@ public class SubPlan {
         }
 
         // now do the actual state transition
-        if (pipelineState.get() == current) {
-            pipelineState.set(targetState);
+        if (pipelineState.compareAndSet(current, targetState)) {
             LOGGER.info(String.format("%s turn from state %s to %s.",
                 pipelineFullName,
                 current,
@@ -182,20 +197,57 @@ public class SubPlan {
         }
     }
 
-    public PassiveCompletableFuture<Void> cancelPipeline() {
-        CompletableFuture<Void> cancelFuture = CompletableFuture.supplyAsync(() -> {
-            // TODO Implement cancel tasks in pipeline.
-            return null;
-        });
+    public void cancelPipeline() {
+        if (!updatePipelineState(PipelineState.CREATED, PipelineState.CANCELED) &&
+            !updatePipelineState(PipelineState.SCHEDULED, PipelineState.CANCELED)) {
+            // may be deploying, running, failed, canceling , canceled, finished
+            if (updatePipelineState(PipelineState.DEPLOYING, PipelineState.CANCELING) ||
+                updatePipelineState(PipelineState.RUNNING, PipelineState.CANCELING)) {
+                cancelRunningPipeline();
+            } else {
+                LOGGER.info(
+                    String.format("%s in a non cancellable state: %s, skip cancel", pipelineFullName, pipelineState.get()));
+            }
+        } else {
+            pipelineFuture.complete(PipelineState.CANCELED);
+        }
+    }
+
+    private void cancelRunningPipeline() {
+        List<CompletableFuture<Void>> coordinatorCancelList =
+            coordinatorVertexList.stream().map(coordinator -> cancelTask(coordinator)).filter(x -> x != null)
+                .collect(Collectors.toList());
+
+        List<CompletableFuture<Void>> taskCancelList =
+            physicalVertexList.stream().map(task -> cancelTask(task)).filter(x -> x != null)
+                .collect(Collectors.toList());
+
+        try {
+            coordinatorCancelList.addAll(taskCancelList);
+            CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
+                coordinatorCancelList.toArray(new CompletableFuture[coordinatorCancelList.size()]));
+            voidCompletableFuture.get();
+        } catch (Exception e) {
+            LOGGER.severe(
+                String.format("%s cancel error with exception: %s", pipelineFullName, ExceptionUtils.getMessage(e)));
+        }
+    }
 
-        cancelFuture.complete(null);
-        return new PassiveCompletableFuture<>(cancelFuture);
+    private CompletableFuture<Void> cancelTask(@NonNull PhysicalVertex task) {
+        if (!task.getExecutionState().get().isEndState() &&
+            !ExecutionState.CANCELING.equals(task.getExecutionState().get())) {
+            CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
+                task.cancel();
+                return null;
+            });
+            return future;
+        }
+        return null;
     }
 
     public void failedWithNoEnoughResource() {
         LOGGER.severe(String.format("%s failed with have no enough resource to run.", this.getPipelineFullName()));
-        updatePipelineState(PipelineState.SCHEDULED, PipelineState.FAILED);
-        pipelineFuture.complete(PipelineState.FAILED);
+        cancelPipeline();
     }
 
     public int getPipelineIndex() {
@@ -213,4 +265,8 @@ public class SubPlan {
     public String getPipelineFullName() {
         return pipelineFullName;
     }
+
+    public AtomicReference<PipelineState> getPipelineState() {
+        return pipelineState;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 3a7895389..1383797f9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -134,7 +134,11 @@ public class JobMaster implements Runnable {
     }
 
     public void cleanJob() {
-        // TODO clean something
+        // TODO Add some job clean operation
+    }
+
+    public void cancelJob() {
+        this.physicalPlan.cancelJob();
     }
 
     public ResourceManager getResourceManager() {
@@ -152,4 +156,8 @@ public class JobMaster implements Runnable {
     public JobImmutableInformation getJobImmutableInformation() {
         return jobImmutableInformation;
     }
+
+    public JobStatus getJobStatus() {
+        return physicalPlan.getJobStatus();
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CancelJobOperation.java
similarity index 53%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CancelJobOperation.java
index 4c48060b4..86c020cd4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CancelJobOperation.java
@@ -21,44 +21,23 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
-import com.hazelcast.internal.nio.IOUtil;
-import com.hazelcast.internal.serialization.Data;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import lombok.NonNull;
-
-import java.io.IOException;
-
-public class DeployTaskOperation extends AsyncOperation {
-    private Data taskImmutableInformation;
-
-    public DeployTaskOperation() {
+public class CancelJobOperation extends AbstractJobAsyncOperation {
+    public CancelJobOperation() {
+        super();
     }
 
-    public DeployTaskOperation(@NonNull Data taskImmutableInformation) {
-        this.taskImmutableInformation = taskImmutableInformation;
+    public CancelJobOperation(long jobId) {
+        super(jobId);
     }
 
     @Override
     protected PassiveCompletableFuture<?> doRun() throws Exception {
-        SeaTunnelServer server = getService();
-        return server.getTaskExecutionService().deployTask(taskImmutableInformation);
+        SeaTunnelServer service = getService();
+        return service.cancelJob(jobId);
     }
 
     @Override
     public int getClassId() {
-        return OperationDataSerializerHook.DEPLOY_TASK_OPERATOR;
-    }
-
-    @Override
-    protected void writeInternal(ObjectDataOutput out) throws IOException {
-        super.writeInternal(out);
-        IOUtil.writeData(out, taskImmutableInformation);
-    }
-
-    @Override
-    protected void readInternal(ObjectDataInput in) throws IOException {
-        super.readInternal(in);
-        taskImmutableInformation = IOUtil.readData(in);
+        return OperationDataSerializerHook.CANCEL_JOB_OPERATOR;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
similarity index 61%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
index 4c48060b4..9d6d4cd14 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
@@ -17,48 +17,59 @@
 
 package org.apache.seatunnel.engine.server.operation;
 
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
-import com.hazelcast.internal.nio.IOUtil;
-import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
-import lombok.NonNull;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.AllowedDuringPassiveState;
+import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
 
-public class DeployTaskOperation extends AsyncOperation {
-    private Data taskImmutableInformation;
+public class GetJobStatusOperation extends Operation implements IdentifiedDataSerializable, AllowedDuringPassiveState {
+    private long jobId;
 
-    public DeployTaskOperation() {
+    private int response;
+
+    public GetJobStatusOperation() {
     }
 
-    public DeployTaskOperation(@NonNull Data taskImmutableInformation) {
-        this.taskImmutableInformation = taskImmutableInformation;
+    public GetJobStatusOperation(long jobId) {
+        this.jobId = jobId;
     }
 
     @Override
-    protected PassiveCompletableFuture<?> doRun() throws Exception {
-        SeaTunnelServer server = getService();
-        return server.getTaskExecutionService().deployTask(taskImmutableInformation);
+    public final int getFactoryId() {
+        return OperationDataSerializerHook.FACTORY_ID;
     }
 
     @Override
     public int getClassId() {
-        return OperationDataSerializerHook.DEPLOY_TASK_OPERATOR;
+        return OperationDataSerializerHook.GET_JOB_STATUS_OPERATOR;
     }
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         super.writeInternal(out);
-        IOUtil.writeData(out, taskImmutableInformation);
+        out.writeLong(jobId);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         super.readInternal(in);
-        taskImmutableInformation = IOUtil.readData(in);
+        jobId = in.readLong();
+    }
+
+    @Override
+    public void run() {
+        SeaTunnelServer service = getService();
+        response = service.getJobStatus(jobId).ordinal();
+    }
+
+    @Override
+    public Object getResponse() {
+        return response;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/CancelJobTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/CancelJobTask.java
new file mode 100644
index 000000000..043de6eb3
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/CancelJobTask.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.protocol.task;
+
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
+import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class CancelJobTask extends AbstractSeaTunnelMessageTask<Long, Void> {
+    protected CancelJobTask(ClientMessage clientMessage, Node node, Connection connection) {
+        super(clientMessage, node, connection,
+            SeaTunnelCancelJobCodec::decodeRequest,
+            x -> SeaTunnelCancelJobCodec.encodeResponse());
+    }
+
+    @Override
+    protected Operation prepareOperation() {
+        return new CancelJobOperation(parameters);
+    }
+
+    @Override
+    public String getMethodName() {
+        return "cancelJob";
+    }
+
+    @Override
+    public Object[] getParameters() {
+        return new Object[0];
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobStatusTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobStatusTask.java
new file mode 100644
index 000000000..4202c804e
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobStatusTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.protocol.task;
+
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
+import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class GetJobStatusTask extends AbstractSeaTunnelMessageTask<Long, Integer> {
+
+    protected GetJobStatusTask(ClientMessage clientMessage, Node node, Connection connection) {
+        super(clientMessage, node, connection,
+            SeaTunnelGetJobStatusCodec::decodeRequest,
+            SeaTunnelGetJobStatusCodec::encodeResponse);
+    }
+
+    @Override
+    protected Operation prepareOperation() {
+        return new GetJobStatusOperation(parameters);
+    }
+
+    @Override
+    public String getMethodName() {
+        return "getJobStatus";
+    }
+
+    @Override
+    public Object[] getParameters() {
+        return new Object[0];
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
index 9876f489e..902e42cb6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.server.protocol.task;
 
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
@@ -49,5 +51,9 @@ public class SeaTunnelMessageTaskFactoryProvider implements MessageTaskFactoryPr
             (clientMessage, connection) -> new SubmitJobTask(clientMessage, node, connection));
         factories.put(SeaTunnelWaitForJobCompleteCodec.REQUEST_MESSAGE_TYPE,
             (clientMessage, connection) -> new WaitForJobCompleteTask(clientMessage, node, connection));
+        factories.put(SeaTunnelCancelJobCodec.REQUEST_MESSAGE_TYPE,
+            (clientMessage, connection) -> new CancelJobTask(clientMessage, node, connection));
+        factories.put(SeaTunnelGetJobStatusCodec.REQUEST_MESSAGE_TYPE,
+            (clientMessage, connection) -> new GetJobStatusTask(clientMessage, node, connection));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 5d608a62a..8d42737d2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -17,9 +17,12 @@
 
 package org.apache.seatunnel.engine.server.scheduler;
 
+import org.apache.seatunnel.engine.common.exception.JobException;
 import org.apache.seatunnel.engine.common.exception.JobNoEnoughResourceException;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineState;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
 import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.master.JobMaster;
@@ -47,34 +50,43 @@ public class PipelineBaseScheduler implements JobScheduler {
 
     @Override
     public void startScheduling() {
-        physicalPlan.turnToRunning();
-        physicalPlan.getPipelineList().forEach(pipeline -> {
-            pipeline.updatePipelineState(PipelineState.CREATED, PipelineState.SCHEDULED);
-            if (applyResourceForPipeline(pipeline)) {
+        if (physicalPlan.turnToRunning()) {
+            List<CompletableFuture<Object>> collect = physicalPlan.getPipelineList().stream().map(pipeline -> {
+                if (!pipeline.updatePipelineState(PipelineState.CREATED, PipelineState.SCHEDULED)) {
+                    handlePipelineStateUpdateError(pipeline, PipelineState.SCHEDULED);
+                    return null;
+                }
+                if (!applyResourceForPipeline(pipeline)) {
+                    return null;
+                }
                 // deploy pipeline
-                deployPipeline(pipeline);
-            } else {
-                pipeline.failedWithNoEnoughResource();
+                return CompletableFuture.supplyAsync(() -> {
+                    deployPipeline(pipeline);
+                    return null;
+                });
+            }).filter(x -> x != null).collect(Collectors.toList());
+            try {
+                CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
+                    collect.toArray(new CompletableFuture[collect.size()]));
+                voidCompletableFuture.get();
+            } catch (Exception e) {
+                // cancel pipeline and throw an exception
+                physicalPlan.cancelJob();
+                throw new RuntimeException(e);
             }
-        });
+        } else if (!JobStatus.CANCELED.equals(physicalPlan.getJobStatus())) {
+            throw new JobException(String.format("%s turn to a unexpected state: %s", physicalPlan.getJobFullName(),
+                physicalPlan.getJobStatus()));
+        }
     }
 
     private boolean applyResourceForPipeline(@NonNull SubPlan subPlan) {
         try {
             // apply resource for coordinators
-            subPlan.getCoordinatorVertexList().forEach(coordinator -> {
-                // TODO If there is no enough resources for tasks, we need add some wait profile
-                coordinator.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
-                resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
-                    coordinator.getTaskGroup().getId());
-            });
+            subPlan.getCoordinatorVertexList().forEach(coordinator -> applyResourceForTask(coordinator));
 
             // apply resource for other tasks
-            subPlan.getPhysicalVertexList().forEach(task -> {
-                task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
-                resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
-                    task.getTaskGroup().getId());
-            });
+            subPlan.getPhysicalVertexList().forEach(task -> applyResourceForTask(task));
         } catch (JobNoEnoughResourceException e) {
             LOGGER.severe(e);
             return false;
@@ -83,39 +95,85 @@ public class PipelineBaseScheduler implements JobScheduler {
         return true;
     }
 
-    private void deployPipeline(@NonNull SubPlan pipeline) {
-        pipeline.updatePipelineState(PipelineState.SCHEDULED, PipelineState.DEPLOYING);
-        List<CompletableFuture> deployCoordinatorFuture =
-            pipeline.getCoordinatorVertexList().stream().map(coordinator -> {
-                if (coordinator.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
-                    // deploy is a time-consuming operation, so we do it async
-                    return CompletableFuture.supplyAsync(() -> {
-                        coordinator.deploy(
-                            resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
-                                coordinator.getTaskGroup().getId()));
-                        return null;
-                    });
-                }
+    private void applyResourceForTask(PhysicalVertex task) {
+        try {
+            // TODO If there is no enough resources for tasks, we need add some wait profile
+            if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
+                resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
+                    task.getTaskGroup().getId());
+            } else {
+                handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
+            }
+        } catch (JobNoEnoughResourceException e) {
+            LOGGER.severe(e);
+        }
+    }
+
+    private CompletableFuture<Void> deployTask(PhysicalVertex task) {
+        if (task.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
+            // deploy is a time-consuming operation, so we do it async
+            return CompletableFuture.supplyAsync(() -> {
+                task.deploy(
+                    resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
+                        task.getTaskGroup().getId()));
                 return null;
-            }).filter(x -> x != null).collect(Collectors.toList());
+            });
+        } else {
+            handleTaskStateUpdateError(task, ExecutionState.DEPLOYING);
+        }
+        return null;
+    }
+
+    private void deployPipeline(@NonNull SubPlan pipeline) {
+        if (pipeline.updatePipelineState(PipelineState.SCHEDULED, PipelineState.DEPLOYING)) {
+            List<CompletableFuture> deployCoordinatorFuture =
+                pipeline.getCoordinatorVertexList().stream().map(task -> deployTask(task)).filter(x -> x != null)
+                    .collect(Collectors.toList());
 
-        List<CompletableFuture> deployTaskFuture =
-            pipeline.getPhysicalVertexList().stream().map(task -> {
-                if (task.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
-                    return CompletableFuture.supplyAsync(() -> {
-                        task.deploy(
-                            resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
-                                task.getTaskGroup().getId()));
-                        return null;
-                    });
+            List<CompletableFuture> deployTaskFuture =
+                pipeline.getPhysicalVertexList().stream().map(task -> deployTask(task)).filter(x -> x != null)
+                    .collect(Collectors.toList());
+
+            try {
+                deployCoordinatorFuture.addAll(deployTaskFuture);
+                CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
+                    deployCoordinatorFuture.toArray(new CompletableFuture[deployCoordinatorFuture.size()]));
+                voidCompletableFuture.get();
+                if (!pipeline.updatePipelineState(PipelineState.DEPLOYING, PipelineState.RUNNING)) {
+                    LOGGER.info(
+                        String.format("%s turn to state %s, skip the running state.", pipeline.getPipelineFullName(),
+                            pipeline.getPipelineState().get()));
                 }
-                return null;
-            }).filter(x -> x != null).collect(Collectors.toList());
+            } catch (Exception e) {
+                // cancel pipeline and throw an exception
+                pipeline.cancelPipeline();
+                throw new RuntimeException(e);
+            }
+        } else {
+            handlePipelineStateUpdateError(pipeline, PipelineState.DEPLOYING);
+        }
+    }
 
-        deployCoordinatorFuture.addAll(deployTaskFuture);
-        CompletableFuture.allOf(deployCoordinatorFuture.toArray(new CompletableFuture[deployCoordinatorFuture.size()]))
-            .whenComplete((v, t) -> {
-                pipeline.updatePipelineState(PipelineState.DEPLOYING, PipelineState.RUNNING);
-            });
+    private void handlePipelineStateUpdateError(SubPlan pipeline, PipelineState targetState) {
+        if (PipelineState.CANCELING.equals(pipeline.getPipelineState().get()) ||
+            PipelineState.CANCELED.equals(pipeline.getPipelineState().get())) {
+            // may be canceled
+            LOGGER.info(String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
+                pipeline.getPipelineState().get(), targetState));
+        } else {
+            throw new JobException(
+                String.format("%s turn to a unexpected state: %s, stop scheduler job", pipeline.getPipelineFullName(),
+                    pipeline.getPipelineState().get()));
+        }
+    }
+
+    private void handleTaskStateUpdateError(PhysicalVertex task, ExecutionState targetState) {
+        if (ExecutionState.CANCELING.equals(task.getExecutionState().get()) ||
+            ExecutionState.CANCELED.equals(task.getExecutionState().get())) {
+            LOGGER.info(String.format("%s be canceled, skip %s this task.", task.getTaskFullName(), targetState));
+        } else {
+            throw new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job.",
+                task.getTaskFullName(), task.getExecutionState().get()));
+        }
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index 2a40f871b..4410966b1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -18,14 +18,16 @@
 package org.apache.seatunnel.engine.server.serializable;
 
 import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
 import org.apache.seatunnel.engine.server.operation.CheckpointAckOperation;
 import org.apache.seatunnel.engine.server.operation.CheckpointFinishedOperation;
 import org.apache.seatunnel.engine.server.operation.CheckpointTriggerOperation;
-import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
+import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
 import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
 import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
 import org.apache.seatunnel.engine.server.operation.TaskCompletedOperation;
 import org.apache.seatunnel.engine.server.operation.WaitForJobCompleteOperation;
+import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
 import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -42,6 +44,7 @@ import com.hazelcast.spi.annotation.PrivateApi;
 public final class OperationDataSerializerHook implements DataSerializerHook {
     public static final int PRINT_MESSAGE_OPERATOR = 0;
     public static final int SUBMIT_OPERATOR = 1;
+
     public static final int DEPLOY_TASK_OPERATOR = 2;
 
     public static final int TASK_COMPLETED_OPERATOR = 3;
@@ -52,6 +55,8 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
     public static final int CHECKPOINT_ACK_OPERATOR = 6;
 
     public static final int CHECKPOINT_FINISHED_OPERATOR = 7;
+    public static final int CANCEL_JOB_OPERATOR = 8;
+    public static final int GET_JOB_STATUS_OPERATOR = 9;
 
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
@@ -85,6 +90,10 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
                     return new WaitForJobCompleteOperation();
                 case CHECKPOINT_TRIGGER_OPERATOR:
                     return new CheckpointTriggerOperation();
+                case CANCEL_JOB_OPERATOR:
+                    return new CancelJobOperation();
+                case GET_JOB_STATUS_OPERATOR:
+                    return new GetJobStatusOperation();
                 case CHECKPOINT_ACK_OPERATOR:
                     return new CheckpointAckOperation();
                 case CHECKPOINT_FINISHED_OPERATOR:
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 059325a36..71c8f2c99 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConsta
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.task.Progress;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
+import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
+import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkUnregisterOperation;
 import org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
@@ -56,6 +58,10 @@ public class TaskDataSerializerHook implements DataSerializerHook {
 
     public static final int CLOSE_REQUEST_TYPE = 10;
 
+    public static final int DEPLOY_TASK_OPERATOR = 11;
+
+    public static final int CANCEL_TASK_OPERATOR = 12;
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
             SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
             SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -96,6 +102,10 @@ public class TaskDataSerializerHook implements DataSerializerHook {
                     return new Progress();
                 case  CLOSE_REQUEST_TYPE:
                     return new CloseRequestOperation();
+                case DEPLOY_TASK_OPERATOR:
+                    return new DeployTaskOperation();
+                case CANCEL_TASK_OPERATOR:
+                    return new CancelTaskOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CancelTaskOperation.java
similarity index 67%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CancelTaskOperation.java
index ee3783352..37f45c17b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CancelTaskOperation.java
@@ -15,12 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.task.operation.source;
+package org.apache.seatunnel.engine.server.task.operation;
 
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask;
 
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
@@ -29,24 +27,35 @@ import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
 
-public class CloseRequestOperation extends Operation implements IdentifiedDataSerializable {
+/**
+ * This operation is only to notice the {@link org.apache.seatunnel.engine.server.TaskExecutionService} to cancel the
+ * task. After the final task is cancelled, the {@link org.apache.seatunnel.engine.server.TaskExecutionService} will
+ * notified JobMaster
+ */
+public class CancelTaskOperation extends Operation implements IdentifiedDataSerializable {
+    private long taskGroupId;
 
-    private TaskLocation readerLocation;
+    public CancelTaskOperation() {
+    }
 
-    public CloseRequestOperation() {
+    public CancelTaskOperation(long taskGroupId) {
+        this.taskGroupId = taskGroupId;
     }
 
-    public CloseRequestOperation(TaskLocation readerLocation) {
-        this.readerLocation = readerLocation;
+    @Override
+    public int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return TaskDataSerializerHook.CANCEL_TASK_OPERATOR;
     }
 
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
-        SourceSeaTunnelTask<?, ?> task =
-                server.getTaskExecutionService().getExecutionContext(readerLocation.getTaskGroupID())
-                        .getTaskGroup().getTask(readerLocation.getTaskID());
-        task.close();
+        server.getTaskExecutionService().cancelTaskGroup(taskGroupId);
     }
 
     @Override
@@ -57,22 +66,12 @@ public class CloseRequestOperation extends Operation implements IdentifiedDataSe
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         super.writeInternal(out);
-        readerLocation.writeData(out);
+        out.writeLong(taskGroupId);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         super.readInternal(in);
-        readerLocation.readData(in);
-    }
-
-    @Override
-    public int getFactoryId() {
-        return TaskDataSerializerHook.FACTORY_ID;
-    }
-
-    @Override
-    public int getClassId() {
-        return TaskDataSerializerHook.CLOSE_REQUEST_TYPE;
+        taskGroupId = in.readLong();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
similarity index 88%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
index 4c48060b4..1667caf4d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.operation;
+package org.apache.seatunnel.engine.server.task.operation;
 
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
+import org.apache.seatunnel.engine.server.operation.AsyncOperation;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 
 import com.hazelcast.internal.nio.IOUtil;
 import com.hazelcast.internal.serialization.Data;
@@ -47,7 +48,7 @@ public class DeployTaskOperation extends AsyncOperation {
 
     @Override
     public int getClassId() {
-        return OperationDataSerializerHook.DEPLOY_TASK_OPERATOR;
+        return TaskDataSerializerHook.DEPLOY_TASK_OPERATOR;
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index 47a7e01d9..8fba46f6f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -18,6 +18,8 @@
 package org.apache.seatunnel.engine.server.task.operation.source;
 
 import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -47,9 +49,14 @@ public class AssignSplitOperation<SplitT extends SourceSplit> extends Operation
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
-        SourceSeaTunnelTask<?, SplitT> task =
-                server.getTaskExecutionService().getExecutionContext(taskID.getTaskGroupID()).getTaskGroup().getTask(taskID.getTaskID());
-        task.receivedSourceSplit(splits);
+        RetryUtils.retryWithException(() -> {
+            SourceSeaTunnelTask<?, SplitT> task =
+                server.getTaskExecutionService().getExecutionContext(taskID.getTaskGroupID()).getTaskGroup()
+                    .getTask(taskID.getTaskID());
+            task.receivedSourceSplit(splits);
+            return null;
+        }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+            exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
index ee3783352..18537e1f4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.server.task.operation.source;
 
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -43,10 +45,14 @@ public class CloseRequestOperation extends Operation implements IdentifiedDataSe
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
-        SourceSeaTunnelTask<?, ?> task =
+        RetryUtils.retryWithException(() -> {
+            SourceSeaTunnelTask<?, ?> task =
                 server.getTaskExecutionService().getExecutionContext(readerLocation.getTaskGroupID())
-                        .getTaskGroup().getTask(readerLocation.getTaskID());
-        task.close();
+                    .getTaskGroup().getTask(readerLocation.getTaskID());
+            task.close();
+            return null;
+        }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+            exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index 67272c6d9..1d6edefd5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.server.task.operation.source;
 
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -46,10 +48,15 @@ public class RequestSplitOperation extends Operation implements IdentifiedDataSe
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
-        SourceSplitEnumeratorTask<?> task =
+
+        RetryUtils.retryWithException(() -> {
+            SourceSplitEnumeratorTask<?> task =
                 server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID())
-                        .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
-        task.requestSplit(taskID.getTaskID());
+                    .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
+            task.requestSplit(taskID.getTaskID());
+            return null;
+        }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+            exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index b8cabcc16..626ff1911 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.server.task.operation.source;
 
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -45,10 +47,14 @@ public class SourceNoMoreElementOperation extends Operation implements Identifie
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
-        SourceSplitEnumeratorTask<?> task =
+        RetryUtils.retryWithException(() -> {
+            SourceSplitEnumeratorTask<?> task =
                 server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID())
-                        .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
-        task.readerFinished(currentTaskID.getTaskID());
+                    .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
+            task.readerFinished(currentTaskID.getTaskID());
+            return null;
+        }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+            exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 5c51a3365..a36bc4871 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -77,7 +77,6 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
 
         JobConfig config = new JobConfig();
         config.setName("test");
-        config.setMode(JobMode.BATCH);
 
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
             nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
@@ -115,7 +114,6 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
 
         JobConfig config = new JobConfig();
         config.setName("test");
-        config.setMode(JobMode.BATCH);
 
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
             nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());