You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/25 03:51:36 UTC
[incubator-seatunnel] branch st-engine updated: [Engine][Task] Change TaskGroup name (#2522)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new e5154b754 [Engine][Task] Change TaskGroup name (#2522)
e5154b754 is described below
commit e5154b754b497923195f5d586779da5b51ffeb95
Author: Hisoka <fa...@qq.com>
AuthorDate: Thu Aug 25 11:51:31 2022 +0800
[Engine][Task] Change TaskGroup name (#2522)
---
.../engine/server/dag/physical/PhysicalPlanGenerator.java | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 36e156825..7dcf2e9e9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -198,7 +198,7 @@ public class PhysicalPlanGenerator {
atomicInteger.incrementAndGet(),
executorService,
collect.size(),
- new TaskGroupDefaultImpl(taskGroupID, "SinkAggregatedCommitterTask",
+ new TaskGroupDefaultImpl(taskGroupID, s.getName() + "-AggregatedCommitterTask",
Lists.newArrayList(t)),
taskFuture,
flakeIdGenerator,
@@ -237,7 +237,8 @@ public class PhysicalPlanGenerator {
i,
executorService,
flow.getAction().getParallelism(),
- new TaskGroupDefaultImpl(taskGroupID, "PartitionTransformTask",
+ new TaskGroupDefaultImpl(taskGroupID, flow.getAction().getName() +
+ "-PartitionTransformTask",
Lists.newArrayList(seaTunnelTask)),
taskFuture,
flakeIdGenerator,
@@ -270,7 +271,8 @@ public class PhysicalPlanGenerator {
atomicInteger.incrementAndGet(),
executorService,
sources.size(),
- new TaskGroupDefaultImpl(taskGroupID, s.getName(), Lists.newArrayList(t)),
+ new TaskGroupDefaultImpl(taskGroupID, s.getName() + "-SplitEnumerator",
+ Lists.newArrayList(t)),
taskFuture,
flakeIdGenerator,
pipelineIndex,
@@ -329,7 +331,8 @@ public class PhysicalPlanGenerator {
i,
executorService,
flow.getAction().getParallelism(),
- new TaskGroupWithIntermediateQueue(taskGroupID, "SourceTask",
+ new TaskGroupWithIntermediateQueue(taskGroupID, flow.getAction().getName() +
+ "-SourceTask",
taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
taskFuture,
flakeIdGenerator,
@@ -344,7 +347,8 @@ public class PhysicalPlanGenerator {
i,
executorService,
flow.getAction().getParallelism(),
- new TaskGroupDefaultImpl(taskGroupID, "SourceTask",
+ new TaskGroupDefaultImpl(taskGroupID, flow.getAction().getName() +
+ "-SourceTask",
taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
taskFuture,
flakeIdGenerator,