You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/22 03:49:55 UTC

[incubator-seatunnel] branch st-engine updated: [enigne][pipeline] Generate the most pipelines (#2481)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 386e7e980 [enigne][pipeline] Generate the most pipelines (#2481)
386e7e980 is described below

commit 386e7e98054916cb07218067ab7e62f4a052428c
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Mon Aug 22 11:49:51 2022 +0800

    [enigne][pipeline] Generate the most pipelines (#2481)
    
    * [enigne][pipeline] Generate the most pipelines
    
    Split vertices with in-degree or out-degree greater than 1
    
    * [Engine][PhysicalPlan] Add spilt multi-input action avoid partition transform logic
    
    * [Engine][PhysicalPlan] Fix test bug
    
    Co-authored-by: Hisoka <fa...@qq.com>
---
 .../engine/client/LogicalDagGeneratorTest.java     |   2 +-
 .../core/dag/logical/LogicalDagGenerator.java      |  16 +--
 .../dag/execution/ExecutionPlanGenerator.java      |  10 +-
 .../server/dag/execution/PipelineGenerator.java    | 133 ++++++++++++++++++++-
 .../seatunnel/engine/server/dag/TaskTest.java      |   9 --
 5 files changed, 144 insertions(+), 26 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index e44328148..1984bbef5 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -57,7 +57,7 @@ public class LogicalDagGeneratorTest {
         LogicalDag logicalDag = logicalDagGenerator.generate();
         JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
         String result =
-            "{\"vertices\":[{\"id\":1,\"name\":\"LocalFile(id=1)\",\"parallelism\":6},{\"id\":2,\"name\":\"FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"FakeSource(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"LocalFile\",\"targetVertex\":\"FakeSource\"},{\"inputVertex\":\"LocalFile\",\"targetVertex\":\"FakeSource\"}]}";
+            "{\"vertices\":[{\"id\":1,\"name\":\"LocalFile(id=1)\",\"parallelism\":6},{\"id\":2,\"name\":\"FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"FakeSource(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"FakeSource\",\"targetVertex\":\"LocalFile\"},{\"inputVertex\":\"FakeSource\",\"targetVertex\":\"LocalFile\"}]}";
         Assert.assertEquals(result, logicalDagJson.toString());
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
index d5440cda7..9d7534a25 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
@@ -81,13 +81,13 @@ public class LogicalDagGenerator {
 
     private Set<LogicalEdge> createLogicalEdges() {
         return inputVerticesMap.entrySet()
-            .stream()
-            .map(entry -> entry.getValue()
-                    .stream()
-                    .map(targetId -> new LogicalEdge(logicalVertexMap.get(entry.getKey()),
-                        logicalVertexMap.get(targetId)))
-                    .collect(Collectors.toList()))
-            .flatMap(Collection::stream)
-            .collect(Collectors.toSet());
+                .stream()
+                .map(entry -> entry.getValue()
+                        .stream()
+                        .map(upstreamId -> new LogicalEdge(logicalVertexMap.get(upstreamId),
+                                logicalVertexMap.get(entry.getKey())))
+                        .collect(Collectors.toList()))
+                .flatMap(Collection::stream)
+                .collect(Collectors.toSet());
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 17ab61dfb..b895f1c69 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -116,7 +116,8 @@ public class ExecutionPlanGenerator {
             }
         });
         List<ExecutionEdge> executionEdges = createExecutionEdges();
-        return new ExecutionPlan(PipelineGenerator.generatePipelines(executionEdges), jobImmutableInformation);
+        return new ExecutionPlan(new PipelineGenerator(executionVertexMap.values(), executionEdges)
+            .generatePipelines(), jobImmutableInformation);
     }
 
     public List<ExecutionEdge> createExecutionEdges() {
@@ -180,7 +181,7 @@ public class ExecutionPlanGenerator {
         logicalToExecutionMap.put(logicalVertex.getVertexId(), executionVertex.getVertexId());
     }
 
-    private static Action recreateAction(Action action, Long id) {
+    public static Action recreateAction(Action action, Long id) {
         Action newAction;
         if (action instanceof PartitionTransformAction) {
             newAction = new PartitionTransformAction(id,
@@ -199,6 +200,11 @@ public class ExecutionPlanGenerator {
                 action.getName(),
                 ((SourceAction<?, ?, ?>) action).getSource(),
                 action.getJarUrls());
+        } else if (action instanceof TransformChainAction) {
+            newAction = new TransformChainAction(id,
+                action.getName(),
+                action.getJarUrls(),
+                ((TransformChainAction<?>) action).getTransforms());
         } else {
             throw new UnknownActionException(action);
         }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
index 6f12352c4..8e1f426cd 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -17,20 +17,59 @@
 
 package org.apache.seatunnel.engine.server.dag.execution;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 public class PipelineGenerator {
+    /**
+     * The action & vertex ID needs to be regenerated because of split pipeline.
+     */
+    private final IdGenerator idGenerator = new IdGenerator();
+
+    /**
+     * key: the vertex id.
+     * <br> value: The input vertices of this vertex.
+     *
+     * <p>When chaining vertices, it need to query whether the vertex has multiple input vertices. </p>
+     */
+    private final Map<Long, List<ExecutionVertex>> inputVerticesMap = new HashMap<>();
+
+    /**
+     * key: the vertex id.
+     * <br> value: The target vertices of this vertex.
+     *
+     * <p>When chaining vertices, it need to query whether the vertex has multiple target vertices. </p>
+     */
+    private final Map<Long, List<ExecutionVertex>> targetVerticesMap = new HashMap<>();
+
+    private final Collection<ExecutionVertex> vertices;
+
+    private final List<ExecutionEdge> edges;
+
+    public PipelineGenerator(Collection<ExecutionVertex> vertices,
+                             List<ExecutionEdge> edges) {
+        this.vertices = vertices;
+        this.edges = edges;
+    }
 
-    public static List<Pipeline> generatePipelines(List<ExecutionEdge> edges) {
+    public List<Pipeline> generatePipelines() {
+        List<ExecutionEdge> executionEdges = expandEdgeByParallelism(edges);
 
         // Split into multiple unrelated pipelines
-        List<List<ExecutionEdge>> edgesList = splitUnrelatedEdges(expandEdgeByParallelism(edges));
+        List<List<ExecutionEdge>> edgesList = splitUnrelatedEdges(executionEdges);
+
+        edgesList = edgesList.stream().flatMap(e -> this.splitUnionEdge(e).stream()).collect(Collectors.toList());
 
         // just convert execution plan to pipeline at now. We should split it to multi pipeline with
         // cache in the future
@@ -62,6 +101,88 @@ public class PipelineGenerator {
         return edges;
     }
 
+    private List<List<ExecutionEdge>> splitUnionEdge(List<ExecutionEdge> edges) {
+        fillVerticesMap(edges);
+        if (checkCanSplit(edges)) {
+            List<ExecutionVertex> sourceVertices = getSourceVertices();
+            List<List<ExecutionEdge>> pipelines = new ArrayList<>();
+            sourceVertices.forEach(sourceVertex -> splitUnionVertex(pipelines, new ArrayList<>(), sourceVertex));
+            return pipelines;
+        } else {
+            return Collections.singletonList(edges);
+        }
+    }
+
+    /**
+     * If this execution vertex have partition transform, can't be spilt
+     */
+    private boolean checkCanSplit(List<ExecutionEdge> edges) {
+        return edges.stream().noneMatch(e -> e.getRightVertex().getAction() instanceof PartitionTransformAction)
+                && edges.stream().anyMatch(e -> inputVerticesMap.get(e.getRightVertexId()).size() > 1);
+    }
+
+    private void splitUnionVertex(List<List<ExecutionEdge>> pipelines, List<ExecutionVertex> pipeline,
+                                  ExecutionVertex currentVertex) {
+        pipeline.add(recreateVertex(currentVertex, pipeline.size() == 0 ?
+                currentVertex.getParallelism() :
+                pipeline.get(pipeline.size() - 1).getParallelism()));
+        List<ExecutionVertex> targetVertices = targetVerticesMap.get(currentVertex.getVertexId());
+        if (targetVertices == null || targetVertices.size() == 0) {
+            pipelines.add(createExecutionEdges(pipeline));
+            return;
+        }
+        for (int i = 0; i < targetVertices.size(); i++) {
+            if (i > 0) {
+                pipeline = recreatePipeline(pipeline);
+            }
+            splitUnionVertex(pipelines, pipeline, targetVertices.get(i));
+            pipeline.remove(pipeline.size() - 1);
+        }
+    }
+
+    private List<ExecutionEdge> createExecutionEdges(List<ExecutionVertex> pipeline) {
+        checkArgument(pipeline != null && pipeline.size() > 1);
+        List<ExecutionEdge> edges = new ArrayList<>(pipeline.size() - 1);
+        for (int i = 1; i < pipeline.size(); i++) {
+            edges.add(new ExecutionEdge(pipeline.get(i - 1), pipeline.get(i)));
+        }
+        return edges;
+    }
+
+    private List<ExecutionVertex> recreatePipeline(List<ExecutionVertex> pipeline) {
+        return pipeline.stream()
+            .map(vertex -> recreateVertex(vertex, vertex.getParallelism()))
+            .collect(Collectors.toList());
+    }
+
+    private ExecutionVertex recreateVertex(ExecutionVertex vertex, int parallelism) {
+        long id = idGenerator.getNextId();
+        Action action = vertex.getAction();
+        return new ExecutionVertex(id, ExecutionPlanGenerator.recreateAction(action, id), action instanceof PartitionTransformAction ? vertex.getParallelism() : parallelism);
+    }
+
+    private void fillVerticesMap(List<ExecutionEdge> edges) {
+        inputVerticesMap.clear();
+        targetVerticesMap.clear();
+        edges.forEach(edge -> {
+            inputVerticesMap.computeIfAbsent(edge.getRightVertexId(), id -> new ArrayList<>())
+                .add(edge.getLeftVertex());
+            targetVerticesMap.computeIfAbsent(edge.getLeftVertexId(), id -> new ArrayList<>())
+                .add(edge.getRightVertex());
+        });
+    }
+
+    private List<ExecutionVertex> getSourceVertices() {
+        List<ExecutionVertex> sourceVertices = new ArrayList<>();
+        for (ExecutionVertex vertex : vertices) {
+            List<ExecutionVertex> inputVertices = inputVerticesMap.get(vertex.getVertexId());
+            if (inputVertices == null || inputVertices.size() == 0) {
+                sourceVertices.add(vertex);
+            }
+        }
+        return sourceVertices;
+    }
+
     private static List<List<ExecutionEdge>> splitUnrelatedEdges(List<ExecutionEdge> edges) {
 
         List<List<ExecutionEdge>> edgeList = new ArrayList<>();
@@ -74,21 +195,21 @@ public class PipelineGenerator {
     private static List<ExecutionEdge> findVertexRelatedEdge(List<ExecutionEdge> edges, ExecutionVertex vertex) {
 
         List<ExecutionEdge> sourceEdges = edges.stream().filter(edge -> edge.getLeftVertex().equals(vertex))
-                .collect(Collectors.toList());
+            .collect(Collectors.toList());
         List<ExecutionEdge> destinationEdges = edges.stream().filter(edge -> edge.getRightVertex().equals(vertex))
-                .collect(Collectors.toList());
+            .collect(Collectors.toList());
 
         List<ExecutionEdge> relatedEdges = new ArrayList<>(sourceEdges);
         relatedEdges.addAll(destinationEdges);
 
         List<ExecutionVertex> relatedActions =
-                sourceEdges.stream().map(ExecutionEdge::getRightVertex).collect(Collectors.toList());
+            sourceEdges.stream().map(ExecutionEdge::getRightVertex).collect(Collectors.toList());
         relatedActions.addAll(destinationEdges.stream().map(ExecutionEdge::getLeftVertex).collect(Collectors.toList()));
 
         edges.removeAll(relatedEdges);
 
         relatedEdges.addAll(relatedActions.stream()
-                .flatMap(d -> findVertexRelatedEdge(edges, d).stream()).collect(Collectors.toList()));
+            .flatMap(d -> findVertexRelatedEdge(edges, d).stream()).collect(Collectors.toList()));
 
         return relatedEdges;
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index f01dcc218..5c78af388 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -86,13 +86,6 @@ public class TaskTest {
         fake.setParallelism(3);
         LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
 
-        FakeSource fakeSource2 = new FakeSource();
-        fakeSource2.setSeaTunnelContext(SeaTunnelContext.getContext());
-        Action fake2 = new SourceAction<>(idGenerator.getNextId(), "fake", fakeSource2,
-                Collections.singletonList(new URL("file:///fake.jar")));
-        fake2.setParallelism(3);
-        LogicalVertex fake2Vertex = new LogicalVertex(fake2.getId(), fake2, 3);
-
         ConsoleSink consoleSink = new ConsoleSink();
         consoleSink.setSeaTunnelContext(SeaTunnelContext.getContext());
         Action console = new SinkAction<>(idGenerator.getNextId(), "console", consoleSink,
@@ -105,7 +98,6 @@ public class TaskTest {
         LogicalDag logicalDag = new LogicalDag();
         logicalDag.addLogicalVertex(fakeVertex);
         logicalDag.addLogicalVertex(consoleVertex);
-        logicalDag.addLogicalVertex(fake2Vertex);
         logicalDag.addEdge(edge);
 
         JobConfig config = new JobConfig();
@@ -144,7 +136,6 @@ public class TaskTest {
         LogicalDag logicalDag = new LogicalDag();
         logicalDag.addLogicalVertex(fakeVertex);
         logicalDag.addLogicalVertex(consoleVertex);
-        logicalDag.addLogicalVertex(fake2Vertex);
         logicalDag.addEdge(edge);
 
         JobConfig config = new JobConfig();