You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/09/04 16:45:05 UTC
[incubator-seatunnel] branch st-engine updated: [Engine][PhysicalPlan] Recreate action with parallelism (#2638)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 7bc2ec540 [Engine][PhysicalPlan] Recreate action with parallelism (#2638)
7bc2ec540 is described below
commit 7bc2ec5405c6b534c4d31e7febffb19a2bf743e3
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Sep 5 00:45:00 2022 +0800
[Engine][PhysicalPlan] Recreate action with parallelism (#2638)
* [Engine][PhysicalPlan] Recreate action with parallelism
---
.../engine/server/dag/execution/ExecutionPlanGenerator.java | 6 +++---
.../seatunnel/engine/server/dag/execution/PipelineGenerator.java | 2 +-
.../test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java | 2 +-
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index b95b3f631..b32f53b08 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -157,7 +157,7 @@ public class ExecutionPlanGenerator {
final long newId = idGenerator.getNextId();
Action newAction;
if (chainedVertices.size() < 1) {
- newAction = recreateAction(logicalVertex.getAction(), newId);
+ newAction = recreateAction(logicalVertex.getAction(), newId, logicalVertex.getParallelism());
} else {
List<SeaTunnelTransform> transforms = new ArrayList<>(chainedVertices.size());
List<String> names = new ArrayList<>(chainedVertices.size());
@@ -182,7 +182,7 @@ public class ExecutionPlanGenerator {
logicalToExecutionMap.put(logicalVertex.getVertexId(), executionVertex.getVertexId());
}
- public static Action recreateAction(Action action, Long id) {
+ public static Action recreateAction(Action action, Long id, int parallelism) {
Action newAction;
if (action instanceof PartitionTransformAction) {
newAction = new PartitionTransformAction(id,
@@ -209,7 +209,7 @@ public class ExecutionPlanGenerator {
} else {
throw new UnknownActionException(action);
}
- newAction.setParallelism(action.getParallelism());
+ newAction.setParallelism(parallelism);
return newAction;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
index 8e1f426cd..bc5e271f2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -158,7 +158,7 @@ public class PipelineGenerator {
private ExecutionVertex recreateVertex(ExecutionVertex vertex, int parallelism) {
long id = idGenerator.getNextId();
Action action = vertex.getAction();
- return new ExecutionVertex(id, ExecutionPlanGenerator.recreateAction(action, id), action instanceof PartitionTransformAction ? vertex.getParallelism() : parallelism);
+ return new ExecutionVertex(id, ExecutionPlanGenerator.recreateAction(action, id, parallelism), action instanceof PartitionTransformAction ? vertex.getParallelism() : parallelism);
}
private void fillVerticesMap(List<ExecutionEdge> edges) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index fbced23a2..6baf2d98b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -126,6 +126,6 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
Assert.assertEquals(physicalPlan.getPipelineList().size(), 1);
Assert.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(), 1);
- Assert.assertEquals(physicalPlan.getPipelineList().get(0).getPhysicalVertexList().size(), 1);
+ Assert.assertEquals(physicalPlan.getPipelineList().get(0).getPhysicalVertexList().size(), 2);
}
}