You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/02 10:34:40 UTC

[incubator-seatunnel] branch st-engine updated: [ST-Engine] Add Submit Job From Client To Server (#2301)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 3ce56e465 [ST-Engine] Add Submit Job From Client To Server (#2301)
3ce56e465 is described below

commit 3ce56e4657006f43905272acbe252cdb3ff6178d
Author: Eric <ga...@gmail.com>
AuthorDate: Tue Aug 2 18:34:35 2022 +0800

    [ST-Engine] Add Submit Job From Client To Server (#2301)
    
    * Add logicaldag generator
    
    * merge from upstream
    
    * fix UT error
    
    * Add Job Submit To Server
    
    * Add license header
    
    add plugin jars url to JobInformation
    
    fix style
    
    reset AbstractPluginDiscovery to upstream
    
    fix stye
    
    * Fix problems in code review
    
    * Fix problems in code review
---
 LICENSE                                            |   4 +-
 .../engine/client/ConnectorInstanceLoader.java     |  27 +++-
 ...SeaTunnelClientInstance.java => JobClient.java} |  24 +--
 .../seatunnel/engine/client/JobConfigParser.java   | 178 +++++++++++++++------
 .../engine/client/JobExecutionEnvironment.java     |  38 ++++-
 .../apache/seatunnel/engine/client/JobProxy.java   |  52 ++++++
 .../seatunnel/engine/client/SeaTunnelClient.java   |  58 +------
 .../engine/client/SeaTunnelClientConfig.java       |   5 +-
 .../engine/client/SeaTunnelClientInstance.java     |  12 +-
 ...elClient.java => SeaTunnelHazelcastClient.java} |  71 ++++----
 .../engine/client/JobConfigParserTest.java         |   9 +-
 .../engine/client/LogicalDagGeneratorTest.java     |  17 +-
 .../engine/client/SeaTunnelClientTest.java         |  26 ++-
 .../apache/seatunnel/engine/common/Constant.java   |   4 +
 .../serializeable/ConfigDataSerializerHook.java    |   3 +-
 .../engine/common/utils/ExceptionUtil.java         |   4 +-
 .../engine/core/dag/actions/AbstractAction.java    |  14 +-
 .../seatunnel/engine/core/dag/actions/Action.java  |   3 +
 .../core/dag/actions/PartitionTransformAction.java |  11 +-
 .../engine/core/dag/actions/SinkAction.java        |  11 +-
 .../engine/core/dag/actions/SourceAction.java      |   7 +-
 .../engine/core/dag/actions/TransformAction.java   |  11 +-
 .../org/apache/seatunnel/engine/core/job/Job.java} |   8 +-
 .../engine/core/job/JobImmutableInformation.java   | 104 ++++++++++++
 .../protocol/codec/SeaTunnelPrintMessageCodec.java |  11 +-
 ...sageCodec.java => SeaTunnelSubmitJobCodec.java} |  48 +++---
 .../core/serializable/JobDataSerializerHook.java   |   8 +
 .../SeaTunnelEngine.yaml                           |  15 ++
 .../seatunnel/engine/server/SeaTunnelServer.java   |  35 +++-
 .../engine/server/SeaTunnelServerStarter.java      |   3 +
 .../engine/server/operation/AsyncOperation.java    | 108 +++++++++++++
 .../server/operation/SubmitJobOperation.java       |  64 ++++++++
 ...Task.java => AbstractSeaTunnelMessageTask.java} |  62 ++++---
 .../server/protocol/task/PrintMessageTask.java     |  51 +-----
 .../task/SeaTunnelMessageTaskFactoryProvider.java  |   2 +
 ...TaskFactoryProvider.java => SubmitJobTask.java} |  37 +++--
 .../serializable/OperationDataSerializerHook.java  |   4 +
 .../plugin/discovery/AbstractPluginDiscovery.java  |  23 +--
 38 files changed, 844 insertions(+), 328 deletions(-)

diff --git a/LICENSE b/LICENSE
index 09e15e69a..7d4d93ac3 100644
--- a/LICENSE
+++ b/LICENSE
@@ -217,4 +217,6 @@ seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade
 seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java     from https://github.com/lightbend/config
 seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java     from https://github.com/lightbend/config
 generate_client_protocol.sh                                                                                                      from https://github.com/hazelcast/hazelcast
-seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java               from https://github.com/hazelcast/hazelcast
\ No newline at end of file
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java               from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java               from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java               from https://github.com/hazelcast/hazelcast
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
index 7791cbec4..0a72fde2d 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
@@ -31,16 +31,28 @@ import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginD
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import java.net.URL;
+import java.util.List;
+
 import scala.Serializable;
 
 public class ConnectorInstanceLoader {
-    public static SeaTunnelSource loadSourceInstance(Config sourceConfig) {
+    private ConnectorInstanceLoader() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    public static ImmutablePair<SeaTunnelSource, List<URL>> loadSourceInstance(Config sourceConfig) {
         SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
         PluginIdentifier pluginIdentifier = PluginIdentifier.of(
             CollectionConstants.SEATUNNEL_PLUGIN,
             CollectionConstants.SOURCE_PLUGIN,
             sourceConfig.getString(CollectionConstants.PLUGIN_NAME));
 
+        List<URL> pluginJarPaths = sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
+
         SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
         seaTunnelSource.prepare(sourceConfig);
         seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
@@ -49,30 +61,33 @@ public class ConnectorInstanceLoader {
             throw new UnsupportedOperationException(
                 String.format("'%s' source don't support off-line job.", seaTunnelSource.getPluginName()));
         }
-        return seaTunnelSource;
+        return new ImmutablePair<>(seaTunnelSource, pluginJarPaths);
     }
 
-    public static SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> loadSinkInstance(
+    public static ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, List<URL>> loadSinkInstance(
         Config sinkConfig) {
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
         PluginIdentifier pluginIdentifier = PluginIdentifier.of(
             CollectionConstants.SEATUNNEL_PLUGIN,
             CollectionConstants.SINK_PLUGIN,
             sinkConfig.getString(CollectionConstants.PLUGIN_NAME));
+        List<URL> pluginJarPaths = sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
         SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
             sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
         seaTunnelSink.prepare(sinkConfig);
         seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
-        return seaTunnelSink;
+        return new ImmutablePair<>(seaTunnelSink, pluginJarPaths);
     }
 
-    public static SeaTunnelTransform loadTransformInstance(Config transformConfig) {
+    public static ImmutablePair<SeaTunnelTransform, List<URL>> loadTransformInstance(Config transformConfig) {
         SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery();
         PluginIdentifier pluginIdentifier = PluginIdentifier.of(
             CollectionConstants.SEATUNNEL_PLUGIN,
             CollectionConstants.TRANSFORM_PLUGIN,
             transformConfig.getString(CollectionConstants.PLUGIN_NAME));
+
+        List<URL> pluginJarPaths = transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
         SeaTunnelTransform seaTunnelTransform = transformPluginDiscovery.createPluginInstance(pluginIdentifier);
-        return seaTunnelTransform;
+        return new ImmutablePair<>(seaTunnelTransform, pluginJarPaths);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java
similarity index 57%
copy from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
copy to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java
index 1e0db46dd..5eb391b31 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java
@@ -17,19 +17,23 @@
 
 package org.apache.seatunnel.engine.client;
 
-import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 
-import com.hazelcast.core.HazelcastInstance;
 import lombok.NonNull;
 
-public interface SeaTunnelClientInstance {
+public class JobClient {
+    private SeaTunnelHazelcastClient hazelcastClient;
 
-    /**
-     * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
-     * be a client, depending on the type of this
-     */
-    @NonNull
-    HazelcastInstance getHazelcastInstance();
+    public JobClient(@NonNull SeaTunnelHazelcastClient hazelcastClient) {
+        this.hazelcastClient = hazelcastClient;
+    }
 
-    JobExecutionEnvironment createExecutionContext(String filePath, JobConfig config);
+    public long getNewJobId() {
+        return hazelcastClient.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+    }
+
+    public JobProxy createJobProxy(@NonNull JobImmutableInformation jobImmutableInformation) {
+        return new JobProxy(hazelcastClient, jobImmutableInformation);
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
index 7ddedd792..42d563225 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
@@ -40,12 +40,16 @@ import com.hazelcast.logging.Logger;
 import lombok.Data;
 import lombok.NonNull;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 
+import java.net.URL;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import scala.Serializable;
@@ -63,22 +67,32 @@ public class JobConfigParser {
 
     private Map<String, List<Config>> sourceResultTableNameMap = new HashMap<>();
 
+    private List<Action> actions = new ArrayList<>();
+    private Set<URL> jarUrlsSet = new HashSet<>();
+
     protected JobConfigParser(@NonNull String jobDefineFilePath, @NonNull IdGenerator idGenerator) {
         this.jobDefineFilePath = jobDefineFilePath;
         this.idGenerator = idGenerator;
     }
 
-    public List<Action> parse() {
+    public ImmutablePair<List<Action>, Set<URL>> parse() {
         Config seaTunnelJobConfig = new ConfigBuilder(Paths.get(jobDefineFilePath)).getConfig();
         List<? extends Config> sinkConfigs = seaTunnelJobConfig.getConfigList("sink");
         List<? extends Config> transformConfigs = seaTunnelJobConfig.getConfigList("transform");
         List<? extends Config> sourceConfigs = seaTunnelJobConfig.getConfigList("source");
 
-        if (sinkConfigs.size() == 1 && sourceConfigs.size() == 1 & transformConfigs.size() <= 1) {
-            return sampleAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
+        if (CollectionUtils.isEmpty(sinkConfigs) || CollectionUtils.isEmpty(sourceConfigs)) {
+            throw new JobDefineCheckExceptionSeaTunnel("Source And Sink can not be null");
+        }
+
+        if (sinkConfigs.size() == 1
+            && sourceConfigs.size() == 1
+            && (CollectionUtils.isEmpty(transformConfigs) || transformConfigs.size() == 1)) {
+            sampleAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
         } else {
-            return complexAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
+            complexAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
         }
+        return new ImmutablePair<>(actions, jarUrlsSet);
     }
 
     /**
@@ -88,18 +102,20 @@ public class JobConfigParser {
      * result_table_name and source_table_name are necessary for Transform Connector.
      * By the end, source_table_name is necessary for Sink Connector.
      */
-    private List<Action> complexAnalyze(List<? extends Config> sourceConfigs,
-                                        List<? extends Config> transformConfigs,
-                                        List<? extends Config> sinkConfigs) {
+    private void complexAnalyze(List<? extends Config> sourceConfigs,
+                                List<? extends Config> transformConfigs,
+                                List<? extends Config> sinkConfigs) {
         initRelationMap(sourceConfigs, transformConfigs);
 
-        List<Action> actions = new ArrayList<>();
         for (Config config : sinkConfigs) {
-            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
+            ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, List<URL>>
+                sinkListImmutablePair =
                 ConnectorInstanceLoader.loadSinkInstance(config);
 
             SinkAction sinkAction =
-                new SinkAction(idGenerator.getNextId(), seaTunnelSink.getPluginName(), seaTunnelSink);
+                createSinkAction(idGenerator.getNextId(), sinkListImmutablePair.getLeft().getPluginName(),
+                    sinkListImmutablePair.getLeft(), sinkListImmutablePair.getRight());
+
             actions.add(sinkAction);
             if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
                 throw new JobDefineCheckExceptionSeaTunnel(Plugin.SOURCE_TABLE_NAME
@@ -118,7 +134,6 @@ public class JobConfigParser {
             }
 
         }
-        return actions;
     }
 
     private void sourceAnalyze(String sourceTableName, Action action) {
@@ -132,10 +147,15 @@ public class JobConfigParser {
         // of its upstream action.
         AtomicInteger totalParallelism = new AtomicInteger();
         sourceConfigList.stream().forEach(sourceConfig -> {
-            SourceAction sourceAction =
-                new SourceAction(idGenerator.getNextId(),
-                    sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
-                    ConnectorInstanceLoader.loadSourceInstance(sourceConfig));
+            ImmutablePair<SeaTunnelSource, List<URL>> seaTunnelSourceListImmutablePair =
+                ConnectorInstanceLoader.loadSourceInstance(sourceConfig);
+
+            SourceAction sourceAction = createSourceAction(
+                idGenerator.getNextId(),
+                sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
+                seaTunnelSourceListImmutablePair.getLeft(),
+                seaTunnelSourceListImmutablePair.getRight());
+
             int sourceParallelism = getSourceParallelism(sourceConfig);
             sourceAction.setParallelism(sourceParallelism);
             totalParallelism.set(totalParallelism.get() + sourceParallelism);
@@ -152,10 +172,15 @@ public class JobConfigParser {
         } else {
             AtomicInteger totalParallelism = new AtomicInteger();
             transformConfigList.stream().forEach(config -> {
-                SeaTunnelTransform seaTunnelTransform = ConnectorInstanceLoader.loadTransformInstance(config);
-                TransformAction transformAction =
-                    new TransformAction(idGenerator.getNextId(), seaTunnelTransform.getPluginName(),
-                        seaTunnelTransform);
+                ImmutablePair<SeaTunnelTransform, List<URL>> transformListImmutablePair =
+                    ConnectorInstanceLoader.loadTransformInstance(config);
+
+                TransformAction transformAction = createTransformAction(
+                    idGenerator.getNextId(),
+                    transformListImmutablePair.getLeft().getPluginName(),
+                    transformListImmutablePair.getLeft(),
+                    transformListImmutablePair.getRight());
+
                 action.addUpstream(transformAction);
                 transformAnalyze(config.getString(Plugin.SOURCE_TABLE_NAME), transformAction);
                 totalParallelism.set(totalParallelism.get() + transformAction.getParallelism());
@@ -211,44 +236,47 @@ public class JobConfigParser {
      * |
      * Sink
      */
-    private List<Action> sampleAnalyze(List<? extends Config> sourceConfigs,
-                                       List<? extends Config> transformConfigs,
-                                       List<? extends Config> sinkConfigs) {
-        SeaTunnelSource seaTunnelSource = ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0));
+    private void sampleAnalyze(List<? extends Config> sourceConfigs,
+                               List<? extends Config> transformConfigs,
+                               List<? extends Config> sinkConfigs) {
+        ImmutablePair<SeaTunnelSource, List<URL>> pair =
+            ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0));
         SourceAction sourceAction =
-            new SourceAction(idGenerator.getNextId(), seaTunnelSource.getPluginName(), seaTunnelSource);
+            createSourceAction(idGenerator.getNextId(), pair.getLeft().getPluginName(), pair.getLeft(),
+                pair.getRight());
         sourceAction.setParallelism(getSourceParallelism(sourceConfigs.get(0)));
-        if (transformConfigs.size() != 0) {
-            SeaTunnelTransform seaTunnelTransform =
+
+        ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, List<URL>>
+            sinkListImmutablePair = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0));
+
+        Action sinkUpstreamAction = sourceAction;
+
+        if (!CollectionUtils.isEmpty(transformConfigs)) {
+            ImmutablePair<SeaTunnelTransform, List<URL>> transformListImmutablePair =
                 ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0));
-            TransformAction transformAction = new TransformAction(
+
+            TransformAction transformAction = createTransformAction(
                 idGenerator.getNextId(),
-                seaTunnelTransform.getPluginName(),
+                transformListImmutablePair.getLeft().getPluginName(),
                 Lists.newArrayList(sourceAction),
-                seaTunnelTransform);
+                transformListImmutablePair.getLeft(),
+                transformListImmutablePair.getRight());
 
-            initTransformParallelism(transformConfigs, sourceAction, seaTunnelTransform, transformAction);
+            initTransformParallelism(transformConfigs, sourceAction, transformListImmutablePair.getLeft(),
+                transformAction);
 
-            SeaTunnelSink seaTunnelSink = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0));
-            SinkAction sinkAction = new SinkAction(
-                idGenerator.getNextId(),
-                seaTunnelSink.getPluginName(),
-                Lists.newArrayList(transformAction),
-                seaTunnelSink
-            );
-            sinkAction.setParallelism(transformAction.getParallelism());
-            return Lists.newArrayList(sinkAction);
-        } else {
-            SeaTunnelSink seaTunnelSink = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0));
-            SinkAction sinkAction = new SinkAction(
-                idGenerator.getNextId(),
-                seaTunnelSink.getPluginName(),
-                Lists.newArrayList(sourceAction),
-                seaTunnelSink
-            );
-            sinkAction.setParallelism(sourceAction.getParallelism());
-            return Lists.newArrayList(sinkAction);
+            sinkUpstreamAction = transformAction;
         }
+
+        SinkAction sinkAction = createSinkAction(
+            idGenerator.getNextId(),
+            sinkListImmutablePair.getLeft().getPluginName(),
+            Lists.newArrayList(sinkUpstreamAction),
+            sinkListImmutablePair.getLeft(),
+            sinkListImmutablePair.getRight()
+        );
+        sinkAction.setParallelism(sinkUpstreamAction.getParallelism());
+        actions.add(sinkAction);
     }
 
     private void initTransformParallelism(List<? extends Config> transformConfigs, Action upstreamAction,
@@ -271,4 +299,56 @@ public class JobConfigParser {
         }
         return 1;
     }
+
+    private SourceAction createSourceAction(int id,
+                                            @NonNull String name,
+                                            @NonNull SeaTunnelSource source,
+                                            List<URL> jarUrls) {
+        if (!CollectionUtils.isEmpty(jarUrls)) {
+            jarUrlsSet.addAll(jarUrls);
+        }
+        return new SourceAction(id, name, source, jarUrls);
+    }
+
+    private TransformAction createTransformAction(int id,
+                                                  @NonNull String name,
+                                                  @NonNull List<Action> upstreams,
+                                                  @NonNull SeaTunnelTransform transformation,
+                                                  List<URL> jarUrls) {
+        if (!CollectionUtils.isEmpty(jarUrls)) {
+            jarUrlsSet.addAll(jarUrls);
+        }
+        return new TransformAction(id, name, upstreams, transformation, jarUrls);
+    }
+
+    private SinkAction createSinkAction(int id,
+                                        @NonNull String name,
+                                        @NonNull List<Action> upstreams,
+                                        @NonNull SeaTunnelSink sink,
+                                        List<URL> jarUrls) {
+        if (!CollectionUtils.isEmpty(jarUrls)) {
+            jarUrlsSet.addAll(jarUrls);
+        }
+        return new SinkAction(id, name, upstreams, sink, jarUrls);
+    }
+
+    private TransformAction createTransformAction(int id,
+                                                  @NonNull String name,
+                                                  @NonNull SeaTunnelTransform transformation,
+                                                  List<URL> jarUrls) {
+        if (!CollectionUtils.isEmpty(jarUrls)) {
+            jarUrlsSet.addAll(jarUrls);
+        }
+        return new TransformAction(id, name, transformation, jarUrls);
+    }
+
+    private SinkAction createSinkAction(int id,
+                                        @NonNull String name,
+                                        @NonNull SeaTunnelSink sink,
+                                        List<URL> jarUrls) {
+        if (!CollectionUtils.isEmpty(jarUrls)) {
+            jarUrlsSet.addAll(jarUrls);
+        }
+        return new SinkAction(id, name, sink, jarUrls);
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
index b00bbcc27..79d60f19a 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
@@ -21,10 +21,16 @@ package org.apache.seatunnel.engine.client;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDagGenerator;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 public class JobExecutionEnvironment {
 
@@ -36,17 +42,23 @@ public class JobExecutionEnvironment {
 
     private List<Action> actions = new ArrayList<>();
 
+    private List<URL> jarUrls = new ArrayList<>();
+
     private String jobFilePath;
 
     private IdGenerator idGenerator;
 
-    public JobExecutionEnvironment(JobConfig jobConfig, String jobFilePath) {
+    private SeaTunnelHazelcastClient seaTunnelHazelcastClient;
+
+    public JobExecutionEnvironment(JobConfig jobConfig, String jobFilePath,
+                                   SeaTunnelHazelcastClient seaTunnelHazelcastClient) {
         this.jobConfig = jobConfig;
         this.jobFilePath = jobFilePath;
         this.idGenerator = new IdGenerator();
+        this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
     }
 
-    public JobConfigParser getJobConfigParser() {
+    private JobConfigParser getJobConfigParser() {
         return new JobConfigParser(jobFilePath, idGenerator);
     }
 
@@ -54,12 +66,32 @@ public class JobExecutionEnvironment {
         this.actions.addAll(actions);
     }
 
-    public LogicalDagGenerator getLogicalDagGenerator() {
+    private LogicalDagGenerator getLogicalDagGenerator() {
         return new LogicalDagGenerator(actions, jobConfig, idGenerator);
     }
 
     public List<Action> getActions() {
         return actions;
     }
+
+    public JobProxy execute() {
+        JobClient jobClient = new JobClient(seaTunnelHazelcastClient);
+        JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(
+            jobClient.getNewJobId(),
+            seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
+            jobConfig,
+            jarUrls);
+
+        JobProxy jobProxy = jobClient.createJobProxy(jobImmutableInformation);
+        jobProxy.submitJob();
+        return jobProxy;
+    }
+
+    public LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        jarUrls.addAll(immutablePair.getRight());
+        return getLogicalDagGenerator().generate();
+    }
 }
 
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java
new file mode 100644
index 000000000..ab45b5cfa
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.engine.core.job.Job;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import lombok.NonNull;
+
+import java.util.concurrent.CompletableFuture;
+
+public class JobProxy implements Job {
+    private SeaTunnelHazelcastClient seaTunnelHazelcastClient;
+    private JobImmutableInformation jobImmutableInformation;
+
+    public JobProxy(@NonNull SeaTunnelHazelcastClient seaTunnelHazelcastClient,
+                    @NonNull JobImmutableInformation jobImmutableInformation) {
+        this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
+        this.jobImmutableInformation = jobImmutableInformation;
+    }
+
+    @Override
+    public long getJobId() {
+        return jobImmutableInformation.getJobId();
+    }
+
+    @Override
+    public void submitJob() {
+        ClientMessage request = SeaTunnelSubmitJobCodec.encodeRequest(
+            seaTunnelHazelcastClient.getSerializationService().toData(jobImmutableInformation));
+        CompletableFuture<Void> voidCompletableFuture =
+            seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);
+        voidCompletableFuture.join();
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 4221ed873..5aa194ce7 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -18,76 +18,34 @@
 package org.apache.seatunnel.engine.client;
 
 import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
 
-import com.hazelcast.client.HazelcastClient;
-import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
-import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
-import com.hazelcast.client.impl.protocol.ClientMessage;
-import com.hazelcast.client.impl.spi.impl.ClientInvocation;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.internal.serialization.SerializationService;
-import com.hazelcast.internal.util.Preconditions;
 import com.hazelcast.logging.ILogger;
 import lombok.NonNull;
 
-import java.util.UUID;
-import java.util.function.Function;
-
 public class SeaTunnelClient implements SeaTunnelClientInstance {
-    private final HazelcastClientInstanceImpl hazelcastClient;
-    private final SerializationService serializationService;
+    private SeaTunnelHazelcastClient hazelcastClient;
 
     public SeaTunnelClient(@NonNull SeaTunnelClientConfig seaTunnelClientConfig) {
-        Preconditions.checkNotNull(seaTunnelClientConfig, "config");
-        this.hazelcastClient =
-            ((HazelcastClientProxy) HazelcastClient.newHazelcastClient(seaTunnelClientConfig)).client;
-        this.serializationService = hazelcastClient.getSerializationService();
-        ExceptionUtil.registerSeaTunnelExceptions(hazelcastClient.getClientExceptionFactory());
+        this.hazelcastClient = new SeaTunnelHazelcastClient(seaTunnelClientConfig);
     }
 
-    @NonNull
     @Override
-    public HazelcastInstance getHazelcastInstance() {
-        return hazelcastClient;
+    public JobExecutionEnvironment createExecutionContext(@NonNull String filePath, JobConfig jobConfig) {
+        return new JobExecutionEnvironment(jobConfig, filePath, hazelcastClient);
     }
 
     @Override
-    public JobExecutionEnvironment createExecutionContext(@NonNull String filePath, JobConfig jobConfig) {
-        JobExecutionEnvironment jobExecutionEnv = new JobExecutionEnvironment(jobConfig, filePath);
-        jobExecutionEnv.addAction(jobExecutionEnv.getJobConfigParser().parse());
-        return jobExecutionEnv;
+    public JobClient createJobClient() {
+        return new JobClient(hazelcastClient);
     }
 
     public ILogger getLogger() {
-        return hazelcastClient.getLoggingService().getLogger(getClass());
-    }
-
-    private <S> S invokeRequestOnMasterAndDecodeResponse(ClientMessage request,
-                                                         Function<ClientMessage, Object> decoder) {
-        UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
-        return invokeRequestAndDecodeResponse(masterUuid, request, decoder);
-    }
-
-    private <S> S invokeRequestOnAnyMemberAndDecodeResponse(ClientMessage request,
-                                                            Function<ClientMessage, Object> decoder) {
-        return invokeRequestAndDecodeResponse(null, request, decoder);
-    }
-
-    private <S> S invokeRequestAndDecodeResponse(UUID uuid, ClientMessage request,
-                                                 Function<ClientMessage, Object> decoder) {
-        ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
-        try {
-            ClientMessage response = invocation.invoke().get();
-            return serializationService.toObject(decoder.apply(response));
-        } catch (Throwable t) {
-            throw ExceptionUtil.rethrow(t);
-        }
+        return hazelcastClient.getLogger(getClass());
     }
 
     public String printMessageToMaster(@NonNull String msg) {
-        return invokeRequestOnMasterAndDecodeResponse(
+        return hazelcastClient.requestOnMasterAndDecodeResponse(
             SeaTunnelPrintMessageCodec.encodeRequest(msg),
             response -> SeaTunnelPrintMessageCodec.decodeResponse(response)
         );
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
index aa15c138a..c6c6972a3 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.client;
 
+import org.apache.seatunnel.engine.common.Constant;
+
 import com.hazelcast.client.config.ClientConfig;
 
 public class SeaTunnelClientConfig extends ClientConfig {
@@ -25,9 +27,8 @@ public class SeaTunnelClientConfig extends ClientConfig {
      * Creates a new config instance with default group name for SeaTunnel Engine
      */
     public SeaTunnelClientConfig() {
-        // TODO we should get cluster name from server config instead of return a constant name.
         super();
-        setClusterName("SeaTunnel Engine");
+        setClusterName(Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
     }
 }
 
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 1e0db46dd..62590cbef 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -19,17 +19,9 @@ package org.apache.seatunnel.engine.client;
 
 import org.apache.seatunnel.engine.common.config.JobConfig;
 
-import com.hazelcast.core.HazelcastInstance;
-import lombok.NonNull;
-
 public interface SeaTunnelClientInstance {
 
-    /**
-     * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
-     * be a client, depending on the type of this
-     */
-    @NonNull
-    HazelcastInstance getHazelcastInstance();
-
     JobExecutionEnvironment createExecutionContext(String filePath, JobConfig config);
+
+    JobClient createJobClient();
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
similarity index 51%
copy from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
copy to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
index 4221ed873..6a696d628 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
@@ -17,11 +17,8 @@
 
 package org.apache.seatunnel.engine.client;
 
-import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
-import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
 
-import com.hazelcast.client.HazelcastClient;
 import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
 import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
 import com.hazelcast.client.impl.protocol.ClientMessage;
@@ -33,63 +30,79 @@ import com.hazelcast.logging.ILogger;
 import lombok.NonNull;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
-public class SeaTunnelClient implements SeaTunnelClientInstance {
+public class SeaTunnelHazelcastClient {
     private final HazelcastClientInstanceImpl hazelcastClient;
     private final SerializationService serializationService;
 
-    public SeaTunnelClient(@NonNull SeaTunnelClientConfig seaTunnelClientConfig) {
+    public SeaTunnelHazelcastClient(@NonNull SeaTunnelClientConfig seaTunnelClientConfig) {
         Preconditions.checkNotNull(seaTunnelClientConfig, "config");
         this.hazelcastClient =
-            ((HazelcastClientProxy) HazelcastClient.newHazelcastClient(seaTunnelClientConfig)).client;
+            ((HazelcastClientProxy) com.hazelcast.client.HazelcastClient.newHazelcastClient(
+                seaTunnelClientConfig)).client;
         this.serializationService = hazelcastClient.getSerializationService();
         ExceptionUtil.registerSeaTunnelExceptions(hazelcastClient.getClientExceptionFactory());
     }
 
+    public SerializationService getSerializationService() {
+        return serializationService;
+    }
+
+    /**
+     * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
+     * be a client, depending on the type of this
+     */
     @NonNull
-    @Override
     public HazelcastInstance getHazelcastInstance() {
         return hazelcastClient;
     }
 
-    @Override
-    public JobExecutionEnvironment createExecutionContext(@NonNull String filePath, JobConfig jobConfig) {
-        JobExecutionEnvironment jobExecutionEnv = new JobExecutionEnvironment(jobConfig, filePath);
-        jobExecutionEnv.addAction(jobExecutionEnv.getJobConfigParser().parse());
-        return jobExecutionEnv;
-    }
-
-    public ILogger getLogger() {
-        return hazelcastClient.getLoggingService().getLogger(getClass());
+    public ILogger getLogger(Class<?> clazz) {
+        return hazelcastClient.getLoggingService().getLogger(clazz);
     }
 
-    private <S> S invokeRequestOnMasterAndDecodeResponse(ClientMessage request,
-                                                         Function<ClientMessage, Object> decoder) {
+    public <S> S requestOnMasterAndDecodeResponse(@NonNull ClientMessage request,
+                                                  @NonNull Function<ClientMessage, Object> decoder) {
         UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
-        return invokeRequestAndDecodeResponse(masterUuid, request, decoder);
+        return requestAndDecodeResponse(masterUuid, request, decoder);
     }
 
-    private <S> S invokeRequestOnAnyMemberAndDecodeResponse(ClientMessage request,
-                                                            Function<ClientMessage, Object> decoder) {
-        return invokeRequestAndDecodeResponse(null, request, decoder);
+    public <S> S requestOnAnyMemberAndDecodeResponse(@NonNull ClientMessage request,
+                                                     @NonNull Function<ClientMessage, Object> decoder) {
+        return requestAndDecodeResponse(null, request, decoder);
     }
 
-    private <S> S invokeRequestAndDecodeResponse(UUID uuid, ClientMessage request,
-                                                 Function<ClientMessage, Object> decoder) {
+    public <S> S requestAndDecodeResponse(UUID uuid, ClientMessage request,
+                                          Function<ClientMessage, Object> decoder) {
         ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
         try {
             ClientMessage response = invocation.invoke().get();
             return serializationService.toObject(decoder.apply(response));
+        } catch (InterruptedException i) {
+            Thread.currentThread().interrupt();
+            return null;
+        } catch (Throwable t) {
+            throw ExceptionUtil.rethrow(t);
+        }
+    }
+
+    public CompletableFuture<Void> requestAndGetCompletableFuture(UUID uuid, ClientMessage request) {
+        ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
+        try {
+            return invocation.invoke().thenApply(c -> null);
         } catch (Throwable t) {
             throw ExceptionUtil.rethrow(t);
         }
     }
 
-    public String printMessageToMaster(@NonNull String msg) {
-        return invokeRequestOnMasterAndDecodeResponse(
-            SeaTunnelPrintMessageCodec.encodeRequest(msg),
-            response -> SeaTunnelPrintMessageCodec.decodeResponse(response)
-        );
+    public CompletableFuture<Void> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request) {
+        UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
+        return requestAndGetCompletableFuture(masterUuid, request);
+    }
+
+    public CompletableFuture<Void> requestOnAnyMemberAndGetCompletableFuture(@NonNull ClientMessage request) {
+        return requestAndGetCompletableFuture(null, request);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
index f8fd9c873..f9da29cf0 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -22,12 +22,15 @@ import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.net.URL;
 import java.util.List;
+import java.util.Set;
 
 @RunWith(JUnit4.class)
 public class JobConfigParserTest {
@@ -38,7 +41,8 @@ public class JobConfigParserTest {
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = TestUtils.getResource("/fakesource_to_file.conf");
         JobConfigParser jobConfigParser = new JobConfigParser(filePath, new IdGenerator());
-        List<Action> actions = jobConfigParser.parse();
+        ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
+        List<Action> actions = parse.getLeft();
         Assert.assertEquals(1, actions.size());
 
         Assert.assertEquals("LocalFile", actions.get(0).getName());
@@ -55,7 +59,8 @@ public class JobConfigParserTest {
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
         JobConfigParser jobConfigParser = new JobConfigParser(filePath, new IdGenerator());
-        List<Action> actions = jobConfigParser.parse();
+        ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
+        List<Action> actions = parse.getLeft();
         Assert.assertEquals(1, actions.size());
 
         Assert.assertEquals("LocalFile", actions.get(0).getName());
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index 856c88aec..19bcc5eb1 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -21,28 +21,37 @@ import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDagGenerator;
 
 import com.hazelcast.internal.json.JsonObject;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.net.URL;
+import java.util.List;
+import java.util.Set;
+
 @RunWith(JUnit4.class)
 public class LogicalDagGeneratorTest {
     @Test
     public void testLogicalGenerator() {
         Common.setDeployMode(DeployMode.CLIENT);
-        String filePath  = TestUtils.getResource("/fakesource_to_file_complex.conf");
+        String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
         JobConfig jobConfig = new JobConfig();
         jobConfig.setBoundedness(Boundedness.BOUNDED);
         jobConfig.setName("fake_to_file");
-        JobExecutionEnvironment jobExecutionEnv = new JobExecutionEnvironment(jobConfig, filePath);
-        jobExecutionEnv.addAction(jobExecutionEnv.getJobConfigParser().parse());
 
-        LogicalDagGenerator logicalDagGenerator = jobExecutionEnv.getLogicalDagGenerator();
+        IdGenerator idGenerator = new IdGenerator();
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = new JobConfigParser(filePath, idGenerator).parse();
+
+        LogicalDagGenerator logicalDagGenerator =
+            new LogicalDagGenerator(immutablePair.getLeft(), jobConfig, idGenerator);
         LogicalDag logicalDag = logicalDagGenerator.generate();
         JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
         String result =
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 0870fa08c..51fd02134 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.seatunnel.engine.client;
 
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
 
 import com.google.common.collect.Lists;
@@ -37,14 +42,15 @@ public class SeaTunnelClientTest {
         Config config = new Config();
         config.getSecurityConfig().setEnabled(false);
         config.getJetConfig().setEnabled(false);
+        config.setClusterName(Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
         config.getNetworkConfig().setPort(50001);
-        HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new SeaTunnelNodeContext());
+        HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(),
+            new SeaTunnelNodeContext());
     }
 
     @Test
     public void testSayHello() {
         SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig();
-        seaTunnelClientConfig.setClusterName("dev");
         seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:50001"));
         SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig);
 
@@ -52,4 +58,20 @@ public class SeaTunnelClientTest {
         String s = engineClient.printMessageToMaster(msg);
         Assert.assertEquals(msg, s);
     }
+
+    @Test
+    public void testExecuteJob() {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setBoundedness(Boundedness.BOUNDED);
+        jobConfig.setName("fake_to_file");
+
+        SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig();
+        seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:50001"));
+        SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig);
+        JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
+
+        JobProxy jobProxy = jobExecutionEnv.execute();
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index cb0795ee4..615b6ca48 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -19,4 +19,8 @@ package org.apache.seatunnel.engine.common;
 
 public class Constant {
     public static final String SEATUNNEL_SERVICE_NAME = "st:impl:seaTunnelServer";
+
+    public static final String SEATUNNEL_ID_GENERATOR_NAME = "SeaTunnelIdGenerator";
+
+    public static final String DEFAULT_SEATUNNEL_CLUSTER_NAME = "SeaTunnel";
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/ConfigDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/ConfigDataSerializerHook.java
index a5758a515..120b9b037 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/ConfigDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/ConfigDataSerializerHook.java
@@ -17,9 +17,10 @@
 
 package org.apache.seatunnel.engine.common.serializeable;
 
+import org.apache.seatunnel.engine.common.config.JobConfig;
+
 import com.hazelcast.internal.serialization.DataSerializerHook;
 import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
-import com.hazelcast.jet.config.JobConfig;
 import com.hazelcast.nio.serialization.DataSerializableFactory;
 import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
index af9b504b7..7e1ed660e 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.common.utils;
 
+import org.apache.seatunnel.engine.common.exception.JobDefineCheckExceptionSeaTunnel;
 import org.apache.seatunnel.engine.common.exception.JobNotFoundExceptionSeaTunnel;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 
@@ -36,7 +37,8 @@ public final class ExceptionUtil {
 
     private static final List<ImmutableTriple<Integer, Class<? extends Throwable>, ClientExceptionFactory.ExceptionFactory>> EXCEPTIONS = Arrays.asList(
         new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START, SeaTunnelEngineException.class, SeaTunnelEngineException::new),
-        new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 1, JobNotFoundExceptionSeaTunnel.class, JobNotFoundExceptionSeaTunnel::new)
+        new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 1, JobNotFoundExceptionSeaTunnel.class, JobNotFoundExceptionSeaTunnel::new),
+        new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 2, JobDefineCheckExceptionSeaTunnel.class, JobDefineCheckExceptionSeaTunnel::new)
     );
 
     private ExceptionUtil() {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
index d85b2873f..0ede9cc6d 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.core.dag.actions;
 
 import lombok.NonNull;
 
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -30,15 +31,19 @@ public abstract class AbstractAction implements Action {
 
     private int parallelism = 1;
 
-    protected AbstractAction(int id, @NonNull String name, @NonNull List<Action> upstreams) {
+    private List<URL> jarUrls;
+
+    protected AbstractAction(int id, @NonNull String name, @NonNull List<Action> upstreams, @NonNull List<URL> jarUrls) {
         this.id = id;
         this.name = name;
         this.upstreams = upstreams;
+        this.jarUrls = jarUrls;
     }
 
-    protected AbstractAction(int id, @NonNull String name) {
+    protected AbstractAction(int id, @NonNull String name, @NonNull List<URL> jarUrls) {
         this.id = id;
         this.name = name;
+        this.jarUrls = jarUrls;
     }
 
     @NonNull
@@ -77,4 +82,9 @@ public abstract class AbstractAction implements Action {
     public int getId() {
         return id;
     }
+
+    @Override
+    public List<URL> getJarUrls() {
+        return jarUrls;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
index 86e39c62d..6abe84bf3 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.core.dag.actions;
 import lombok.NonNull;
 
 import java.io.Serializable;
+import java.net.URL;
 import java.util.List;
 
 public interface Action extends Serializable {
@@ -38,4 +39,6 @@ public interface Action extends Serializable {
     void setParallelism(int parallelism);
 
     int getId();
+
+    List<URL> getJarUrls();
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
index 25377c4bd..8806978a7 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
 
 import lombok.NonNull;
 
+import java.net.URL;
 import java.util.List;
 
 public class PartitionTransformAction extends AbstractAction {
@@ -29,15 +30,17 @@ public class PartitionTransformAction extends AbstractAction {
     public PartitionTransformAction(int id,
                                     @NonNull String name,
                                     @NonNull List<Action> upstreams,
-                                    @NonNull PartitionSeaTunnelTransform partitionTransformation) {
-        super(id, name, upstreams);
+                                    @NonNull PartitionSeaTunnelTransform partitionTransformation,
+                                    @NonNull List<URL> jarUrls) {
+        super(id, name, upstreams, jarUrls);
         this.partitionTransformation = partitionTransformation;
     }
 
     public PartitionTransformAction(int id,
                                     @NonNull String name,
-                                    @NonNull PartitionSeaTunnelTransform partitionTransformation) {
-        super(id, name);
+                                    @NonNull PartitionSeaTunnelTransform partitionTransformation,
+                                    @NonNull List<URL> jarUrls) {
+        super(id, name, jarUrls);
         this.partitionTransformation = partitionTransformation;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
index 8dac82faf..47c387dcd 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
 
 import lombok.NonNull;
 
+import java.net.URL;
 import java.util.List;
 
 @SuppressWarnings("checkstyle:ClassTypeParameterName")
@@ -30,15 +31,17 @@ public class SinkAction<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends
     public SinkAction(int id,
                       @NonNull String name,
                       @NonNull List<Action> upstreams,
-                      @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink) {
-        super(id, name, upstreams);
+                      @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
+                      @NonNull List<URL> jarUrls) {
+        super(id, name, upstreams, jarUrls);
         this.sink = sink;
     }
 
     public SinkAction(int id,
                       @NonNull String name,
-                      @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink) {
-        super(id, name);
+                      @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
+                      @NonNull List<URL> jarUrls) {
+        super(id, name, jarUrls);
         this.sink = sink;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
index 5599b0e5d..a5f834fe3 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
@@ -24,14 +24,17 @@ import com.google.common.collect.Lists;
 import lombok.NonNull;
 
 import java.io.Serializable;
+import java.net.URL;
+import java.util.List;
 
 public class SourceAction<T, SplitT extends SourceSplit, StateT extends Serializable> extends AbstractAction {
     private SeaTunnelSource<T, SplitT, StateT> source;
 
     public SourceAction(int id,
                         @NonNull String name,
-                        @NonNull SeaTunnelSource<T, SplitT, StateT> source) {
-        super(id, name, Lists.newArrayList());
+                        @NonNull SeaTunnelSource<T, SplitT, StateT> source,
+                        @NonNull List<URL> jarUrls) {
+        super(id, name, Lists.newArrayList(), jarUrls);
         this.source = source;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
index afea5986d..c4385a064 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 
 import lombok.NonNull;
 
+import java.net.URL;
 import java.util.List;
 
 public class TransformAction extends AbstractAction {
@@ -29,15 +30,17 @@ public class TransformAction extends AbstractAction {
     public TransformAction(int id,
                            @NonNull String name,
                            @NonNull List<Action> upstreams,
-                           @NonNull SeaTunnelTransform transformation) {
-        super(id, name, upstreams);
+                           @NonNull SeaTunnelTransform transformation,
+                           @NonNull List<URL> jarUrls) {
+        super(id, name, upstreams, jarUrls);
         this.transformation = transformation;
     }
 
     public TransformAction(int id,
                            @NonNull String name,
-                           @NonNull SeaTunnelTransform transformation) {
-        super(id, name);
+                           @NonNull SeaTunnelTransform transformation,
+                           @NonNull List<URL> jarUrls) {
+        super(id, name, jarUrls);
         this.transformation = transformation;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
similarity index 84%
copy from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index cb0795ee4..26b2e76ff 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -15,8 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.common;
+package org.apache.seatunnel.engine.core.job;
 
-public class Constant {
-    public static final String SEATUNNEL_SERVICE_NAME = "st:impl:seaTunnelServer";
+public interface Job {
+    long getJobId();
+
+    void submitJob();
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java
new file mode 100644
index 000000000..577d52e0c
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.job;
+
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
+
+import com.hazelcast.internal.nio.IOUtil;
+import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.NonNull;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+
+public class JobImmutableInformation implements IdentifiedDataSerializable {
+    private long jobId;
+
+    private long createTime;
+
+    private Data logicalDag;
+
+    private JobConfig jobConfig;
+
+    private List<URL> pluginJarsUrls;
+
+    public JobImmutableInformation() {
+    }
+
+    public JobImmutableInformation(long jobId, @NonNull Data logicalDag, @NonNull JobConfig jobConfig, @NonNull List<URL> pluginJarsUrls) {
+        this.createTime = System.currentTimeMillis();
+        this.jobId = jobId;
+        this.logicalDag = logicalDag;
+        this.jobConfig = jobConfig;
+        this.pluginJarsUrls = pluginJarsUrls;
+    }
+
+    public long getJobId() {
+        return jobId;
+    }
+
+    public long getCreateTime() {
+        return createTime;
+    }
+
+    public Data getLogicalDag() {
+        return logicalDag;
+    }
+
+    public JobConfig getJobConfig() {
+        return jobConfig;
+    }
+
+    public List<URL> getPluginJarsUrls() {
+        return pluginJarsUrls;
+    }
+
+    @Override
+    public int getFactoryId() {
+        return JobDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return JobDataSerializerHook.JOB_IMMUTABLE_INFORMATION;
+    }
+
+    @Override
+    public void writeData(ObjectDataOutput out) throws IOException {
+        out.writeLong(jobId);
+        out.writeLong(createTime);
+        IOUtil.writeData(out, logicalDag);
+        out.writeObject(jobConfig);
+        out.writeObject(pluginJarsUrls);
+
+    }
+
+    @Override
+    public void readData(ObjectDataInput in) throws IOException {
+        jobId = in.readLong();
+        createTime = in.readLong();
+        logicalDag = IOUtil.readData(in);
+        jobConfig = in.readObject();
+        pluginJarsUrls = in.readObject();
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
index 75060fc90..ec10e3d8a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
@@ -29,13 +29,14 @@ import com.hazelcast.client.impl.protocol.ClientMessage;
 import com.hazelcast.client.impl.protocol.Generated;
 import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;
 
-/**
+/*
  * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
  * To change this file, edit the templates or the protocol
  * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
  * and regenerate it.
  */
-@Generated("4d1cbb254e8eaad3c45fe22c57f29492")
+
+@Generated("c0a6d0c9d7eb912e8b10861931a0a695")
 public final class SeaTunnelPrintMessageCodec {
     //hex: 0xDE0100
     public static final int REQUEST_MESSAGE_TYPE = 14549248;
@@ -51,7 +52,8 @@ public final class SeaTunnelPrintMessageCodec {
         ClientMessage clientMessage = ClientMessage.createForEncode();
         clientMessage.setRetryable(false);
         clientMessage.setOperationName("SeaTunnel.PrintMessage");
-        ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        ClientMessage.Frame initialFrame =
+            new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
         encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
         encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
         clientMessage.add(initialFrame);
@@ -71,7 +73,8 @@ public final class SeaTunnelPrintMessageCodec {
 
     public static ClientMessage encodeResponse(java.lang.String response) {
         ClientMessage clientMessage = ClientMessage.createForEncode();
-        ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        ClientMessage.Frame initialFrame =
+            new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
         encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
         clientMessage.add(initialFrame);
 
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSubmitJobCodec.java
similarity index 67%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSubmitJobCodec.java
index 75060fc90..36ec48de4 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSubmitJobCodec.java
@@ -27,65 +27,57 @@ import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCod
 
 import com.hazelcast.client.impl.protocol.ClientMessage;
 import com.hazelcast.client.impl.protocol.Generated;
-import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;
+import com.hazelcast.client.impl.protocol.codec.builtin.DataCodec;
 
-/**
+/*
  * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
  * To change this file, edit the templates or the protocol
  * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
  * and regenerate it.
  */
-@Generated("4d1cbb254e8eaad3c45fe22c57f29492")
-public final class SeaTunnelPrintMessageCodec {
-    //hex: 0xDE0100
-    public static final int REQUEST_MESSAGE_TYPE = 14549248;
-    //hex: 0xDE0101
-    public static final int RESPONSE_MESSAGE_TYPE = 14549249;
+
+@Generated("f0fdf747fe01901765dedbe5a527bb0f")
+public final class SeaTunnelSubmitJobCodec {
+    //hex: 0xDE0200
+    public static final int REQUEST_MESSAGE_TYPE = 14549504;
+    //hex: 0xDE0201
+    public static final int RESPONSE_MESSAGE_TYPE = 14549505;
     private static final int REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
     private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
 
-    private SeaTunnelPrintMessageCodec() {
+    private SeaTunnelSubmitJobCodec() {
     }
 
-    public static ClientMessage encodeRequest(java.lang.String message) {
+    public static ClientMessage encodeRequest(com.hazelcast.internal.serialization.Data jobImmutableInformation) {
         ClientMessage clientMessage = ClientMessage.createForEncode();
         clientMessage.setRetryable(false);
-        clientMessage.setOperationName("SeaTunnel.PrintMessage");
-        ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        clientMessage.setOperationName("SeaTunnel.SubmitJob");
+        ClientMessage.Frame initialFrame =
+            new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
         encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
         encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
         clientMessage.add(initialFrame);
-        StringCodec.encode(clientMessage, message);
+        DataCodec.encode(clientMessage, jobImmutableInformation);
         return clientMessage;
     }
 
     /**
      *
      */
-    public static java.lang.String decodeRequest(ClientMessage clientMessage) {
+    public static com.hazelcast.internal.serialization.Data decodeRequest(ClientMessage clientMessage) {
         ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
         //empty initial frame
         iterator.next();
-        return StringCodec.decode(iterator);
+        return DataCodec.decode(iterator);
     }
 
-    public static ClientMessage encodeResponse(java.lang.String response) {
+    public static ClientMessage encodeResponse() {
         ClientMessage clientMessage = ClientMessage.createForEncode();
-        ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        ClientMessage.Frame initialFrame =
+            new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
         encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
         clientMessage.add(initialFrame);
 
-        StringCodec.encode(clientMessage, response);
         return clientMessage;
     }
-
-    /**
-     *
-     */
-    public static java.lang.String decodeResponse(ClientMessage clientMessage) {
-        ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
-        //empty initial frame
-        iterator.next();
-        return StringCodec.decode(iterator);
-    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
index 2a98be5f0..b6da4a3df 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConsta
 import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalEdge;
 import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalVertex;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
 import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -51,6 +52,11 @@ public final class JobDataSerializerHook implements DataSerializerHook {
      */
     public static final int LOGICAL_EDGE = 2;
 
+    /**
+     * Serialization ID of the {@link org.apache.seatunnel.engine.core.job.JobImmutableInformation} class.
+     */
+    public static final int JOB_IMMUTABLE_INFORMATION = 3;
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY,
         SeaTunnelFactoryIdConstant.SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY_ID
@@ -77,6 +83,8 @@ public final class JobDataSerializerHook implements DataSerializerHook {
                     return new LogicalVertex();
                 case LOGICAL_EDGE:
                     return new LogicalEdge();
+                case JOB_IMMUTABLE_INFORMATION:
+                    return new JobImmutableInformation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index a16ea482f..fb4b29e2b 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -39,3 +39,18 @@ methods:
           nullable: false
           since: 2.0
           doc: ''
+
+  - id: 2
+    name: submitJob
+    since: 2.0
+    doc: ''
+    request:
+      retryable: false
+      partitionIdentifier: -1
+      params:
+        - name: jobImmutableInformation
+          type: Data
+          nullable: false
+          since: 2.0
+          doc: ''
+    response: {}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 51a36f373..42c3cb157 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -17,10 +17,14 @@
 
 package org.apache.seatunnel.engine.server;
 
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+
 import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.internal.services.ManagedService;
 import com.hazelcast.internal.services.MembershipAwareService;
 import com.hazelcast.internal.services.MembershipServiceEvent;
+import com.hazelcast.jet.impl.LiveOperationRegistry;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.NodeEngineImpl;
@@ -28,17 +32,20 @@ import com.hazelcast.spi.impl.operationservice.LiveOperations;
 import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
 
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
 
 public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker {
     public static final String SERVICE_NAME = "st:impl:seaTunnelServer";
 
     private NodeEngineImpl nodeEngine;
     private final ILogger logger;
+    private final LiveOperationRegistry liveOperationRegistry;
 
     private TaskExecutionService taskExecutionService;
 
     public SeaTunnelServer(Node node) {
         this.logger = node.getLogger(getClass());
+        this.liveOperationRegistry = new LiveOperationRegistry();
         logger.info("SeaTunnel server start...");
     }
 
@@ -82,9 +89,35 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
     /**
      * Used for debugging on call
      */
-    public String printMessage(String message){
+    public String printMessage(String message) {
         this.logger.info(nodeEngine.getThisAddress() + ":" + message);
         return message;
     }
 
+    public LiveOperationRegistry getLiveOperationRegistry() {
+        return liveOperationRegistry;
+    }
+
+    /**
+     * call by client to submit job
+     */
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public CompletableFuture<Void> submitJob(Data jobImmutableInformation) {
+        // TODO Here we need new a JobMaster and run it.
+        JobImmutableInformation jobInformation = nodeEngine.getSerializationService().toObject(jobImmutableInformation);
+        logger.info("Job [" + jobInformation.getJobId() + "] submit");
+        logger.info("Job [" + jobInformation.getJobId() + "] jar urls " + jobInformation.getPluginJarsUrls());
+        CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
+        new Thread(() -> {
+            try {
+                Thread.sleep(2000);
+                logger.info("I am sleep 2000 ms");
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } finally {
+                voidCompletableFuture.complete(null);
+            }
+        }).start();
+        return voidCompletableFuture;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index e40e5d727..49db3f60c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.server;
 
+import org.apache.seatunnel.engine.common.Constant;
+
 import com.hazelcast.config.Config;
 import com.hazelcast.instance.impl.HazelcastInstanceFactory;
 
@@ -26,6 +28,7 @@ public class SeaTunnelServerStarter {
         Config config = new Config();
         config.getSecurityConfig().setEnabled(false);
         config.getJetConfig().setEnabled(false);
+        config.setClusterName(Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
         HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new SeaTunnelNodeContext());
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
new file mode 100644
index 000000000..21d6dc8b8
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import static com.hazelcast.jet.impl.util.ExceptionUtil.isRestartableException;
+import static com.hazelcast.jet.impl.util.ExceptionUtil.peel;
+import static com.hazelcast.jet.impl.util.ExceptionUtil.stackTraceToString;
+import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+import static com.hazelcast.spi.impl.operationservice.ExceptionAction.THROW_EXCEPTION;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
+
+import com.hazelcast.jet.JetException;
+import com.hazelcast.nio.serialization.HazelcastSerializationException;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.ExceptionAction;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Base class for async operations. Handles registration/deregistration of
+ * operations from live registry, exception handling and peeling and
+ * logging of exceptions
+ */
+public abstract class AsyncOperation extends Operation implements IdentifiedDataSerializable {
+
+    @Override
+    public void beforeRun() {
+        SeaTunnelServer service = getService();
+        service.getLiveOperationRegistry().register(this);
+    }
+
+    @Override
+    public final void run() {
+        CompletableFuture<?> future;
+        try {
+            future = doRun();
+        } catch (Exception e) {
+            logError(e);
+            doSendResponse(e);
+            return;
+        }
+        future.whenComplete(withTryCatch(getLogger(), (r, f) -> doSendResponse(f != null ? peel(f) : r)));
+    }
+
+    protected abstract CompletableFuture<?> doRun() throws Exception;
+
+    @Override
+    public final boolean returnsResponse() {
+        return false;
+    }
+
+    @Override
+    public final Object getResponse() {
+        throw new UnsupportedOperationException();
+    }
+
+    private void doSendResponse(Object value) {
+        try {
+            final SeaTunnelServer service = getService();
+            service.getLiveOperationRegistry().deregister(this);
+        } finally {
+            try {
+                sendResponse(value);
+            } catch (Exception e) {
+                Throwable ex = peel(e);
+                if (value instanceof Throwable && ex instanceof HazelcastSerializationException) {
+                    // Sometimes exceptions are not serializable, for example on
+                    // https://github.com/hazelcast/hazelcast-jet/issues/1995.
+                    // When sending exception as a response and the serialization fails,
+                    // the response will not be sent and the operation will hang.
+                    // To prevent this from happening, replace the exception with
+                    // another exception that can be serialized.
+                    sendResponse(new JetException(stackTraceToString(ex)));
+                } else {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    @Override
+    public ExceptionAction onInvocationException(Throwable throwable) {
+        return isRestartableException(throwable) ? THROW_EXCEPTION : super.onInvocationException(throwable);
+    }
+
+    @Override
+    public final int getFactoryId() {
+        return OperationDataSerializerHook.FACTORY_ID;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
new file mode 100644
index 000000000..689f697be
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
+
+import com.hazelcast.internal.nio.IOUtil;
+import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+public class SubmitJobOperation extends AsyncOperation {
+    private Data jobImmutableInformation;
+
+    public SubmitJobOperation() {
+    }
+
+    public SubmitJobOperation(Data jobImmutableInformation) {
+        this.jobImmutableInformation = jobImmutableInformation;
+    }
+
+    @Override
+    public int getClassId() {
+        return OperationDataSerializerHook.SUBMIT_OPERATOR;
+    }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        super.writeInternal(out);
+        IOUtil.writeData(out, jobImmutableInformation);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        super.readInternal(in);
+        jobImmutableInformation = IOUtil.readData(in);
+    }
+
+    @Override
+    protected CompletableFuture<?> doRun() throws Exception {
+        SeaTunnelServer seaTunnelServer = getService();
+        CompletableFuture<Void> voidCompletableFuture = seaTunnelServer.submitJob(jobImmutableInformation);
+        return voidCompletableFuture;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
similarity index 61%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
index 78dee775e..33c829f75 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
@@ -17,58 +17,46 @@
 
 package org.apache.seatunnel.engine.server.protocol.task;
 
-import org.apache.seatunnel.engine.common.Constant;
-import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
-import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
 
-import com.google.common.base.Function;
 import com.hazelcast.client.impl.protocol.ClientMessage;
 import com.hazelcast.client.impl.protocol.task.AbstractInvocationMessageTask;
 import com.hazelcast.cluster.Address;
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.spi.exception.RetryableHazelcastException;
 import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.security.Permission;
+import java.util.function.Function;
 
-public class PrintMessageTask extends AbstractInvocationMessageTask<String> {
+abstract class AbstractSeaTunnelMessageTask<P, R> extends AbstractInvocationMessageTask<P> {
+    private final Function<ClientMessage, P> decoder;
+    private final Function<R, ClientMessage> encoder;
 
-    protected PrintMessageTask(ClientMessage clientMessage, Node node, Connection connection) {
+    protected AbstractSeaTunnelMessageTask(ClientMessage clientMessage, Node node, Connection connection,
+                                           Function<ClientMessage, P> decoder, Function<R, ClientMessage> encoder) {
         super(clientMessage, node, connection);
-    }
 
-    @Override
-    protected InvocationBuilder getInvocationBuilder(Operation op) {
-        Address masterAddress = nodeEngine.getMasterAddress();
-        if (masterAddress == null) {
-            throw new RetryableHazelcastException("master not yet known");
-        }
-        return nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
-            op, masterAddress);
-    }
-
-    @Override
-    protected Operation prepareOperation() {
-        return new PrintMessageOperation(parameters);
+        this.decoder = decoder;
+        this.encoder = encoder;
     }
 
     @Override
-    protected String decodeClientMessage(ClientMessage clientMessage) {
-        Function<ClientMessage, String> decodeRequest = SeaTunnelPrintMessageCodec::decodeRequest;
-        return decodeRequest.apply(clientMessage);
+    public String getServiceName() {
+        return SeaTunnelServer.SERVICE_NAME;
     }
 
     @Override
-    protected ClientMessage encodeResponse(Object response) {
-        Function<String, ClientMessage> encodeResponse = SeaTunnelPrintMessageCodec::encodeResponse;
-        return encodeResponse.apply((String) response);
+    protected final P decodeClientMessage(ClientMessage clientMessage) {
+        return decoder.apply(clientMessage);
     }
 
     @Override
-    public String getServiceName() {
-        return Constant.SEATUNNEL_SERVICE_NAME;
+    protected ClientMessage encodeResponse(Object o) {
+        return encoder.apply((R) o);
     }
 
     @Override
@@ -81,13 +69,21 @@ public class PrintMessageTask extends AbstractInvocationMessageTask<String> {
         return null;
     }
 
-    @Override
-    public String getMethodName() {
-        return "printMessage";
+    protected <V> Data toData(V v) {
+        return nodeEngine.getSerializationService().toData(v);
     }
 
     @Override
-    public Object[] getParameters() {
-        return new Object[0];
+    protected InvocationBuilder getInvocationBuilder(Operation operation) {
+        Address masterAddress = nodeEngine.getMasterAddress();
+        if (masterAddress == null) {
+            throw new RetryableHazelcastException("master not yet known");
+        }
+        return nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME,
+            operation, masterAddress);
+    }
+
+    protected SeaTunnelServer getSeaTunnelService() {
+        return getService(SeaTunnelServer.SERVICE_NAME);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java
index 78dee775e..a39dfeeeb 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java
@@ -17,36 +17,20 @@
 
 package org.apache.seatunnel.engine.server.protocol.task;
 
-import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
 import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
 
-import com.google.common.base.Function;
 import com.hazelcast.client.impl.protocol.ClientMessage;
-import com.hazelcast.client.impl.protocol.task.AbstractInvocationMessageTask;
-import com.hazelcast.cluster.Address;
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.internal.nio.Connection;
-import com.hazelcast.spi.exception.RetryableHazelcastException;
-import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
-import java.security.Permission;
-
-public class PrintMessageTask extends AbstractInvocationMessageTask<String> {
+public class PrintMessageTask extends AbstractSeaTunnelMessageTask<String, String> {
 
     protected PrintMessageTask(ClientMessage clientMessage, Node node, Connection connection) {
-        super(clientMessage, node, connection);
-    }
-
-    @Override
-    protected InvocationBuilder getInvocationBuilder(Operation op) {
-        Address masterAddress = nodeEngine.getMasterAddress();
-        if (masterAddress == null) {
-            throw new RetryableHazelcastException("master not yet known");
-        }
-        return nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
-            op, masterAddress);
+        super(clientMessage, node, connection,
+            SeaTunnelPrintMessageCodec::decodeRequest,
+            SeaTunnelPrintMessageCodec::encodeResponse);
     }
 
     @Override
@@ -54,33 +38,6 @@ public class PrintMessageTask extends AbstractInvocationMessageTask<String> {
         return new PrintMessageOperation(parameters);
     }
 
-    @Override
-    protected String decodeClientMessage(ClientMessage clientMessage) {
-        Function<ClientMessage, String> decodeRequest = SeaTunnelPrintMessageCodec::decodeRequest;
-        return decodeRequest.apply(clientMessage);
-    }
-
-    @Override
-    protected ClientMessage encodeResponse(Object response) {
-        Function<String, ClientMessage> encodeResponse = SeaTunnelPrintMessageCodec::encodeResponse;
-        return encodeResponse.apply((String) response);
-    }
-
-    @Override
-    public String getServiceName() {
-        return Constant.SEATUNNEL_SERVICE_NAME;
-    }
-
-    @Override
-    public Permission getRequiredPermission() {
-        return null;
-    }
-
-    @Override
-    public String getDistributedObjectName() {
-        return null;
-    }
-
     @Override
     public String getMethodName() {
         return "printMessage";
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
index 9b25ecd32..2b6c78488 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.protocol.task;
 
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
 
 import com.hazelcast.client.impl.protocol.MessageTaskFactory;
 import com.hazelcast.client.impl.protocol.MessageTaskFactoryProvider;
@@ -42,5 +43,6 @@ public class SeaTunnelMessageTaskFactoryProvider implements MessageTaskFactoryPr
 
     private void initFactories() {
         factories.put(SeaTunnelPrintMessageCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new PrintMessageTask(clientMessage, node, connection));
+        factories.put(SeaTunnelSubmitJobCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new SubmitJobTask(clientMessage, node, connection));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
similarity index 51%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
index 9b25ecd32..5cb228b3e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
@@ -17,30 +17,35 @@
 
 package org.apache.seatunnel.engine.server.protocol.task;
 
-import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
+import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
 
-import com.hazelcast.client.impl.protocol.MessageTaskFactory;
-import com.hazelcast.client.impl.protocol.MessageTaskFactoryProvider;
+import com.hazelcast.client.impl.protocol.ClientMessage;
 import com.hazelcast.instance.impl.Node;
-import com.hazelcast.internal.util.collection.Int2ObjectHashMap;
-import com.hazelcast.spi.impl.NodeEngine;
-import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.spi.impl.operationservice.Operation;
 
-public class SeaTunnelMessageTaskFactoryProvider implements MessageTaskFactoryProvider {
-    private final Int2ObjectHashMap<MessageTaskFactory> factories = new Int2ObjectHashMap<>(60);
-    public final Node node;
+public class SubmitJobTask extends AbstractSeaTunnelMessageTask<Data, Void> {
 
-    public SeaTunnelMessageTaskFactoryProvider(NodeEngine nodeEngine) {
-        this.node = ((NodeEngineImpl) nodeEngine).getNode();
-        initFactories();
+    protected SubmitJobTask(ClientMessage clientMessage, Node node, Connection connection) {
+        super(clientMessage, node, connection,
+            SeaTunnelSubmitJobCodec::decodeRequest,
+            x -> SeaTunnelSubmitJobCodec.encodeResponse());
     }
 
     @Override
-    public Int2ObjectHashMap<MessageTaskFactory> getFactories() {
-        return this.factories;
+    protected Operation prepareOperation() {
+        return new SubmitJobOperation(parameters);
     }
 
-    private void initFactories() {
-        factories.put(SeaTunnelPrintMessageCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new PrintMessageTask(clientMessage, node, connection));
+    @Override
+    public String getMethodName() {
+        return "submitJob";
+    }
+
+    @Override
+    public Object[] getParameters() {
+        return new Object[]{};
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index f56a68353..3153413d4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.serializable;
 
 import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
 import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
 import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -34,6 +35,7 @@ import com.hazelcast.spi.annotation.PrivateApi;
 @PrivateApi
 public final class OperationDataSerializerHook implements DataSerializerHook {
     public static final int PRINT_MESSAGE_OPERATOR = 0;
+    public static final int SUBMIT_OPERATOR = 1;
 
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
@@ -57,6 +59,8 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
             switch (typeId) {
                 case PRINT_MESSAGE_OPERATOR:
                     return new PrintMessageOperation();
+                case SUBMIT_OPERATOR:
+                    return new SubmitJobOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index 529049455..baacf1636 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -64,7 +64,7 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
     };
 
     protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> pluginJarPath =
-            new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+        new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
 
     public AbstractPluginDiscovery(String pluginSubDir, BiConsumer<ClassLoader, URL> addURLToClassloader) {
         this.pluginDir = Common.connectorJarDir(pluginSubDir);
@@ -80,10 +80,10 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
     @Override
     public List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers) {
         return pluginIdentifiers.stream()
-                .map(this::getPluginJarPath)
-                .filter(Optional::isPresent)
-                .map(Optional::get).distinct()
-                .collect(Collectors.toList());
+            .map(this::getPluginJarPath)
+            .filter(Optional::isPresent)
+            .map(Optional::get).distinct()
+            .collect(Collectors.toList());
     }
 
     @Override
@@ -109,13 +109,14 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
                 this.addURLToClassLoader.accept(classLoader, pluginJarPath.get());
             } catch (Exception e) {
                 LOGGER.warn("can't load jar use current thread classloader, use URLClassLoader instead now." +
-                        " message: " + e.getMessage());
-                classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
+                    " message: " + e.getMessage());
+                classLoader =
+                    new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
             }
             pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
             if (pluginInstance != null) {
                 LOGGER.info("Load plugin: {} from path: {} use classloader: {}",
-                        pluginIdentifier, pluginJarPath.get(), classLoader.getClass().getName());
+                    pluginIdentifier, pluginJarPath.get(), classLoader.getClass().getName());
                 return pluginInstance;
             }
         }
@@ -135,7 +136,8 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
             } else if (t instanceof PluginIdentifierInterface) {
                 // new api
                 PluginIdentifierInterface pluginIdentifierInstance = (PluginIdentifierInterface) t;
-                if (StringUtils.equalsIgnoreCase(pluginIdentifierInstance.getPluginName(), pluginIdentifier.getPluginName())) {
+                if (StringUtils.equalsIgnoreCase(pluginIdentifierInstance.getPluginName(),
+                    pluginIdentifier.getPluginName())) {
                     return (T) pluginIdentifierInstance;
                 }
             } else {
@@ -193,7 +195,8 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
         File[] targetPluginFiles = pluginDir.toFile().listFiles(new FileFilter() {
             @Override
             public boolean accept(File pathname) {
-                return pathname.getName().endsWith(".jar") && StringUtils.startsWithIgnoreCase(pathname.getName(), pluginJarPrefix);
+                return pathname.getName().endsWith(".jar") &&
+                    StringUtils.startsWithIgnoreCase(pathname.getName(), pluginJarPrefix);
             }
         });
         if (ArrayUtils.isEmpty(targetPluginFiles)) {