You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/03 10:20:05 UTC
[incubator-seatunnel] branch st-engine updated: [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan (#2316)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 258b85e12 [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan (#2316)
258b85e12 is described below
commit 258b85e1242968616364a9e2f7d8045e56a8b0a1
Author: Hisoka <fa...@qq.com>
AuthorDate: Wed Aug 3 18:20:00 2022 +0800
[Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan (#2316)
* [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan
* [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan
* [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan
* [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan
* [Engine][PhysicalPlan] Add code from LogicalDag to PhysicalPlan
* [Engine][Task] Add Job Master
* [Core][Task] add task
* [Core][Logical Plan] Change logical vertex and edge name
* [Core][Logical Plan] Add TaskGroup and JavaQueue
* [Core][Logical Plan] Add TaskGroup and JavaQueue
* [Core][Logical Plan] Add TaskGroup and JavaQueue
* fix checkstyle
* remove JavaQueueAction.java and change TaskGroupInfo
* change TaskDataSerializerHook
* change TaskDataSerializerHook directory
---
.../engine/client/JobExecutionEnvironment.java | 4 +-
.../engine/client/LogicalDagGeneratorTest.java | 4 +-
.../serializeable/SeaTunnelFactoryIdConstant.java | 4 +
.../seatunnel/engine/common/utils/IdGenerator.java | 2 +
.../core/dag/actions/PartitionTransformAction.java | 2 +-
.../core/dag/actions/PhysicalSourceAction.java | 62 +++++++
.../engine/core/dag/actions/SourceAction.java | 8 +-
.../engine/core/dag/actions/TransformAction.java | 14 +-
...nsformAction.java => TransformChainAction.java} | 34 ++--
.../core/dag/actions/UnknownActionException.java} | 15 +-
.../core/dag/internal/IntermediateDataQueue.java} | 33 ++--
.../dag/{logicaldag => logical}/LogicalDag.java | 83 +++++-----
.../LogicalDagGenerator.java | 2 +-
.../dag/{logicaldag => logical}/LogicalEdge.java | 2 +-
.../dag/{logicaldag => logical}/LogicalVertex.java | 3 +-
.../core/serializable/JobDataSerializerHook.java | 12 +-
.../execution/ExecutionEdge.java} | 23 ++-
.../execution/ExecutionPlan.java} | 21 ++-
.../dag/execution/ExecutionPlanGenerator.java | 150 +++++++++++++++++
.../execution/ExecutionVertex.java} | 13 +-
.../Task.java => dag/execution/Pipeline.java} | 31 ++--
.../server/dag/execution/PipelineGenerator.java | 94 +++++++++++
.../engine/server/dag/physical/PhysicalPlan.java | 54 ++++++
.../server/dag/physical/PhysicalPlanGenerator.java | 182 +++++++++++++++++++++
.../server/dag/physical/PhysicalPlanUtils.java} | 20 +--
.../TaskGroup.java => dag/physical/flow/Flow.java} | 21 ++-
.../physical/flow/IntermediateExecutionFlow.java} | 32 ++--
.../physical/flow/PhysicalExecutionFlow.java} | 32 ++--
.../seatunnel/engine/server/execution/Task.java | 18 +-
.../engine/server/execution/TaskGroup.java | 5 +
.../seatunnel/engine/server/master/JobMaster.java | 69 ++++++++
.../serializable/TaskDataSerializerHook.java | 73 +++++++++
.../Task.java => task/AbstractTask.java} | 42 +++--
.../TaskGroup.java => task/CoordinatorTask.java} | 15 +-
.../{execution/Task.java => task/Progress.java} | 34 ++--
.../task/SeaTunnelSplitEnumeratorContext.java | 63 +++++++
.../engine/server/task/SeaTunnelTask.java | 94 +++++++++++
.../SinkAggregatedCommitterTask.java} | 34 ++--
.../server/task/SourceSplitEnumeratorTask.java | 82 ++++++++++
.../engine/server/task/TaskGroupInfo.java} | 51 +++---
.../task/operation/AssignSplitOperation.java} | 47 +++---
.../server/task/operation/RegisterOperation.java | 86 ++++++++++
.../task/operation/RequestSplitOperation.java | 79 +++++++++
.../services/com.hazelcast.DataSerializerHook | 1 +
44 files changed, 1453 insertions(+), 297 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
index 79d60f19a..a8257bebd 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
@@ -21,8 +21,8 @@ package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
-import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDagGenerator;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.commons.lang3.tuple.ImmutablePair;
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index 19bcc5eb1..c5f282bda 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -23,8 +23,8 @@ import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
-import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDagGenerator;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
import com.hazelcast.internal.json.JsonObject;
import org.apache.commons.lang3.tuple.ImmutablePair;
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
index 9cfcc54f8..0f2470495 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/SeaTunnelFactoryIdConstant.java
@@ -39,4 +39,8 @@ public final class SeaTunnelFactoryIdConstant {
public static final String SEATUNNEL_CONFIG_DATA_SERIALIZER_FACTORY =
"hazelcast.serialization.ds.seatunnel.engine.config";
public static final int SEATUNNEL_CONFIG_DATA_SERIALIZER_FACTORY_ID = -30003;
+
+ public static final String SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY =
+ "hazelcast.serialization.ds.seatunnel.engine.task";
+ public static final int SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID = -30004;
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
index e05cbfa4d..4b10e8d60 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
@@ -24,6 +24,8 @@ import java.io.Serializable;
* unique.
*/
public class IdGenerator implements Serializable {
+
+ private static final long serialVersionUID = 7683323453014131725L;
private int id = 0;
public int getNextId() {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
index 8806978a7..88e5e5881 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
@@ -25,7 +25,7 @@ import java.net.URL;
import java.util.List;
public class PartitionTransformAction extends AbstractAction {
- private PartitionSeaTunnelTransform partitionTransformation;
+ private final PartitionSeaTunnelTransform partitionTransformation;
public PartitionTransformAction(int id,
@NonNull String name,
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PhysicalSourceAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PhysicalSourceAction.java
new file mode 100644
index 000000000..8391be830
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PhysicalSourceAction.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+
+import lombok.NonNull;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.util.List;
+
+public class PhysicalSourceAction<T, SplitT extends SourceSplit, StateT extends Serializable> extends AbstractAction {
+
+ private static final long serialVersionUID = 4222477901364853468L;
+ private final SeaTunnelSource<T, SplitT, StateT> source;
+ private final List<SeaTunnelTransform> transforms;
+
+ public PhysicalSourceAction(int id,
+ @NonNull String name,
+ @NonNull SeaTunnelSource<T, SplitT, StateT> source,
+ @NonNull List<URL> jarUrls,
+ List<SeaTunnelTransform> transforms) {
+ super(id, name, jarUrls);
+ this.source = source;
+ this.transforms = transforms;
+ }
+
+ protected PhysicalSourceAction(int id, @NonNull String name, @NonNull List<Action> upstreams,
+ @NonNull SeaTunnelSource<T, SplitT, StateT> source,
+ @NonNull List<URL> jarUrls,
+ List<SeaTunnelTransform> transforms) {
+ super(id, name, upstreams, jarUrls);
+ this.source = source;
+ this.transforms = transforms;
+ }
+
+ public SeaTunnelSource<T, SplitT, StateT> getSource() {
+ return source;
+ }
+
+ public List<SeaTunnelTransform> getTransforms() {
+ return transforms;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
index a5f834fe3..41b2bcb63 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
@@ -28,7 +28,9 @@ import java.net.URL;
import java.util.List;
public class SourceAction<T, SplitT extends SourceSplit, StateT extends Serializable> extends AbstractAction {
- private SeaTunnelSource<T, SplitT, StateT> source;
+
+ private static final long serialVersionUID = -4104531889750766731L;
+ private final SeaTunnelSource<T, SplitT, StateT> source;
public SourceAction(int id,
@NonNull String name,
@@ -37,4 +39,8 @@ public class SourceAction<T, SplitT extends SourceSplit, StateT extends Serializ
super(id, name, Lists.newArrayList(), jarUrls);
this.source = source;
}
+
+ public SeaTunnelSource<T, SplitT, StateT> getSource() {
+ return source;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
index c4385a064..c82f41947 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
@@ -25,22 +25,26 @@ import java.net.URL;
import java.util.List;
public class TransformAction extends AbstractAction {
- private SeaTunnelTransform transformation;
+ private SeaTunnelTransform transform;
public TransformAction(int id,
@NonNull String name,
@NonNull List<Action> upstreams,
- @NonNull SeaTunnelTransform transformation,
+ @NonNull SeaTunnelTransform transform,
@NonNull List<URL> jarUrls) {
super(id, name, upstreams, jarUrls);
- this.transformation = transformation;
+ this.transform = transform;
}
public TransformAction(int id,
@NonNull String name,
- @NonNull SeaTunnelTransform transformation,
+ @NonNull SeaTunnelTransform transform,
@NonNull List<URL> jarUrls) {
super(id, name, jarUrls);
- this.transformation = transformation;
+ this.transform = transform;
+ }
+
+ public SeaTunnelTransform getTransform() {
+ return transform;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
similarity index 55%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
index c4385a064..339dbc1a0 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
@@ -24,23 +24,29 @@ import lombok.NonNull;
import java.net.URL;
import java.util.List;
-public class TransformAction extends AbstractAction {
- private SeaTunnelTransform transformation;
-
- public TransformAction(int id,
- @NonNull String name,
- @NonNull List<Action> upstreams,
- @NonNull SeaTunnelTransform transformation,
- @NonNull List<URL> jarUrls) {
+public class TransformChainAction extends AbstractAction {
+
+ private static final long serialVersionUID = -340174711145367535L;
+ private final List<SeaTunnelTransform> transforms;
+
+ public TransformChainAction(int id,
+ @NonNull String name,
+ @NonNull List<Action> upstreams,
+ @NonNull List<URL> jarUrls,
+ @NonNull List<SeaTunnelTransform> transforms) {
super(id, name, upstreams, jarUrls);
- this.transformation = transformation;
+ this.transforms = transforms;
}
- public TransformAction(int id,
- @NonNull String name,
- @NonNull SeaTunnelTransform transformation,
- @NonNull List<URL> jarUrls) {
+ public TransformChainAction(int id,
+ @NonNull String name,
+ @NonNull List<URL> jarUrls,
+ @NonNull List<SeaTunnelTransform> transforms) {
super(id, name, jarUrls);
- this.transformation = transformation;
+ this.transforms = transforms;
+ }
+
+ public List<SeaTunnelTransform> getTransforms() {
+ return transforms;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/UnknownActionException.java
similarity index 72%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/UnknownActionException.java
index fdf88cde7..314bf5291 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/UnknownActionException.java
@@ -15,16 +15,13 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.core.dag.actions;
-import lombok.AllArgsConstructor;
-import lombok.Data;
+public class UnknownActionException extends RuntimeException {
-import java.io.Serializable;
-import java.util.Collection;
+ private static final long serialVersionUID = 6566687693833135857L;
-@Data
-@AllArgsConstructor
-public class TaskGroup implements Serializable {
- private final Collection<Task> tasks;
+ public UnknownActionException(Action action) {
+ super("Unknown Action: " + action.getClass().getName());
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/internal/IntermediateDataQueue.java
similarity index 59%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/internal/IntermediateDataQueue.java
index 688f0476b..df8776548 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/internal/IntermediateDataQueue.java
@@ -15,32 +15,33 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
-
-import com.hazelcast.spi.impl.operationservice.OperationService;
-import lombok.NonNull;
+package org.apache.seatunnel.engine.core.dag.internal;
import java.io.Serializable;
-public interface Task extends Serializable {
-
- default void init() {
- }
+public class IntermediateDataQueue implements Serializable {
- @NonNull
- ProgressState call();
+ private static final long serialVersionUID = -3049265155605303992L;
- @NonNull
- Long getTaskID();
+ private final int id;
+ private final int parallelism;
+ private final String name;
- default boolean isCooperative() {
- return false;
+ public IntermediateDataQueue(int id, String name, int parallelism) {
+ this.id = id;
+ this.name = name;
+ this.parallelism = parallelism;
}
- default void close() {
+ public int getId() {
+ return id;
}
- default void setOperationService(OperationService operationService) {
+ public int getParallelism() {
+ return parallelism;
}
+ public String getName() {
+ return name;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDag.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
similarity index 93%
rename from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDag.java
rename to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
index ed4f3f2fc..69c7e635c 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDag.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.core.dag.logicaldag;
+package org.apache.seatunnel.engine.core.dag.logical;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -56,10 +56,11 @@ import java.util.Set;
* as it passes through the processors.
*/
public class LogicalDag implements IdentifiedDataSerializable {
+
private static final Logger LOG = LoggerFactory.getLogger(LogicalDag.class);
private JobConfig jobConfig;
- private Set<LogicalEdge> edges = new LinkedHashSet<>();
- private Map<Integer, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();
+ private final Set<LogicalEdge> edges = new LinkedHashSet<>();
+ private final Map<Integer, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();
private IdGenerator idGenerator;
public LogicalDag() {
@@ -71,6 +72,48 @@ public class LogicalDag implements IdentifiedDataSerializable {
this.idGenerator = idGenerator;
}
+ public void addLogicalVertex(LogicalVertex logicalVertex) {
+ logicalVertexMap.put(logicalVertex.getVertexId(), logicalVertex);
+ }
+
+ public void addEdge(LogicalEdge logicalEdge) {
+ edges.add(logicalEdge);
+ }
+
+ public Set<LogicalEdge> getEdges() {
+ return this.edges;
+ }
+
+ public Map<Integer, LogicalVertex> getLogicalVertexMap() {
+ return logicalVertexMap;
+ }
+
+ @NonNull
+ public JsonObject getLogicalDagAsJson() {
+ JsonObject logicalDag = new JsonObject();
+ JsonArray vertices = new JsonArray();
+
+ logicalVertexMap.values().stream().forEach(v -> {
+ JsonObject vertex = new JsonObject();
+ vertex.add("id", v.getVertexId());
+ vertex.add("name", v.getAction().getName() + "(id=" + v.getVertexId() + ")");
+ vertex.add("parallelism", v.getParallelism());
+ vertices.add(vertex);
+ });
+ logicalDag.add("vertices", vertices);
+
+ JsonArray edges = new JsonArray();
+ this.edges.stream().forEach(e -> {
+ JsonObject edge = new JsonObject();
+ edge.add("leftVertex", e.getLeftVertex().getAction().getName());
+ edge.add("rightVertex", e.getRightVertex().getAction().getName());
+ edges.add(edge);
+ });
+
+ logicalDag.add("edges", edges);
+ return logicalDag;
+ }
+
@Override
public int getFactoryId() {
return JobDataSerializerHook.FACTORY_ID;
@@ -81,14 +124,6 @@ public class LogicalDag implements IdentifiedDataSerializable {
return JobDataSerializerHook.LOGICAL_DAG;
}
- public void addLogicalVertex(LogicalVertex logicalVertex) {
- logicalVertexMap.put(logicalVertex.getVertexId(), logicalVertex);
- }
-
- public void addEdge(LogicalEdge logicalEdge) {
- edges.add(logicalEdge);
- }
-
@Override
public void writeData(ObjectDataOutput out) throws IOException {
out.writeInt(logicalVertexMap.size());
@@ -129,30 +164,4 @@ public class LogicalDag implements IdentifiedDataSerializable {
jobConfig = in.readObject();
idGenerator = in.readObject();
}
-
- @NonNull
- public JsonObject getLogicalDagAsJson() {
- JsonObject logicalDag = new JsonObject();
- JsonArray vertices = new JsonArray();
-
- logicalVertexMap.values().stream().forEach(v -> {
- JsonObject vertex = new JsonObject();
- vertex.add("id", v.getVertexId());
- vertex.add("name", v.getAction().getName() + "(id=" + v.getVertexId() + ")");
- vertex.add("parallelism", v.getParallelism());
- vertices.add(vertex);
- });
- logicalDag.add("vertices", vertices);
-
- JsonArray edges = new JsonArray();
- this.edges.stream().forEach(e -> {
- JsonObject edge = new JsonObject();
- edge.add("leftVertex", e.getLeftVertex().getAction().getName());
- edge.add("rightVertex", e.getRightVertex().getAction().getName());
- edges.add(edge);
- });
-
- logicalDag.add("edges", edges);
- return logicalDag;
- }
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDagGenerator.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
similarity index 98%
rename from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDagGenerator.java
rename to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
index a08c48301..1ea396f7a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalDagGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.core.dag.logicaldag;
+package org.apache.seatunnel.engine.core.dag.logical;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalEdge.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java
similarity index 97%
rename from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalEdge.java
rename to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java
index ac48b4e37..bfa7091d6 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalEdge.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.core.dag.logicaldag;
+package org.apache.seatunnel.engine.core.dag.logical;
import static com.google.common.base.Preconditions.checkNotNull;
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalVertex.java
similarity index 97%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalVertex.java
index 09a61c116..c9e145c86 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalVertex.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.core.dag.logicaldag;
+package org.apache.seatunnel.engine.core.dag.logical;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
@@ -31,6 +31,7 @@ import java.io.IOException;
@Data
@AllArgsConstructor
public class LogicalVertex implements IdentifiedDataSerializable {
+
private Integer vertexId;
private Action action;
private int parallelism;
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
index b6da4a3df..1ff1c3288 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
@@ -18,9 +18,9 @@
package org.apache.seatunnel.engine.core.serializable;
import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
-import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
-import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalEdge;
-import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalVertex;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import com.hazelcast.internal.serialization.DataSerializerHook;
@@ -38,17 +38,17 @@ import com.hazelcast.spi.annotation.PrivateApi;
public final class JobDataSerializerHook implements DataSerializerHook {
/**
- * Serialization ID of the {@link org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag} class.
+ * Serialization ID of the {@link LogicalDag} class.
*/
public static final int LOGICAL_DAG = 0;
/**
- * Serialization ID of the {@link org.apache.seatunnel.engine.core.dag.logicaldag.LogicalVertex} class.
+ * Serialization ID of the {@link LogicalVertex} class.
*/
public static final int LOGICAL_VERTEX = 1;
/**
- * Serialization ID of the {@link org.apache.seatunnel.engine.core.dag.logicaldag.LogicalEdge} class.
+ * Serialization ID of the {@link LogicalEdge} class.
*/
public static final int LOGICAL_EDGE = 2;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionEdge.java
similarity index 61%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionEdge.java
index fdf88cde7..b1527e120 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionEdge.java
@@ -15,16 +15,23 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.dag.execution;
-import lombok.AllArgsConstructor;
import lombok.Data;
-import java.io.Serializable;
-import java.util.Collection;
-
@Data
-@AllArgsConstructor
-public class TaskGroup implements Serializable {
- private final Collection<Task> tasks;
+public class ExecutionEdge {
+ private ExecutionVertex leftVertex;
+ private ExecutionVertex rightVertex;
+
+ private Integer leftVertexId;
+
+ private Integer rightVertexId;
+
+ public ExecutionEdge(ExecutionVertex leftVertex, ExecutionVertex rightVertex) {
+ this.leftVertex = leftVertex;
+ this.rightVertex = rightVertex;
+ this.leftVertexId = leftVertex.getVertexId();
+ this.rightVertexId = rightVertex.getVertexId();
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
similarity index 71%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
index fdf88cde7..b72c62287 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlan.java
@@ -15,16 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.dag.execution;
-import lombok.AllArgsConstructor;
-import lombok.Data;
+import java.util.List;
-import java.io.Serializable;
-import java.util.Collection;
+public class ExecutionPlan {
-@Data
-@AllArgsConstructor
-public class TaskGroup implements Serializable {
- private final Collection<Task> tasks;
+ private final List<Pipeline> pipelines;
+
+ ExecutionPlan(List<Pipeline> pipelines) {
+ this.pipelines = pipelines;
+ }
+
+ public List<Pipeline> getPipelines() {
+ return pipelines;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
new file mode 100644
index 000000000..af3a4c68c
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.execution;
+
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
+import org.apache.seatunnel.engine.core.dag.actions.PhysicalSourceAction;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
+import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
+import org.apache.seatunnel.engine.core.dag.actions.UnknownActionException;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ExecutionPlanGenerator {
+
+ private final Map<Integer, List<Integer>> edgeMap = new HashMap<>();
+ private final Map<Integer, Action> actions = new HashMap<>();
+
+ private final Map<Integer, LogicalVertex> logicalVertexes;
+ private final List<LogicalEdge> logicalEdges;
+
+ public ExecutionPlanGenerator(LogicalDag logicalPlan) {
+ this.logicalVertexes = new HashMap<>(logicalPlan.getLogicalVertexMap());
+ this.logicalEdges = new ArrayList<>(logicalPlan.getEdges());
+ }
+
+ public ExecutionPlan generate() {
+
+ if (logicalVertexes == null) {
+ throw new IllegalArgumentException("ExecutionPlan Builder must have LogicalPlan and Action");
+ }
+ List<LogicalVertex> next = logicalVertexes.values().stream().filter(a -> a.getAction() instanceof SourceAction)
+ .collect(Collectors.toList());
+ while (!next.isEmpty()) {
+ List<LogicalVertex> newNext = new ArrayList<>();
+ next.forEach(n -> {
+ List<LogicalVertex> nextVertex = convertExecutionActions(logicalEdges, n.getAction());
+ newNext.addAll(nextVertex);
+ });
+ next = newNext;
+ }
+
+ Map<Integer, LogicalVertex> vertexes = new HashMap<>();
+ actions.forEach((key, value) -> vertexes.put(key, new LogicalVertex(key, value,
+ logicalVertexes.get(key).getParallelism())));
+
+ return new ExecutionPlan(PipelineGenerator.generatePipelines(edgeMap.entrySet().stream()
+ .flatMap(e -> e.getValue().stream().map(d -> new ExecutionEdge(convertFromLogical(vertexes.get(e.getKey())),
+ convertFromLogical(vertexes.get(d)))))
+ .collect(Collectors.toList())));
+ }
+
+ private ExecutionVertex convertFromLogical(LogicalVertex vertex) {
+ return new ExecutionVertex(vertex.getVertexId(), vertex.getAction(), vertex.getParallelism());
+ }
+
+ private List<LogicalVertex> convertExecutionActions(List<LogicalEdge> logicalEdges, Action start) {
+
+ Action end = start;
+ Action executionAction;
+ if (start instanceof PartitionTransformAction) {
+ executionAction = start;
+ actions.put(start.getId(), start);
+ } else if (start instanceof SinkAction) {
+ actions.put(start.getId(), start);
+ return Collections.emptyList();
+ } else {
+ List<SeaTunnelTransform> transforms = new ArrayList<>();
+ Set<URL> jars = new HashSet<>();
+ for (TransformAction t : findMigrateTransform(logicalEdges, start)) {
+ transforms.add(t.getTransform());
+ jars.addAll(t.getJarUrls());
+ end = t;
+ }
+ if (start instanceof SourceAction) {
+ SourceAction<?, ?, ?> source = (SourceAction<?, ?, ?>) start;
+ jars.addAll(source.getJarUrls());
+ executionAction = new PhysicalSourceAction<>(start.getId(), start.getName(),
+ source.getSource(), new ArrayList<>(jars), transforms);
+ actions.put(start.getId(), start);
+ } else if (start instanceof TransformAction) {
+ TransformAction transform = (TransformAction) start;
+ jars.addAll(transform.getJarUrls());
+ transforms.add(0, transform.getTransform());
+ executionAction = new TransformChainAction(start.getId(), start.getName(),
+ new ArrayList<>(jars), transforms);
+ actions.put(start.getId(), executionAction);
+ } else {
+ throw new UnknownActionException(start);
+ }
+ }
+
+ final Action e = end;
+ // find next should be converted action
+ List<LogicalVertex> nextStarts = logicalEdges.stream().filter(edge -> edge.getLeftVertex().getAction().equals(e))
+ .map(LogicalEdge::getRightVertex).collect(Collectors.toList());
+ for (LogicalVertex n : nextStarts) {
+ if (!edgeMap.containsKey(executionAction.getId())) {
+ edgeMap.put(executionAction.getId(), new ArrayList<>());
+ }
+ edgeMap.get(executionAction.getId()).add(n.getAction().getId());
+ }
+ return nextStarts;
+ }
+
+ private List<TransformAction> findMigrateTransform(List<LogicalEdge> executionEdges, Action start) {
+ List<Action> actionAfterStart =
+ executionEdges.stream().filter(edge -> edge.getLeftVertex().getAction().equals(start))
+ .map(edge -> edge.getRightVertex().getAction()).collect(Collectors.toList());
+ // make sure the start's next only have one LogicalTransform, so it can be migrated.
+ if (actionAfterStart.size() != 1 || actionAfterStart.get(0) instanceof PartitionTransformAction ||
+ actionAfterStart.get(0) instanceof SinkAction) {
+ return Collections.emptyList();
+ } else {
+ List<TransformAction> transforms = new ArrayList<>();
+ transforms.add((TransformAction) actionAfterStart.get(0));
+ transforms.addAll(findMigrateTransform(executionEdges, actionAfterStart.get(0)));
+ return transforms;
+ }
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionVertex.java
similarity index 79%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionVertex.java
index fdf88cde7..f39bb2d8e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionVertex.java
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.dag.execution;
+
+import org.apache.seatunnel.engine.core.dag.actions.Action;
import lombok.AllArgsConstructor;
import lombok.Data;
-import java.io.Serializable;
-import java.util.Collection;
-
@Data
@AllArgsConstructor
-public class TaskGroup implements Serializable {
- private final Collection<Task> tasks;
+public class ExecutionVertex {
+ private Integer vertexId;
+ private Action action;
+ private int parallelism;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
similarity index 61%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
index 688f0476b..b7def66ad 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
@@ -15,32 +15,27 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.dag.execution;
-import com.hazelcast.spi.impl.operationservice.OperationService;
-import lombok.NonNull;
+import java.util.List;
+import java.util.Map;
-import java.io.Serializable;
+public class Pipeline {
+ private final List<ExecutionEdge> edges;
-public interface Task extends Serializable {
+ private final Map<Integer, ExecutionVertex> vertexes;
- default void init() {
+ Pipeline(List<ExecutionEdge> edges, Map<Integer, ExecutionVertex> vertexes) {
+ this.edges = edges;
+ this.vertexes = vertexes;
}
- @NonNull
- ProgressState call();
-
- @NonNull
- Long getTaskID();
-
- default boolean isCooperative() {
- return false;
- }
-
- default void close() {
+ public List<ExecutionEdge> getEdges() {
+ return edges;
}
- default void setOperationService(OperationService operationService) {
+ public Map<Integer, ExecutionVertex> getVertexes() {
+ return vertexes;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
new file mode 100644
index 000000000..1f8dab913
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.execution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class PipelineGenerator {
+
+ public static List<Pipeline> generatePipelines(List<ExecutionEdge> edges) {
+
+ // Split into multiple unrelated pipelines
+ List<List<ExecutionEdge>> edgesList = splitUnrelatedEdges(expandEdgeByParallelism(edges));
+
+ // just convert execution plan to pipeline at now. We should split it to multi pipeline with
+ // cache in the future
+
+ return edgesList.stream().map(e -> {
+ Map<Integer, ExecutionVertex> vertexes = new HashMap<>();
+ List<ExecutionEdge> pipelineEdges = e.stream().map(edge -> {
+ if (!vertexes.containsKey(edge.getLeftVertexId())) {
+ vertexes.put(edge.getLeftVertexId(), edge.getLeftVertex());
+ }
+ ExecutionVertex source = vertexes.get(edge.getLeftVertexId());
+ if (!vertexes.containsKey(edge.getRightVertexId())) {
+ vertexes.put(edge.getRightVertexId(), edge.getRightVertex());
+ }
+ ExecutionVertex destination = vertexes.get(edge.getRightVertexId());
+ return new ExecutionEdge(source, destination);
+ }).collect(Collectors.toList());
+ return new Pipeline(pipelineEdges, vertexes);
+ }).collect(Collectors.toList());
+ }
+
+ private static List<ExecutionEdge> expandEdgeByParallelism(List<ExecutionEdge> edges) {
+ /*
+ *TODO
+ * use SupportCoordinate interface to determine whether the Pipeline needs to be split.
+ * Pipelines without coordinator support can be split into multiple pipelines that do not
+ * interfere with each other
+ */
+ return edges;
+ }
+
+ private static List<List<ExecutionEdge>> splitUnrelatedEdges(List<ExecutionEdge> edges) {
+
+ List<List<ExecutionEdge>> edgeList = new ArrayList<>();
+ while (!edges.isEmpty()) {
+ edgeList.add(findVertexRelatedEdge(edges, edges.get(0).getLeftVertex()));
+ }
+ return edgeList;
+ }
+
+ private static List<ExecutionEdge> findVertexRelatedEdge(List<ExecutionEdge> edges, ExecutionVertex vertex) {
+
+ List<ExecutionEdge> sourceEdges = edges.stream().filter(edge -> edge.getLeftVertex().equals(vertex))
+ .collect(Collectors.toList());
+ List<ExecutionEdge> destinationEdges = edges.stream().filter(edge -> edge.getRightVertex().equals(vertex))
+ .collect(Collectors.toList());
+
+ List<ExecutionEdge> relatedEdges = new ArrayList<>(sourceEdges);
+ relatedEdges.addAll(destinationEdges);
+
+ List<ExecutionVertex> relatedActions =
+ sourceEdges.stream().map(ExecutionEdge::getRightVertex).collect(Collectors.toList());
+ relatedActions.addAll(destinationEdges.stream().map(ExecutionEdge::getLeftVertex).collect(Collectors.toList()));
+
+ edges.removeAll(relatedEdges);
+
+ relatedEdges.addAll(relatedActions.stream()
+ .flatMap(d -> findVertexRelatedEdge(edges, d).stream()).collect(Collectors.toList()));
+
+ return relatedEdges;
+
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
new file mode 100644
index 000000000..af5a9d7c1
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.physical;
+
+import org.apache.seatunnel.engine.server.task.TaskGroupInfo;
+
+import java.util.List;
+
+public class PhysicalPlan {
+
+ private final List<SubPlan> plans;
+
+ public PhysicalPlan(List<SubPlan> plans) {
+ this.plans = plans;
+ }
+
+ public List<SubPlan> getPlans() {
+ return plans;
+ }
+
+ public static class SubPlan {
+ private final List<TaskGroupInfo> tasks;
+
+ private final List<TaskGroupInfo> coordinatorTasks;
+
+ public SubPlan(List<TaskGroupInfo> tasks, List<TaskGroupInfo> coordinatorTasks) {
+ this.tasks = tasks;
+ this.coordinatorTasks = coordinatorTasks;
+ }
+
+ public List<TaskGroupInfo> getTasks() {
+ return tasks;
+ }
+
+ public List<TaskGroupInfo> getCoordinatorTasks() {
+ return coordinatorTasks;
+ }
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
new file mode 100644
index 000000000..1bb0652c1
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.dag.physical;
+
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
+import org.apache.seatunnel.engine.core.dag.actions.PhysicalSourceAction;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.internal.IntermediateDataQueue;
+import org.apache.seatunnel.engine.server.dag.execution.ExecutionEdge;
+import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlan;
+import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
+import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
+import org.apache.seatunnel.engine.server.dag.physical.flow.IntermediateExecutionFlow;
+import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
+import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskGroup;
+import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
+import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
+import org.apache.seatunnel.engine.server.task.TaskGroupInfo;
+
+import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.spi.impl.NodeEngine;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class PhysicalPlanGenerator {
+
+ private final List<List<ExecutionEdge>> edgesList;
+
+ private final NodeEngine nodeEngine;
+
+ private final IdGenerator idGenerator = new IdGenerator();
+
+ public PhysicalPlanGenerator(ExecutionPlan executionPlan, NodeEngine nodeEngine) {
+ edgesList = executionPlan.getPipelines().stream().map(Pipeline::getEdges).collect(Collectors.toList());
+ this.nodeEngine = nodeEngine;
+ }
+
+ public PhysicalPlan generate() {
+
+ // TODO Determine which tasks do not need to be restored according to state
+ return new PhysicalPlan(edgesList.stream().map(edges -> {
+ List<PhysicalSourceAction<?, ?, ?>> sources = findSourceAction(edges);
+
+ List<TaskGroupInfo> coordinatorTasks = getEnumeratorTask(sources);
+
+ List<TaskGroupInfo> tasks = getSourceTask(edges, sources);
+
+ tasks.addAll(getPartitionTask(edges));
+
+ coordinatorTasks.addAll(getCommitterTask(edges));
+
+ return new PhysicalPlan.SubPlan(tasks, coordinatorTasks);
+ }).collect(Collectors.toList()));
+ }
+
+ private List<PhysicalSourceAction<?, ?, ?>> findSourceAction(List<ExecutionEdge> edges) {
+ return edges.stream().filter(s -> s.getLeftVertex().getAction() instanceof PhysicalSourceAction)
+ .map(s -> (PhysicalSourceAction<?, ?, ?>) s.getLeftVertex().getAction())
+ .collect(Collectors.toList());
+ }
+
+ private List<TaskGroupInfo> getCommitterTask(List<ExecutionEdge> edges) {
+ return edges.stream().filter(s -> s.getRightVertex().getAction() instanceof SinkAction)
+ .map(s -> (SinkAction<?, ?, ?, ?>) s.getRightVertex().getAction())
+ .map(s -> {
+ SinkAggregatedCommitterTask t =
+ new SinkAggregatedCommitterTask(idGenerator.getNextId(), s);
+ return new TaskGroupInfo(toData(new TaskGroup(t)), t.getJarsUrl());
+ }).collect(Collectors.toList());
+ }
+
+ private List<TaskGroupInfo> getPartitionTask(List<ExecutionEdge> edges) {
+ return edges.stream().filter(s -> s.getLeftVertex().getAction() instanceof PartitionTransformAction)
+ .map(q -> (PartitionTransformAction) q.getLeftVertex().getAction())
+ .map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
+ .flatMap(flow -> {
+ List<TaskGroupInfo> t = new ArrayList<>();
+ for (int i = 0; i < flow.getAction().getParallelism(); i++) {
+ SeaTunnelTask seaTunnelTask = new SeaTunnelTask(idGenerator.getNextId(), flow);
+ t.add(new TaskGroupInfo(toData(new TaskGroup(seaTunnelTask)),
+ seaTunnelTask.getJarsUrl()));
+ }
+ return t.stream();
+ }).collect(Collectors.toList());
+ }
+
+ private List<TaskGroupInfo> getEnumeratorTask(List<PhysicalSourceAction<?, ?, ?>> sources) {
+ return sources.stream().map(s -> {
+ SourceSplitEnumeratorTask<?> t = new SourceSplitEnumeratorTask<>(idGenerator.getNextId(), s);
+ return new TaskGroupInfo(toData(new TaskGroup(t)), t.getJarsUrl());
+ }).collect(Collectors.toList());
+ }
+
+ private List<TaskGroupInfo> getSourceTask(List<ExecutionEdge> edges,
+ List<PhysicalSourceAction<?, ?, ?>> sources) {
+ return sources.stream()
+ .map(s -> new PhysicalExecutionFlow(s, getNextWrapper(edges, s)))
+ .flatMap(flow -> {
+ List<TaskGroupInfo> t = new ArrayList<>();
+ List<Flow> flows = new ArrayList<>(Collections.singletonList(flow));
+ if (sourceWithSink(flow)) {
+ flows.addAll(splitSinkFromFlow(flow));
+ }
+ for (int i = 0; i < flow.getAction().getParallelism(); i++) {
+ List<SeaTunnelTask> taskList =
+ flows.stream().map(f -> new SeaTunnelTask(idGenerator.getNextId(), f)).collect(Collectors.toList());
+ Set<URL> jars =
+ taskList.stream().flatMap(task -> task.getJarsUrl().stream()).collect(Collectors.toSet());
+ t.add(new TaskGroupInfo(toData(new TaskGroup(taskList.stream().map(task -> (Task) task).collect(Collectors.toList()))), jars));
+ }
+ return t.stream();
+ }).collect(Collectors.toList());
+ }
+
+ private static List<Flow> splitSinkFromFlow(Flow flow) {
+ List<PhysicalExecutionFlow> sinkFlows =
+ flow.getNext().stream().filter(f -> f instanceof PhysicalExecutionFlow).map(f -> (PhysicalExecutionFlow) f)
+ .filter(f -> f.getAction() instanceof SinkAction).collect(Collectors.toList());
+ List<Flow> allFlows = new ArrayList<>();
+ flow.getNext().removeAll(sinkFlows);
+ sinkFlows.forEach(s -> {
+ IntermediateDataQueue queue = new IntermediateDataQueue(s.getAction().getId(),
+ s.getAction().getName() + "-Queue", s.getAction().getParallelism());
+ IntermediateExecutionFlow intermediateFlow = new IntermediateExecutionFlow(queue);
+ flow.getNext().add(intermediateFlow);
+ IntermediateExecutionFlow intermediateFlowQuote = new IntermediateExecutionFlow(queue);
+ intermediateFlowQuote.getNext().add(s);
+ allFlows.add(intermediateFlowQuote);
+ });
+
+ if (flow.getNext().size() > sinkFlows.size()) {
+ allFlows.addAll(flow.getNext().stream().flatMap(f -> splitSinkFromFlow(f).stream()).collect(Collectors.toList()));
+ }
+ return allFlows;
+ }
+
+ private static boolean sourceWithSink(PhysicalExecutionFlow flow) {
+ return flow.getAction() instanceof SinkAction ||
+ flow.getNext().stream().map(f -> (PhysicalExecutionFlow) f).map(PhysicalPlanGenerator::sourceWithSink)
+ .collect(Collectors.toList()).contains(true);
+ }
+
+ private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start) {
+ List<Action> actions = edges.stream().filter(e -> e.getLeftVertex().getAction().equals(start))
+ .map(e -> e.getLeftVertex().getAction()).collect(Collectors.toList());
+ List<Flow> wrappers = actions.stream()
+ .filter(a -> a instanceof PartitionTransformAction || a instanceof SinkAction)
+ .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
+ wrappers.addAll(actions.stream()
+ .filter(a -> !(a instanceof PartitionTransformAction || a instanceof SinkAction))
+ .map(a -> new PhysicalExecutionFlow(a, getNextWrapper(edges, a))).collect(Collectors.toList()));
+ return wrappers;
+ }
+
+ private Data toData(Object object) {
+ return this.nodeEngine.getSerializationService().toData(object);
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
similarity index 62%
copy from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
index e05cbfa4d..fc8284877 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
@@ -15,19 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.common.utils;
+package org.apache.seatunnel.engine.server.dag.physical;
-import java.io.Serializable;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
-/**
- * It is used to generate the ID of each vertex in DAG. We just need to ensure that the id of all Vertices in a DAG are
- * unique.
- */
-public class IdGenerator implements Serializable {
- private int id = 0;
+import com.hazelcast.spi.impl.NodeEngine;
+
+public class PhysicalPlanUtils {
- public int getNextId() {
- id++;
- return id;
+ public static PhysicalPlan fromLogicalDAG(LogicalDag logicalDag, NodeEngine nodeEngine) {
+ return new PhysicalPlanGenerator(new ExecutionPlanGenerator(logicalDag).generate(), nodeEngine).generate();
}
+
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/Flow.java
similarity index 74%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/Flow.java
index fdf88cde7..b4dfd1ca9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/Flow.java
@@ -15,16 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.dag.physical.flow;
-import lombok.AllArgsConstructor;
-import lombok.Data;
+import java.util.List;
-import java.io.Serializable;
-import java.util.Collection;
+public abstract class Flow {
-@Data
-@AllArgsConstructor
-public class TaskGroup implements Serializable {
- private final Collection<Task> tasks;
+ protected final List<Flow> next;
+
+ public Flow(List<Flow> next) {
+ this.next = next;
+ }
+
+ public List<Flow> getNext() {
+ return next;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/IntermediateExecutionFlow.java
similarity index 56%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/IntermediateExecutionFlow.java
index 688f0476b..72906d0b1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/IntermediateExecutionFlow.java
@@ -15,32 +15,28 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.dag.physical.flow;
-import com.hazelcast.spi.impl.operationservice.OperationService;
-import lombok.NonNull;
+import org.apache.seatunnel.engine.core.dag.internal.IntermediateDataQueue;
-import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
-public interface Task extends Serializable {
+public class IntermediateExecutionFlow extends Flow {
- default void init() {
- }
-
- @NonNull
- ProgressState call();
+ private final IntermediateDataQueue queue;
- @NonNull
- Long getTaskID();
-
- default boolean isCooperative() {
- return false;
+ public IntermediateExecutionFlow(IntermediateDataQueue queue) {
+ super(Collections.emptyList());
+ this.queue = queue;
}
- default void close() {
+ public IntermediateExecutionFlow(IntermediateDataQueue queue, List<Flow> next) {
+ super(next);
+ this.queue = queue;
}
- default void setOperationService(OperationService operationService) {
+ public IntermediateDataQueue getQueue() {
+ return queue;
}
-
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/PhysicalExecutionFlow.java
similarity index 60%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/PhysicalExecutionFlow.java
index 688f0476b..d3edd1552 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/PhysicalExecutionFlow.java
@@ -15,32 +15,28 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.dag.physical.flow;
-import com.hazelcast.spi.impl.operationservice.OperationService;
-import lombok.NonNull;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
-import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
-public interface Task extends Serializable {
+public class PhysicalExecutionFlow extends Flow {
- default void init() {
- }
-
- @NonNull
- ProgressState call();
+ private final Action action;
- @NonNull
- Long getTaskID();
-
- default boolean isCooperative() {
- return false;
+ public PhysicalExecutionFlow(Action action, List<Flow> next) {
+ super(next);
+ this.action = action;
}
- default void close() {
+ public PhysicalExecutionFlow(Action action) {
+ super(Collections.emptyList());
+ this.action = action;
}
- default void setOperationService(OperationService operationService) {
+ public Action getAction() {
+ return action;
}
-
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
index 688f0476b..9025facce 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
@@ -17,14 +17,18 @@
package org.apache.seatunnel.engine.server.execution;
+import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.OperationService;
+import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.NonNull;
+import java.io.IOException;
import java.io.Serializable;
+import java.util.UUID;
public interface Task extends Serializable {
- default void init() {
+ default void init() throws Exception {
}
@NonNull
@@ -37,10 +41,20 @@ public interface Task extends Serializable {
return false;
}
- default void close() {
+ default void close() throws IOException {
}
default void setOperationService(OperationService operationService) {
}
+ default <E> InvocationFuture<E> sendToMaster(Operation operation) {
+ // TODO add method send operation to master
+ return null;
+ }
+
+ default <E> InvocationFuture<E> sendToMember(Operation operation, UUID memberID) {
+ // TODO add method send operation to member
+ return null;
+ }
+
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
index fdf88cde7..2763942ee 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
@@ -21,10 +21,15 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.Collection;
@Data
@AllArgsConstructor
public class TaskGroup implements Serializable {
private final Collection<Task> tasks;
+
+ public TaskGroup(Task... tasks) {
+ this.tasks = Arrays.asList(tasks);
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
new file mode 100644
index 000000000..37542cd2d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.master;
+
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanUtils;
+import org.apache.seatunnel.engine.server.execution.ProgressState;
+import org.apache.seatunnel.engine.server.execution.Task;
+
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.OperationService;
+import lombok.NonNull;
+
+import java.io.IOException;
+
+public class JobMaster implements Task {
+
+ private final LogicalDag logicalDag;
+ private PhysicalPlan physicalPlan;
+
+ private NodeEngine nodeEngine;
+
+ public JobMaster() {
+ this.logicalDag = new LogicalDag();
+ }
+
+ @Override
+ public void init() throws Exception {
+ physicalPlan = PhysicalPlanUtils.fromLogicalDAG(logicalDag, nodeEngine);
+ }
+
+ @NonNull
+ @Override
+ public ProgressState call() {
+ return ProgressState.DONE;
+ }
+
+ @NonNull
+ @Override
+ public Long getTaskID() {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ Task.super.close();
+ }
+
+ @Override
+ public void setOperationService(OperationService operationService) {
+ Task.super.setOperationService(operationService);
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
new file mode 100644
index 000000000..a59721b78
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.serializable;
+
+import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.server.task.TaskGroupInfo;
+import org.apache.seatunnel.engine.server.task.operation.AssignSplitOperation;
+import org.apache.seatunnel.engine.server.task.operation.RegisterOperation;
+import org.apache.seatunnel.engine.server.task.operation.RequestSplitOperation;
+
+import com.hazelcast.internal.serialization.DataSerializerHook;
+import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
+import com.hazelcast.nio.serialization.DataSerializableFactory;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class TaskDataSerializerHook implements DataSerializerHook {
+
+ public static final int REGISTER_TYPE = 1;
+
+ public static final int REQUEST_SPLIT_TYPE = 2;
+
+ public static final int ASSIGN_SPLIT_TYPE = 3;
+
+ public static final int TASK_GROUP_INFO_TYPE = 4;
+ public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
+ SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
+ SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
+ );
+
+ @Override
+ public int getFactoryId() {
+ return FACTORY_ID;
+ }
+
+ @Override
+ public DataSerializableFactory createFactory() {
+ return new Factory();
+ }
+
+ private static class Factory implements DataSerializableFactory {
+
+ @Override
+ public IdentifiedDataSerializable create(int typeId) {
+ switch (typeId) {
+ case REGISTER_TYPE:
+ return new RegisterOperation();
+ case REQUEST_SPLIT_TYPE:
+ return new RequestSplitOperation();
+ case ASSIGN_SPLIT_TYPE:
+ return new AssignSplitOperation();
+ case TASK_GROUP_INFO_TYPE:
+ return new TaskGroupInfo();
+ default:
+ return null;
+ }
+ }
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
similarity index 51%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index 688f0476b..e15689c86 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -15,32 +15,46 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.task;
+
+import org.apache.seatunnel.engine.server.execution.ProgressState;
+import org.apache.seatunnel.engine.server.execution.Task;
import com.hazelcast.spi.impl.operationservice.OperationService;
import lombok.NonNull;
-import java.io.Serializable;
+import java.net.URL;
+import java.util.Set;
-public interface Task extends Serializable {
+public abstract class AbstractTask implements Task {
+ private static final long serialVersionUID = -2524701323779523718L;
- default void init() {
- }
+ protected OperationService operationService;
+ protected long taskID;
- @NonNull
- ProgressState call();
-
- @NonNull
- Long getTaskID();
+ protected Progress progress;
- default boolean isCooperative() {
- return false;
+ public AbstractTask(long taskID) {
+ this.taskID = taskID;
+ this.progress = new Progress();
}
- default void close() {
+ public abstract Set<URL> getJarsUrl();
+
+ @Override
+ public void setOperationService(OperationService operationService) {
+ this.operationService = operationService;
}
- default void setOperationService(OperationService operationService) {
+ @NonNull
+ @Override
+ public ProgressState call() {
+ return progress.toState();
}
+ @NonNull
+ @Override
+ public Long getTaskID() {
+ return taskID;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/CoordinatorTask.java
similarity index 74%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/CoordinatorTask.java
index fdf88cde7..6fd8dbc5c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/CoordinatorTask.java
@@ -15,16 +15,13 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.task;
-import lombok.AllArgsConstructor;
-import lombok.Data;
+public abstract class CoordinatorTask extends AbstractTask {
-import java.io.Serializable;
-import java.util.Collection;
+ private static final long serialVersionUID = -3957168748281681077L;
-@Data
-@AllArgsConstructor
-public class TaskGroup implements Serializable {
- private final Collection<Task> tasks;
+ public CoordinatorTask(long taskID) {
+ super(taskID);
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/Progress.java
similarity index 58%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/Progress.java
index 688f0476b..2ceb2ac25 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/Progress.java
@@ -15,32 +15,36 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.task;
-import com.hazelcast.spi.impl.operationservice.OperationService;
-import lombok.NonNull;
+import org.apache.seatunnel.engine.server.execution.ProgressState;
-import java.io.Serializable;
+public class Progress {
-public interface Task extends Serializable {
+ private boolean madeProgress;
+ private boolean isDone;
- default void init() {
+ public Progress() {
+ isDone = true;
+ madeProgress = false;
}
- @NonNull
- ProgressState call();
-
- @NonNull
- Long getTaskID();
+ public void start() {
+ isDone = false;
+ madeProgress = false;
+ }
- default boolean isCooperative() {
- return false;
+ public void makeProgress() {
+ isDone = false;
+ madeProgress = true;
}
- default void close() {
+ public void done() {
+ isDone = true;
}
- default void setOperationService(OperationService operationService) {
+ public ProgressState toState() {
+ return ProgressState.valueOf(madeProgress, isDone);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSplitEnumeratorContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSplitEnumeratorContext.java
new file mode 100644
index 000000000..319bc4ec6
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSplitEnumeratorContext.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.engine.server.task.operation.AssignSplitOperation;
+
+import java.util.List;
+import java.util.Set;
+
+public class SeaTunnelSplitEnumeratorContext<SplitT extends SourceSplit> implements SourceSplitEnumerator.Context<SplitT> {
+
+ private final int parallelism;
+
+ private final SourceSplitEnumeratorTask<SplitT> task;
+
+ public SeaTunnelSplitEnumeratorContext(int parallelism, SourceSplitEnumeratorTask<SplitT> task) {
+ this.parallelism = parallelism;
+ this.task = task;
+ }
+
+ @Override
+ public int currentParallelism() {
+ return parallelism;
+ }
+
+ @Override
+ public Set<Integer> registeredReaders() {
+ return null;
+ }
+
+ @Override
+ public void assignSplit(int subtaskId, List<SplitT> splits) {
+ task.sendToMember(new AssignSplitOperation<>(subtaskId, splits), task.getTaskMemberID(subtaskId));
+ }
+
+ @Override
+ public void signalNoMoreSplits(int subtask) {
+
+ }
+
+ @Override
+ public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
new file mode 100644
index 000000000..b5c381999
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task;
+
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
+import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
+import org.apache.seatunnel.engine.server.task.operation.RegisterOperation;
+import org.apache.seatunnel.engine.server.task.operation.RequestSplitOperation;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+public class SeaTunnelTask extends AbstractTask {
+
+ private static final long serialVersionUID = 2604309561613784425L;
+ private final Flow executionFlow;
+
+ // TODO init memberID in task execution service
+ private UUID memberID = UUID.randomUUID();
+ private int enumeratorTaskID = -1;
+
+ public SeaTunnelTask(long taskID, Flow executionFlow) {
+ super(taskID);
+ // TODO add enumerator task ID
+ enumeratorTaskID = 1;
+ this.executionFlow = executionFlow;
+ }
+
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
+
+ private void register() {
+ if (startFromSource()) {
+ sendToMaster(new RegisterOperation(taskID, enumeratorTaskID));
+ }
+ }
+
+ private void requestSplit() {
+ sendToMaster(new RequestSplitOperation(taskID, enumeratorTaskID));
+ }
+
+ private boolean startFromSource() {
+ if (executionFlow instanceof PhysicalExecutionFlow) {
+ return ((PhysicalExecutionFlow) executionFlow).getAction() instanceof SourceAction;
+ }
+ return false;
+ }
+
+ @Override
+ public Set<URL> getJarsUrl() {
+ List<Flow> now = Collections.singletonList(executionFlow);
+ Set<URL> urls = new HashSet<>();
+ List<Flow> next = new ArrayList<>();
+ while (!now.isEmpty()) {
+ next.clear();
+ now.forEach(n -> {
+ if (n instanceof PhysicalExecutionFlow) {
+ urls.addAll(((PhysicalExecutionFlow) n).getAction().getJarUrls());
+ }
+ next.addAll(n.getNext());
+ });
+ now = next;
+ }
+ return urls;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
similarity index 57%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 688f0476b..d097f0bb2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -15,32 +15,26 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.execution;
+package org.apache.seatunnel.engine.server.task;
-import com.hazelcast.spi.impl.operationservice.OperationService;
-import lombok.NonNull;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
-import java.io.Serializable;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Set;
-public interface Task extends Serializable {
+public class SinkAggregatedCommitterTask extends CoordinatorTask {
- default void init() {
- }
-
- @NonNull
- ProgressState call();
-
- @NonNull
- Long getTaskID();
+ private static final long serialVersionUID = 5906594537520393503L;
+ private final SinkAction<?, ?, ?, ?> sink;
- default boolean isCooperative() {
- return false;
+ public SinkAggregatedCommitterTask(long taskID, SinkAction<?, ?, ?, ?> sink) {
+ super(taskID);
+ this.sink = sink;
}
- default void close() {
+ @Override
+ public Set<URL> getJarsUrl() {
+ return new HashSet<>(sink.getJarUrls());
}
-
- default void setOperationService(OperationService operationService) {
- }
-
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
new file mode 100644
index 000000000..4605d52f9
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.engine.core.dag.actions.PhysicalSourceAction;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends CoordinatorTask {
+
+ private static final long serialVersionUID = -3713701594297977775L;
+
+ private final PhysicalSourceAction<?, SplitT, ?> source;
+ private SourceSplitEnumerator<?, ?> enumerator;
+ private SeaTunnelSplitEnumeratorContext<SplitT> context;
+ private Map<Integer, UUID> taskMemberMapping;
+
+ @Override
+ public void init() throws Exception {
+ context = new SeaTunnelSplitEnumeratorContext<>(this.source.getParallelism(), this);
+ enumerator = this.source.getSource().createEnumerator(context);
+ taskMemberMapping = new HashMap<>();
+ enumerator.open();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (enumerator != null) {
+ enumerator.close();
+ }
+ }
+
+ public SourceSplitEnumeratorTask(long taskID, PhysicalSourceAction<?, SplitT, ?> source) {
+ super(taskID);
+ this.source = source;
+ }
+
+ private void receivedReader(int readerId, UUID memberID) {
+ this.addTaskMemberMapping(readerId, memberID);
+ enumerator.registerReader(readerId);
+ }
+
+ private void requestSplit(int taskID) {
+ enumerator.handleSplitRequest(taskID);
+ }
+
+ private void addTaskMemberMapping(int taskID, UUID memberID) {
+ taskMemberMapping.put(taskID, memberID);
+ }
+
+ public UUID getTaskMemberID(int taskID) {
+ return taskMemberMapping.get(taskID);
+ }
+
+ @Override
+ public Set<URL> getJarsUrl() {
+ return new HashSet<>(source.getJarUrls());
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupInfo.java
similarity index 58%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupInfo.java
index 09a61c116..e05ddcf9b 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupInfo.java
@@ -15,50 +15,61 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.core.dag.logicaldag;
+package org.apache.seatunnel.engine.server.task;
-import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+import com.hazelcast.internal.nio.IOUtil;
+import com.hazelcast.internal.serialization.Data;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
-import lombok.AllArgsConstructor;
-import lombok.Data;
import java.io.IOException;
+import java.net.URL;
+import java.util.Set;
-@Data
-@AllArgsConstructor
-public class LogicalVertex implements IdentifiedDataSerializable {
- private Integer vertexId;
- private Action action;
- private int parallelism;
+public class TaskGroupInfo implements IdentifiedDataSerializable {
- public LogicalVertex() {
+ private Data group;
+
+ private Set<URL> jars;
+
+ public TaskGroupInfo() {
+ }
+
+ public TaskGroupInfo(Data group, Set<URL> jars) {
+ this.group = group;
+ this.jars = jars;
+ }
+
+ public Data getGroup() {
+ return group;
+ }
+
+ public Set<URL> getJars() {
+ return jars;
}
@Override
public int getFactoryId() {
- return JobDataSerializerHook.FACTORY_ID;
+ return TaskDataSerializerHook.TASK_GROUP_INFO_TYPE;
}
@Override
public int getClassId() {
- return JobDataSerializerHook.LOGICAL_VERTEX;
+ return TaskDataSerializerHook.FACTORY_ID;
}
@Override
public void writeData(ObjectDataOutput out) throws IOException {
- out.writeInt(vertexId);
- out.writeObject(action);
- out.writeInt(parallelism);
+ out.writeObject(jars);
+ IOUtil.writeData(out, group);
}
@Override
public void readData(ObjectDataInput in) throws IOException {
- vertexId = in.readInt();
- action = in.readObject();
- parallelism = in.readInt();
+ jars = in.readObject();
+ group = IOUtil.readData(in);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/AssignSplitOperation.java
similarity index 56%
rename from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/AssignSplitOperation.java
index 09a61c116..b45a42a0e 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logicaldag/LogicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/AssignSplitOperation.java
@@ -15,50 +15,49 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.core.dag.logicaldag;
+package org.apache.seatunnel.engine.server.task.operation;
-import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
-import lombok.AllArgsConstructor;
-import lombok.Data;
+import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
+import java.util.List;
-@Data
-@AllArgsConstructor
-public class LogicalVertex implements IdentifiedDataSerializable {
- private Integer vertexId;
- private Action action;
- private int parallelism;
+public class AssignSplitOperation<SplitT> extends Operation implements IdentifiedDataSerializable {
- public LogicalVertex() {
+ private List<SplitT> splits;
+ private int taskID;
+
+ public AssignSplitOperation() {
+ }
+
+ public AssignSplitOperation(int taskID, List<SplitT> splits) {
+ this.splits = splits;
}
@Override
- public int getFactoryId() {
- return JobDataSerializerHook.FACTORY_ID;
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ out.writeObject(splits);
+ out.writeInt(taskID);
}
@Override
- public int getClassId() {
- return JobDataSerializerHook.LOGICAL_VERTEX;
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ splits = in.readObject();
+ taskID = in.readInt();
}
@Override
- public void writeData(ObjectDataOutput out) throws IOException {
- out.writeInt(vertexId);
- out.writeObject(action);
- out.writeInt(parallelism);
+ public int getFactoryId() {
+ return TaskDataSerializerHook.FACTORY_ID;
}
@Override
- public void readData(ObjectDataInput in) throws IOException {
- vertexId = in.readInt();
- action = in.readObject();
- parallelism = in.readInt();
+ public int getClassId() {
+ return TaskDataSerializerHook.ASSIGN_SPLIT_TYPE;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
new file mode 100644
index 000000000..5c26c3a39
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RegisterOperation.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * For {@link org.apache.seatunnel.api.source.SourceReader} to register with
+ * the {@link org.apache.seatunnel.api.source.SourceSplitEnumerator}
+ */
+public class RegisterOperation extends Operation implements IdentifiedDataSerializable {
+
+ private long readerTaskID;
+ private long enumeratorTaskID;
+
+ public RegisterOperation() {
+ }
+
+ public RegisterOperation(long readerTaskID, long enumeratorTaskID) {
+ this.readerTaskID = readerTaskID;
+ this.enumeratorTaskID = enumeratorTaskID;
+ }
+
+ @Override
+ public void run() throws Exception {
+ SeaTunnelServer server = getService();
+ UUID readerUUID = getCallerUuid();
+ TaskExecutionContext executionContext =
+ server.getTaskExecutionService().getExecutionContext(enumeratorTaskID);
+ // TODO register reader to enumerator
+ }
+
+ @Override
+ public String getServiceName() {
+ return SeaTunnelServer.SERVICE_NAME;
+ }
+
+ @Override
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ super.writeInternal(out);
+ out.writeLong(readerTaskID);
+ out.writeLong(enumeratorTaskID);
+ }
+
+ @Override
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ super.readInternal(in);
+ readerTaskID = in.readLong();
+ enumeratorTaskID = in.readLong();
+ }
+
+ @Override
+ public int getFactoryId() {
+ return TaskDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return TaskDataSerializerHook.REGISTER_TYPE;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RequestSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RequestSplitOperation.java
new file mode 100644
index 000000000..b2860ae40
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/RequestSplitOperation.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+
+public class RequestSplitOperation extends Operation implements IdentifiedDataSerializable {
+
+ private long enumeratorTaskID;
+
+ private long taskID;
+
+ public RequestSplitOperation() {
+ }
+
+ public RequestSplitOperation(long taskID, long enumeratorTaskID) {
+ this.enumeratorTaskID = enumeratorTaskID;
+ this.taskID = taskID;
+ }
+
+ @Override
+ public void run() throws Exception {
+ SeaTunnelServer server = getService();
+ server.getTaskExecutionService().getExecutionContext(enumeratorTaskID);
+ // TODO ask source split enumerator return split
+ }
+
+ @Override
+ public String getServiceName() {
+ return SeaTunnelServer.SERVICE_NAME;
+ }
+
+ @Override
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ super.writeInternal(out);
+ out.writeLong(enumeratorTaskID);
+ out.writeLong(taskID);
+ }
+
+ @Override
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ super.readInternal(in);
+ enumeratorTaskID = in.readLong();
+ taskID = in.readLong();
+ }
+
+ @Override
+ public int getFactoryId() {
+ return TaskDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return TaskDataSerializerHook.REQUEST_SPLIT_TYPE;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
index ab0062e85..bf045cd23 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
@@ -16,3 +16,4 @@
#
org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook
+org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook
\ No newline at end of file