You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/07/29 05:22:11 UTC

[incubator-seatunnel] branch st-engine updated: [ST-Engine] Add LogicalDag Generator (#2284)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new d9d01a3df [ST-Engine] Add LogicalDag Generator (#2284)
d9d01a3df is described below

commit d9d01a3df907fc88294700dcf87766d712f57912
Author: Eric <ga...@gmail.com>
AuthorDate: Fri Jul 29 13:22:07 2022 +0800

    [ST-Engine] Add LogicalDag Generator (#2284)
    
    * Add logicaldag generator
    
    * merge from upstream
    
    * fix UT error
---
 .../api/transform/PartitionSeaTunnelTransform.java |   4 +-
 .../api/transform/SeaTunnelTransform.java          |  10 +-
 .../common/constants/CollectionConstants.java      |  16 ++
 seatunnel-engine/seatunnel-engine-client/pom.xml   |  17 ++
 .../engine/client/ConnectorInstanceLoader.java     |  78 ++++++
 .../seatunnel/engine/client/JobConfigParser.java   | 274 +++++++++++++++++++++
 .../engine/client/JobExecutionEnvironment.java     |  44 ++--
 .../seatunnel/engine/client/SeaTunnelClient.java   |  11 +-
 .../engine/client/SeaTunnelClientInstance.java     |   4 +-
 .../engine/client/JobConfigParserTest.java         |  70 ++++++
 .../engine/client/LogicalDagGeneratorTest.java     |  52 ++++
 .../apache/seatunnel/engine/client/TestUtils.java  |   9 +-
 .../src/test/resources/fakesource_to_file.conf     |  65 +++++
 .../test/resources/fakesource_to_file_complex.conf |  71 ++++++
 seatunnel-engine/seatunnel-engine-common/pom.xml   |   5 +
 .../seatunnel/engine/common/config/JobConfig.java  |  35 ++-
 .../JobDefineCheckExceptionSeaTunnel.java          |  12 +-
 .../serializeable/ConfigDataSerializerHook.java}   |  20 +-
 .../serializeable/SeaTunnelFactoryIdConstant.java} |  23 +-
 .../engine/common/utils/IdGenerator.java}          |  20 +-
 .../services/com.hazelcast.DataSerializerHook      |  18 ++
 .../engine/core/dag/actions/AbstractAction.java    |  80 ++++++
 .../seatunnel/engine/core/dag/actions/Action.java} |  26 +-
 .../core/dag/actions/PartitionTransformAction.java |  43 ++++
 .../engine/core/dag/actions/SinkAction.java        |  44 ++++
 .../engine/core/dag/actions/SourceAction.java}     |  24 +-
 .../engine/core/dag/actions/TransformAction.java}  |  30 ++-
 .../engine/core/dag/logicaldag/LogicalDag.java     | 158 ++++++++++++
 .../core/dag/logicaldag/LogicalDagGenerator.java   |  94 +++++++
 .../engine/core/dag/logicaldag/LogicalEdge.java    |  81 ++++++
 .../engine/core/dag/logicaldag/LogicalVertex.java  |  64 +++++
 .../core/serializable/JobDataSerializerHook.java}  |  44 +++-
 .../services/com.hazelcast.DataSerializerHook      |  18 ++
 .../serializable/OperationDataSerializerHook.java  |  14 +-
 .../SeaTunnelTransformPluginDiscovery.java         |  22 +-
 35 files changed, 1487 insertions(+), 113 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/PartitionSeaTunnelTransform.java
similarity index 87%
copy from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/PartitionSeaTunnelTransform.java
index d6cd39dc6..cccfbd716 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/PartitionSeaTunnelTransform.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.common.config;
+package org.apache.seatunnel.api.transform;
 
