You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/03 10:20:05 UTC

[incubator-seatunnel] branch st-engine updated: [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan (#2316)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 258b85e12 [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan (#2316)
258b85e12 is described below

commit 258b85e1242968616364a9e2f7d8045e56a8b0a1
Author: Hisoka <fa...@qq.com>
AuthorDate: Wed Aug 3 18:20:00 2022 +0800

    [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan (#2316)
    
    * [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan
    
    * [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan
    
    * [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan
    
    * [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan
    
    * [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan
    
    * [Engine][Task] Add Job Master
    
    * [Core][Task] add task
    
    * [Core][Logical Plan] Change logical vertex and edge name
    
    * [Core][Logical Plan] Add TaskGroup and JavaQueue
    
    * [Core][Logical Plan] Add TaskGroup and JavaQueue
    
    * [Core][Logical Plan] Add TaskGroup and JavaQueue
    
    * fix checkstyle
    
    * remove JavaQueueAction.java and change TaskGroupInfo
    
    * change TaskDataSerializerHook
    
    * change TaskDataSerializerHook directory
---
 .../engine/client/JobExecutionEnvironment.java     |   4 +-
 .../engine/client/LogicalDagGeneratorTest.java     |   4 +-
 .../serializeable/SeaTunnelFactoryIdConstant.java  |   4 +
 .../seatunnel/engine/common/utils/IdGenerator.java |   2 +
 .../core/dag/actions/PartitionTransformAction.java |   2 +-
 .../core/dag/actions/PhysicalSourceAction.java     |  62 +++++++
 .../engine/core/dag/actions/SourceAction.java      |   8 +-
 .../engine/core/dag/actions/TransformAction.java   |  14 +-
 ...nsformAction.java => TransformChainAction.java} |  34 ++--
 .../core/dag/actions/UnknownActionException.java}  |  15 +-
 .../core/dag/internal/IntermediateDataQueue.java}  |  33 ++--
 .../dag/{logicaldag => logical}/LogicalDag.java    |  83 +++++-----
 .../LogicalDagGenerator.java                       |   2 +-
 .../dag/{logicaldag => logical}/LogicalEdge.java   |   2 +-
 .../dag/{logicaldag => logical}/LogicalVertex.java |   3 +-
 .../core/serializable/JobDataSerializerHook.java   |  12 +-
 .../execution/ExecutionEdge.java}                  |  23 ++-
 .../execution/ExecutionPlan.java}                  |  21 ++-
 .../dag/execution/ExecutionPlanGenerator.java      | 150 +++++++++++++++++
 .../execution/ExecutionVertex.java}                |  13 +-
 .../Task.java => dag/execution/Pipeline.java}      |  31 ++--
 .../server/dag/execution/PipelineGenerator.java    |  94 +++++++++++
 .../engine/server/dag/physical/PhysicalPlan.java   |  54 ++++++
 .../server/dag/physical/PhysicalPlanGenerator.java | 182 +++++++++++++++++++++
 .../server/dag/physical/PhysicalPlanUtils.java}    |  20 +--
 .../TaskGroup.java => dag/physical/flow/Flow.java} |  21 ++-
 .../physical/flow/IntermediateExecutionFlow.java}  |  32 ++--
 .../physical/flow/PhysicalExecutionFlow.java}      |  32 ++--
 .../seatunnel/engine/server/execution/Task.java    |  18 +-
 .../engine/server/execution/TaskGroup.java         |   5 +
 .../seatunnel/engine/server/master/JobMaster.java  |  69 ++++++++
 .../serializable/TaskDataSerializerHook.java       |  73 +++++++++
 .../Task.java => task/AbstractTask.java}           |  42 +++--
 .../TaskGroup.java => task/CoordinatorTask.java}   |  15 +-
 .../{execution/Task.java => task/Progress.java}    |  34 ++--
 .../task/SeaTunnelSplitEnumeratorContext.java      |  63 +++++++
 .../engine/server/task/SeaTunnelTask.java          |  94 +++++++++++
 .../SinkAggregatedCommitterTask.java}              |  34 ++--
 .../server/task/SourceSplitEnumeratorTask.java     |  82 ++++++++++
 .../engine/server/task/TaskGroupInfo.java}         |  51 +++---
 .../task/operation/AssignSplitOperation.java}      |  47 +++---
 .../server/task/operation/RegisterOperation.java   |  86 ++++++++++
 .../task/operation/RequestSplitOperation.java      |  79 +++++++++
 .../services/com.hazelcast.DataSerializerHook      |   1 +
 44 files changed, 1453 insertions(+), 297 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
