You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/25 03:51:36 UTC

[incubator-seatunnel] branch st-engine updated: [Engine][Task] Change TaskGroup name (#2522)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new e5154b754 [Engine][Task] Change TaskGroup name (#2522)
e5154b754 is described below

commit e5154b754b497923195f5d586779da5b51ffeb95
Author: Hisoka <fa...@qq.com>
AuthorDate: Thu Aug 25 11:51:31 2022 +0800

    [Engine][Task] Change TaskGroup name (#2522)
---
 .../engine/server/dag/physical/PhysicalPlanGenerator.java  | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 36e156825..7dcf2e9e9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -198,7 +198,7 @@ public class PhysicalPlanGenerator {
                         atomicInteger.incrementAndGet(),
                         executorService,
                         collect.size(),
-                        new TaskGroupDefaultImpl(taskGroupID, "SinkAggregatedCommitterTask",
+                        new TaskGroupDefaultImpl(taskGroupID, s.getName() + "-AggregatedCommitterTask",
                             Lists.newArrayList(t)),
                         taskFuture,
                         flakeIdGenerator,
@@ -237,7 +237,8 @@ public class PhysicalPlanGenerator {
                         i,
                         executorService,
                         flow.getAction().getParallelism(),
-                        new TaskGroupDefaultImpl(taskGroupID, "PartitionTransformTask",
+                        new TaskGroupDefaultImpl(taskGroupID, flow.getAction().getName() +
+                                "-PartitionTransformTask",
                             Lists.newArrayList(seaTunnelTask)),
                         taskFuture,
                         flakeIdGenerator,
@@ -270,7 +271,8 @@ public class PhysicalPlanGenerator {
                 atomicInteger.incrementAndGet(),
                 executorService,
                 sources.size(),
-                new TaskGroupDefaultImpl(taskGroupID, s.getName(), Lists.newArrayList(t)),
+                new TaskGroupDefaultImpl(taskGroupID, s.getName() + "-SplitEnumerator",
+                        Lists.newArrayList(t)),
                 taskFuture,
                 flakeIdGenerator,
                 pipelineIndex,
@@ -329,7 +331,8 @@ public class PhysicalPlanGenerator {
                             i,
                             executorService,
                             flow.getAction().getParallelism(),
-                            new TaskGroupWithIntermediateQueue(taskGroupID, "SourceTask",
+                            new TaskGroupWithIntermediateQueue(taskGroupID, flow.getAction().getName() +
+                                    "-SourceTask",
                                 taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
                             taskFuture,
                             flakeIdGenerator,
@@ -344,7 +347,8 @@ public class PhysicalPlanGenerator {
                             i,
                             executorService,
                             flow.getAction().getParallelism(),
-                            new TaskGroupDefaultImpl(taskGroupID, "SourceTask",
+                            new TaskGroupDefaultImpl(taskGroupID, flow.getAction().getName() +
+                                    "-SourceTask",
                                 taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
                             taskFuture,
                             flakeIdGenerator,