You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/19 03:53:24 UTC

[incubator-seatunnel] branch st-engine updated: [ST-Engine] Add wait for job complete feture (#2413)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 8d32d1799 [ST-Engine] Add wait for job complete feture (#2413)
8d32d1799 is described below

commit 8d32d179981cc8de87af981ade28558cd44aeca5
Author: Eric <ga...@gmail.com>
AuthorDate: Fri Aug 19 11:53:17 2022 +0800

    [ST-Engine] Add wait for job complete feture (#2413)
    
    * Add SeaTunnel Engine ConfigProvider and seatunnel-seatunnel-starter
    
    * add source file to licenserc.yaml
    
    * fix checkstyle
    
    * fix error
    
    * tmp
    
    * tmp
    
    * Update PhysicalPlan to support scheduler by pipeline
    
    * remove init in run
    
    * tmp
    
    * complete scheduler
    
    * optimize scheduler
    
    * fix checkstyle
    
    * fix checkstyle
    
    * Add wait for job complete in JobProxy
    
    * Add wait for job complete in JobProxy
    
    * add license header
    
    fix style
    
    optimize code
    
    * fir merge error
    
    * fix test error
    
    * fix ci error
    
    optimize ci
    
    * fix code conflicts
    
    * fix review error
---
 .github/workflows/backend.yml                      |   2 +
 .github/workflows/engine_backend.yml               |   2 +
 .licenserc.yaml                                    |   1 +
 LICENSE                                            |   2 +
 .../core/starter/seatunnel/SeaTunnelStarter.java   |  10 +-
 .../seatunnel/engine/client/SeaTunnelClient.java   |   2 +
 .../engine/client/SeaTunnelClientInstance.java     |   2 +
 .../engine/client/SeaTunnelHazelcastClient.java    |  51 ++++-
 .../engine/client/{ => job}/JobClient.java         |   3 +-
 .../engine/client/{ => job}/JobConfigParser.java   |   6 +-
 .../client/{ => job}/JobExecutionEnvironment.java  |  26 +--
 .../engine/client/{ => job}/JobProxy.java          |  39 +++-
 .../engine/client/JobConfigParserTest.java         |   1 +
 .../engine/client/LogicalDagGeneratorTest.java     |   4 +-
 .../engine/client/SeaTunnelClientTest.java         |  24 ++-
 ...leFuture.java => PassiveCompletableFuture.java} |  19 +-
 .../org/apache/seatunnel/engine/core/job/Job.java  |   2 +
 .../protocol/codec/SeaTunnelPrintMessageCodec.java |   3 -
 .../protocol/codec/SeaTunnelSubmitJobCodec.java    |  32 +++-
 ....java => SeaTunnelWaitForJobCompleteCodec.java} |  54 +++---
 .../SeaTunnelEngine.yaml                           |  26 +++
 .../seatunnel/engine/server/SeaTunnelServer.java   |  36 +++-
 .../engine/server/TaskExecutionService.java        |   9 +-
 .../engine/server/dag/physical/PhysicalPlan.java   |  18 +-
 .../server/dag/physical/PhysicalPlanGenerator.java | 205 +++++++++++----------
 .../engine/server/dag/physical/PhysicalVertex.java |  66 ++++---
 .../engine/server/dag/physical/SubPlan.java        |  10 +-
 .../seatunnel/engine/server/master/JobMaster.java  |  35 ++--
 ...eration.java => AbstractJobAsyncOperation.java} |  31 +---
 .../engine/server/operation/AsyncOperation.java    |   6 +-
 .../operation/CheckpointTriggerOperation.java      |   4 +-
 .../server/operation/DeployTaskOperation.java      |  10 +-
 .../server/operation/SubmitJobOperation.java       |  12 +-
 .../operation/WaitForJobCompleteOperation.java}    |  29 +--
 .../task/SeaTunnelMessageTaskFactoryProvider.java  |   9 +-
 .../engine/server/protocol/task/SubmitJobTask.java |   5 +-
 ...mitJobTask.java => WaitForJobCompleteTask.java} |  21 +--
 .../serializable/OperationDataSerializerHook.java  |   6 +-
 .../seatunnel/engine/server/dag/TaskTest.java      |   7 +-
 39 files changed, 501 insertions(+), 329 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 0f681028c..0bc67c0a0 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -24,6 +24,8 @@ on:
       - '**/*.md'
       - 'seatunnel-ui/**'
       - 'seatunnel-engine/**'
+      - 'seatunnel-engine/seatunnel-engine-e2e/**'
+      - 'seatunnel-examples/seatunnel-engine-examples/**'
 
 concurrency:
   group: backend-${{ github.event.pull_request.number || github.ref }}
diff --git a/.github/workflows/engine_backend.yml b/.github/workflows/engine_backend.yml
index 7bc593944..5dcca74ad 100644
--- a/.github/workflows/engine_backend.yml
+++ b/.github/workflows/engine_backend.yml
@@ -21,6 +21,8 @@ on:
   pull_request:
     paths:
       - 'seatunnel-engine/**'
+      - 'seatunnel-engine/seatunnel-engine-e2e/**'
+      - 'seatunnel-examples/seatunnel-engine-examples/**'
 
 concurrency:
   group: backend-${{ github.event.pull_request.number || github.ref }}
diff --git a/.licenserc.yaml b/.licenserc.yaml
index a105be8bc..fd6bd426a 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -46,5 +46,6 @@ header:
     - 'seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java'
     - 'seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java'
     - 'seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java'
+    - 'seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java'
 
   comment: on-failure
diff --git a/LICENSE b/LICENSE
index eb58900c3..f7731139d 100644
--- a/LICENSE
+++ b/LICENSE
@@ -220,10 +220,12 @@ generate_client_protocol.sh
 seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java                          from https://github.com/hazelcast/hazelcast
 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java   from https://github.com/hazelcast/hazelcast
 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java                     from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AbstractJobAsyncOperation.java          from https://github.com/hazelcast/hazelcast
 seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java                        from https://github.com/hazelcast/hazelcast
 seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfigSections.java               from https://github.com/hazelcast/hazelcast
 seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java            from https://github.com/hazelcast/hazelcast
 seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java                                    from https://github.com/apache/flink
 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java                     from https://github.com/apache/flink
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java               from https://github.com/hazelcast/hazelcast
 
 
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
index 7d2238421..90b12b48e 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
+++ b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
 import org.apache.seatunnel.core.starter.utils.FileUtils;
-import org.apache.seatunnel.engine.client.JobExecutionEnvironment;
-import org.apache.seatunnel.engine.client.JobProxy;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.JobProxy;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 
@@ -45,12 +45,12 @@ public class SeaTunnelStarter {
         SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
         JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig);
 
+        JobProxy jobProxy;
         try {
-            JobProxy jobProxy = jobExecutionEnv.execute();
+            jobProxy = jobExecutionEnv.execute();
+            jobProxy.waitForJobComplete();
         } catch (ExecutionException | InterruptedException e) {
             throw new RuntimeException(e);
         }
-
-        // TODO wait for job complete and then exit
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index e182c527e..d89b24570 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.client;
 
+import org.apache.seatunnel.engine.client.job.JobClient;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
 
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 62590cbef..ccb828119 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.client;
 
