You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/22 03:49:55 UTC
[incubator-seatunnel] branch st-engine updated: [enigne][pipeline] Generate the most pipelines (#2481)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 386e7e980 [enigne][pipeline] Generate the most pipelines (#2481)
386e7e980 is described below
commit 386e7e98054916cb07218067ab7e62f4a052428c
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Mon Aug 22 11:49:51 2022 +0800
[enigne][pipeline] Generate the most pipelines (#2481)
* [enigne][pipeline] Generate the most pipelines
Split vertices with in-degree or out-degree greater than 1
* [Engine][PhysicalPlan] Add spilt multi-input action avoid partition transform logic
* [Engine][PhysicalPlan] Fix test bug
Co-authored-by: Hisoka <fa...@qq.com>
---
.../engine/client/LogicalDagGeneratorTest.java | 2 +-
.../core/dag/logical/LogicalDagGenerator.java | 16 +--
.../dag/execution/ExecutionPlanGenerator.java | 10 +-
.../server/dag/execution/PipelineGenerator.java | 133 ++++++++++++++++++++-
.../seatunnel/engine/server/dag/TaskTest.java | 9 --
5 files changed, 144 insertions(+), 26 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index e44328148..1984bbef5 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -57,7 +57,7 @@ public class LogicalDagGeneratorTest {
LogicalDag logicalDag = logicalDagGenerator.generate();
JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
String result =
- "{\"vertices\":[{\"id\":1,\"name\":\"LocalFile(id=1)\",\"parallelism\":6},{\"id\":2,\"name\":\"FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"FakeSource(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"LocalFile\",\"targetVertex\":\"FakeSource\"},{\"inputVertex\":\"LocalFile\",\"targetVertex\":\"FakeSource\"}]}";
+ "{\"vertices\":[{\"id\":1,\"name\":\"LocalFile(id=1)\",\"parallelism\":6},{\"id\":2,\"name\":\"FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"FakeSource(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"FakeSource\",\"targetVertex\":\"LocalFile\"},{\"inputVertex\":\"FakeSource\",\"targetVertex\":\"LocalFile\"}]}";
Assert.assertEquals(result, logicalDagJson.toString());
}
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
index d5440cda7..9d7534a25 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
@@ -81,13 +81,13 @@ public class LogicalDagGenerator {
private Set<LogicalEdge> createLogicalEdges() {
return inputVerticesMap.entrySet()
- .stream()
- .map(entry -> entry.getValue()
- .stream()
- .map(targetId -> new LogicalEdge(logicalVertexMap.get(entry.getKey()),
- logicalVertexMap.get(targetId)))
- .collect(Collectors.toList()))
- .flatMap(Collection::stream)
- .collect(Collectors.toSet());
+ .stream()
+ .map(entry -> entry.getValue()
+ .stream()
+ .map(upstreamId -> new LogicalEdge(logicalVertexMap.get(upstreamId),
+ logicalVertexMap.get(entry.getKey())))
+ .collect(Collectors.toList()))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 17ab61dfb..b895f1c69 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -116,7 +116,8 @@ public class ExecutionPlanGenerator {
}
});
List<ExecutionEdge> executionEdges = createExecutionEdges();
- return new ExecutionPlan(PipelineGenerator.generatePipelines(executionEdges), jobImmutableInformation);
+ return new ExecutionPlan(new PipelineGenerator(executionVertexMap.values(), executionEdges)
+ .generatePipelines(), jobImmutableInformation);
}
public List<ExecutionEdge> createExecutionEdges() {
@@ -180,7 +181,7 @@ public class ExecutionPlanGenerator {
logicalToExecutionMap.put(logicalVertex.getVertexId(), executionVertex.getVertexId());
}
- private static Action recreateAction(Action action, Long id) {
+ public static Action recreateAction(Action action, Long id) {
Action newAction;
if (action instanceof PartitionTransformAction) {
newAction = new PartitionTransformAction(id,
@@ -199,6 +200,11 @@ public class ExecutionPlanGenerator {
action.getName(),
((SourceAction<?, ?, ?>) action).getSource(),
action.getJarUrls());
+ } else if (action instanceof TransformChainAction) {
+ newAction = new TransformChainAction(id,
+ action.getName(),
+ action.getJarUrls(),
+ ((TransformChainAction<?>) action).getTransforms());
} else {
throw new UnknownActionException(action);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
index 6f12352c4..8e1f426cd 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -17,20 +17,59 @@
package org.apache.seatunnel.engine.server.dag.execution;
+import static com.google.common.base.Preconditions.checkArgument;
+
import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class PipelineGenerator {
+ /**
+ * The action & vertex ID needs to be regenerated because of split pipeline.
+ */
+ private final IdGenerator idGenerator = new IdGenerator();
+
+ /**
+ * key: the vertex id.
+ * <br> value: The input vertices of this vertex.
+ *
+ * <p>When chaining vertices, it need to query whether the vertex has multiple input vertices. </p>
+ */
+ private final Map<Long, List<ExecutionVertex>> inputVerticesMap = new HashMap<>();
+
+ /**
+ * key: the vertex id.
+ * <br> value: The target vertices of this vertex.
+ *
+ * <p>When chaining vertices, it need to query whether the vertex has multiple target vertices. </p>
+ */
+ private final Map<Long, List<ExecutionVertex>> targetVerticesMap = new HashMap<>();
+
+ private final Collection<ExecutionVertex> vertices;
+
+ private final List<ExecutionEdge> edges;
+
+ public PipelineGenerator(Collection<ExecutionVertex> vertices,
+ List<ExecutionEdge> edges) {
+ this.vertices = vertices;
+ this.edges = edges;
+ }
- public static List<Pipeline> generatePipelines(List<ExecutionEdge> edges) {
+ public List<Pipeline> generatePipelines() {
+ List<ExecutionEdge> executionEdges = expandEdgeByParallelism(edges);
// Split into multiple unrelated pipelines
- List<List<ExecutionEdge>> edgesList = splitUnrelatedEdges(expandEdgeByParallelism(edges));
+ List<List<ExecutionEdge>> edgesList = splitUnrelatedEdges(executionEdges);
+
+ edgesList = edgesList.stream().flatMap(e -> this.splitUnionEdge(e).stream()).collect(Collectors.toList());
// just convert execution plan to pipeline at now. We should split it to multi pipeline with
// cache in the future
@@ -62,6 +101,88 @@ public class PipelineGenerator {
return edges;
}
+ private List<List<ExecutionEdge>> splitUnionEdge(List<ExecutionEdge> edges) {
+ fillVerticesMap(edges);
+ if (checkCanSplit(edges)) {
+ List<ExecutionVertex> sourceVertices = getSourceVertices();
+ List<List<ExecutionEdge>> pipelines = new ArrayList<>();
+ sourceVertices.forEach(sourceVertex -> splitUnionVertex(pipelines, new ArrayList<>(), sourceVertex));
+ return pipelines;
+ } else {
+ return Collections.singletonList(edges);
+ }
+ }
+
+ /**
+ * If this execution vertex have partition transform, can't be spilt
+ */
+ private boolean checkCanSplit(List<ExecutionEdge> edges) {
+ return edges.stream().noneMatch(e -> e.getRightVertex().getAction() instanceof PartitionTransformAction)
+ && edges.stream().anyMatch(e -> inputVerticesMap.get(e.getRightVertexId()).size() > 1);
+ }
+
+ private void splitUnionVertex(List<List<ExecutionEdge>> pipelines, List<ExecutionVertex> pipeline,
+ ExecutionVertex currentVertex) {
+ pipeline.add(recreateVertex(currentVertex, pipeline.size() == 0 ?
+ currentVertex.getParallelism() :
+ pipeline.get(pipeline.size() - 1).getParallelism()));
+ List<ExecutionVertex> targetVertices = targetVerticesMap.get(currentVertex.getVertexId());
+ if (targetVertices == null || targetVertices.size() == 0) {
+ pipelines.add(createExecutionEdges(pipeline));
+ return;
+ }
+ for (int i = 0; i < targetVertices.size(); i++) {
+ if (i > 0) {
+ pipeline = recreatePipeline(pipeline);
+ }
+ splitUnionVertex(pipelines, pipeline, targetVertices.get(i));
+ pipeline.remove(pipeline.size() - 1);
+ }
+ }
+
+ private List<ExecutionEdge> createExecutionEdges(List<ExecutionVertex> pipeline) {
+ checkArgument(pipeline != null && pipeline.size() > 1);
+ List<ExecutionEdge> edges = new ArrayList<>(pipeline.size() - 1);
+ for (int i = 1; i < pipeline.size(); i++) {
+ edges.add(new ExecutionEdge(pipeline.get(i - 1), pipeline.get(i)));
+ }
+ return edges;
+ }
+
+ private List<ExecutionVertex> recreatePipeline(List<ExecutionVertex> pipeline) {
+ return pipeline.stream()
+ .map(vertex -> recreateVertex(vertex, vertex.getParallelism()))
+ .collect(Collectors.toList());
+ }
+
+ private ExecutionVertex recreateVertex(ExecutionVertex vertex, int parallelism) {
+ long id = idGenerator.getNextId();
+ Action action = vertex.getAction();
+ return new ExecutionVertex(id, ExecutionPlanGenerator.recreateAction(action, id), action instanceof PartitionTransformAction ? vertex.getParallelism() : parallelism);
+ }
+
+ private void fillVerticesMap(List<ExecutionEdge> edges) {
+ inputVerticesMap.clear();
+ targetVerticesMap.clear();
+ edges.forEach(edge -> {
+ inputVerticesMap.computeIfAbsent(edge.getRightVertexId(), id -> new ArrayList<>())
+ .add(edge.getLeftVertex());
+ targetVerticesMap.computeIfAbsent(edge.getLeftVertexId(), id -> new ArrayList<>())
+ .add(edge.getRightVertex());
+ });
+ }
+
+ private List<ExecutionVertex> getSourceVertices() {
+ List<ExecutionVertex> sourceVertices = new ArrayList<>();
+ for (ExecutionVertex vertex : vertices) {
+ List<ExecutionVertex> inputVertices = inputVerticesMap.get(vertex.getVertexId());
+ if (inputVertices == null || inputVertices.size() == 0) {
+ sourceVertices.add(vertex);
+ }
+ }
+ return sourceVertices;
+ }
+
private static List<List<ExecutionEdge>> splitUnrelatedEdges(List<ExecutionEdge> edges) {
List<List<ExecutionEdge>> edgeList = new ArrayList<>();
@@ -74,21 +195,21 @@ public class PipelineGenerator {
private static List<ExecutionEdge> findVertexRelatedEdge(List<ExecutionEdge> edges, ExecutionVertex vertex) {
List<ExecutionEdge> sourceEdges = edges.stream().filter(edge -> edge.getLeftVertex().equals(vertex))
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
List<ExecutionEdge> destinationEdges = edges.stream().filter(edge -> edge.getRightVertex().equals(vertex))
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
List<ExecutionEdge> relatedEdges = new ArrayList<>(sourceEdges);
relatedEdges.addAll(destinationEdges);
List<ExecutionVertex> relatedActions =
- sourceEdges.stream().map(ExecutionEdge::getRightVertex).collect(Collectors.toList());
+ sourceEdges.stream().map(ExecutionEdge::getRightVertex).collect(Collectors.toList());
relatedActions.addAll(destinationEdges.stream().map(ExecutionEdge::getLeftVertex).collect(Collectors.toList()));
edges.removeAll(relatedEdges);
relatedEdges.addAll(relatedActions.stream()
- .flatMap(d -> findVertexRelatedEdge(edges, d).stream()).collect(Collectors.toList()));
+ .flatMap(d -> findVertexRelatedEdge(edges, d).stream()).collect(Collectors.toList()));
return relatedEdges;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index f01dcc218..5c78af388 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -86,13 +86,6 @@ public class TaskTest {
fake.setParallelism(3);
LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
- FakeSource fakeSource2 = new FakeSource();
- fakeSource2.setSeaTunnelContext(SeaTunnelContext.getContext());
- Action fake2 = new SourceAction<>(idGenerator.getNextId(), "fake", fakeSource2,
- Collections.singletonList(new URL("file:///fake.jar")));
- fake2.setParallelism(3);
- LogicalVertex fake2Vertex = new LogicalVertex(fake2.getId(), fake2, 3);
-
ConsoleSink consoleSink = new ConsoleSink();
consoleSink.setSeaTunnelContext(SeaTunnelContext.getContext());
Action console = new SinkAction<>(idGenerator.getNextId(), "console", consoleSink,
@@ -105,7 +98,6 @@ public class TaskTest {
LogicalDag logicalDag = new LogicalDag();
logicalDag.addLogicalVertex(fakeVertex);
logicalDag.addLogicalVertex(consoleVertex);
- logicalDag.addLogicalVertex(fake2Vertex);
logicalDag.addEdge(edge);
JobConfig config = new JobConfig();
@@ -144,7 +136,6 @@ public class TaskTest {
LogicalDag logicalDag = new LogicalDag();
logicalDag.addLogicalVertex(fakeVertex);
logicalDag.addLogicalVertex(consoleVertex);
- logicalDag.addLogicalVertex(fake2Vertex);
logicalDag.addEdge(edge);
JobConfig config = new JobConfig();