index 79d60f19a..a8257bebd 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
@@ -21,8 +21,8 @@ package org.apache.seatunnel.engine.client;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
-import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDagGenerator;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index 19bcc5eb1..c5f282bda 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -23,8 +23,8 @@ import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
-import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDagGenerator;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
 
 import com.hazelcast.internal.json.JsonObject;
 import org.apache.commons.lang3.tuple.ImmutablePair;
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
index 9cfcc54f8..0f2470495 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
@@ -39,4 +39,8 @@ public final class SeaTunnelFactoryIdConstant {
     public static final String SEATUNNEL_CONFIG_DATA_SERIALIZER_FACTORY =
         "hazelcast.serialization.ds.seatunnel.engine.config";
     public static final int SEATUNNEL_CONFIG_DATA_SERIALIZER_FACTORY_ID = -30003;
+
+    public static final String SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY =
+            "hazelcast.serialization.ds.seatunnel.engine.task";
+    public static final int SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID = -30004;
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
index e05cbfa4d..4b10e8d60 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
@@ -24,6 +24,8 @@ import java.io.Serializable;
  * unique.
  */
 public class IdGenerator implements Serializable {
+
+    private static final long serialVersionUID = 7683323453014131725L;
     private int id = 0;
 
     public int getNextId() {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
index 8806978a7..88e5e5881 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
@@ -25,7 +25,7 @@ import java.net.URL;
 import java.util.List;
 
 public class PartitionTransformAction extends AbstractAction {
-    private PartitionSeaTunnelTransform partitionTransformation;
+    private final PartitionSeaTunnelTransform partitionTransformation;
 
     public PartitionTransformAction(int id,
                                     @NonNull String name,
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PhysicalSourceAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PhysicalSourceAction.java
new file mode 100644
index 000000000..8391be830
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PhysicalSourceAction.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+
+import lombok.NonNull;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.util.List;
+
+public class PhysicalSourceAction<T, SplitT extends SourceSplit, StateT extends Serializable> extends AbstractAction {
+
+    private static final long serialVersionUID = 4222477901364853468L;
+    private final SeaTunnelSource<T, SplitT, StateT> source;
+    private final List<SeaTunnelTransform> transforms;
+
+    public PhysicalSourceAction(int id,
+                                @NonNull String name,
+                                @NonNull SeaTunnelSource<T, SplitT, StateT> source,
+                                @NonNull List<URL> jarUrls,
+                                List<SeaTunnelTransform> transforms) {
+        super(id, name, jarUrls);
+        this.source = source;
+        this.transforms = transforms;
+    }
+
+    protected PhysicalSourceAction(int id, @NonNull String name, @NonNull List<Action> upstreams,
+                                   @NonNull SeaTunnelSource<T, SplitT, StateT> source,
+                                   @NonNull List<URL> jarUrls,
+                                   List<SeaTunnelTransform> transforms) {
+        super(id, name, upstreams, jarUrls);
+        this.source = source;
+        this.transforms = transforms;
+    }
+
+    public SeaTunnelSource<T, SplitT, StateT> getSource() {
+        return source;
+    }
+
+    public List<SeaTunnelTransform> getTransforms() {
+        return transforms;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
index a5f834fe3..41b2bcb63 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
@@ -28,7 +28,9 @@ import java.net.URL;
 import java.util.List;
 
 public class SourceAction<T, SplitT extends SourceSplit, StateT extends Serializable> extends AbstractAction {
-    private SeaTunnelSource<T, SplitT, StateT> source;
+
+    private static final long serialVersionUID = -4104531889750766731L;
+    private final SeaTunnelSource<T, SplitT, StateT> source;
 
     public SourceAction(int id,
                         @NonNull String name,
@@ -37,4 +39,8 @@ public class SourceAction<T, SplitT extends SourceSplit, StateT extends Serializ
         super(id, name, Lists.newArrayList(), jarUrls);
         this.source = source;
     }
+
+    public SeaTunnelSource<T, SplitT, StateT> getSource() {
+        return source;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
index c4385a064..c82f41947 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
@@ -25,22 +25,26 @@ import java.net.URL;
 import java.util.List;
 
 public class TransformAction extends AbstractAction {
-    private SeaTunnelTransform transformation;
+    private SeaTunnelTransform transform;
 
     public TransformAction(int id,
                            @NonNull String name,
                            @NonNull List<Action> upstreams,
-                           @NonNull SeaTunnelTransform transformation,
+                           @NonNull SeaTunnelTransform transform,
                            @NonNull List<URL> jarUrls) {
         super(id, name, upstreams, jarUrls);
-        this.transformation = transformation;
+        this.transform = transform;
     }
 
     public TransformAction(int id,
                            @NonNull String name,
-                           @NonNull SeaTunnelTransform transformation,
+                           @NonNull SeaTunnelTransform transform,
                            @NonNull List<URL> jarUrls) {
         super(id, name, jarUrls);
-        this.transformation = transformation;
+        this.transform = transform;
+    }
+
+    public SeaTunnelTransform getTransform() {
+        return transform;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
similarity index 55%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
index c4385a064..339dbc1a0 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
@@ -24,23 +24,29 @@ import lombok.NonNull;
 import java.net.URL;
 import java.util.List;
 
-public class TransformAction extends AbstractAction {
-    private SeaTunnelTransform transformation;
-
-    public TransformAction(int id,
-                           @NonNull String name,
-                           @NonNull List<Action> upstreams,
-                           @NonNull SeaTunnelTransform transformation,
-                           @NonNull List<URL> jarUrls) {
+public class TransformChainAction extends AbstractAction {
+
+    private static final long serialVersionUID = -340174711145367535L;
+    private final List<SeaTunnelTransform> transforms;
+
+    public TransformChainAction(int id,
+                                @NonNull String name,
+                                @NonNull List<Action> upstreams,
+                                @NonNull List<URL> jarUrls,
+                                @NonNull List<SeaTunnelTransform> transforms) {
         super(id, name, upstreams, jarUrls);
-        this.transformation = transformation;
+        this.transforms = transforms;
     }
 
-    public TransformAction(int id,
-                           @NonNull String name,
-                           @NonNull SeaTunnelTransform transformation,
-                           @NonNull List<URL> jarUrls) {
+    public TransformChainAction(int id,
+                                @NonNull String name,
+                                @NonNull List<URL> jarUrls,
+                                @NonNull List<SeaTunnelTransform> transforms) {
         super(id, name, jarUrls);
-        this.transformation = transformation;
+        this.transforms = transforms;
+    }
+
+    public List<SeaTunnelTransform> getTransforms() {
+        return transforms;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/UnknownActionException.java
similarity index 72%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/UnknownActionException.java
index fdf88cde7..314bf5291 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/UnknownActionException.java
@@ -15,16 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.core.dag.actions;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
+public class UnknownActionException extends RuntimeException {
 
-import java.io.Serializable;
-import java.util.Collection;
+    private static final long serialVersionUID = 6566687693833135857L;
 
-@Data
-@AllArgsConstructor
-public class TaskGroup implements Serializable {
-    private final Collection<Task> tasks;
+    public UnknownActionException(Action action) {
+        super("Unknown Action: " + action.getClass().getName());
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/internal/IntermediateDataQueue.java
similarity index 59%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/internal/IntermediateDataQueue.java
index 688f0476b..df8776548 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/internal/IntermediateDataQueue.java
@@ -15,32 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
-
-import com.hazelcast.spi.impl.operationservice.OperationService;
-import lombok.NonNull;
+package org.apache.seatunnel.engine.core.dag.internal;
 
 import java.io.Serializable;
 
-public interface Task extends Serializable {
-
-    default void init() {
-    }
+public class IntermediateDataQueue implements Serializable {
 
-    @NonNull
-    ProgressState call();
+    private static final long serialVersionUID = -3049265155605303992L;
 
-    @NonNull
-    Long getTaskID();
+    private final int id;
+    private final int parallelism;
+    private final String name;
 
-    default boolean isCooperative() {
-        return false;
+    public IntermediateDataQueue(int id, String name, int parallelism) {
+        this.id = id;
+        this.name = name;
+        this.parallelism = parallelism;
     }
 
-    default void close() {
+    public int getId() {
+        return id;
     }
 
-    default void setOperationService(OperationService operationService) {
+    public int getParallelism() {
+        return parallelism;
     }
 
+    public String getName() {
+        return name;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDag.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
similarity index 93%
rename from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDag.java
rename to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
index ed4f3f2fc..69c7e635c 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDag.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.core.dag.logicaldag;
+package org.apache.seatunnel.engine.core.dag.logical;
 
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -56,10 +56,11 @@ import java.util.Set;
  * as it passes through the processors.
  */
 public class LogicalDag implements IdentifiedDataSerializable {
+
     private static final Logger LOG = LoggerFactory.getLogger(LogicalDag.class);
     private JobConfig jobConfig;
-    private Set<LogicalEdge> edges = new LinkedHashSet<>();
-    private Map<Integer, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();
+    private final Set<LogicalEdge> edges = new LinkedHashSet<>();
+    private final Map<Integer, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();
     private IdGenerator idGenerator;
 
     public LogicalDag() {
@@ -71,6 +72,48 @@ public class LogicalDag implements IdentifiedDataSerializable {
         this.idGenerator = idGenerator;
     }
 
+    public void addLogicalVertex(LogicalVertex logicalVertex) {
+        logicalVertexMap.put(logicalVertex.getVertexId(), logicalVertex);
+    }
+
+    public void addEdge(LogicalEdge logicalEdge) {
+        edges.add(logicalEdge);
+    }
+
+    public Set<LogicalEdge> getEdges() {
+        return this.edges;
+    }
+
+    public Map<Integer, LogicalVertex> getLogicalVertexMap() {
+        return logicalVertexMap;
+    }
+
+    @NonNull
+    public JsonObject getLogicalDagAsJson() {
+        JsonObject logicalDag = new JsonObject();
+        JsonArray vertices = new JsonArray();
+
+        logicalVertexMap.values().stream().forEach(v -> {
+            JsonObject vertex = new JsonObject();
+            vertex.add("id", v.getVertexId());
+            vertex.add("name", v.getAction().getName() + "(id=" + v.getVertexId() + ")");
+            vertex.add("parallelism", v.getParallelism());
+            vertices.add(vertex);
+        });
+        logicalDag.add("vertices", vertices);
+
+        JsonArray edges = new JsonArray();
+        this.edges.stream().forEach(e -> {
+            JsonObject edge = new JsonObject();
+            edge.add("leftVertex", e.getLeftVertex().getAction().getName());
+            edge.add("rightVertex", e.getRightVertex().getAction().getName());
+            edges.add(edge);
+        });
+
+        logicalDag.add("edges", edges);
+        return logicalDag;
+    }
+
     @Override
     public int getFactoryId() {
         return JobDataSerializerHook.FACTORY_ID;
@@ -81,14 +124,6 @@ public class LogicalDag implements IdentifiedDataSerializable {
         return JobDataSerializerHook.LOGICAL_DAG;
     }
 
-    public void addLogicalVertex(LogicalVertex logicalVertex) {
-        logicalVertexMap.put(logicalVertex.getVertexId(), logicalVertex);
-    }
-
-    public void addEdge(LogicalEdge logicalEdge) {
-        edges.add(logicalEdge);
-    }
-
     @Override
     public void writeData(ObjectDataOutput out) throws IOException {
         out.writeInt(logicalVertexMap.size());
@@ -129,30 +164,4 @@ public class LogicalDag implements IdentifiedDataSerializable {
         jobConfig = in.readObject();
         idGenerator = in.readObject();
     }
-
-    @NonNull
-    public JsonObject getLogicalDagAsJson() {
-        JsonObject logicalDag = new JsonObject();
-        JsonArray vertices = new JsonArray();
-
-        logicalVertexMap.values().stream().forEach(v -> {
-            JsonObject vertex = new JsonObject();
-            vertex.add("id", v.getVertexId());
-            vertex.add("name", v.getAction().getName() + "(id=" + v.getVertexId() + ")");
-            vertex.add("parallelism", v.getParallelism());
-            vertices.add(vertex);
-        });
-        logicalDag.add("vertices", vertices);
-
-        JsonArray edges = new JsonArray();
-        this.edges.stream().forEach(e -> {
-            JsonObject edge = new JsonObject();
-            edge.add("leftVertex", e.getLeftVertex().getAction().getName());
-            edge.add("rightVertex", e.getRightVertex().getAction().getName());
-            edges.add(edge);
-        });
-
-        logicalDag.add("edges", edges);
-        return logicalDag;
-    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDagGenerator.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
similarity index 98%
rename from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDagGenerator.java
rename to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
index a08c48301..1ea396f7a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDagGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.core.dag.logicaldag;
+package org.apache.seatunnel.engine.core.dag.logical;
 
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalEdge.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java
similarity index 97%
rename from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalEdge.java
rename to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java
index ac48b4e37..bfa7091d6 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalEdge.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.core.dag.logicaldag;
+package org.apache.seatunnel.engine.core.dag.logical;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalVertex.java
similarity index 97%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalVertex.java
index 09a61c116..c9e145c86 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalVertex.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.core.dag.logicaldag;
+package org.apache.seatunnel.engine.core.dag.logical;
 
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
@@ -31,6 +31,7 @@ import java.io.IOException;
 @Data
 @AllArgsConstructor
 public class LogicalVertex implements IdentifiedDataSerializable {
+
     private Integer vertexId;
     private Action action;
     private int parallelism;
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
index b6da4a3df..1ff1c3288 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
@@ -18,9 +18,9 @@
 package org.apache.seatunnel.engine.core.serializable;
 
 import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
-import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
-import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalEdge;
-import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalVertex;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
@@ -38,17 +38,17 @@ import com.hazelcast.spi.annotation.PrivateApi;
 public final class JobDataSerializerHook implements DataSerializerHook {
 
     /**
-     * Serialization ID of the {@link org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag} class.
+     * Serialization ID of the {@link LogicalDag} class.
      */
     public static final int LOGICAL_DAG = 0;
 
     /**
-     * Serialization ID of the {@link org.apache.seatunnel.engine.core.dag.logicaldag.LogicalVertex} class.
+     * Serialization ID of the {@link LogicalVertex} class.
      */
     public static final int LOGICAL_VERTEX = 1;
 
     /**
-     * Serialization ID of the {@link org.apache.seatunnel.engine.core.dag.logicaldag.LogicalEdge} class.
+     * Serialization ID of the {@link LogicalEdge} class.
      */
     public static final int LOGICAL_EDGE = 2;
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionEdge.java
similarity index 61%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionEdge.java
index fdf88cde7..b1527e120 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionEdge.java
@@ -15,16 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.dag.execution;
 
-import lombok.AllArgsConstructor;
 import lombok.Data;
 
-import java.io.Serializable;
-import java.util.Collection;
-
 @Data
-@AllArgsConstructor
-public class TaskGroup implements Serializable {
-    private final Collection<Task> tasks;
+public class ExecutionEdge {
+    private ExecutionVertex leftVertex;
+    private ExecutionVertex rightVertex;
+
+    private Integer leftVertexId;
+
+    private Integer rightVertexId;
+
+    public ExecutionEdge(ExecutionVertex leftVertex, ExecutionVertex rightVertex) {
+        this.leftVertex = leftVertex;
+        this.rightVertex = rightVertex;
+        this.leftVertexId = leftVertex.getVertexId();
+        this.rightVertexId = rightVertex.getVertexId();
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
similarity index 71%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
index fdf88cde7..b72c62287 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
@@ -15,16 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.dag.execution;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
+import java.util.List;
 
-import java.io.Serializable;
-import java.util.Collection;
+public class ExecutionPlan {
 
-@Data
-@AllArgsConstructor
-public class TaskGroup implements Serializable {
-    private final Collection<Task> tasks;
+    private final List<Pipeline> pipelines;
+
+    ExecutionPlan(List<Pipeline> pipelines) {
+        this.pipelines = pipelines;
+    }
+
+    public List<Pipeline> getPipelines() {
+        return pipelines;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
new file mode 100644
index 000000000..af3a4c68c
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.execution;
+
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
+import org.apache.seatunnel.engine.core.dag.actions.PhysicalSourceAction;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
+import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
+import org.apache.seatunnel.engine.core.dag.actions.UnknownActionException;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ExecutionPlanGenerator {
+
+    private final Map<Integer, List<Integer>> edgeMap = new HashMap<>();
+    private final Map<Integer, Action> actions = new HashMap<>();
+
+    private final Map<Integer, LogicalVertex> logicalVertexes;
+    private final List<LogicalEdge> logicalEdges;
+
+    public ExecutionPlanGenerator(LogicalDag logicalPlan) {
+        this.logicalVertexes = new HashMap<>(logicalPlan.getLogicalVertexMap());
+        this.logicalEdges = new ArrayList<>(logicalPlan.getEdges());
+    }
+
+    public ExecutionPlan generate() {
+
+        if (logicalVertexes == null) {
+            throw new IllegalArgumentException("ExecutionPlan Builder must have LogicalPlan and Action");
+        }
+        List<LogicalVertex> next = logicalVertexes.values().stream().filter(a -> a.getAction() instanceof SourceAction)
+                .collect(Collectors.toList());
+        while (!next.isEmpty()) {
+            List<LogicalVertex> newNext = new ArrayList<>();
+            next.forEach(n -> {
+                List<LogicalVertex> nextVertex = convertExecutionActions(logicalEdges, n.getAction());
+                newNext.addAll(nextVertex);
+            });
+            next = newNext;
+        }
+
+        Map<Integer, LogicalVertex> vertexes = new HashMap<>();
+        actions.forEach((key, value) -> vertexes.put(key, new LogicalVertex(key, value,
+                logicalVertexes.get(key).getParallelism())));
+
+        return new ExecutionPlan(PipelineGenerator.generatePipelines(edgeMap.entrySet().stream()
+                .flatMap(e -> e.getValue().stream().map(d -> new ExecutionEdge(convertFromLogical(vertexes.get(e.getKey())),
+                        convertFromLogical(vertexes.get(d)))))
+                .collect(Collectors.toList())));
+    }
+
+    private ExecutionVertex convertFromLogical(LogicalVertex vertex) {
+        return new ExecutionVertex(vertex.getVertexId(), vertex.getAction(), vertex.getParallelism());
+    }
+
+    private List<LogicalVertex> convertExecutionActions(List<LogicalEdge> logicalEdges, Action start) {
+
+        Action end = start;
+        Action executionAction;
+        if (start instanceof PartitionTransformAction) {
+            executionAction = start;
+            actions.put(start.getId(), start);
+        } else if (start instanceof SinkAction) {
+            actions.put(start.getId(), start);
+            return Collections.emptyList();
+        } else {
+            List<SeaTunnelTransform> transforms = new ArrayList<>();
+            Set<URL> jars = new HashSet<>();
+            for (TransformAction t : findMigrateTransform(logicalEdges, start)) {
+                transforms.add(t.getTransform());
+                jars.addAll(t.getJarUrls());
+                end = t;
+            }
+            if (start instanceof SourceAction) {
+                SourceAction<?, ?, ?> source = (SourceAction<?, ?, ?>) start;
+                jars.addAll(source.getJarUrls());
+                executionAction = new PhysicalSourceAction<>(start.getId(), start.getName(),
+                        source.getSource(), new ArrayList<>(jars), transforms);
+                actions.put(start.getId(), start);
+            } else if (start instanceof TransformAction) {
+                TransformAction transform = (TransformAction) start;
+                jars.addAll(transform.getJarUrls());
+                transforms.add(0, transform.getTransform());
+                executionAction = new TransformChainAction(start.getId(), start.getName(),
+                        new ArrayList<>(jars), transforms);
+                actions.put(start.getId(), executionAction);
+            } else {
+                throw new UnknownActionException(start);
+            }
+        }
+
+        final Action e = end;
+        // find next should be converted action
+        List<LogicalVertex> nextStarts = logicalEdges.stream().filter(edge -> edge.getLeftVertex().getAction().equals(e))
+                .map(LogicalEdge::getRightVertex).collect(Collectors.toList());
+        for (LogicalVertex n : nextStarts) {
+            if (!edgeMap.containsKey(executionAction.getId())) {
+                edgeMap.put(executionAction.getId(), new ArrayList<>());
+            }
+            edgeMap.get(executionAction.getId()).add(n.getAction().getId());
+        }
+        return nextStarts;
+    }
+
+    private List<TransformAction> findMigrateTransform(List<LogicalEdge> executionEdges, Action start) {
+        List<Action> actionAfterStart =
+                executionEdges.stream().filter(edge -> edge.getLeftVertex().getAction().equals(start))
+                        .map(edge -> edge.getRightVertex().getAction()).collect(Collectors.toList());
+        // make sure the start's next only have one LogicalTransform, so it can be migrated.
+        if (actionAfterStart.size() != 1 || actionAfterStart.get(0) instanceof PartitionTransformAction ||
+                actionAfterStart.get(0) instanceof SinkAction) {
+            return Collections.emptyList();
+        } else {
+            List<TransformAction> transforms = new ArrayList<>();
+            transforms.add((TransformAction) actionAfterStart.get(0));
+            transforms.addAll(findMigrateTransform(executionEdges, actionAfterStart.get(0)));
+            return transforms;
+        }
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionVertex.java
similarity index 79%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionVertex.java
index fdf88cde7..f39bb2d8e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionVertex.java
@@ -15,16 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.dag.execution;
+
+import org.apache.seatunnel.engine.core.dag.actions.Action;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
 
-import java.io.Serializable;
-import java.util.Collection;
-
 @Data
 @AllArgsConstructor
-public class TaskGroup implements Serializable {
-    private final Collection<Task> tasks;
+public class ExecutionVertex {
+    private Integer vertexId;
+    private Action action;
+    private int parallelism;
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
similarity index 61%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
index 688f0476b..b7def66ad 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
@@ -15,32 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.dag.execution;
 
-import com.hazelcast.spi.impl.operationservice.OperationService;
-import lombok.NonNull;
+import java.util.List;
+import java.util.Map;
 
-import java.io.Serializable;
+public class Pipeline {
+    private final List<ExecutionEdge> edges;
 
-public interface Task extends Serializable {
+    private final Map<Integer, ExecutionVertex> vertexes;
 
-    default void init() {
+    Pipeline(List<ExecutionEdge> edges, Map<Integer, ExecutionVertex> vertexes) {
+        this.edges = edges;
+        this.vertexes = vertexes;
     }
 
-    @NonNull
-    ProgressState call();
-
-    @NonNull
-    Long getTaskID();
-
-    default boolean isCooperative() {
-        return false;
-    }
-
-    default void close() {
+    public List<ExecutionEdge> getEdges() {
+        return edges;
     }
 
-    default void setOperationService(OperationService operationService) {
+    public Map<Integer, ExecutionVertex> getVertexes() {
+        return vertexes;
     }
 
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
new file mode 100644
index 000000000..1f8dab913
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.execution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class PipelineGenerator {
+
+    public static List<Pipeline> generatePipelines(List<ExecutionEdge> edges) {
+
+        // Split into multiple unrelated pipelines
+        List<List<ExecutionEdge>> edgesList = splitUnrelatedEdges(expandEdgeByParallelism(edges));
+
+        // just convert execution plan to pipeline at now. We should split it to multi pipeline with
+        // cache in the future
+
+        return edgesList.stream().map(e -> {
+            Map<Integer, ExecutionVertex> vertexes = new HashMap<>();
+            List<ExecutionEdge> pipelineEdges = e.stream().map(edge -> {
+                if (!vertexes.containsKey(edge.getLeftVertexId())) {
+                    vertexes.put(edge.getLeftVertexId(), edge.getLeftVertex());
+                }
+                ExecutionVertex source = vertexes.get(edge.getLeftVertexId());
+                if (!vertexes.containsKey(edge.getRightVertexId())) {
+                    vertexes.put(edge.getRightVertexId(), edge.getRightVertex());
+                }
+                ExecutionVertex destination = vertexes.get(edge.getRightVertexId());
+                return new ExecutionEdge(source, destination);
+            }).collect(Collectors.toList());
+            return new Pipeline(pipelineEdges, vertexes);
+        }).collect(Collectors.toList());
+    }
+
+    private static List<ExecutionEdge> expandEdgeByParallelism(List<ExecutionEdge> edges) {
+        /*
+         *TODO
+         * use SupportCoordinate interface to determine whether the Pipeline needs to be split.
+         * Pipelines without coordinator support can be split into multiple pipelines that do not
+         * interfere with each other
+         */
+        return edges;
+    }
+
+    private static List<List<ExecutionEdge>> splitUnrelatedEdges(List<ExecutionEdge> edges) {
+
+        List<List<ExecutionEdge>> edgeList = new ArrayList<>();
+        while (!edges.isEmpty()) {
+            edgeList.add(findVertexRelatedEdge(edges, edges.get(0).getLeftVertex()));
+        }
+        return edgeList;
+    }
+
+    private static List<ExecutionEdge> findVertexRelatedEdge(List<ExecutionEdge> edges, ExecutionVertex vertex) {
+
+        List<ExecutionEdge> sourceEdges = edges.stream().filter(edge -> edge.getLeftVertex().equals(vertex))
+                .collect(Collectors.toList());
+        List<ExecutionEdge> destinationEdges = edges.stream().filter(edge -> edge.getRightVertex().equals(vertex))
+                .collect(Collectors.toList());
+
+        List<ExecutionEdge> relatedEdges = new ArrayList<>(sourceEdges);
+        relatedEdges.addAll(destinationEdges);
+
+        List<ExecutionVertex> relatedActions =
+                sourceEdges.stream().map(ExecutionEdge::getRightVertex).collect(Collectors.toList());
+        relatedActions.addAll(destinationEdges.stream().map(ExecutionEdge::getLeftVertex).collect(Collectors.toList()));
+
+        edges.removeAll(relatedEdges);
+
+        relatedEdges.addAll(relatedActions.stream()
+                .flatMap(d -> findVertexRelatedEdge(edges, d).stream()).collect(Collectors.toList()));
+
+        return relatedEdges;
+
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
new file mode 100644
index 000000000..af5a9d7c1
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.physical;
+
+import org.apache.seatunnel.engine.server.task.TaskGroupInfo;
+
+import java.util.List;
+
+public class PhysicalPlan {
+
+    private final List<SubPlan> plans;
+
+    public PhysicalPlan(List<SubPlan> plans) {
+        this.plans = plans;
+    }
+
+    public List<SubPlan> getPlans() {
+        return plans;
+    }
+
+    public static class SubPlan {
+        private final List<TaskGroupInfo> tasks;
+
+        private final List<TaskGroupInfo> coordinatorTasks;
+
+        public SubPlan(List<TaskGroupInfo> tasks, List<TaskGroupInfo> coordinatorTasks) {
+            this.tasks = tasks;
+            this.coordinatorTasks = coordinatorTasks;
+        }
+
+        public List<TaskGroupInfo> getTasks() {
+            return tasks;
+        }
+
+        public List<TaskGroupInfo> getCoordinatorTasks() {
+            return coordinatorTasks;
+        }
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
new file mode 100644
index 000000000..1bb0652c1
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.physical;
+
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
+import org.apache.seatunnel.engine.core.dag.actions.PhysicalSourceAction;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.internal.IntermediateDataQueue;
+import org.apache.seatunnel.engine.server.dag.execution.ExecutionEdge;
+import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlan;
+import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
+import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
+import org.apache.seatunnel.engine.server.dag.physical.flow.IntermediateExecutionFlow;
+import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
+import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskGroup;
+import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
+import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
+import org.apache.seatunnel.engine.server.task.TaskGroupInfo;
+
+import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.spi.impl.NodeEngine;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class PhysicalPlanGenerator {
+
+    private final List<List<ExecutionEdge>> edgesList;
+
+    private final NodeEngine nodeEngine;
+
+    private final IdGenerator idGenerator = new IdGenerator();
+
+    public PhysicalPlanGenerator(ExecutionPlan executionPlan, NodeEngine nodeEngine) {
+        edgesList = executionPlan.getPipelines().stream().map(Pipeline::getEdges).collect(Collectors.toList());
+        this.nodeEngine = nodeEngine;
+    }
+
+    public PhysicalPlan generate() {
+
+        // TODO Determine which tasks do not need to be restored according to state
+        return new PhysicalPlan(edgesList.stream().map(edges -> {
+            List<PhysicalSourceAction<?, ?, ?>> sources = findSourceAction(edges);
+
+            List<TaskGroupInfo> coordinatorTasks = getEnumeratorTask(sources);
+
+            List<TaskGroupInfo> tasks = getSourceTask(edges, sources);
+
+            tasks.addAll(getPartitionTask(edges));
+
+            coordinatorTasks.addAll(getCommitterTask(edges));
+
+            return new PhysicalPlan.SubPlan(tasks, coordinatorTasks);
+        }).collect(Collectors.toList()));
+    }
+
+    private List<PhysicalSourceAction<?, ?, ?>> findSourceAction(List<ExecutionEdge> edges) {
+        return edges.stream().filter(s -> s.getLeftVertex().getAction() instanceof PhysicalSourceAction)
+                .map(s -> (PhysicalSourceAction<?, ?, ?>) s.getLeftVertex().getAction())
+                .collect(Collectors.toList());
+    }
+
+    private List<TaskGroupInfo> getCommitterTask(List<ExecutionEdge> edges) {
+        return edges.stream().filter(s -> s.getRightVertex().getAction() instanceof SinkAction)
+                .map(s -> (SinkAction<?, ?, ?, ?>) s.getRightVertex().getAction())
+                .map(s -> {
+                    SinkAggregatedCommitterTask t =
+                            new SinkAggregatedCommitterTask(idGenerator.getNextId(), s);
+                    return new TaskGroupInfo(toData(new TaskGroup(t)), t.getJarsUrl());
+                }).collect(Collectors.toList());
+    }
+
+    private List<TaskGroupInfo> getPartitionTask(List<ExecutionEdge> edges) {
+        return edges.stream().filter(s -> s.getLeftVertex().getAction() instanceof PartitionTransformAction)
+                .map(q -> (PartitionTransformAction) q.getLeftVertex().getAction())
+                .map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
+                .flatMap(flow -> {
+                    List<TaskGroupInfo> t = new ArrayList<>();
+                    for (int i = 0; i < flow.getAction().getParallelism(); i++) {
+                        SeaTunnelTask seaTunnelTask = new SeaTunnelTask(idGenerator.getNextId(), flow);
+                        t.add(new TaskGroupInfo(toData(new TaskGroup(seaTunnelTask)),
+                                seaTunnelTask.getJarsUrl()));
+                    }
+                    return t.stream();
+                }).collect(Collectors.toList());
+    }
+
+    private List<TaskGroupInfo> getEnumeratorTask(List<PhysicalSourceAction<?, ?, ?>> sources) {
+        return sources.stream().map(s -> {
+            SourceSplitEnumeratorTask<?> t = new SourceSplitEnumeratorTask<>(idGenerator.getNextId(), s);
+            return new TaskGroupInfo(toData(new TaskGroup(t)), t.getJarsUrl());
+        }).collect(Collectors.toList());
+    }
+
+    private List<TaskGroupInfo> getSourceTask(List<ExecutionEdge> edges,
+                                              List<PhysicalSourceAction<?, ?, ?>> sources) {
+        return sources.stream()
+                .map(s -> new PhysicalExecutionFlow(s, getNextWrapper(edges, s)))
+                .flatMap(flow -> {
+                    List<TaskGroupInfo> t = new ArrayList<>();
+                    List<Flow> flows = new ArrayList<>(Collections.singletonList(flow));
+                    if (sourceWithSink(flow)) {
+                        flows.addAll(splitSinkFromFlow(flow));
+                    }
+                    for (int i = 0; i < flow.getAction().getParallelism(); i++) {
+                        List<SeaTunnelTask> taskList =
+                                flows.stream().map(f -> new SeaTunnelTask(idGenerator.getNextId(), f)).collect(Collectors.toList());
+                        Set<URL> jars =
+                                taskList.stream().flatMap(task -> task.getJarsUrl().stream()).collect(Collectors.toSet());
+                        t.add(new TaskGroupInfo(toData(new TaskGroup(taskList.stream().map(task -> (Task) task).collect(Collectors.toList()))), jars));
+                    }
+                    return t.stream();
+                }).collect(Collectors.toList());
+    }
+
+    private static List<Flow> splitSinkFromFlow(Flow flow) {
+        List<PhysicalExecutionFlow> sinkFlows =
+                flow.getNext().stream().filter(f -> f instanceof PhysicalExecutionFlow).map(f -> (PhysicalExecutionFlow) f)
+                        .filter(f -> f.getAction() instanceof SinkAction).collect(Collectors.toList());
+        List<Flow> allFlows = new ArrayList<>();
+        flow.getNext().removeAll(sinkFlows);
+        sinkFlows.forEach(s -> {
+            IntermediateDataQueue queue = new IntermediateDataQueue(s.getAction().getId(),
+                    s.getAction().getName() + "-Queue", s.getAction().getParallelism());
+            IntermediateExecutionFlow intermediateFlow = new IntermediateExecutionFlow(queue);
+            flow.getNext().add(intermediateFlow);
+            IntermediateExecutionFlow intermediateFlowQuote = new IntermediateExecutionFlow(queue);
+            intermediateFlowQuote.getNext().add(s);
+            allFlows.add(intermediateFlowQuote);
+        });
+
+        if (flow.getNext().size() > sinkFlows.size()) {
+            allFlows.addAll(flow.getNext().stream().flatMap(f -> splitSinkFromFlow(f).stream()).collect(Collectors.toList()));
+        }
+        return allFlows;
+    }
+
+    private static boolean sourceWithSink(PhysicalExecutionFlow flow) {
+        return flow.getAction() instanceof SinkAction ||
+                flow.getNext().stream().map(f -> (PhysicalExecutionFlow) f).map(PhysicalPlanGenerator::sourceWithSink)
+                        .collect(Collectors.toList()).contains(true);
+    }
+
+    private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start) {
+        List<Action> actions = edges.stream().filter(e -> e.getLeftVertex().getAction().equals(start))
+                .map(e -> e.getLeftVertex().getAction()).collect(Collectors.toList());
+        List<Flow> wrappers = actions.stream()
+                .filter(a -> a instanceof PartitionTransformAction || a instanceof SinkAction)
+                .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
+        wrappers.addAll(actions.stream()
+                .filter(a -> !(a instanceof PartitionTransformAction || a instanceof SinkAction))
+                .map(a -> new PhysicalExecutionFlow(a, getNextWrapper(edges, a))).collect(Collectors.toList()));
+        return wrappers;
+    }
+
+    private Data toData(Object object) {
+        return this.nodeEngine.getSerializationService().toData(object);
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
similarity index 62%
copy from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
index e05cbfa4d..fc8284877 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
@@ -15,19 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.common.utils;
+package org.apache.seatunnel.engine.server.dag.physical;
 
-import java.io.Serializable;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
 
-/**
- * It is used to generate the ID of each vertex in DAG. We just need to ensure that the id of all Vertices in a DAG are
- * unique.
- */
-public class IdGenerator implements Serializable {
-    private int id = 0;
+import com.hazelcast.spi.impl.NodeEngine;
+
+public class PhysicalPlanUtils {
 
-    public int getNextId() {
-        id++;
-        return id;
+    public static PhysicalPlan fromLogicalDAG(LogicalDag logicalDag, NodeEngine nodeEngine) {
+        return new PhysicalPlanGenerator(new ExecutionPlanGenerator(logicalDag).generate(), nodeEngine).generate();
     }
+
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/Flow.java
similarity index 74%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/Flow.java
index fdf88cde7..b4dfd1ca9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/Flow.java
@@ -15,16 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.dag.physical.flow;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
+import java.util.List;
 
-import java.io.Serializable;
-import java.util.Collection;
+public abstract class Flow {
 
-@Data
-@AllArgsConstructor
-public class TaskGroup implements Serializable {
-    private final Collection<Task> tasks;
+    protected final List<Flow> next;
+
+    public Flow(List<Flow> next) {
+        this.next = next;
+    }
+
+    public List<Flow> getNext() {
+        return next;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/IntermediateExecutionFlow.java
similarity index 56%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/IntermediateExecutionFlow.java
index 688f0476b..72906d0b1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/IntermediateExecutionFlow.java
@@ -15,32 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.dag.physical.flow;
 
-import com.hazelcast.spi.impl.operationservice.OperationService;
-import lombok.NonNull;
+import org.apache.seatunnel.engine.core.dag.internal.IntermediateDataQueue;
 
-import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
 
-public interface Task extends Serializable {
+public class IntermediateExecutionFlow extends Flow {
 
-    default void init() {
-    }
-
-    @NonNull
-    ProgressState call();
+    private final IntermediateDataQueue queue;
 
-    @NonNull
-    Long getTaskID();
-
-    default boolean isCooperative() {
-        return false;
+    public IntermediateExecutionFlow(IntermediateDataQueue queue) {
+        super(Collections.emptyList());
+        this.queue = queue;
     }
 
-    default void close() {
+    public IntermediateExecutionFlow(IntermediateDataQueue queue, List<Flow> next) {
+        super(next);
+        this.queue = queue;
     }
 
-    default void setOperationService(OperationService operationService) {
+    public IntermediateDataQueue getQueue() {
+        return queue;
     }
-
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/PhysicalExecutionFlow.java
similarity index 60%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/PhysicalExecutionFlow.java
index 688f0476b..d3edd1552 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/PhysicalExecutionFlow.java
@@ -15,32 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.dag.physical.flow;
 
-import com.hazelcast.spi.impl.operationservice.OperationService;
-import lombok.NonNull;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
 
-import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
 
-public interface Task extends Serializable {
+public class PhysicalExecutionFlow extends Flow {
 
-    default void init() {
-    }
-
-    @NonNull
-    ProgressState call();
+    private final Action action;
 
-    @NonNull
-    Long getTaskID();
-
-    default boolean isCooperative() {
-        return false;
+    public PhysicalExecutionFlow(Action action, List<Flow> next) {
+        super(next);
+        this.action = action;
     }
 
-    default void close() {
+    public PhysicalExecutionFlow(Action action) {
+        super(Collections.emptyList());
+        this.action = action;
     }
 
-    default void setOperationService(OperationService operationService) {
+    public Action getAction() {
+        return action;
     }
-
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
index 688f0476b..9025facce 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
@@ -17,14 +17,18 @@
 
 package org.apache.seatunnel.engine.server.execution;
 
+import com.hazelcast.spi.impl.operationservice.Operation;
 import com.hazelcast.spi.impl.operationservice.OperationService;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.NonNull;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.UUID;
 
 public interface Task extends Serializable {
 
-    default void init() {
+    default void init() throws Exception {
     }
 
     @NonNull
@@ -37,10 +41,20 @@ public interface Task extends Serializable {
         return false;
     }
 
-    default void close() {
+    default void close() throws IOException {
     }
 
     default void setOperationService(OperationService operationService) {
     }
 
+    default <E> InvocationFuture<E> sendToMaster(Operation operation) {
+        // TODO add method send operation to master
+        return null;
+    }
+
+    default <E> InvocationFuture<E> sendToMember(Operation operation, UUID memberID) {
+        // TODO add method send operation to member
+        return null;
+    }
+
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
index fdf88cde7..2763942ee 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
@@ -21,10 +21,15 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Collection;
 
 @Data
 @AllArgsConstructor
 public class TaskGroup implements Serializable {
     private final Collection<Task> tasks;
+
+    public TaskGroup(Task... tasks) {
+        this.tasks = Arrays.asList(tasks);
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
new file mode 100644
index 000000000..37542cd2d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.master;
+
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanUtils;
+import org.apache.seatunnel.engine.server.execution.ProgressState;
+import org.apache.seatunnel.engine.server.execution.Task;
+
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.OperationService;
+import lombok.NonNull;
+
+import java.io.IOException;
+
+public class JobMaster implements Task {
+
+    private final LogicalDag logicalDag;
+    private PhysicalPlan physicalPlan;
+
+    private NodeEngine nodeEngine;
+
+    public JobMaster() {
+        this.logicalDag = new LogicalDag();
+    }
+
+    @Override
+    public void init() throws Exception {
+        physicalPlan = PhysicalPlanUtils.fromLogicalDAG(logicalDag, nodeEngine);
+    }
+
+    @NonNull
+    @Override
+    public ProgressState call() {
+        return ProgressState.DONE;
+    }
+
+    @NonNull
+    @Override
+    public Long getTaskID() {
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+        Task.super.close();
+    }
+
+    @Override
+    public void setOperationService(OperationService operationService) {
+        Task.super.setOperationService(operationService);
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
new file mode 100644
index 000000000..a59721b78
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.serializable;
+
+import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.server.task.TaskGroupInfo;
+import org.apache.seatunnel.engine.server.task.operation.AssignSplitOperation;
+import org.apache.seatunnel.engine.server.task.operation.RegisterOperation;
+import org.apache.seatunnel.engine.server.task.operation.RequestSplitOperation;
+
+import com.hazelcast.internal.serialization.DataSerializerHook;
+import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
+import com.hazelcast.nio.serialization.DataSerializableFactory;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class TaskDataSerializerHook implements DataSerializerHook {
+
+    public static final int REGISTER_TYPE = 1;
+
+    public static final int REQUEST_SPLIT_TYPE = 2;
+
+    public static final int ASSIGN_SPLIT_TYPE = 3;
+
+    public static final int TASK_GROUP_INFO_TYPE = 4;
+    public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
+            SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
+            SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
+    );
+
+    @Override
+    public int getFactoryId() {
+        return FACTORY_ID;
+    }
+
+    @Override
+    public DataSerializableFactory createFactory() {
+        return new Factory();
+    }
+
+    private static class Factory implements DataSerializableFactory {
+
+        @Override
+        public IdentifiedDataSerializable create(int typeId) {
+            switch (typeId) {
+                case REGISTER_TYPE:
+                    return new RegisterOperation();
+                case REQUEST_SPLIT_TYPE:
+                    return new RequestSplitOperation();
+                case ASSIGN_SPLIT_TYPE:
+                    return new AssignSplitOperation();
+                case TASK_GROUP_INFO_TYPE:
+                    return new TaskGroupInfo();
+                default:
+                    return null;
+            }
+        }
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
similarity index 51%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index 688f0476b..e15689c86 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -15,32 +15,46 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.task;
+
+import org.apache.seatunnel.engine.server.execution.ProgressState;
+import org.apache.seatunnel.engine.server.execution.Task;
 
 import com.hazelcast.spi.impl.operationservice.OperationService;
 import lombok.NonNull;
 
-import java.io.Serializable;
+import java.net.URL;
+import java.util.Set;
 
-public interface Task extends Serializable {
+public abstract class AbstractTask implements Task {
+    private static final long serialVersionUID = -2524701323779523718L;
 
-    default void init() {
-    }
+    protected OperationService operationService;
+    protected long taskID;
 
-    @NonNull
-    ProgressState call();
-
-    @NonNull
-    Long getTaskID();
+    protected Progress progress;
 
-    default boolean isCooperative() {
-        return false;
+    public AbstractTask(long taskID) {
+        this.taskID = taskID;
+        this.progress = new Progress();
     }
 
-    default void close() {
+    public abstract Set<URL> getJarsUrl();
+
+    @Override
+    public void setOperationService(OperationService operationService) {
+        this.operationService = operationService;
     }
 
-    default void setOperationService(OperationService operationService) {
+    @NonNull
+    @Override
+    public ProgressState call() {
+        return progress.toState();
     }
 
+    @NonNull
+    @Override
+    public Long getTaskID() {
+        return taskID;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/CoordinatorTask.java
similarity index 74%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/CoordinatorTask.java
index fdf88cde7..6fd8dbc5c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/CoordinatorTask.java
@@ -15,16 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.task;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
+public abstract class CoordinatorTask extends AbstractTask {
 
-import java.io.Serializable;
-import java.util.Collection;
+    private static final long serialVersionUID = -3957168748281681077L;
 
-@Data
-@AllArgsConstructor
-public class TaskGroup implements Serializable {
-    private final Collection<Task> tasks;
+    public CoordinatorTask(long taskID) {
+        super(taskID);
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/Progress.java
similarity index 58%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/Progress.java
index 688f0476b..2ceb2ac25 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/Progress.java
@@ -15,32 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.task;
 
-import com.hazelcast.spi.impl.operationservice.OperationService;
-import lombok.NonNull;
+import org.apache.seatunnel.engine.server.execution.ProgressState;
 
-import java.io.Serializable;
+public class Progress {
 
-public interface Task extends Serializable {
+    private boolean madeProgress;
+    private boolean isDone;
 
-    default void init() {
+    public Progress() {
+        isDone = true;
+        madeProgress = false;
     }
 
-    @NonNull
-    ProgressState call();
-
-    @NonNull
-    Long getTaskID();
+    public void start() {
+        isDone = false;
+        madeProgress = false;
+    }
 
-    default boolean isCooperative() {
-        return false;
+    public void makeProgress() {
+        isDone = false;
+        madeProgress = true;
     }
 
-    default void close() {
+    public void done() {
+        isDone = true;
     }
 
-    default void setOperationService(OperationService operationService) {
+    public ProgressState toState() {
+        return ProgressState.valueOf(madeProgress, isDone);
     }
 
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSplitEnumeratorContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSplitEnumeratorContext.java
new file mode 100644
index 000000000..319bc4ec6
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSplitEnumeratorContext.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.engine.server.task.operation.AssignSplitOperation;
+
+import java.util.List;
+import java.util.Set;
+
+public class SeaTunnelSplitEnumeratorContext<SplitT extends SourceSplit> implements SourceSplitEnumerator.Context<SplitT> {
+
+    private final int parallelism;
+
+    private final SourceSplitEnumeratorTask<SplitT> task;
+
+    public SeaTunnelSplitEnumeratorContext(int parallelism, SourceSplitEnumeratorTask<SplitT> task) {
+        this.parallelism = parallelism;
+        this.task = task;
+    }
+
+    @Override
+    public int currentParallelism() {
+        return parallelism;
+    }
+
+    @Override
+    public Set<Integer> registeredReaders() {
+        return null;
+    }
+
+    @Override
+    public void assignSplit(int subtaskId, List<SplitT> splits) {
+        task.sendToMember(new AssignSplitOperation<>(subtaskId, splits), task.getTaskMemberID(subtaskId));
+    }
+
+    @Override
+    public void signalNoMoreSplits(int subtask) {
+
+    }
+
+    @Override
+    public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
new file mode 100644
index 000000000..b5c381999
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task;
+
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
+import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
+import org.apache.seatunnel.engine.server.task.operation.RegisterOperation;
+import org.apache.seatunnel.engine.server.task.operation.RequestSplitOperation;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+public class SeaTunnelTask extends AbstractTask {
+
+    private static final long serialVersionUID = 2604309561613784425L;
+    private final Flow executionFlow;
+
+    // TODO init memberID in task execution service
+    private UUID memberID = UUID.randomUUID();
+    private int enumeratorTaskID = -1;
+
+    public SeaTunnelTask(long taskID, Flow executionFlow) {
+        super(taskID);
+        // TODO add enumerator task ID
+        enumeratorTaskID = 1;
+        this.executionFlow = executionFlow;
+    }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+    }
+
+    private void register() {
+        if (startFromSource()) {
+            sendToMaster(new RegisterOperation(taskID, enumeratorTaskID));
+        }
+    }
+
+    private void requestSplit() {
+        sendToMaster(new RequestSplitOperation(taskID, enumeratorTaskID));
+    }
+
+    private boolean startFromSource() {
+        if (executionFlow instanceof PhysicalExecutionFlow) {
+            return ((PhysicalExecutionFlow) executionFlow).getAction() instanceof SourceAction;
+        }
+        return false;
+    }
+
+    @Override
+    public Set<URL> getJarsUrl() {
+        List<Flow> now = Collections.singletonList(executionFlow);
+        Set<URL> urls = new HashSet<>();
+        List<Flow> next = new ArrayList<>();
+        while (!now.isEmpty()) {
+            next.clear();
+            now.forEach(n -> {
+                if (n instanceof PhysicalExecutionFlow) {
+                    urls.addAll(((PhysicalExecutionFlow) n).getAction().getJarUrls());
+                }
+                next.addAll(n.getNext());
+            });
+            now = next;
+        }
+        return urls;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
similarity index 57%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 688f0476b..d097f0bb2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -15,32 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.task;
 
-import com.hazelcast.spi.impl.operationservice.OperationService;
-import lombok.NonNull;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 
-import java.io.Serializable;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Set;
 
-public interface Task extends Serializable {
+public class SinkAggregatedCommitterTask extends CoordinatorTask {
 
-    default void init() {
-    }
-
-    @NonNull
-    ProgressState call();
-
-    @NonNull
-    Long getTaskID();
+    private static final long serialVersionUID = 5906594537520393503L;
+    private final SinkAction<?, ?, ?, ?> sink;
 
-    default boolean isCooperative() {
-        return false;
+    public SinkAggregatedCommitterTask(long taskID, SinkAction<?, ?, ?, ?> sink) {
+        super(taskID);
+        this.sink = sink;
     }
 
-    default void close() {
+    @Override
+    public Set<URL> getJarsUrl() {
+        return new HashSet<>(sink.getJarUrls());
     }
-
-    default void setOperationService(OperationService operationService) {
-    }
-
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
new file mode 100644
index 000000000..4605d52f9
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.engine.core.dag.actions.PhysicalSourceAction;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends CoordinatorTask {
+
+    private static final long serialVersionUID = -3713701594297977775L;
+
+    private final PhysicalSourceAction<?, SplitT, ?> source;
+    private SourceSplitEnumerator<?, ?> enumerator;
+    private SeaTunnelSplitEnumeratorContext<SplitT> context;
+    private Map<Integer, UUID> taskMemberMapping;
+
+    @Override
+    public void init() throws Exception {
+        context = new SeaTunnelSplitEnumeratorContext<>(this.source.getParallelism(), this);
+        enumerator = this.source.getSource().createEnumerator(context);
+        taskMemberMapping = new HashMap<>();
+        enumerator.open();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (enumerator != null) {
+            enumerator.close();
+        }
+    }
+
+    public SourceSplitEnumeratorTask(long taskID, PhysicalSourceAction<?, SplitT, ?> source) {
+        super(taskID);
+        this.source = source;
+    }
+
+    private void receivedReader(int readerId, UUID memberID) {
+        this.addTaskMemberMapping(readerId, memberID);
+        enumerator.registerReader(readerId);
+    }
+
+    private void requestSplit(int taskID) {
+        enumerator.handleSplitRequest(taskID);
+    }
+
+    private void addTaskMemberMapping(int taskID, UUID memberID) {
+        taskMemberMapping.put(taskID, memberID);
+    }
+
+    public UUID getTaskMemberID(int taskID) {
+        return taskMemberMapping.get(taskID);
+    }
+
+    @Override
+    public Set<URL> getJarsUrl() {
+        return new HashSet<>(source.getJarUrls());
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupInfo.java
similarity index 58%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupInfo.java
index 09a61c116..e05ddcf9b 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupInfo.java
@@ -15,50 +15,61 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.core.dag.logicaldag;
+package org.apache.seatunnel.engine.server.task;
 
-import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 
+import com.hazelcast.internal.nio.IOUtil;
+import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
 import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
-import lombok.AllArgsConstructor;
-import lombok.Data;
 
 import java.io.IOException;
+import java.net.URL;
+import java.util.Set;
 
-@Data
-@AllArgsConstructor
-public class LogicalVertex implements IdentifiedDataSerializable {
-    private Integer vertexId;
-    private Action action;
-    private int parallelism;
+public class TaskGroupInfo implements IdentifiedDataSerializable {
 
-    public LogicalVertex() {
+    private Data group;
+
+    private Set<URL> jars;
+
+    public TaskGroupInfo() {
+    }
+
+    public TaskGroupInfo(Data group, Set<URL> jars) {
+        this.group = group;
+        this.jars = jars;
+    }
+
+    public Data getGroup() {
+        return group;
+    }
+
+    public Set<URL> getJars() {
+        return jars;
     }
 
     @Override
     public int getFactoryId() {
-        return JobDataSerializerHook.FACTORY_ID;
+        return TaskDataSerializerHook.TASK_GROUP_INFO_TYPE;
     }
 
     @Override
     public int getClassId() {
-        return JobDataSerializerHook.LOGICAL_VERTEX;
+        return TaskDataSerializerHook.FACTORY_ID;
     }
 
     @Override
     public void writeData(ObjectDataOutput out) throws IOException {
-        out.writeInt(vertexId);
-        out.writeObject(action);
-        out.writeInt(parallelism);
+        out.writeObject(jars);
+        IOUtil.writeData(out, group);
     }
 
     @Override
     public void readData(ObjectDataInput in) throws IOException {
-        vertexId = in.readInt();
-        action = in.readObject();
-        parallelism = in.readInt();
+        jars = in.readObject();
+        group = IOUtil.readData(in);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/AssignSplitOperation.java
similarity index 56%
rename from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/AssignSplitOperation.java
index 09a61c116..b45a42a0e 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/AssignSplitOperation.java
@@ -15,50 +15,49 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.core.dag.logicaldag;
+package org.apache.seatunnel.engine.server.task.operation;
 
-import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
 import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
-import lombok.AllArgsConstructor;
-import lombok.Data;
+import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
+import java.util.List;
 
-@Data
-@AllArgsConstructor
-public class LogicalVertex implements IdentifiedDataSerializable {
-    private Integer vertexId;
-    private Action action;
-    private int parallelism;
+public class AssignSplitOperation<SplitT> extends Operation implements IdentifiedDataSerializable {
 
-    public LogicalVertex() {
+    private List<SplitT> splits;
+    private int taskID;
+
+    public AssignSplitOperation() {
+    }
+
+    public AssignSplitOperation(int taskID, List<SplitT> splits) {
+        this.splits = splits;
     }
 
     @Override
-    public int getFactoryId() {
-        return JobDataSerializerHook.FACTORY_ID;
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        out.writeObject(splits);
+        out.writeInt(taskID);
     }
 
     @Override
-    public int getClassId() {
-        return JobDataSerializerHook.LOGICAL_VERTEX;
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        splits = in.readObject();
+        taskID = in.readInt();
     }
 
     @Override
-    public void writeData(ObjectDataOutput out) throws IOException {
-        out.writeInt(vertexId);
-        out.writeObject(action);
-        out.writeInt(parallelism);
+    public int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
     }
 
     @Override
-    public void readData(ObjectDataInput in) throws IOException {
-        vertexId = in.readInt();
-        action = in.readObject();
-        parallelism = in.readInt();
+    public int getClassId() {
+        return TaskDataSerializerHook.ASSIGN_SPLIT_TYPE;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
new file mode 100644
index 000000000..5c26c3a39
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * For {@link org.apache.seatunnel.api.source.SourceReader} to register with
+ * the {@link org.apache.seatunnel.api.source.SourceSplitEnumerator}
+ */
+public class RegisterOperation extends Operation implements IdentifiedDataSerializable {
+
+    private long readerTaskID;
+    private long enumeratorTaskID;
+
+    public RegisterOperation() {
+    }
+
+    public RegisterOperation(long readerTaskID, long enumeratorTaskID) {
+        this.readerTaskID = readerTaskID;
+        this.enumeratorTaskID = enumeratorTaskID;
+    }
+
+    @Override
+    public void run() throws Exception {
+        SeaTunnelServer server = getService();
+        UUID readerUUID = getCallerUuid();
+        TaskExecutionContext executionContext =
+                server.getTaskExecutionService().getExecutionContext(enumeratorTaskID);
+        // TODO register reader to enumerator
+    }
+
+    @Override
+    public String getServiceName() {
+        return SeaTunnelServer.SERVICE_NAME;
+    }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        super.writeInternal(out);
+        out.writeLong(readerTaskID);
+        out.writeLong(enumeratorTaskID);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        super.readInternal(in);
+        readerTaskID = in.readLong();
+        enumeratorTaskID = in.readLong();
+    }
+
+    @Override
+    public int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return TaskDataSerializerHook.REGISTER_TYPE;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RequestSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RequestSplitOperation.java
new file mode 100644
index 000000000..b2860ae40
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RequestSplitOperation.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+
+public class RequestSplitOperation extends Operation implements IdentifiedDataSerializable {
+
+    private long enumeratorTaskID;
+
+    private long taskID;
+
+    public RequestSplitOperation() {
+    }
+
+    public RequestSplitOperation(long taskID, long enumeratorTaskID) {
+        this.enumeratorTaskID = enumeratorTaskID;
+        this.taskID = taskID;
+    }
+
+    @Override
+    public void run() throws Exception {
+        SeaTunnelServer server = getService();
+        server.getTaskExecutionService().getExecutionContext(enumeratorTaskID);
+        // TODO ask source split enumerator return split
+    }
+
+    @Override
+    public String getServiceName() {
+        return SeaTunnelServer.SERVICE_NAME;
+    }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        super.writeInternal(out);
+        out.writeLong(enumeratorTaskID);
+        out.writeLong(taskID);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        super.readInternal(in);
+        enumeratorTaskID = in.readLong();
+        taskID = in.readLong();
+    }
+
+    @Override
+    public int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return TaskDataSerializerHook.REQUEST_SPLIT_TYPE;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
index ab0062e85..bf045cd23 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
@@ -16,3 +16,4 @@
 #
 
 org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook
+org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook
\ No newline at end of file