You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/25 03:17:14 UTC
[incubator-seatunnel] branch st-engine updated: [hotfix][engine][dag] The index of the task should start at 0 (#2520)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new bdd12b66d [hotfix][engine][dag] The index of the task should start at 0 (#2520)
bdd12b66d is described below
commit bdd12b66d432ae61851090bdd3cca02c1c323eed
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Thu Aug 25 11:17:08 2022 +0800
[hotfix][engine][dag] The index of the task should start at 0 (#2520)
---
.../seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 6ac92406f..36e156825 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -188,7 +188,7 @@ public class PhysicalPlanGenerator {
long taskGroupID = idGenerator.getNextId();
SinkAggregatedCommitterTask<?> t =
new SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 1)), s,
+ new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 0)), s,
sinkAggregatedCommitter.get());
committerTaskIDMap.put(s, new TaskLocation(taskGroupID, t.getTaskID()));
CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
@@ -261,7 +261,7 @@ public class PhysicalPlanGenerator {
return sources.stream().map(s -> {
long taskGroupID = idGenerator.getNextId();
SourceSplitEnumeratorTask<?> t = new SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 1)), s);
+ new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 0)), s);
enumeratorTaskIDMap.put(s, new TaskLocation(taskGroupID, t.getTaskID()));
CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
waitForCompleteByPhysicalVertexList.add(new PassiveCompletableFuture<>(taskFuture));