-public class JobConfig {
+public interface PartitionSeaTunnelTransform extends SeaTunnelTransform {
 }
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
similarity index 66%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
index 28f5dc4d7..4b3eb065f 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java
@@ -15,9 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.common.constants;
+package org.apache.seatunnel.api.transform;
 
-public class CollectionConstants {
+import org.apache.seatunnel.api.common.PluginIdentifierInterface;
+import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
+import org.apache.seatunnel.api.source.SeaTunnelContextAware;
 
-    public static final int MAP_SIZE = 6;
+import java.io.Serializable;
+
+public interface SeaTunnelTransform extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelContextAware {
 }
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
index 28f5dc4d7..fb387b605 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
@@ -20,4 +20,20 @@ package org.apache.seatunnel.common.constants;
 public class CollectionConstants {
 
     public static final int MAP_SIZE = 6;
+
+    public static final String PLUGIN_NAME = "plugin_name";
+
+    public static final String SEATUNNEL_PLUGIN = "seatunnel";
+
+    public static final String FLINK_PLUGIN = "flink";
+
+    public static final String SPARK_PLUGIN = "spark";
+
+    public static final String SOURCE_PLUGIN = "source";
+
+    public static final String TRANSFORM_PLUGIN = "transform";
+
+    public static final String SINK_PLUGIN = "sink";
+
+    public static final String PARALLELISM = "parallelism";
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/pom.xml b/seatunnel-engine/seatunnel-engine-client/pom.xml
index 26c502f36..c82c6b56d 100644
--- a/seatunnel-engine/seatunnel-engine-client/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-client/pom.xml
@@ -39,6 +39,11 @@
             <artifactId>seatunnel-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-core-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-engine-core</artifactId>
@@ -50,6 +55,18 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-local</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.scala-lang</groupId>
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
new file mode 100644
index 000000000..7791cbec4
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.common.constants.CollectionConstants;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import scala.Serializable;
+
+public class ConnectorInstanceLoader {
+    public static SeaTunnelSource loadSourceInstance(Config sourceConfig) {
+        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
+        PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+            CollectionConstants.SEATUNNEL_PLUGIN,
+            CollectionConstants.SOURCE_PLUGIN,
+            sourceConfig.getString(CollectionConstants.PLUGIN_NAME));
+
+        SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
+        seaTunnelSource.prepare(sourceConfig);
+        seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
+        if (SeaTunnelContext.getContext().getJobMode() == JobMode.BATCH
+            && seaTunnelSource.getBoundedness() == org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
+            throw new UnsupportedOperationException(
+                String.format("'%s' source don't support off-line job.", seaTunnelSource.getPluginName()));
+        }
+        return seaTunnelSource;
+    }
+
+    public static SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> loadSinkInstance(
+        Config sinkConfig) {
+        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
+        PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+            CollectionConstants.SEATUNNEL_PLUGIN,
+            CollectionConstants.SINK_PLUGIN,
+            sinkConfig.getString(CollectionConstants.PLUGIN_NAME));
+        SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
+            sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
+        seaTunnelSink.prepare(sinkConfig);
+        seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+        return seaTunnelSink;
+    }
+
+    public static SeaTunnelTransform loadTransformInstance(Config transformConfig) {
+        SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery();
+        PluginIdentifier pluginIdentifier = PluginIdentifier.of(
+            CollectionConstants.SEATUNNEL_PLUGIN,
+            CollectionConstants.TRANSFORM_PLUGIN,
+            transformConfig.getString(CollectionConstants.PLUGIN_NAME));
+        SeaTunnelTransform seaTunnelTransform = transformPluginDiscovery.createPluginInstance(pluginIdentifier);
+        return seaTunnelTransform;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
new file mode 100644
index 000000000..7ddedd792
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.apis.base.plugin.Plugin;
+import org.apache.seatunnel.common.constants.CollectionConstants;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.engine.common.exception.JobDefineCheckExceptionSeaTunnel;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.Data;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.Serializable;
+
+@Data
+public class JobConfigParser {
+    private static final ILogger LOGGER = Logger.getLogger(JobConfigParser.class);
+    private String jobDefineFilePath;
+    private IdGenerator idGenerator;
+
+    private Map<Action, String> alreadyTransformActionMap = new HashMap<>();
+
+    private Map<String, List<Config>> transformResultTableNameMap = new HashMap<>();
+    private Map<String, List<Config>> transformSourceTableNameMap = new HashMap<>();
+
+    private Map<String, List<Config>> sourceResultTableNameMap = new HashMap<>();
+
+    protected JobConfigParser(@NonNull String jobDefineFilePath, @NonNull IdGenerator idGenerator) {
+        this.jobDefineFilePath = jobDefineFilePath;
+        this.idGenerator = idGenerator;
+    }
+
+    public List<Action> parse() {
+        Config seaTunnelJobConfig = new ConfigBuilder(Paths.get(jobDefineFilePath)).getConfig();
+        List<? extends Config> sinkConfigs = seaTunnelJobConfig.getConfigList("sink");
+        List<? extends Config> transformConfigs = seaTunnelJobConfig.getConfigList("transform");
+        List<? extends Config> sourceConfigs = seaTunnelJobConfig.getConfigList("source");
+
+        if (sinkConfigs.size() == 1 && sourceConfigs.size() == 1 & transformConfigs.size() <= 1) {
+            return sampleAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
+        } else {
+            return complexAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
+        }
+    }
+
+    /**
+     * If there are multiple sources or multiple transforms or multiple sink, We will rely on
+     * source_table_name and result_table_name to build actions pipeline.
+     * So in this case result_table_name is necessary for the Source Connector and all of
+     * result_table_name and source_table_name are necessary for Transform Connector.
+     * By the end, source_table_name is necessary for Sink Connector.
+     */
+    private List<Action> complexAnalyze(List<? extends Config> sourceConfigs,
+                                        List<? extends Config> transformConfigs,
+                                        List<? extends Config> sinkConfigs) {
+        initRelationMap(sourceConfigs, transformConfigs);
+
+        List<Action> actions = new ArrayList<>();
+        for (Config config : sinkConfigs) {
+            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
+                ConnectorInstanceLoader.loadSinkInstance(config);
+
+            SinkAction sinkAction =
+                new SinkAction(idGenerator.getNextId(), seaTunnelSink.getPluginName(), seaTunnelSink);
+            actions.add(sinkAction);
+            if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
+                throw new JobDefineCheckExceptionSeaTunnel(Plugin.SOURCE_TABLE_NAME
+                    + " must be set in the sink plugin config when the job have complex dependencies");
+            }
+            String sourceTableName = config.getString(Plugin.SOURCE_TABLE_NAME);
+            List<Config> transformConfigList = transformResultTableNameMap.get(sourceTableName);
+            if (CollectionUtils.isEmpty(transformConfigList)) {
+                sourceAnalyze(sourceTableName, sinkAction);
+            } else if (transformConfigList.size() > 1) {
+                throw new JobDefineCheckExceptionSeaTunnel("Only UnionTransform can have more than one upstream, "
+                    + sinkAction.getName()
+                    + " is not UnionTransform Connector");
+            } else {
+                transformAnalyze(sourceTableName, sinkAction);
+            }
+
+        }
+        return actions;
+    }
+
+    private void sourceAnalyze(String sourceTableName, Action action) {
+        List<Config> sourceConfigList = sourceResultTableNameMap.get(sourceTableName);
+        if (CollectionUtils.isEmpty(sourceConfigList)) {
+            throw new JobDefineCheckExceptionSeaTunnel(action.getName()
+                + " source table name [" + sourceTableName + "] can not be found");
+        }
+
+        // If a transform have more than one upstream action, the parallelism of this transform is the sum of the parallelism
+        // of its upstream action.
+        AtomicInteger totalParallelism = new AtomicInteger();
+        sourceConfigList.stream().forEach(sourceConfig -> {
+            SourceAction sourceAction =
+                new SourceAction(idGenerator.getNextId(),
+                    sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
+                    ConnectorInstanceLoader.loadSourceInstance(sourceConfig));
+            int sourceParallelism = getSourceParallelism(sourceConfig);
+            sourceAction.setParallelism(sourceParallelism);
+            totalParallelism.set(totalParallelism.get() + sourceParallelism);
+            action.addUpstream(sourceAction);
+            action.setParallelism(totalParallelism.get());
+        });
+    }
+
+    private void transformAnalyze(String sourceTableName, Action action) {
+        // find upstream transform node
+        List<Config> transformConfigList = transformResultTableNameMap.get(sourceTableName);
+        if (CollectionUtils.isEmpty(transformConfigList)) {
+            sourceAnalyze(sourceTableName, action);
+        } else {
+            AtomicInteger totalParallelism = new AtomicInteger();
+            transformConfigList.stream().forEach(config -> {
+                SeaTunnelTransform seaTunnelTransform = ConnectorInstanceLoader.loadTransformInstance(config);
+                TransformAction transformAction =
+                    new TransformAction(idGenerator.getNextId(), seaTunnelTransform.getPluginName(),
+                        seaTunnelTransform);
+                action.addUpstream(transformAction);
+                transformAnalyze(config.getString(Plugin.SOURCE_TABLE_NAME), transformAction);
+                totalParallelism.set(totalParallelism.get() + transformAction.getParallelism());
+                action.setParallelism(totalParallelism.get());
+            });
+        }
+    }
+
+    private void initRelationMap(List<? extends Config> sourceConfigs, List<? extends Config> transformConfigs) {
+        for (Config config : sourceConfigs) {
+            if (!config.hasPath(Plugin.RESULT_TABLE_NAME)) {
+                throw new JobDefineCheckExceptionSeaTunnel(Plugin.RESULT_TABLE_NAME
+                    + " must be set in the source plugin config when the job have complex dependencies");
+            }
+            String resultTableName = config.getString(Plugin.RESULT_TABLE_NAME);
+            if (sourceResultTableNameMap.get(resultTableName) == null) {
+                sourceResultTableNameMap.put(resultTableName, new ArrayList<>());
+            }
+            sourceResultTableNameMap.get(resultTableName).add(config);
+        }
+
+        for (Config config : transformConfigs) {
+            if (!config.hasPath(Plugin.RESULT_TABLE_NAME)) {
+                throw new JobDefineCheckExceptionSeaTunnel(Plugin.RESULT_TABLE_NAME
+                    + " must be set in the transform plugin config when the job have complex dependencies");
+            }
+
+            if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
+                throw new JobDefineCheckExceptionSeaTunnel(Plugin.SOURCE_TABLE_NAME
+                    + " must be set in the transform plugin config when the job have complex dependencies");
+            }
+            String resultTableName = config.getString(Plugin.RESULT_TABLE_NAME);
+            String sourceTableName = config.getString(Plugin.SOURCE_TABLE_NAME);
+
+            if (transformResultTableNameMap.get(resultTableName) == null) {
+                transformResultTableNameMap.put(resultTableName, new ArrayList<>());
+            }
+            transformResultTableNameMap.get(resultTableName).add(config);
+
+            if (transformSourceTableNameMap.get(sourceTableName) == null) {
+                transformSourceTableNameMap.put(sourceTableName, new ArrayList<>());
+            }
+            transformSourceTableNameMap.get(sourceTableName).add(config);
+
+        }
+    }
+
+    /**
+     * If there is only one Source and one Sink and at most one Transform, We simply build actions pipeline in the following order
+     * Source
+     * |
+     * Transform(If have)
+     * |
+     * Sink
+     */
+    private List<Action> sampleAnalyze(List<? extends Config> sourceConfigs,
+                                       List<? extends Config> transformConfigs,
+                                       List<? extends Config> sinkConfigs) {
+        SeaTunnelSource seaTunnelSource = ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0));
+        SourceAction sourceAction =
+            new SourceAction(idGenerator.getNextId(), seaTunnelSource.getPluginName(), seaTunnelSource);
+        sourceAction.setParallelism(getSourceParallelism(sourceConfigs.get(0)));
+        if (transformConfigs.size() != 0) {
+            SeaTunnelTransform seaTunnelTransform =
+                ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0));
+            TransformAction transformAction = new TransformAction(
+                idGenerator.getNextId(),
+                seaTunnelTransform.getPluginName(),
+                Lists.newArrayList(sourceAction),
+                seaTunnelTransform);
+
+            initTransformParallelism(transformConfigs, sourceAction, seaTunnelTransform, transformAction);
+
+            SeaTunnelSink seaTunnelSink = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0));
+            SinkAction sinkAction = new SinkAction(
+                idGenerator.getNextId(),
+                seaTunnelSink.getPluginName(),
+                Lists.newArrayList(transformAction),
+                seaTunnelSink
+            );
+            sinkAction.setParallelism(transformAction.getParallelism());
+            return Lists.newArrayList(sinkAction);
+        } else {
+            SeaTunnelSink seaTunnelSink = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0));
+            SinkAction sinkAction = new SinkAction(
+                idGenerator.getNextId(),
+                seaTunnelSink.getPluginName(),
+                Lists.newArrayList(sourceAction),
+                seaTunnelSink
+            );
+            sinkAction.setParallelism(sourceAction.getParallelism());
+            return Lists.newArrayList(sinkAction);
+        }
+    }
+
+    private void initTransformParallelism(List<? extends Config> transformConfigs, Action upstreamAction,
+                                          SeaTunnelTransform seaTunnelTransform, TransformAction transformAction) {
+        if ((seaTunnelTransform instanceof PartitionSeaTunnelTransform)
+            && transformConfigs.get(0).hasPath(CollectionConstants.PARALLELISM)) {
+            transformAction.setParallelism(transformConfigs
+                .get(0)
+                .getInt(CollectionConstants.PARALLELISM));
+        } else {
+            // If transform type is not RePartitionTransform, Using the parallelism of its upstream operators.
+            transformAction.setParallelism(upstreamAction.getParallelism());
+        }
+    }
+
+    private int getSourceParallelism(Config sourceConfig) {
+        if (sourceConfig.hasPath(CollectionConstants.PARALLELISM)) {
+            int sourceParallelism = sourceConfig.getInt(CollectionConstants.PARALLELISM);
+            return sourceParallelism < 1 ? 1 : sourceParallelism;
+        }
+        return 1;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
index 9cc0bfb61..b00bbcc27 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
@@ -18,48 +18,48 @@
 
 package org.apache.seatunnel.engine.client;
 
-import org.apache.seatunnel.api.transform.Transformation;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDagGenerator;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class JobExecutionEnvironment {
 
-    private List<Transformation> transformations;
-
     private static String DEFAULT_JOB_NAME = "test_st_job";
 
-    private SeaTunnelClientConfig configuration;
-
-    private String jobName;
+    private JobConfig jobConfig;
 
     private int maxParallelism = 1;
 
-    public JobExecutionEnvironment(SeaTunnelClientConfig configuration) {
-        this.configuration = configuration;
-    }
+    private List<Action> actions = new ArrayList<>();
+
+    private String jobFilePath;
+
+    private IdGenerator idGenerator;
 
-    public void addTransformation(Transformation transformation) {
-        if (transformations == null) {
-            transformations = new ArrayList<>();
-        }
-        this.transformations.add(transformation);
+    public JobExecutionEnvironment(JobConfig jobConfig, String jobFilePath) {
+        this.jobConfig = jobConfig;
+        this.jobFilePath = jobFilePath;
+        this.idGenerator = new IdGenerator();
     }
 
-    public List<Transformation> getTransformations() {
-        return transformations;
+    public JobConfigParser getJobConfigParser() {
+        return new JobConfigParser(jobFilePath, idGenerator);
     }
 
-    public void setTransformations(List<Transformation> transformations) {
-        this.transformations = transformations;
+    public void addAction(List<Action> actions) {
+        this.actions.addAll(actions);
     }
 
-    public void setJobName(String jobName) {
-        this.jobName = jobName;
+    public LogicalDagGenerator getLogicalDagGenerator() {
+        return new LogicalDagGenerator(actions, jobConfig, idGenerator);
     }
 
-    public void setMaxParallelism(int maxParallelism) {
-        this.maxParallelism = maxParallelism;
+    public List<Action> getActions() {
+        return actions;
     }
 }
 
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 652883a7d..4221ed873 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.client;
 
+import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
 
@@ -40,7 +41,8 @@ public class SeaTunnelClient implements SeaTunnelClientInstance {
 
     public SeaTunnelClient(@NonNull SeaTunnelClientConfig seaTunnelClientConfig) {
         Preconditions.checkNotNull(seaTunnelClientConfig, "config");
-        this.hazelcastClient = ((HazelcastClientProxy) HazelcastClient.newHazelcastClient(seaTunnelClientConfig)).client;
+        this.hazelcastClient =
+            ((HazelcastClientProxy) HazelcastClient.newHazelcastClient(seaTunnelClientConfig)).client;
         this.serializationService = hazelcastClient.getSerializationService();
         ExceptionUtil.registerSeaTunnelExceptions(hazelcastClient.getClientExceptionFactory());
     }
@@ -52,9 +54,10 @@ public class SeaTunnelClient implements SeaTunnelClientInstance {
     }
 
     @Override
-    public JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config) {
-        // TODO analyze job config file and create LocalExecutionContext
-        return null;
+    public JobExecutionEnvironment createExecutionContext(@NonNull String filePath, JobConfig jobConfig) {
+        JobExecutionEnvironment jobExecutionEnv = new JobExecutionEnvironment(jobConfig, filePath);
+        jobExecutionEnv.addAction(jobExecutionEnv.getJobConfigParser().parse());
+        return jobExecutionEnv;
     }
 
     public ILogger getLogger() {
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 34ca898d9..1e0db46dd 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.client;
 
+import org.apache.seatunnel.engine.common.config.JobConfig;
+
 import com.hazelcast.core.HazelcastInstance;
 import lombok.NonNull;
 
@@ -29,5 +31,5 @@ public interface SeaTunnelClientInstance {
     @NonNull
     HazelcastInstance getHazelcastInstance();
 
-    JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config);
+    JobExecutionEnvironment createExecutionContext(String filePath, JobConfig config);
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
new file mode 100644
index 000000000..f8fd9c873
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.List;
+
+@RunWith(JUnit4.class)
+public class JobConfigParserTest {
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Test
+    public void testSimpleJobParse() {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = TestUtils.getResource("/fakesource_to_file.conf");
+        JobConfigParser jobConfigParser = new JobConfigParser(filePath, new IdGenerator());
+        List<Action> actions = jobConfigParser.parse();
+        Assert.assertEquals(1, actions.size());
+
+        Assert.assertEquals("LocalFile", actions.get(0).getName());
+        Assert.assertEquals(1, actions.get(0).getUpstream().size());
+        Assert.assertEquals("FakeSource", actions.get(0).getUpstream().get(0).getName());
+
+        Assert.assertEquals(3, actions.get(0).getUpstream().get(0).getParallelism());
+        Assert.assertEquals(3, actions.get(0).getParallelism());
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Test
+    public void testComplexJobParse() {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+        JobConfigParser jobConfigParser = new JobConfigParser(filePath, new IdGenerator());
+        List<Action> actions = jobConfigParser.parse();
+        Assert.assertEquals(1, actions.size());
+
+        Assert.assertEquals("LocalFile", actions.get(0).getName());
+        Assert.assertEquals(2, actions.get(0).getUpstream().size());
+        Assert.assertEquals("FakeSource", actions.get(0).getUpstream().get(0).getName());
+        Assert.assertEquals("FakeSource", actions.get(0).getUpstream().get(1).getName());
+
+        Assert.assertEquals(3, actions.get(0).getUpstream().get(0).getParallelism());
+        Assert.assertEquals(3, actions.get(0).getUpstream().get(1).getParallelism());
+        Assert.assertEquals(6, actions.get(0).getParallelism());
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
new file mode 100644
index 000000000..856c88aec
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDagGenerator;
+
+import com.hazelcast.internal.json.JsonObject;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class LogicalDagGeneratorTest {
+    @Test
+    public void testLogicalGenerator() {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath  = TestUtils.getResource("/fakesource_to_file_complex.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setBoundedness(Boundedness.BOUNDED);
+        jobConfig.setName("fake_to_file");
+        JobExecutionEnvironment jobExecutionEnv = new JobExecutionEnvironment(jobConfig, filePath);
+        jobExecutionEnv.addAction(jobExecutionEnv.getJobConfigParser().parse());
+
+        LogicalDagGenerator logicalDagGenerator = jobExecutionEnv.getLogicalDagGenerator();
+        LogicalDag logicalDag = logicalDagGenerator.generate();
+        JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
+        String result =
+            "{\"vertices\":[{\"id\":2,\"name\":\"FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"FakeSource(id=3)\",\"parallelism\":3},{\"id\":1,\"name\":\"LocalFile(id=1)\",\"parallelism\":6}],\"edges\":[{\"leftVertex\":\"FakeSource\",\"rightVertex\":\"LocalFile\"},{\"leftVertex\":\"FakeSource\",\"rightVertex\":\"LocalFile\"}]}";
+        Assert.assertEquals(result, logicalDagJson.toString());
+    }
+}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java
similarity index 79%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
copy to seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java
index 28f5dc4d7..c392fdaf5 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.common.constants;
+package org.apache.seatunnel.engine.client;
 
-public class CollectionConstants {
-
-    public static final int MAP_SIZE = 6;
+public class TestUtils {
+    public static String getResource(String confFile) {
+        return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf
new file mode 100644
index 000000000..c6f0f7bbf
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  execution.checkpoint.interval = 5000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+      field_name = "name,age",
+      parallelism = 3
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  LocalFile {
+    path="file:///tmp/hive/warehouse/test2"
+    field_delimiter="\t"
+    row_delimiter="\n"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="text"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+    save_mode="error"
+
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf
new file mode 100644
index 000000000..dc73c2f09
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  execution.checkpoint.interval = 5000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+      field_name = "name,age",
+      parallelism = 3
+    }
+
+    FakeSource {
+      result_table_name = "fake"
+      field_name = "name,age",
+      parallelism = 3
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  LocalFile {
+    path="file:///tmp/hive/warehouse/test2"
+    field_delimiter="\t"
+    row_delimiter="\n"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="text"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+    save_mode="error",
+    source_table_name="fake"
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-common/pom.xml b/seatunnel-engine/seatunnel-engine-common/pom.xml
index c62993d55..afad57006 100644
--- a/seatunnel-engine/seatunnel-engine-common/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-common/pom.xml
@@ -42,5 +42,10 @@
             <artifactId>scala-library</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
index d6cd39dc6..a33814677 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
@@ -17,5 +17,38 @@
 
 package org.apache.seatunnel.engine.common.config;
 
-public class JobConfig {
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.engine.common.serializeable.ConfigDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.Data;
+
+import java.io.IOException;
+
+@Data
+public class JobConfig implements IdentifiedDataSerializable {
+    private String name;
+    private Boundedness boundedness;
+
+    @Override
+    public int getFactoryId() {
+        return ConfigDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return ConfigDataSerializerHook.JOB_CONFIG;
+    }
+
+    @Override
+    public void writeData(ObjectDataOutput out) throws IOException {
+        out.writeString(name);
+    }
+
+    @Override
+    public void readData(ObjectDataInput in) throws IOException {
+        this.name = in.readString();
+    }
 }
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
similarity index 70%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
index 28f5dc4d7..2d0e10fe6 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobDefineCheckExceptionSeaTunnel.java
@@ -15,9 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.common.constants;
+package org.apache.seatunnel.engine.common.exception;
 
-public class CollectionConstants {
+public class JobDefineCheckExceptionSeaTunnel extends SeaTunnelEngineException {
 
-    public static final int MAP_SIZE = 6;
+    public JobDefineCheckExceptionSeaTunnel(String message) {
+        super(message);
+    }
+
+    public JobDefineCheckExceptionSeaTunnel(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/ConfigDataSerializerHook.java
similarity index 75%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/ConfigDataSerializerHook.java
index 807602605..a5758a515 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/ConfigDataSerializerHook.java
@@ -15,21 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.serializable;
-
-import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+package org.apache.seatunnel.engine.common.serializeable;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
 import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
+import com.hazelcast.jet.config.JobConfig;
 import com.hazelcast.nio.serialization.DataSerializableFactory;
 import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 
-public class OperationDataSerializerHook implements DataSerializerHook {
-    public static final int PRINT_MESSAGE_OPERATOR = 0;
+public class ConfigDataSerializerHook implements DataSerializerHook {
+    /**
+     * Serialization ID of the {@link org.apache.seatunnel.engine.common.config.JobConfig} class.
+     */
+    public static final int JOB_CONFIG = 0;
 
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
-        OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
-        OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
+        SeaTunnelFactoryIdConstant.SEATUNNEL_CONFIG_DATA_SERIALIZER_FACTORY,
+        SeaTunnelFactoryIdConstant.SEATUNNEL_CONFIG_DATA_SERIALIZER_FACTORY_ID
     );
 
     @Override
@@ -47,8 +49,8 @@ public class OperationDataSerializerHook implements DataSerializerHook {
         @Override
         public IdentifiedDataSerializable create(int typeId) {
             switch (typeId) {
-                case PRINT_MESSAGE_OPERATOR:
-                    return new PrintMessageOperation();
+                case JOB_CONFIG:
+                    return new JobConfig();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
similarity index 57%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
index e062bf67d..9cfcc54f8 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
@@ -15,15 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.serializable;
+package org.apache.seatunnel.engine.common.serializeable;
 
 /**
  * Constants used for Hazelcast's {@link com.hazelcast.nio.serialization.IdentifiedDataSerializable}
  * mechanism.
  */
-public class OperationFactoryIdConstant {
-    /** Name of the system property that specifies SeaTunnelEngine's data serialization factory ID. */
-    public static final String SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY = "hazelcast.serialization.ds.seatunnel.engine.operation";
-    /** Default ID of SeaTunnelEngine's data serialization factory. */
+public final class SeaTunnelFactoryIdConstant {
+    /**
+     * Name of the system property that specifies SeaTunnelEngine's data serialization factory ID.
+     */
+    public static final String SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY =
+        "hazelcast.serialization.ds.seatunnel.engine.operation";
+    /**
+     * Default ID of SeaTunnelEngine's data serialization factory.
+     */
     public static final int SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID = -30001;
+
+    public static final String SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY =
+        "hazelcast.serialization.ds.seatunnel.engine.job";
+    public static final int SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY_ID = -30002;
+
+    public static final String SEATUNNEL_CONFIG_DATA_SERIALIZER_FACTORY =
+        "hazelcast.serialization.ds.seatunnel.engine.config";
+    public static final int SEATUNNEL_CONFIG_DATA_SERIALIZER_FACTORY_ID = -30003;
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
similarity index 57%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java
rename to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
index e062bf67d..e05cbfa4d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
@@ -15,15 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.serializable;
+package org.apache.seatunnel.engine.common.utils;
+
+import java.io.Serializable;
 
 /**
- * Constants used for Hazelcast's {@link com.hazelcast.nio.serialization.IdentifiedDataSerializable}
- * mechanism.
+ * It is used to generate the ID of each vertex in DAG. We just need to ensure that the id of all Vertices in a DAG are
+ * unique.
  */
-public class OperationFactoryIdConstant {
-    /** Name of the system property that specifies SeaTunnelEngine's data serialization factory ID. */
-    public static final String SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY = "hazelcast.serialization.ds.seatunnel.engine.operation";
-    /** Default ID of SeaTunnelEngine's data serialization factory. */
-    public static final int SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID = -30001;
+public class IdGenerator implements Serializable {
+    private int id = 0;
+
+    public int getNextId() {
+        id++;
+        return id;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook b/seatunnel-engine/seatunnel-engine-common/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
new file mode 100644
index 000000000..db10e7aa3
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.engine.common.serializeable.ConfigDataSerializerHook
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
new file mode 100644
index 000000000..d85b2873f
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import lombok.NonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class AbstractAction implements Action {
+    private String name;
+    private List<Action> upstreams = new ArrayList<>();
+    // This is used to assign a unique ID to every Action
+    private int id;
+
+    private int parallelism = 1;
+
+    protected AbstractAction(int id, @NonNull String name, @NonNull List<Action> upstreams) {
+        this.id = id;
+        this.name = name;
+        this.upstreams = upstreams;
+    }
+
+    protected AbstractAction(int id, @NonNull String name) {
+        this.id = id;
+        this.name = name;
+    }
+
+    @NonNull
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public void setName(@NonNull String name) {
+        this.name = name;
+    }
+
+    @NonNull
+    @Override
+    public List<Action> getUpstream() {
+        return upstreams;
+    }
+
+    @Override
+    public void addUpstream(@NonNull Action action) {
+        this.upstreams.add(action);
+    }
+
+    @Override
+    public int getParallelism() {
+        return parallelism;
+    }
+
+    @Override
+    public void setParallelism(int parallelism) {
+        this.parallelism = parallelism;
+    }
+
+    @Override
+    public int getId() {
+        return id;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
similarity index 65%
copy from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
index 34ca898d9..86e39c62d 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
@@ -15,19 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.core.dag.actions;
 
-import com.hazelcast.core.HazelcastInstance;
 import lombok.NonNull;
 
-public interface SeaTunnelClientInstance {
+import java.io.Serializable;
+import java.util.List;
 
-    /**
-     * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
-     * be a client, depending on the type of this
-     */
+public interface Action extends Serializable {
     @NonNull
-    HazelcastInstance getHazelcastInstance();
+    String getName();
 
-    JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config);
+    void setName(@NonNull String name);
+
+    @NonNull
+    List<Action> getUpstream();
+
+    void addUpstream(@NonNull Action action);
+
+    int getParallelism();
+
+    void setParallelism(int parallelism);
+
+    int getId();
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
new file mode 100644
index 000000000..25377c4bd
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
+
+import lombok.NonNull;
+
+import java.util.List;
+
+public class PartitionTransformAction extends AbstractAction {
+    private PartitionSeaTunnelTransform partitionTransformation;
+
+    public PartitionTransformAction(int id,
+                                    @NonNull String name,
+                                    @NonNull List<Action> upstreams,
+                                    @NonNull PartitionSeaTunnelTransform partitionTransformation) {
+        super(id, name, upstreams);
+        this.partitionTransformation = partitionTransformation;
+    }
+
+    public PartitionTransformAction(int id,
+                                    @NonNull String name,
+                                    @NonNull PartitionSeaTunnelTransform partitionTransformation) {
+        super(id, name);
+        this.partitionTransformation = partitionTransformation;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
new file mode 100644
index 000000000..8dac82faf
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+
+import lombok.NonNull;
+
+import java.util.List;
+
+@SuppressWarnings("checkstyle:ClassTypeParameterName")
+public class SinkAction<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends AbstractAction {
+    private SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
+
+    public SinkAction(int id,
+                      @NonNull String name,
+                      @NonNull List<Action> upstreams,
+                      @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink) {
+        super(id, name, upstreams);
+        this.sink = sink;
+    }
+
+    public SinkAction(int id,
+                      @NonNull String name,
+                      @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink) {
+        super(id, name);
+        this.sink = sink;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
similarity index 56%
copy from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
index 34ca898d9..5599b0e5d 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
@@ -15,19 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.core.dag.actions;
 
-import com.hazelcast.core.HazelcastInstance;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import com.google.common.collect.Lists;
 import lombok.NonNull;
 
-public interface SeaTunnelClientInstance {
+import java.io.Serializable;
 
-    /**
-     * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
-     * be a client, depending on the type of this
-     */
-    @NonNull
-    HazelcastInstance getHazelcastInstance();
+public class SourceAction<T, SplitT extends SourceSplit, StateT extends Serializable> extends AbstractAction {
+    private SeaTunnelSource<T, SplitT, StateT> source;
 
-    JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config);
+    public SourceAction(int id,
+                        @NonNull String name,
+                        @NonNull SeaTunnelSource<T, SplitT, StateT> source) {
+        super(id, name, Lists.newArrayList());
+        this.source = source;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
similarity index 51%
copy from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
index 34ca898d9..afea5986d 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
@@ -15,19 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 
-import com.hazelcast.core.HazelcastInstance;
 import lombok.NonNull;
 
-public interface SeaTunnelClientInstance {
+import java.util.List;
+
+public class TransformAction extends AbstractAction {
+    private SeaTunnelTransform transformation;
 
-    /**
-     * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
-     * be a client, depending on the type of this
-     */
-    @NonNull
-    HazelcastInstance getHazelcastInstance();
+    public TransformAction(int id,
+                           @NonNull String name,
+                           @NonNull List<Action> upstreams,
+                           @NonNull SeaTunnelTransform transformation) {
+        super(id, name, upstreams);
+        this.transformation = transformation;
+    }
 
-    JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config);
+    public TransformAction(int id,
+                           @NonNull String name,
+                           @NonNull SeaTunnelTransform transformation) {
+        super(id, name);
+        this.transformation = transformation;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDag.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDag.java
new file mode 100644
index 000000000..ed4f3f2fc
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDag.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.logicaldag;
+
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
+
+import com.hazelcast.internal.json.JsonArray;
+import com.hazelcast.internal.json.JsonObject;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A LogicalDag describe the logical plan run by SeaTunnel Engine
+ * {@link LogicalVertex} defines an operator, and {@link LogicalEdge} defines the
+ * relationship between the two operators.
+ * <p>
+ * {@link LogicalVertex} not a final executable object. It will be optimized when
+ * generate PhysicalDag in JobMaster.
+ * <p>
+ * There are three basic kinds of vertices:
+ * <ol><li>
+ *     <em>SeaTunnelSource</em> with just outbound edges;
+ * </li><li>
+ *     <em>SeaTunnelTransform</em> with both inbound and outbound edges;
+ * </li><li>
+ *     <em>SeaTunnelSink</em> with just inbound edges.
+ * </li></ol>
+ * Data travels from sources to sinks and is transformed and reshaped
+ * as it passes through the processors.
+ */
+public class LogicalDag implements IdentifiedDataSerializable {
+    private static final Logger LOG = LoggerFactory.getLogger(LogicalDag.class);
+    private JobConfig jobConfig;
+    private Set<LogicalEdge> edges = new LinkedHashSet<>();
+    private Map<Integer, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();
+    private IdGenerator idGenerator;
+
+    public LogicalDag() {
+    }
+
+    public LogicalDag(@NonNull JobConfig jobConfig,
+                      @NonNull IdGenerator idGenerator) {
+        this.jobConfig = jobConfig;
+        this.idGenerator = idGenerator;
+    }
+
+    @Override
+    public int getFactoryId() {
+        return JobDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return JobDataSerializerHook.LOGICAL_DAG;
+    }
+
+    public void addLogicalVertex(LogicalVertex logicalVertex) {
+        logicalVertexMap.put(logicalVertex.getVertexId(), logicalVertex);
+    }
+
+    public void addEdge(LogicalEdge logicalEdge) {
+        edges.add(logicalEdge);
+    }
+
+    @Override
+    public void writeData(ObjectDataOutput out) throws IOException {
+        out.writeInt(logicalVertexMap.size());
+
+        for (Map.Entry<Integer, LogicalVertex> entry : logicalVertexMap.entrySet()) {
+            out.writeInt(entry.getKey());
+            out.writeObject(entry.getValue());
+        }
+
+        out.writeInt(edges.size());
+
+        for (LogicalEdge edge : edges) {
+            out.writeObject(edge);
+        }
+
+        out.writeObject(jobConfig);
+        out.writeObject(idGenerator);
+    }
+
+    @Override
+    public void readData(ObjectDataInput in) throws IOException {
+        int vertexCount = in.readInt();
+
+        for (int i = 0; i < vertexCount; i++) {
+            Integer key = in.readInt();
+            LogicalVertex value = in.readObject();
+            logicalVertexMap.put(key, value);
+        }
+
+        int edgeCount = in.readInt();
+
+        for (int i = 0; i < edgeCount; i++) {
+            LogicalEdge edge = in.readObject();
+            edge.recoveryFromVertexMap(logicalVertexMap);
+            edges.add(edge);
+        }
+
+        jobConfig = in.readObject();
+        idGenerator = in.readObject();
+    }
+
+    @NonNull
+    public JsonObject getLogicalDagAsJson() {
+        JsonObject logicalDag = new JsonObject();
+        JsonArray vertices = new JsonArray();
+
+        logicalVertexMap.values().stream().forEach(v -> {
+            JsonObject vertex = new JsonObject();
+            vertex.add("id", v.getVertexId());
+            vertex.add("name", v.getAction().getName() + "(id=" + v.getVertexId() + ")");
+            vertex.add("parallelism", v.getParallelism());
+            vertices.add(vertex);
+        });
+        logicalDag.add("vertices", vertices);
+
+        JsonArray edges = new JsonArray();
+        this.edges.stream().forEach(e -> {
+            JsonObject edge = new JsonObject();
+            edge.add("leftVertex", e.getLeftVertex().getAction().getName());
+            edge.add("rightVertex", e.getRightVertex().getAction().getName());
+            edges.add(edge);
+        });
+
+        logicalDag.add("edges", edges);
+        return logicalDag;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDagGenerator.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDagGenerator.java
new file mode 100644
index 000000000..a08c48301
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDagGenerator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.logicaldag;
+
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class LogicalDagGenerator {
+    private static final ILogger LOGGER = Logger.getLogger(LogicalDagGenerator.class);
+    private List<Action> actions;
+    private LogicalDag logicalDag;
+    private JobConfig jobConfig;
+    private IdGenerator idGenerator;
+
+    private Map<Action, Collection<Integer>> alreadyTransformed = new HashMap<>();
+
+    private Map<Integer, LogicalVertex> logicalIdVertexMap = new HashMap<>();
+
+    public LogicalDagGenerator(@NonNull List<Action> actions,
+                               @NonNull JobConfig jobConfig,
+                               @NonNull IdGenerator idGenerator) {
+        this.actions = actions;
+        this.jobConfig = jobConfig;
+        this.idGenerator = idGenerator;
+        if (actions.size() <= 0) {
+            throw new IllegalStateException("No actions define in the job. Cannot execute.");
+        }
+    }
+
+    public LogicalDag generate() {
+        logicalDag = new LogicalDag(jobConfig, idGenerator);
+        for (Action action : actions) {
+            transformAction(action);
+        }
+        return logicalDag;
+    }
+
+    private Collection<Integer> transformAction(Action action) {
+        if (alreadyTransformed.containsKey(action)) {
+            return alreadyTransformed.get(action);
+        }
+
+        Collection<Integer> upstreamVertexIds = new ArrayList<>();
+        List<Action> upstream = action.getUpstream();
+        if (!CollectionUtils.isEmpty(upstream)) {
+            for (Action upstreamAction : upstream) {
+                upstreamVertexIds.addAll(transformAction(upstreamAction));
+            }
+        }
+
+        LogicalVertex logicalVertex =
+            new LogicalVertex(action.getId(), action, action.getParallelism());
+        logicalDag.addLogicalVertex(logicalVertex);
+        Collection<Integer> transformedActions = Lists.newArrayList(logicalVertex.getVertexId());
+        alreadyTransformed.put(action, transformedActions);
+        logicalIdVertexMap.put(logicalVertex.getVertexId(), logicalVertex);
+
+        if (!CollectionUtils.isEmpty(upstreamVertexIds)) {
+            upstreamVertexIds.stream().forEach(id -> {
+                LogicalEdge logicalEdge = new LogicalEdge(logicalIdVertexMap.get(id), logicalVertex);
+                logicalDag.addEdge(logicalEdge);
+            });
+        }
+        return transformedActions;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalEdge.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalEdge.java
new file mode 100644
index 000000000..ac48b4e37
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalEdge.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.logicaldag;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.Data;
+import lombok.NonNull;
+
+import java.io.IOException;
+import java.util.Map;
+
+@Data
+public class LogicalEdge implements IdentifiedDataSerializable {
+    private LogicalVertex leftVertex;
+    private LogicalVertex rightVertex;
+
+    private Integer leftVertexId;
+
+    private Integer rightVertexId;
+
+    public LogicalEdge(){}
+
+    public LogicalEdge(LogicalVertex leftVertex, LogicalVertex rightVertex) {
+        this.leftVertex = leftVertex;
+        this.rightVertex = rightVertex;
+        this.leftVertexId = leftVertex.getVertexId();
+        this.rightVertexId = rightVertex.getVertexId();
+    }
+
+    public void recoveryFromVertexMap(@NonNull Map<Integer, LogicalVertex> vertexMap) {
+        leftVertex = vertexMap.get(leftVertexId);
+        rightVertex = vertexMap.get(rightVertexId);
+
+        checkNotNull(leftVertex);
+        checkNotNull(rightVertex);
+    }
+
+    @Override
+    public int getFactoryId() {
+        return JobDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return JobDataSerializerHook.LOGICAL_EDGE;
+    }
+
+    @Override
+    public void writeData(ObjectDataOutput out) throws IOException {
+        // To prevent circular serialization, we only serialize the ID of vertices for edges
+        out.writeInt(leftVertexId);
+        out.writeInt(rightVertexId);
+    }
+
+    @Override
+    public void readData(ObjectDataInput in) throws IOException {
+        leftVertexId = in.readInt();
+        rightVertexId = in.readInt();
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
new file mode 100644
index 000000000..09a61c116
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.logicaldag;
+
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.IOException;
+
+@Data
+@AllArgsConstructor
+public class LogicalVertex implements IdentifiedDataSerializable {
+    private Integer vertexId;
+    private Action action;
+    private int parallelism;
+
+    public LogicalVertex() {
+    }
+
+    @Override
+    public int getFactoryId() {
+        return JobDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return JobDataSerializerHook.LOGICAL_VERTEX;
+    }
+
+    @Override
+    public void writeData(ObjectDataOutput out) throws IOException {
+        out.writeInt(vertexId);
+        out.writeObject(action);
+        out.writeInt(parallelism);
+    }
+
+    @Override
+    public void readData(ObjectDataInput in) throws IOException {
+        vertexId = in.readInt();
+        action = in.readObject();
+        parallelism = in.readInt();
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
similarity index 52%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
index 807602605..2a98be5f0 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
@@ -15,21 +15,45 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.serializable;
+package org.apache.seatunnel.engine.core.serializable;
 
-import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalEdge;
+import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalVertex;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
 import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
 import com.hazelcast.nio.serialization.DataSerializableFactory;
 import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.annotation.PrivateApi;
 
-public class OperationDataSerializerHook implements DataSerializerHook {
-    public static final int PRINT_MESSAGE_OPERATOR = 0;
+/**
+ * A Java Service Provider hook for Hazelcast's Identified Data Serializable
+ * mechanism. This is private API.
+ * All about the Job's data serializable define in this class.
+ */
+@PrivateApi
+public final class JobDataSerializerHook implements DataSerializerHook {
+
+    /**
+     * Serialization ID of the {@link org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag} class.
+     */
+    public static final int LOGICAL_DAG = 0;
+
+    /**
+     * Serialization ID of the {@link org.apache.seatunnel.engine.core.dag.logicaldag.LogicalVertex} class.
+     */
+    public static final int LOGICAL_VERTEX = 1;
+
+    /**
+     * Serialization ID of the {@link org.apache.seatunnel.engine.core.dag.logicaldag.LogicalEdge} class.
+     */
+    public static final int LOGICAL_EDGE = 2;
 
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
-        OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
-        OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
+        SeaTunnelFactoryIdConstant.SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY,
+        SeaTunnelFactoryIdConstant.SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY_ID
     );
 
     @Override
@@ -47,8 +71,12 @@ public class OperationDataSerializerHook implements DataSerializerHook {
         @Override
         public IdentifiedDataSerializable create(int typeId) {
             switch (typeId) {
-                case PRINT_MESSAGE_OPERATOR:
-                    return new PrintMessageOperation();
+                case LOGICAL_DAG:
+                    return new LogicalDag();
+                case LOGICAL_VERTEX:
+                    return new LogicalVertex();
+                case LOGICAL_EDGE:
+                    return new LogicalEdge();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook b/seatunnel-engine/seatunnel-engine-core/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
new file mode 100644
index 000000000..900587e31
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index 807602605..f56a68353 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -17,19 +17,27 @@
 
 package org.apache.seatunnel.engine.server.serializable;
 
+import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
 import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
 import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
 import com.hazelcast.nio.serialization.DataSerializableFactory;
 import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.annotation.PrivateApi;
 
-public class OperationDataSerializerHook implements DataSerializerHook {
+/**
+ * A Java Service Provider hook for Hazelcast's Identified Data Serializable
+ * mechanism. This is private API.
+ * All about the Operation's data serializable define in this class.
+ */
+@PrivateApi
+public final class OperationDataSerializerHook implements DataSerializerHook {
     public static final int PRINT_MESSAGE_OPERATOR = 0;
 
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
-        OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
-        OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
+        SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
+        SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
     );
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
similarity index 61%
copy from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
copy to seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
index 34ca898d9..395e857ed 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
@@ -15,19 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.plugin.discovery.seatunnel;
 
-import com.hazelcast.core.HazelcastInstance;
-import lombok.NonNull;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
 
-public interface SeaTunnelClientInstance {
+public class SeaTunnelTransformPluginDiscovery extends AbstractPluginDiscovery<SeaTunnelTransform> {
 
-    /**
-     * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
-     * be a client, depending on the type of this
-     */
-    @NonNull
-    HazelcastInstance getHazelcastInstance();
+    public SeaTunnelTransformPluginDiscovery() {
+        super("seatunnel");
+    }
 
-    JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config);
+    @Override
+    protected Class<SeaTunnelTransform> getPluginBaseClass() {
+        return SeaTunnelTransform.class;
+    }
 }