+import org.apache.seatunnel.engine.client.job.JobClient;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 
 public interface SeaTunnelClientInstance {
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
index c22faeb1c..3700378aa 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
@@ -18,9 +18,11 @@
 package org.apache.seatunnel.engine.client;
 
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 
 import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.client.impl.ClientDelegatingFuture;
+import com.hazelcast.client.impl.clientside.ClientMessageDecoder;
 import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
 import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
 import com.hazelcast.client.impl.protocol.ClientMessage;
@@ -32,7 +34,6 @@ import com.hazelcast.logging.ILogger;
 import lombok.NonNull;
 
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
 public class SeaTunnelHazelcastClient {
@@ -76,8 +77,9 @@ public class SeaTunnelHazelcastClient {
         return requestAndDecodeResponse(null, request, decoder);
     }
 
-    public <S> S requestAndDecodeResponse(UUID uuid, ClientMessage request,
-                                          Function<ClientMessage, Object> decoder) {
+    public <S> S requestAndDecodeResponse(@NonNull UUID uuid,
+                                          @NonNull ClientMessage request,
+                                          @NonNull Function<ClientMessage, Object> decoder) {
         ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
         try {
             ClientMessage response = invocation.invoke().get();
@@ -90,21 +92,52 @@ public class SeaTunnelHazelcastClient {
         }
     }
 
-    public NonCompletableFuture<Void> requestAndGetCompletableFuture(UUID uuid, ClientMessage request) {
+    public <T> PassiveCompletableFuture<T> requestAndGetCompletableFuture(@NonNull UUID uuid,
+                                                                          @NonNull ClientMessage request,
+                                                                          @NonNull
+                                                                          ClientMessageDecoder clientMessageDecoder) {
         ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
         try {
-            return new NonCompletableFuture<>(invocation.invoke().thenApply(c -> null));
+
+            return new PassiveCompletableFuture<T>(new ClientDelegatingFuture<>(
+                invocation.invoke(),
+                serializationService,
+                clientMessageDecoder
+            ));
         } catch (Throwable t) {
             throw ExceptionUtil.rethrow(t);
         }
     }
 
-    public NonCompletableFuture<Void> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request) {
+    public <T> PassiveCompletableFuture<T> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request,
+                                                                                  @NonNull
+                                                                                  ClientMessageDecoder clientMessageDecoder) {
         UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
-        return requestAndGetCompletableFuture(masterUuid, request);
+        return requestAndGetCompletableFuture(masterUuid, request, clientMessageDecoder);
+    }
+
+    public <T> PassiveCompletableFuture<T> requestOnAnyMemberAndGetCompletableFuture(@NonNull ClientMessage request,
+                                                                                     @NonNull
+                                                                                     ClientMessageDecoder clientMessageDecoder) {
+        return requestAndGetCompletableFuture(null, request, clientMessageDecoder);
     }
 
-    public CompletableFuture<Void> requestOnAnyMemberAndGetCompletableFuture(@NonNull ClientMessage request) {
+    public PassiveCompletableFuture<Void> requestAndGetCompletableFuture(@NonNull UUID uuid,
+                                                                         @NonNull ClientMessage request) {
+        ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
+        try {
+            return new PassiveCompletableFuture(invocation.invoke().thenApply(r -> null));
+        } catch (Throwable t) {
+            throw ExceptionUtil.rethrow(t);
+        }
+    }
+
+    public PassiveCompletableFuture<Void> requestOnAnyMemberAndGetCompletableFuture(@NonNull ClientMessage request) {
         return requestAndGetCompletableFuture(null, request);
     }
+
+    public PassiveCompletableFuture<Void> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request) {
+        UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
+        return requestAndGetCompletableFuture(masterUuid, request);
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
similarity index 92%
copy from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java
copy to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index 5eb391b31..964a7891e 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.client.job;
 
+import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
similarity index 98%
rename from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
rename to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
index 8f37bf96f..4d1850bcf 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.client.job;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.apis.base.plugin.Plugin;
 import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.engine.client.ConnectorInstanceLoader;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -75,7 +76,8 @@ public class JobConfigParser {
 
     private JobConfig jobConfig;
 
-    protected JobConfigParser(@NonNull String jobDefineFilePath, @NonNull IdGenerator idGenerator,
+    public JobConfigParser(@NonNull String jobDefineFilePath,
+                              @NonNull IdGenerator idGenerator,
                               @NonNull JobConfig jobConfig) {
         this.jobDefineFilePath = jobDefineFilePath;
         this.idGenerator = idGenerator;
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
similarity index 79%
rename from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
rename to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 45bf325fb..5c83874e7 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -1,13 +1,12 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,9 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.client.job;
 
 import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -80,10 +80,10 @@ public class JobExecutionEnvironment {
         JobClient jobClient = new JobClient(seaTunnelHazelcastClient);
         initSeaTunnelContext();
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(
-                jobClient.getNewJobId(),
-                seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
-                jobConfig,
-                jarUrls);
+            jobClient.getNewJobId(),
+            seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
+            jobConfig,
+            jarUrls);
 
         JobProxy jobProxy = jobClient.createJobProxy(jobImmutableInformation);
         jobProxy.submitJob();
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java
similarity index 57%
rename from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java
rename to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java
index 84b80770d..dcd0235e2 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java
@@ -15,12 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.client.job;
 
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.Job;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
 
 import com.hazelcast.client.impl.protocol.ClientMessage;
 import com.hazelcast.logging.ILogger;
@@ -47,10 +51,37 @@ public class JobProxy implements Job {
 
     @Override
     public void submitJob() throws ExecutionException, InterruptedException {
-        ClientMessage request = SeaTunnelSubmitJobCodec.encodeRequest(
+        ClientMessage request = SeaTunnelSubmitJobCodec.encodeRequest(jobImmutableInformation.getJobId(),
             seaTunnelHazelcastClient.getSerializationService().toData(jobImmutableInformation));
-        NonCompletableFuture<Void> submitJobFuture =
+        PassiveCompletableFuture<Void> submitJobFuture =
             seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);
         submitJobFuture.get();
     }
+
+    @Override
+    public void waitForJobComplete() {
+        PassiveCompletableFuture<JobStatus> jobFuture =
+            seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
+                SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobImmutableInformation.getJobId()),
+                response -> {
+                    return JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(response)];
+                });
+
+        jobFuture.whenComplete((v, t) -> {
+            if (null != t) {
+                LOGGER.info(String.format("Job %s (%s) end with state %s, and throw Exception: %s",
+                    jobImmutableInformation.getJobId(),
+                    jobImmutableInformation.getJobConfig().getName(),
+                    v,
+                    ExceptionUtils.getMessage(t)));
+            } else {
+                LOGGER.info(String.format("Job %s (%s) end with state %s",
+                    jobImmutableInformation.getJobId(),
+                    jobImmutableInformation.getJobConfig().getName(),
+                    v));
+            }
+        });
+
+        jobFuture.join();
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
index 4534c0a7b..919b492d5 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.client;
 
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.client.job.JobConfigParser;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index a73579fd4..e44328148 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.client;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.engine.client.job.JobConfigParser;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -48,7 +49,8 @@ public class LogicalDagGeneratorTest {
         jobConfig.setName("fake_to_file");
 
         IdGenerator idGenerator = new IdGenerator();
-        ImmutablePair<List<Action>, Set<URL>> immutablePair = new JobConfigParser(filePath, idGenerator, new JobConfig()).parse();
+        ImmutablePair<List<Action>, Set<URL>> immutablePair =
+            new JobConfigParser(filePath, idGenerator, new JobConfig()).parse();
 
         LogicalDagGenerator logicalDagGenerator =
             new LogicalDagGenerator(immutablePair.getLeft(), jobConfig, idGenerator);
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index f641a5bac..6a1c992a5 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.engine.client;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.JobProxy;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig;
@@ -37,6 +39,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.util.concurrent.ExecutionException;
+
 @SuppressWarnings("checkstyle:MagicNumber")
 @RunWith(JUnit4.class)
 public class SeaTunnelClientTest {
@@ -47,8 +51,8 @@ public class SeaTunnelClientTest {
     public void beforeClass() throws Exception {
         SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
         instance = HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
-                Thread.currentThread().getName(),
-                new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
+            Thread.currentThread().getName(),
+            new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
     }
 
     @Test
@@ -74,13 +78,15 @@ public class SeaTunnelClientTest {
         SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
         JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
 
-        //        JobProxy jobProxy;
-        //        try {
-        //            jobProxy = jobExecutionEnv.execute();
-        //            Assert.assertNotNull(jobProxy);
-        //        } catch (ExecutionException | InterruptedException e) {
-        //            throw new RuntimeException(e);
-        //        }
+        JobProxy jobProxy = null;
+        try {
+            jobProxy = jobExecutionEnv.execute();
+            jobProxy.waitForJobComplete();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+            // TODO throw exception after fix sink.setTypeInfo in ConnectorInstanceLoader
+            //            throw new RuntimeException(e);
+        }
     }
 
     @After
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
similarity index 73%
rename from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
rename to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
index a3c6d2380..0a6c519d7 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
@@ -1,12 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,12 +21,12 @@ import java.util.concurrent.CompletableFuture;
 /**
  * A future which prevents completion by outside caller
  */
-public class NonCompletableFuture<T> extends CompletableFuture<T> {
+public class PassiveCompletableFuture<T> extends CompletableFuture<T> {
 
-    public NonCompletableFuture() {
+    public PassiveCompletableFuture() {
     }
 
-    public NonCompletableFuture(CompletableFuture<T> chainedFuture) {
+    public PassiveCompletableFuture(CompletableFuture<T> chainedFuture) {
         chainedFuture.whenComplete((r, t) -> {
             if (t != null) {
                 internalCompleteExceptionally(t);
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index 90b74350d..5ad043ed0 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -23,4 +23,6 @@ public interface Job {
     long getJobId();
 
     void submitJob() throws ExecutionException, InterruptedException;
+
+    void waitForJobComplete();
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
index ec10e3d8a..3d3f0b225 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
@@ -61,9 +61,6 @@ public final class SeaTunnelPrintMessageCodec {
         return clientMessage;
     }
 
-    /**
-     *
-     */
     public static java.lang.String decodeRequest(ClientMessage clientMessage) {
         ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
         //empty initial frame
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSubmitJobCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSubmitJobCodec.java
index 36ec48de4..3762d21ca 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSubmitJobCodec.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSubmitJobCodec.java
@@ -23,7 +23,10 @@ import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET
 import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
 import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
 import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong;
 import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong;
 
 import com.hazelcast.client.impl.protocol.ClientMessage;
 import com.hazelcast.client.impl.protocol.Generated;
@@ -36,19 +39,28 @@ import com.hazelcast.client.impl.protocol.codec.builtin.DataCodec;
  * and regenerate it.
  */
 
-@Generated("f0fdf747fe01901765dedbe5a527bb0f")
+@Generated("ebea440b36898863958c102f47603fee")
 public final class SeaTunnelSubmitJobCodec {
     //hex: 0xDE0200
     public static final int REQUEST_MESSAGE_TYPE = 14549504;
     //hex: 0xDE0201
     public static final int RESPONSE_MESSAGE_TYPE = 14549505;
-    private static final int REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+    private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+    private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
     private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
 
     private SeaTunnelSubmitJobCodec() {
     }
 
-    public static ClientMessage encodeRequest(com.hazelcast.internal.serialization.Data jobImmutableInformation) {
+    public static class RequestParameters {
+
+        public long jobId;
+
+        public com.hazelcast.internal.serialization.Data jobImmutableInformation;
+    }
+
+    public static ClientMessage encodeRequest(long jobId,
+                                              com.hazelcast.internal.serialization.Data jobImmutableInformation) {
         ClientMessage clientMessage = ClientMessage.createForEncode();
         clientMessage.setRetryable(false);
         clientMessage.setOperationName("SeaTunnel.SubmitJob");
@@ -56,19 +68,19 @@ public final class SeaTunnelSubmitJobCodec {
             new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
         encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
         encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+        encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
         clientMessage.add(initialFrame);
         DataCodec.encode(clientMessage, jobImmutableInformation);
         return clientMessage;
     }
 
-    /**
-     *
-     */
-    public static com.hazelcast.internal.serialization.Data decodeRequest(ClientMessage clientMessage) {
+    public static SeaTunnelSubmitJobCodec.RequestParameters decodeRequest(ClientMessage clientMessage) {
         ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
-        //empty initial frame
-        iterator.next();
-        return DataCodec.decode(iterator);
+        RequestParameters request = new RequestParameters();
+        ClientMessage.Frame initialFrame = iterator.next();
+        request.jobId = decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
+        request.jobImmutableInformation = DataCodec.decode(iterator);
+        return request;
     }
 
     public static ClientMessage encodeResponse() {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelWaitForJobCompleteCodec.java
similarity index 60%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelWaitForJobCompleteCodec.java
index ec10e3d8a..eee977992 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelWaitForJobCompleteCodec.java
@@ -23,11 +23,14 @@ import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET
 import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
 import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
 import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeInt;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong;
 import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong;
 
 import com.hazelcast.client.impl.protocol.ClientMessage;
 import com.hazelcast.client.impl.protocol.Generated;
-import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;
 
 /*
  * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
@@ -36,59 +39,56 @@ import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;
  * and regenerate it.
  */
 
-@Generated("c0a6d0c9d7eb912e8b10861931a0a695")
-public final class SeaTunnelPrintMessageCodec {
-    //hex: 0xDE0100
-    public static final int REQUEST_MESSAGE_TYPE = 14549248;
-    //hex: 0xDE0101
-    public static final int RESPONSE_MESSAGE_TYPE = 14549249;
-    private static final int REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
-    private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+@Generated("45a79cdd8ea874bd3c99b414d5f7639f")
+public final class SeaTunnelWaitForJobCompleteCodec {
+    //hex: 0xDE0300
+    public static final int REQUEST_MESSAGE_TYPE = 14549760;
+    //hex: 0xDE0301
+    public static final int RESPONSE_MESSAGE_TYPE = 14549761;
+    private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+    private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
+    private static final int RESPONSE_JOB_STATUS_FIELD_OFFSET = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+    private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_JOB_STATUS_FIELD_OFFSET + INT_SIZE_IN_BYTES;
 
-    private SeaTunnelPrintMessageCodec() {
+    private SeaTunnelWaitForJobCompleteCodec() {
     }
 
-    public static ClientMessage encodeRequest(java.lang.String message) {
+    public static ClientMessage encodeRequest(long jobId) {
         ClientMessage clientMessage = ClientMessage.createForEncode();
-        clientMessage.setRetryable(false);
-        clientMessage.setOperationName("SeaTunnel.PrintMessage");
+        clientMessage.setRetryable(true);
+        clientMessage.setOperationName("SeaTunnel.WaitForJobComplete");
         ClientMessage.Frame initialFrame =
             new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
         encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
         encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+        encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
         clientMessage.add(initialFrame);
-        StringCodec.encode(clientMessage, message);
         return clientMessage;
     }
 
-    /**
-     *
-     */
-    public static java.lang.String decodeRequest(ClientMessage clientMessage) {
+    public static long decodeRequest(ClientMessage clientMessage) {
         ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
-        //empty initial frame
-        iterator.next();
-        return StringCodec.decode(iterator);
+        ClientMessage.Frame initialFrame = iterator.next();
+        return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
     }
 
-    public static ClientMessage encodeResponse(java.lang.String response) {
+    public static ClientMessage encodeResponse(int jobStatus) {
         ClientMessage clientMessage = ClientMessage.createForEncode();
         ClientMessage.Frame initialFrame =
             new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
         encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
+        encodeInt(initialFrame.content, RESPONSE_JOB_STATUS_FIELD_OFFSET, jobStatus);
         clientMessage.add(initialFrame);
 
-        StringCodec.encode(clientMessage, response);
         return clientMessage;
     }
 
     /**
      *
      */
-    public static java.lang.String decodeResponse(ClientMessage clientMessage) {
+    public static int decodeResponse(ClientMessage clientMessage) {
         ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
-        //empty initial frame
-        iterator.next();
-        return StringCodec.decode(iterator);
+        ClientMessage.Frame initialFrame = iterator.next();
+        return decodeInt(initialFrame.content, RESPONSE_JOB_STATUS_FIELD_OFFSET);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index fb4b29e2b..19f616d29 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -48,9 +48,35 @@ methods:
       retryable: false
       partitionIdentifier: -1
       params:
+        - name: jobId
+          type: long
+          nullable: false
+          since: 2.0
+          doc: ''
         - name: jobImmutableInformation
           type: Data
           nullable: false
           since: 2.0
           doc: ''
     response: {}
+
+  - id: 3
+    name: waitForJobComplete
+    since: 2.0
+    doc: ''
+    request:
+      retryable: true
+      partitionIdentifier: -1
+      params:
+        - name: jobId
+          type: long
+          nullable: false
+          since: 2.0
+          doc: ''
+    response:
+      params:
+        - name: jobStatus
+          type: int
+          nullable: false
+          since: 2.0
+          doc: ''
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 09a1454e9..3b0cf8dd6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -17,8 +17,10 @@
 
 package org.apache.seatunnel.engine.server;
 
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.master.JobMaster;
 
 import com.hazelcast.instance.impl.Node;
@@ -35,8 +37,10 @@ import com.hazelcast.spi.impl.operationservice.LiveOperations;
 import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
 import lombok.NonNull;
 
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -54,6 +58,8 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
 
     private final SeaTunnelConfig seaTunnelConfig;
 
+    private Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap<>();
+
     public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
         this.logger = node.getLogger(getClass());
         this.liveOperationRegistry = new LiveOperationRegistry();
@@ -116,23 +122,39 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
     /**
      * call by client to submit job
      */
-    @SuppressWarnings("checkstyle:MagicNumber")
-    public NonCompletableFuture<Void> submitJob(Data jobImmutableInformation) {
+    public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInformation) {
         CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
         JobMaster jobMaster = new JobMaster(jobImmutableInformation, this.nodeEngine, executorService);
         executorService.submit(() -> {
             try {
                 jobMaster.init();
-                jobMaster.run();
+                runningJobMasterMap.put(jobId, jobMaster);
             } catch (Throwable e) {
-                LOGGER.severe("submit job error: " + e.getMessage());
+                LOGGER.severe(String.format("submit job %s error %s ", jobId, ExceptionUtils.getMessage(e)));
                 voidCompletableFuture.completeExceptionally(e);
             } finally {
                 // We specify that when init is complete, the submitJob is complete
                 voidCompletableFuture.complete(null);
             }
-            //jobMaster.run();
+
+            try {
+                jobMaster.run();
+            } finally {
+                runningJobMasterMap.remove(jobId);
+            }
         });
-        return new NonCompletableFuture<>(voidCompletableFuture);
+        return new PassiveCompletableFuture(voidCompletableFuture);
+    }
+
+    public PassiveCompletableFuture<JobStatus> waitForJobComplete(long jobId) {
+        JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
+        if (runningJobMaster == null) {
+            // TODO Get Job Status from JobHistoryStorage
+            CompletableFuture<JobStatus> future = new CompletableFuture<>();
+            future.complete(JobStatus.FINISHED);
+            return new PassiveCompletableFuture<>(future);
+        } else {
+            return runningJobMaster.getJobMasterCompleteFuture();
+        }
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 2f54e7f41..1b1bcbb21 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -25,7 +25,7 @@ import static java.util.stream.Collectors.partitioningBy;
 import static java.util.stream.Collectors.toList;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.ProgressState;
 import org.apache.seatunnel.engine.server.execution.Task;
@@ -117,7 +117,7 @@ public class TaskExecutionService {
         uncheckRun(startedLatch::await);
     }
 
-    public NonCompletableFuture<TaskExecutionState> deployTask(
+    public PassiveCompletableFuture<TaskExecutionState> deployTask(
         @NonNull Data taskImmutableInformation
     ) {
         CompletableFuture<TaskExecutionState> resultFuture = new CompletableFuture<>();
@@ -134,7 +134,8 @@ public class TaskExecutionService {
 
             // TODO We need add a method to cancel task
             CompletableFuture<Void> cancellationFuture = new CompletableFuture<>();
-            TaskGroupExecutionTracker executionTracker = new TaskGroupExecutionTracker(cancellationFuture, taskGroup, resultFuture);
+            TaskGroupExecutionTracker executionTracker =
+                new TaskGroupExecutionTracker(cancellationFuture, taskGroup, resultFuture);
             ConcurrentMap<Long, TaskExecutionContext> taskExecutionContextMap = new ConcurrentHashMap<>();
             final Map<Boolean, List<Task>> byCooperation =
                 tasks.stream()
@@ -152,7 +153,7 @@ public class TaskExecutionService {
             logger.severe(ExceptionUtils.getMessage(t));
             resultFuture.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, t));
         }
-        return new NonCompletableFuture<>(resultFuture);
+        return new PassiveCompletableFuture<>(resultFuture);
     }
 
     private final class BlockingWorker implements Runnable {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 42464bc5d..00df7ebef 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.engine.server.dag.physical;
 
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineState;
@@ -62,13 +62,13 @@ public class PhysicalPlan {
      * in {@link org.apache.seatunnel.engine.server.scheduler.JobScheduler} whenComplete method will be called.
      */
     private final CompletableFuture<JobStatus> jobEndFuture;
-    private final NonCompletableFuture<JobStatus> nonCompletableFuture;
+    private final PassiveCompletableFuture<JobStatus> passiveCompletableFuture;
 
     /**
      * This future only can completion by the {@link SubPlan } subPlanFuture.
      * When subPlanFuture completed, this NonCompletableFuture's whenComplete method will be called.
      */
-    private final NonCompletableFuture<PipelineState>[] waitForCompleteBySubPlan;
+    private final PassiveCompletableFuture<PipelineState>[] waitForCompleteBySubPlan;
 
     private final ExecutorService executorService;
 
@@ -76,7 +76,7 @@ public class PhysicalPlan {
                         @NonNull ExecutorService executorService,
                         @NonNull JobImmutableInformation jobImmutableInformation,
                         long initializationTimestamp,
-                        @NonNull NonCompletableFuture<PipelineState>[] waitForCompleteBySubPlan) {
+                        @NonNull PassiveCompletableFuture<PipelineState>[] waitForCompleteBySubPlan) {
         this.executorService = executorService;
         this.jobImmutableInformation = jobImmutableInformation;
         stateTimestamps = new long[JobStatus.values().length];
@@ -84,7 +84,7 @@ public class PhysicalPlan {
         this.jobStatus.set(JobStatus.CREATED);
         this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
         this.jobEndFuture = new CompletableFuture<>();
-        this.nonCompletableFuture = new NonCompletableFuture<>(jobEndFuture);
+        this.passiveCompletableFuture = new PassiveCompletableFuture<>(jobEndFuture);
         this.waitForCompleteBySubPlan = waitForCompleteBySubPlan;
         this.pipelineList = pipelineList;
         if (pipelineList.isEmpty()) {
@@ -124,14 +124,14 @@ public class PhysicalPlan {
         });
     }
 
-    public NonCompletableFuture<Void> cancelJob() {
+    public PassiveCompletableFuture<Void> cancelJob() {
         CompletableFuture<Void> cancelFuture = CompletableFuture.supplyAsync(() -> {
             // TODO Implement cancel pipeline in job.
             return null;
         });
 
         cancelFuture.complete(null);
-        return new NonCompletableFuture<>(cancelFuture);
+        return new PassiveCompletableFuture<>(cancelFuture);
     }
 
     public List<SubPlan> getPipelineList() {
@@ -173,8 +173,8 @@ public class PhysicalPlan {
         }
     }
 
-    public NonCompletableFuture<JobStatus> getJobEndCompletableFuture() {
-        return this.nonCompletableFuture;
+    public PassiveCompletableFuture<JobStatus> getJobEndCompletableFuture() {
+        return this.passiveCompletableFuture;
     }
 
     public JobImmutableInformation getJobImmutableInformation() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index d2b2e1950..a2b71303d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.engine.server.dag.physical;
 
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
@@ -114,7 +114,7 @@ public class PhysicalPlanGenerator {
     public PhysicalPlan generate() {
 
         // TODO Determine which tasks do not need to be restored according to state
-        CopyOnWriteArrayList<NonCompletableFuture<PipelineState>> waitForCompleteBySubPlanList =
+        CopyOnWriteArrayList<PassiveCompletableFuture<PipelineState>> waitForCompleteBySubPlanList =
             new CopyOnWriteArrayList<>();
 
         final int totalPipelineNum = pipelines.size();
@@ -122,14 +122,14 @@ public class PhysicalPlanGenerator {
             final int pipelineId = pipeline.getId();
             final List<ExecutionEdge> edges = pipeline.getEdges();
 
-            CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList =
+            CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList =
                 new CopyOnWriteArrayList<>();
             List<SourceAction<?, ?, ?>> sources = findSourceAction(edges);
 
             List<PhysicalVertex> coordinatorVertexList =
                 getEnumeratorTask(sources, pipelineId, totalPipelineNum, waitForCompleteByPhysicalVertexList);
             coordinatorVertexList.addAll(
-                    getCommitterTask(edges, pipelineId, totalPipelineNum, waitForCompleteByPhysicalVertexList));
+                getCommitterTask(edges, pipelineId, totalPipelineNum, waitForCompleteByPhysicalVertexList));
 
             List<PhysicalVertex> physicalVertexList =
                 getSourceTask(edges, sources, pipelineId, totalPipelineNum, waitForCompleteByPhysicalVertexList);
@@ -138,24 +138,24 @@ public class PhysicalPlanGenerator {
                 getPartitionTask(edges, pipelineId, totalPipelineNum, waitForCompleteByPhysicalVertexList));
 
             CompletableFuture<PipelineState> pipelineFuture = new CompletableFuture<>();
-            waitForCompleteBySubPlanList.add(new NonCompletableFuture<>(pipelineFuture));
+            waitForCompleteBySubPlanList.add(new PassiveCompletableFuture<>(pipelineFuture));
 
             return new SubPlan(pipelineId,
-                    totalPipelineNum,
-                    initializationTimestamp,
-                    physicalVertexList,
-                    coordinatorVertexList,
-                    pipelineFuture,
-                    waitForCompleteByPhysicalVertexList.toArray(
-                            new NonCompletableFuture[waitForCompleteByPhysicalVertexList.size()]),
-                    jobImmutableInformation);
+                totalPipelineNum,
+                initializationTimestamp,
+                physicalVertexList,
+                coordinatorVertexList,
+                pipelineFuture,
+                waitForCompleteByPhysicalVertexList.toArray(
+                    new PassiveCompletableFuture[waitForCompleteByPhysicalVertexList.size()]),
+                jobImmutableInformation);
         });
 
         return new PhysicalPlan(subPlanStream.collect(Collectors.toList()),
-                executorService,
-                jobImmutableInformation,
-                initializationTimestamp,
-                waitForCompleteBySubPlanList.toArray(new NonCompletableFuture[waitForCompleteBySubPlanList.size()]));
+            executorService,
+            jobImmutableInformation,
+            initializationTimestamp,
+            waitForCompleteBySubPlanList.toArray(new PassiveCompletableFuture[waitForCompleteBySubPlanList.size()]));
     }
 
     private List<SourceAction<?, ?, ?>> findSourceAction(List<ExecutionEdge> edges) {
@@ -167,7 +167,7 @@ public class PhysicalPlanGenerator {
     private List<PhysicalVertex> getCommitterTask(List<ExecutionEdge> edges,
                                                   int pipelineIndex,
                                                   int totalPipelineNum,
-                                                  CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
+                                                  CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
         AtomicInteger atomicInteger = new AtomicInteger(-1);
         List<ExecutionEdge> collect = edges.stream().filter(s -> s.getRightVertex().getAction() instanceof SinkAction)
             .collect(Collectors.toList());
@@ -184,27 +184,27 @@ public class PhysicalPlanGenerator {
                 if (sinkAggregatedCommitter.isPresent()) {
                     long taskGroupID = idGenerator.getNextId();
                     SinkAggregatedCommitterTask<?> t =
-                            new SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(),
-                                    new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 1)), s,
-                                    sinkAggregatedCommitter.get());
+                        new SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(),
+                            new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 1)), s,
+                            sinkAggregatedCommitter.get());
                     committerTaskIDMap.put(s, new TaskLocation(taskGroupID, t.getTaskID()));
                     CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
-                    waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+                    waitForCompleteByPhysicalVertexList.add(new PassiveCompletableFuture<>(taskFuture));
 
                     return new PhysicalVertex(idGenerator.getNextId(),
-                            atomicInteger.incrementAndGet(),
-                            executorService,
-                            collect.size(),
-                            new TaskGroupDefaultImpl(taskGroupID, "SinkAggregatedCommitterTask",
-                                    Lists.newArrayList(t)),
-                            taskFuture,
-                            flakeIdGenerator,
-                            pipelineIndex,
-                            totalPipelineNum,
-                            null,
-                            jobImmutableInformation,
-                            initializationTimestamp,
-                            nodeEngine);
+                        atomicInteger.incrementAndGet(),
+                        executorService,
+                        collect.size(),
+                        new TaskGroupDefaultImpl(taskGroupID, "SinkAggregatedCommitterTask",
+                            Lists.newArrayList(t)),
+                        taskFuture,
+                        flakeIdGenerator,
+                        pipelineIndex,
+                        totalPipelineNum,
+                        null,
+                        jobImmutableInformation,
+                        initializationTimestamp,
+                        nodeEngine);
                 } else {
                     return null;
                 }
@@ -214,7 +214,7 @@ public class PhysicalPlanGenerator {
     private List<PhysicalVertex> getPartitionTask(List<ExecutionEdge> edges,
                                                   int pipelineIndex,
                                                   int totalPipelineNum,
-                                                  CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
+                                                  CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
         return edges.stream().filter(s -> s.getLeftVertex().getAction() instanceof PartitionTransformAction)
             .map(q -> (PartitionTransformAction) q.getLeftVertex().getAction())
             .map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
@@ -225,17 +225,17 @@ public class PhysicalPlanGenerator {
                     long taskGroupID = idGenerator.getNextId();
                     setFlowConfig(flow, i);
                     SeaTunnelTask seaTunnelTask = new TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
-                            new TaskLocation(taskGroupID, convertToTaskID(taskIDPrefix, i)), i, flow);
+                        new TaskLocation(taskGroupID, convertToTaskID(taskIDPrefix, i)), i, flow);
 
                     CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
-                    waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+                    waitForCompleteByPhysicalVertexList.add(new PassiveCompletableFuture<>(taskFuture));
 
                     t.add(new PhysicalVertex(idGenerator.getNextId(),
                         i,
                         executorService,
                         flow.getAction().getParallelism(),
                         new TaskGroupDefaultImpl(taskGroupID, "PartitionTransformTask",
-                                Lists.newArrayList(seaTunnelTask)),
+                            Lists.newArrayList(seaTunnelTask)),
                         taskFuture,
                         flakeIdGenerator,
                         pipelineIndex,
@@ -252,16 +252,16 @@ public class PhysicalPlanGenerator {
     private List<PhysicalVertex> getEnumeratorTask(List<SourceAction<?, ?, ?>> sources,
                                                    int pipelineIndex,
                                                    int totalPipelineNum,
-                                                   CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
+                                                   CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
         AtomicInteger atomicInteger = new AtomicInteger(-1);
 
         return sources.stream().map(s -> {
             long taskGroupID = idGenerator.getNextId();
             SourceSplitEnumeratorTask<?> t = new SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(),
-                    new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 1)), s);
+                new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 1)), s);
             enumeratorTaskIDMap.put(s, new TaskLocation(taskGroupID, t.getTaskID()));
             CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
-            waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+            waitForCompleteByPhysicalVertexList.add(new PassiveCompletableFuture<>(taskFuture));
 
             return new PhysicalVertex(idGenerator.getNextId(),
                 atomicInteger.incrementAndGet(),
@@ -283,7 +283,7 @@ public class PhysicalPlanGenerator {
                                                List<SourceAction<?, ?, ?>> sources,
                                                int pipelineIndex,
                                                int totalPipelineNum,
-                                               CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
+                                               CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
         return sources.stream()
             .map(s -> new PhysicalExecutionFlow(s, getNextWrapper(edges, s)))
             .flatMap(flow -> {
@@ -297,59 +297,60 @@ public class PhysicalPlanGenerator {
                 for (int i = 0; i < flow.getAction().getParallelism(); i++) {
                     int finalParallelismIndex = i;
                     List<SeaTunnelTask> taskList =
-                            flows.stream().map(f -> {
-                                setFlowConfig(f, finalParallelismIndex);
-                                long taskIDPrefix = flowTaskIDPrefixMap.computeIfAbsent(f.getFlowID(), id -> idGenerator.getNextId());
-                                if (f instanceof PhysicalExecutionFlow) {
-                                    return new SourceSeaTunnelTask<>(jobImmutableInformation.getJobId(),
-                                            new TaskLocation(taskGroupID,
-                                                    convertToTaskID(taskIDPrefix, finalParallelismIndex)),
-                                            finalParallelismIndex, f);
-                                } else {
-                                    return new TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
-                                            new TaskLocation(taskGroupID,
-                                                    convertToTaskID(taskIDPrefix, finalParallelismIndex)),
-                                            finalParallelismIndex, f);
-                                }
-                            }).collect(Collectors.toList());
+                        flows.stream().map(f -> {
+                            setFlowConfig(f, finalParallelismIndex);
+                            long taskIDPrefix =
+                                flowTaskIDPrefixMap.computeIfAbsent(f.getFlowID(), id -> idGenerator.getNextId());
+                            if (f instanceof PhysicalExecutionFlow) {
+                                return new SourceSeaTunnelTask<>(jobImmutableInformation.getJobId(),
+                                    new TaskLocation(taskGroupID,
+                                        convertToTaskID(taskIDPrefix, finalParallelismIndex)),
+                                    finalParallelismIndex, f);
+                            } else {
+                                return new TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
+                                    new TaskLocation(taskGroupID,
+                                        convertToTaskID(taskIDPrefix, finalParallelismIndex)),
+                                    finalParallelismIndex, f);
+                            }
+                        }).collect(Collectors.toList());
                     Set<URL> jars =
-                            taskList.stream().flatMap(task -> task.getJarsUrl().stream()).collect(Collectors.toSet());
+                        taskList.stream().flatMap(task -> task.getJarsUrl().stream()).collect(Collectors.toSet());
 
                     CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
-                    waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+                    waitForCompleteByPhysicalVertexList.add(new PassiveCompletableFuture<>(taskFuture));
 
                     // TODO We need give every task a appropriate name
                     if (taskList.stream().anyMatch(TransformSeaTunnelTask.class::isInstance)) {
                         // contains IntermediateExecutionFlow in task group
                         t.add(new PhysicalVertex(idGenerator.getNextId(),
-                                i,
-                                executorService,
-                                flow.getAction().getParallelism(),
-                                new TaskGroupWithIntermediateQueue(taskGroupID, "SourceTask",
-                                        taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
-                                taskFuture,
-                                flakeIdGenerator,
-                                pipelineIndex,
-                                totalPipelineNum,
-                                jars,
-                                jobImmutableInformation,
-                                initializationTimestamp,
-                                nodeEngine));
+                            i,
+                            executorService,
+                            flow.getAction().getParallelism(),
+                            new TaskGroupWithIntermediateQueue(taskGroupID, "SourceTask",
+                                taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
+                            taskFuture,
+                            flakeIdGenerator,
+                            pipelineIndex,
+                            totalPipelineNum,
+                            jars,
+                            jobImmutableInformation,
+                            initializationTimestamp,
+                            nodeEngine));
                     } else {
                         t.add(new PhysicalVertex(idGenerator.getNextId(),
-                                i,
-                                executorService,
-                                flow.getAction().getParallelism(),
-                                new TaskGroupDefaultImpl(taskGroupID, "SourceTask",
-                                        taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
-                                taskFuture,
-                                flakeIdGenerator,
-                                pipelineIndex,
-                                totalPipelineNum,
-                                jars,
-                                jobImmutableInformation,
-                                initializationTimestamp,
-                                nodeEngine));
+                            i,
+                            executorService,
+                            flow.getAction().getParallelism(),
+                            new TaskGroupDefaultImpl(taskGroupID, "SourceTask",
+                                taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
+                            taskFuture,
+                            flakeIdGenerator,
+                            pipelineIndex,
+                            totalPipelineNum,
+                            jars,
+                            jobImmutableInformation,
+                            initializationTimestamp,
+                            nodeEngine));
                     }
                 }
                 return t.stream();
@@ -380,14 +381,15 @@ public class PhysicalPlanGenerator {
                 flow.setConfig(config);
             } else if (flow.getAction() instanceof PartitionTransformAction) {
                 PartitionConfig config =
-                        new PartitionConfig(((PartitionTransformAction) flow.getAction()).getPartitionTransformation().getPartitionCount(),
-                                ((PartitionTransformAction) flow.getAction()).getPartitionTransformation().getTargetCount(),
-                                parallelismIndex);
+                    new PartitionConfig(
+                        ((PartitionTransformAction) flow.getAction()).getPartitionTransformation().getPartitionCount(),
+                        ((PartitionTransformAction) flow.getAction()).getPartitionTransformation().getTargetCount(),
+                        parallelismIndex);
                 flow.setConfig(config);
             }
         } else if (f instanceof IntermediateExecutionFlow) {
             ((IntermediateExecutionFlow<IntermediateQueueConfig>) f)
-                    .setConfig(new IntermediateQueueConfig(((IntermediateExecutionFlow<?>) f).getQueue().getId()));
+                .setConfig(new IntermediateQueueConfig(((IntermediateExecutionFlow<?>) f).getQueue().getId()));
         } else {
             throw new UnknownFlowException(f);
         }
@@ -406,13 +408,14 @@ public class PhysicalPlanGenerator {
      */
     private static List<Flow> splitSinkFromFlow(Flow flow) {
         List<PhysicalExecutionFlow<?, ?>> sinkFlows =
-                flow.getNext().stream().filter(f -> f instanceof PhysicalExecutionFlow).map(f -> (PhysicalExecutionFlow<?, ?>) f)
-                        .filter(f -> f.getAction() instanceof SinkAction).collect(Collectors.toList());
+            flow.getNext().stream().filter(f -> f instanceof PhysicalExecutionFlow)
+                .map(f -> (PhysicalExecutionFlow<?, ?>) f)
+                .filter(f -> f.getAction() instanceof SinkAction).collect(Collectors.toList());
         List<Flow> allFlows = new ArrayList<>();
         flow.getNext().removeAll(sinkFlows);
         sinkFlows.forEach(s -> {
             IntermediateQueue queue = new IntermediateQueue(s.getAction().getId(),
-                    s.getAction().getName() + "-Queue", s.getAction().getParallelism());
+                s.getAction().getName() + "-Queue", s.getAction().getParallelism());
             IntermediateExecutionFlow<?> intermediateFlow = new IntermediateExecutionFlow<>(queue);
             flow.getNext().add(intermediateFlow);
             IntermediateExecutionFlow<?> intermediateFlowQuote = new IntermediateExecutionFlow<>(queue);
@@ -422,15 +425,15 @@ public class PhysicalPlanGenerator {
 
         if (flow.getNext().size() > sinkFlows.size()) {
             allFlows.addAll(flow.getNext().stream().flatMap(f -> splitSinkFromFlow(f).stream())
-                    .collect(Collectors.toList()));
+                .collect(Collectors.toList()));
         }
         return allFlows;
     }
 
     private static boolean sourceWithSink(PhysicalExecutionFlow<?, ?> flow) {
         return flow.getAction() instanceof SinkAction ||
-                flow.getNext().stream().map(f -> (PhysicalExecutionFlow<?, ?>) f).map(PhysicalPlanGenerator::sourceWithSink)
-                        .collect(Collectors.toList()).contains(true);
+            flow.getNext().stream().map(f -> (PhysicalExecutionFlow<?, ?>) f).map(PhysicalPlanGenerator::sourceWithSink)
+                .collect(Collectors.toList()).contains(true);
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
@@ -440,13 +443,13 @@ public class PhysicalPlanGenerator {
 
     private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start) {
         List<Action> actions = edges.stream().filter(e -> e.getLeftVertex().getAction().equals(start))
-                .map(e -> e.getRightVertex().getAction()).collect(Collectors.toList());
+            .map(e -> e.getRightVertex().getAction()).collect(Collectors.toList());
         List<Flow> wrappers = actions.stream()
-                .filter(a -> a instanceof PartitionTransformAction || a instanceof SinkAction)
-                .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
+            .filter(a -> a instanceof PartitionTransformAction || a instanceof SinkAction)
+            .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
         wrappers.addAll(actions.stream()
-                .filter(a -> !(a instanceof PartitionTransformAction || a instanceof SinkAction))
-                .map(a -> new PhysicalExecutionFlow<>(a, getNextWrapper(edges, a))).collect(Collectors.toList()));
+            .filter(a -> !(a instanceof PartitionTransformAction || a instanceof SinkAction))
+            .map(a -> new PhysicalExecutionFlow<>(a, getNextWrapper(edges, a))).collect(Collectors.toList()));
         return wrappers;
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index ed823a74a..f11019fa0 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -18,15 +18,12 @@
 package org.apache.seatunnel.engine.server.dag.physical;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.engine.common.Constant;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
-import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
-import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
@@ -92,7 +89,7 @@ public class PhysicalVertex {
     /**
      * This future only can completion by the task run in {@link com.hazelcast.spi.impl.executionservice.ExecutionService }
      */
-    private NonCompletableFuture<TaskExecutionState> waitForCompleteByExecutionService;
+    private PassiveCompletableFuture<TaskExecutionState> waitForCompleteByExecutionService;
 
     private final JobImmutableInformation jobImmutableInformation;
 
@@ -145,26 +142,35 @@ public class PhysicalVertex {
     @SuppressWarnings("checkstyle:MagicNumber")
     // This method must not throw an exception
     public void deploy(@NonNull Address address) {
-
-        TaskGroupImmutableInformation taskGroupImmutableInformation =
-                new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
-                        nodeEngine.getSerializationService().toData(this.taskGroup),
-                        this.pluginJarsUrls);
-
-        try {
-            waitForCompleteByExecutionService = new NonCompletableFuture<>(
-                    nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
-                                    new DeployTaskOperation(nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
-                                    address)
-                            .invoke());
-        } catch (Throwable th) {
-            LOGGER.severe(String.format("%s deploy error with Exception: %s",
-                    this.taskFullName,
-                    ExceptionUtils.getMessage(th)));
-            updateTaskState(ExecutionState.DEPLOYING, ExecutionState.FAILED);
-            taskFuture.complete(
-                    new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED, null));
-        }
+        /**
+         TaskGroupImmutableInformation taskGroupImmutableInformation =
+         new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
+         nodeEngine.getSerializationService().toData(this.taskGroup),
+         this.pluginJarsUrls);
+
+         try {
+         waitForCompleteByExecutionService = new NonCompletableFuture<>(
+         nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+         new DeployTaskOperation(nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+         address)
+         .invoke());
+         } catch (Throwable th) {
+         LOGGER.severe(String.format("%s deploy error with Exception: %s",
+         this.taskFullName,
+         ExceptionUtils.getMessage(th)));
+         updateTaskState(ExecutionState.DEPLOYING, ExecutionState.FAILED);
+         taskFuture.complete(
+         new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED, null));
+         }*/
+
+        waitForCompleteByExecutionService = new PassiveCompletableFuture<>(CompletableFuture.supplyAsync(() -> {
+            try {
+                Thread.sleep(2000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            return new TaskExecutionState(flakeIdGenerator.newId(), ExecutionState.FINISHED, null);
+        }));
 
         updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
         waitForCompleteByExecutionService.whenComplete((v, t) -> {
@@ -176,13 +182,13 @@ public class PhysicalVertex {
                     updateTaskState(executionState.get(), v.getExecutionState());
                     if (v.getThrowable() != null) {
                         LOGGER.severe(String.format("%s end with state %s and Exception: %s",
-                                this.taskFullName,
-                                v.getExecutionState(),
-                                ExceptionUtils.getMessage(v.getThrowable())));
+                            this.taskFullName,
+                            v.getExecutionState(),
+                            ExceptionUtils.getMessage(v.getThrowable())));
                     } else {
                         LOGGER.severe(String.format("%s end with state %s",
-                                this.taskFullName,
-                                v.getExecutionState()));
+                            this.taskFullName,
+                            v.getExecutionState()));
                     }
                     taskFuture.complete(v);
                 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index a044cc7b8..2a7c06689 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.engine.server.dag.physical;
 
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.PipelineState;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
@@ -74,7 +74,7 @@ public class SubPlan {
      * This future only can completion by the {@link PhysicalVertex } taskFuture.
      * When the taskFuture in {@link PhysicalVertex} completed, The NonCompletableFuture's whenComplete method will be called
      */
-    private final NonCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex;
+    private final PassiveCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex;
 
     public SubPlan(int pipelineIndex,
                    int totalPipelineNum,
@@ -82,7 +82,7 @@ public class SubPlan {
                    @NonNull List<PhysicalVertex> physicalVertexList,
                    @NonNull List<PhysicalVertex> coordinatorVertexList,
                    @NonNull CompletableFuture<PipelineState> pipelineFuture,
-                   @NonNull NonCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex,
+                   @NonNull PassiveCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex,
                    @NonNull JobImmutableInformation jobImmutableInformation) {
         this.pipelineIndex = pipelineIndex;
         this.pipelineFuture = pipelineFuture;
@@ -184,14 +184,14 @@ public class SubPlan {
         }
     }
 
-    public NonCompletableFuture<Void> cancelPipeline() {
+    public PassiveCompletableFuture<Void> cancelPipeline() {
         CompletableFuture<Void> cancelFuture = CompletableFuture.supplyAsync(() -> {
             // TODO Implement cancel tasks in pipeline.
             return null;
         });
 
         cancelFuture.complete(null);
-        return new NonCompletableFuture<>(cancelFuture);
+        return new PassiveCompletableFuture<>(cancelFuture);
     }
 
     public void failedWithNoEnoughResource() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index cfb3352bc..1cdc0b5f7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.engine.server.master;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.Constant;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
@@ -45,7 +45,7 @@ public class JobMaster implements Runnable {
 
     private LogicalDag logicalDag;
     private PhysicalPlan physicalPlan;
-    private final Data jobImmutableInformation;
+    private final Data jobImmutableInformationData;
 
     private final NodeEngine nodeEngine;
 
@@ -57,10 +57,12 @@ public class JobMaster implements Runnable {
 
     private CompletableFuture<JobStatus> jobMasterCompleteFuture = new CompletableFuture<>();
 
-    public JobMaster(@NonNull Data jobImmutableInformation,
+    private JobImmutableInformation jobImmutableInformation;
+
+    public JobMaster(@NonNull Data jobImmutableInformationData,
                      @NonNull NodeEngine nodeEngine,
                      @NonNull ExecutorService executorService) {
-        this.jobImmutableInformation = jobImmutableInformation;
+        this.jobImmutableInformationData = jobImmutableInformationData;
         this.nodeEngine = nodeEngine;
         this.executorService = executorService;
         flakeIdGenerator =
@@ -70,15 +72,17 @@ public class JobMaster implements Runnable {
     }
 
     public void init() throws Exception {
-        JobImmutableInformation jobInformation = nodeEngine.getSerializationService().toObject(jobImmutableInformation);
-        LOGGER.info("Job [" + jobInformation.getJobId() + "] submit");
-        LOGGER.info("Job [" + jobInformation.getJobId() + "] jar urls " + jobInformation.getPluginJarsUrls());
+        jobImmutableInformation = nodeEngine.getSerializationService().toObject(
+            jobImmutableInformationData);
+        LOGGER.info("Job [" + jobImmutableInformation.getJobId() + "] submit");
+        LOGGER.info(
+            "Job [" + jobImmutableInformation.getJobId() + "] jar urls " + jobImmutableInformation.getPluginJarsUrls());
 
         // TODO Use classloader load the connector jars and deserialize logicalDag
-        this.logicalDag = nodeEngine.getSerializationService().toObject(jobInformation.getLogicalDag());
+        this.logicalDag = nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag());
         physicalPlan = PhysicalPlanUtils.fromLogicalDAG(logicalDag,
             nodeEngine,
-            jobInformation,
+            jobImmutableInformation,
             System.currentTimeMillis(),
             executorService,
             flakeIdGenerator);
@@ -88,9 +92,10 @@ public class JobMaster implements Runnable {
     @Override
     public void run() {
         try {
-            NonCompletableFuture<JobStatus> jobStatusNonCompletableFuture = physicalPlan.getJobEndCompletableFuture();
+            PassiveCompletableFuture<JobStatus> jobStatusPassiveCompletableFuture =
+                physicalPlan.getJobEndCompletableFuture();
 
-            jobStatusNonCompletableFuture.whenComplete((v, t) -> {
+            jobStatusPassiveCompletableFuture.whenComplete((v, t) -> {
                 // We need not handle t, Because we will not return t from physicalPlan
                 if (JobStatus.FAILING.equals(v)) {
                     cleanJob();
@@ -120,4 +125,12 @@ public class JobMaster implements Runnable {
     public ResourceManager getResourceManager() {
         return resourceManager;
     }
+
+    public PassiveCompletableFuture<JobStatus> getJobMasterCompleteFuture() {
+        return new PassiveCompletableFuture<>(jobMasterCompleteFuture);
+    }
+
+    public JobImmutableInformation getJobImmutableInformation() {
+        return jobImmutableInformation;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AbstractJobAsyncOperation.java
similarity index 57%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AbstractJobAsyncOperation.java
index d66a5d66c..d27b62a28 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AbstractJobAsyncOperation.java
@@ -17,43 +17,30 @@
 
 package org.apache.seatunnel.engine.server.operation;
 
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
-import org.apache.seatunnel.engine.core.checkpoint.CheckpointBarrier;
-import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
-
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
 
 import java.io.IOException;
 
-public class CheckpointTriggerOperation extends AsyncOperation {
-    private CheckpointBarrier checkpointBarrier;
-
-    public CheckpointTriggerOperation() {
-    }
+public abstract class AbstractJobAsyncOperation extends AsyncOperation {
+    protected long jobId;
 
-    public CheckpointTriggerOperation(CheckpointBarrier checkpointBarrier) {
-        this.checkpointBarrier = checkpointBarrier;
+    public AbstractJobAsyncOperation() {
     }
 
-    @Override
-    public int getClassId() {
-        return OperationDataSerializerHook.SUBMIT_OPERATOR;
+    public AbstractJobAsyncOperation(long jobId) {
+        this.jobId = jobId;
     }
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
-        out.writeObject(checkpointBarrier);
+        super.writeInternal(out);
+        out.writeLong(jobId);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
-        checkpointBarrier = in.readObject(CheckpointBarrier.class);
-    }
-
-    @Override
-    protected NonCompletableFuture<?> doRun() throws Exception {
-        // TODO: All source Vertexes executed
-        return null;
+        super.readInternal(in);
+        jobId = in.readLong();
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
index 543197559..56701e88b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
@@ -23,7 +23,7 @@ import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
 import static com.hazelcast.spi.impl.operationservice.ExceptionAction.THROW_EXCEPTION;
 
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
@@ -47,7 +47,7 @@ public abstract class AsyncOperation extends Operation implements IdentifiedData
 
     @Override
     public final void run() {
-        NonCompletableFuture<?> future;
+        PassiveCompletableFuture<?> future;
         try {
             future = doRun();
         } catch (Exception e) {
@@ -58,7 +58,7 @@ public abstract class AsyncOperation extends Operation implements IdentifiedData
         future.whenComplete(withTryCatch(getLogger(), (r, f) -> doSendResponse(f != null ? peel(f) : r)));
     }
 
-    protected abstract NonCompletableFuture<?> doRun() throws Exception;
+    protected abstract PassiveCompletableFuture<?> doRun() throws Exception;
 
     @Override
     public final boolean returnsResponse() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
index d66a5d66c..6de6b3f75 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.engine.server.operation;
 
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointBarrier;
 import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
@@ -52,7 +52,7 @@ public class CheckpointTriggerOperation extends AsyncOperation {
     }
 
     @Override
-    protected NonCompletableFuture<?> doRun() throws Exception {
+    protected PassiveCompletableFuture<?> doRun() throws Exception {
         // TODO: All source Vertexes executed
         return null;
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
index d658bd398..8a36d9ea4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.engine.server.operation;
 
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
 import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
 import com.hazelcast.internal.nio.IOUtil;
@@ -40,9 +40,9 @@ public class DeployTaskOperation extends AsyncOperation {
     }
 
     @Override
-    protected NonCompletableFuture<?> doRun() throws Exception {
-        SeaTunnelServer server = getService();
-        return server.getTaskExecutionService().deployTask(taskImmutableInformation);
+    protected PassiveCompletableFuture<?> doRun() throws Exception {
+        TaskExecutionService taskExecutionService = getService();
+        return taskExecutionService.deployTask(taskImmutableInformation);
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
index 419712111..5c1e5b0fc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.engine.server.operation;
 
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
@@ -29,13 +29,14 @@ import lombok.NonNull;
 
 import java.io.IOException;
 
-public class SubmitJobOperation extends AsyncOperation {
+public class SubmitJobOperation extends AbstractJobAsyncOperation {
     private Data jobImmutableInformation;
 
     public SubmitJobOperation() {
     }
 
-    public SubmitJobOperation(@NonNull Data jobImmutableInformation) {
+    public SubmitJobOperation(long jobId, @NonNull Data jobImmutableInformation) {
+        super(jobId);
         this.jobImmutableInformation = jobImmutableInformation;
     }
 
@@ -57,9 +58,8 @@ public class SubmitJobOperation extends AsyncOperation {
     }
 
     @Override
-    protected NonCompletableFuture<?> doRun() throws Exception {
+    protected PassiveCompletableFuture<?> doRun() throws Exception {
         SeaTunnelServer seaTunnelServer = getService();
-        NonCompletableFuture<Void> voidCompletableFuture = seaTunnelServer.submitJob(jobImmutableInformation);
-        return voidCompletableFuture;
+        return seaTunnelServer.submitJob(jobId, jobImmutableInformation);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/WaitForJobCompleteOperation.java
similarity index 51%
rename from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/WaitForJobCompleteOperation.java
index 5eb391b31..65724c3c0 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/WaitForJobCompleteOperation.java
@@ -15,25 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.server.operation;
 
-import org.apache.seatunnel.engine.common.Constant;
-import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
 
-import lombok.NonNull;
+public class WaitForJobCompleteOperation extends AbstractJobAsyncOperation {
 
-public class JobClient {
-    private SeaTunnelHazelcastClient hazelcastClient;
+    public WaitForJobCompleteOperation() {
+        super();
+    }
 
-    public JobClient(@NonNull SeaTunnelHazelcastClient hazelcastClient) {
-        this.hazelcastClient = hazelcastClient;
+    public WaitForJobCompleteOperation(long jobId) {
+        super(jobId);
     }
 
-    public long getNewJobId() {
-        return hazelcastClient.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+    @Override
+    protected PassiveCompletableFuture<?> doRun() throws Exception {
+        SeaTunnelServer service = getService();
+        return service.waitForJobComplete(jobId);
     }
 
-    public JobProxy createJobProxy(@NonNull JobImmutableInformation jobImmutableInformation) {
-        return new JobProxy(hazelcastClient, jobImmutableInformation);
+    @Override
+    public int getClassId() {
+        return OperationDataSerializerHook.WAIT_FORM_JOB_COMPLETE_OPERATOR;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
index 2b6c78488..9876f489e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.protocol.task;
 
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
 
 import com.hazelcast.client.impl.protocol.MessageTaskFactory;
 import com.hazelcast.client.impl.protocol.MessageTaskFactoryProvider;
@@ -42,7 +43,11 @@ public class SeaTunnelMessageTaskFactoryProvider implements MessageTaskFactoryPr
     }
 
     private void initFactories() {
-        factories.put(SeaTunnelPrintMessageCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new PrintMessageTask(clientMessage, node, connection));
-        factories.put(SeaTunnelSubmitJobCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new SubmitJobTask(clientMessage, node, connection));
+        factories.put(SeaTunnelPrintMessageCodec.REQUEST_MESSAGE_TYPE,
+            (clientMessage, connection) -> new PrintMessageTask(clientMessage, node, connection));
+        factories.put(SeaTunnelSubmitJobCodec.REQUEST_MESSAGE_TYPE,
+            (clientMessage, connection) -> new SubmitJobTask(clientMessage, node, connection));
+        factories.put(SeaTunnelWaitForJobCompleteCodec.REQUEST_MESSAGE_TYPE,
+            (clientMessage, connection) -> new WaitForJobCompleteTask(clientMessage, node, connection));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
index 5cb228b3e..68ec6b69c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
@@ -23,10 +23,9 @@ import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
 import com.hazelcast.client.impl.protocol.ClientMessage;
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.internal.nio.Connection;
-import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
-public class SubmitJobTask extends AbstractSeaTunnelMessageTask<Data, Void> {
+public class SubmitJobTask extends AbstractSeaTunnelMessageTask<SeaTunnelSubmitJobCodec.RequestParameters, Void> {
 
     protected SubmitJobTask(ClientMessage clientMessage, Node node, Connection connection) {
         super(clientMessage, node, connection,
@@ -36,7 +35,7 @@ public class SubmitJobTask extends AbstractSeaTunnelMessageTask<Data, Void> {
 
     @Override
     protected Operation prepareOperation() {
-        return new SubmitJobOperation(parameters);
+        return new SubmitJobOperation(parameters.jobId, parameters.jobImmutableInformation);
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/WaitForJobCompleteTask.java
similarity index 68%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/WaitForJobCompleteTask.java
index 5cb228b3e..813063b19 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/WaitForJobCompleteTask.java
@@ -17,35 +17,34 @@
 
 package org.apache.seatunnel.engine.server.protocol.task;
 
-import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
-import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
+import org.apache.seatunnel.engine.server.operation.WaitForJobCompleteOperation;
 
 import com.hazelcast.client.impl.protocol.ClientMessage;
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.internal.nio.Connection;
-import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
-public class SubmitJobTask extends AbstractSeaTunnelMessageTask<Data, Void> {
-
-    protected SubmitJobTask(ClientMessage clientMessage, Node node, Connection connection) {
+public class WaitForJobCompleteTask extends AbstractSeaTunnelMessageTask<Long, JobStatus> {
+    protected WaitForJobCompleteTask(ClientMessage clientMessage, Node node, Connection connection) {
         super(clientMessage, node, connection,
-            SeaTunnelSubmitJobCodec::decodeRequest,
-            x -> SeaTunnelSubmitJobCodec.encodeResponse());
+            SeaTunnelWaitForJobCompleteCodec::decodeRequest,
+            x -> SeaTunnelWaitForJobCompleteCodec.encodeResponse(x.ordinal()));
     }
 
     @Override
     protected Operation prepareOperation() {
-        return new SubmitJobOperation(parameters);
+        return new WaitForJobCompleteOperation(parameters);
     }
 
     @Override
     public String getMethodName() {
-        return "submitJob";
+        return "waitForJobComplete";
     }
 
     @Override
     public Object[] getParameters() {
-        return new Object[]{};
+        return new Object[0];
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index 575b4bfa1..d5ade4d59 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.server.operation.CheckpointTriggerOperation;
 import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
 import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
 import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
+import org.apache.seatunnel.engine.server.operation.WaitForJobCompleteOperation;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
 import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -39,8 +40,9 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
     public static final int PRINT_MESSAGE_OPERATOR = 0;
     public static final int SUBMIT_OPERATOR = 1;
     public static final int DEPLOY_TASK_OPERATOR = 2;
+    public static final int WAIT_FORM_JOB_COMPLETE_OPERATOR = 3;
 
-    public static final int CHECKPOINT_TRIGGER_OPERATOR = 3;
+    public static final int CHECKPOINT_TRIGGER_OPERATOR = 4;
 
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
@@ -68,6 +70,8 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
                     return new SubmitJobOperation();
                 case DEPLOY_TASK_OPERATOR:
                     return new DeployTaskOperation();
+                case WAIT_FORM_JOB_COMPLETE_OPERATOR:
+                    return new WaitForJobCompleteOperation();
                 case CHECKPOINT_TRIGGER_OPERATOR:
                     return new CheckpointTriggerOperation();
                 default:
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 17c004ff2..f01dcc218 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -114,7 +115,11 @@ public class TaskTest {
         JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
                 nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
 
-        service.submitJob(nodeEngine.getSerializationService().toData(jobImmutableInformation));
+        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+            service.submitJob(jobImmutableInformation.getJobId(),
+                nodeEngine.getSerializationService().toData(jobImmutableInformation));
+
+        Assert.assertNotNull(voidPassiveCompletableFuture);
     }
 
     @Test