You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/09/04 16:45:05 UTC

[incubator-seatunnel] branch st-engine updated: [Engine][PhysicalPlan] Recreate action with parallelism (#2638)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 7bc2ec540 [Engine][PhysicalPlan] Recreate action with parallelism (#2638)
7bc2ec540 is described below

commit 7bc2ec5405c6b534c4d31e7febffb19a2bf743e3
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Sep 5 00:45:00 2022 +0800

    [Engine][PhysicalPlan] Recreate action with parallelism (#2638)
    
    * [Engine][PhysicalPlan] Recreate action with parallelism
---
 .../engine/server/dag/execution/ExecutionPlanGenerator.java         | 6 +++---
 .../seatunnel/engine/server/dag/execution/PipelineGenerator.java    | 2 +-
 .../test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java  | 2 +-
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index b95b3f631..b32f53b08 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -157,7 +157,7 @@ public class ExecutionPlanGenerator {
         final long newId = idGenerator.getNextId();
         Action newAction;
         if (chainedVertices.size() < 1) {
-            newAction = recreateAction(logicalVertex.getAction(), newId);
+            newAction = recreateAction(logicalVertex.getAction(), newId, logicalVertex.getParallelism());
         } else {
             List<SeaTunnelTransform> transforms = new ArrayList<>(chainedVertices.size());
             List<String> names = new ArrayList<>(chainedVertices.size());
@@ -182,7 +182,7 @@ public class ExecutionPlanGenerator {
         logicalToExecutionMap.put(logicalVertex.getVertexId(), executionVertex.getVertexId());
     }
 
-    public static Action recreateAction(Action action, Long id) {
+    public static Action recreateAction(Action action, Long id, int parallelism) {
         Action newAction;
         if (action instanceof PartitionTransformAction) {
             newAction = new PartitionTransformAction(id,
@@ -209,7 +209,7 @@ public class ExecutionPlanGenerator {
         } else {
             throw new UnknownActionException(action);
         }
-        newAction.setParallelism(action.getParallelism());
+        newAction.setParallelism(parallelism);
         return newAction;
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
index 8e1f426cd..bc5e271f2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -158,7 +158,7 @@ public class PipelineGenerator {
     private ExecutionVertex recreateVertex(ExecutionVertex vertex, int parallelism) {
         long id = idGenerator.getNextId();
         Action action = vertex.getAction();
-        return new ExecutionVertex(id, ExecutionPlanGenerator.recreateAction(action, id), action instanceof PartitionTransformAction ? vertex.getParallelism() : parallelism);
+        return new ExecutionVertex(id, ExecutionPlanGenerator.recreateAction(action, id, parallelism), action instanceof PartitionTransformAction ? vertex.getParallelism() : parallelism);
     }
 
     private void fillVerticesMap(List<ExecutionEdge> edges) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index fbced23a2..6baf2d98b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -126,6 +126,6 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
 
         Assert.assertEquals(physicalPlan.getPipelineList().size(), 1);
         Assert.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(), 1);
-        Assert.assertEquals(physicalPlan.getPipelineList().get(0).getPhysicalVertexList().size(), 1);
+        Assert.assertEquals(physicalPlan.getPipelineList().get(0).getPhysicalVertexList().size(), 2);
     }
 }