You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/08/29 08:54:47 UTC

[incubator-seatunnel] branch st-engine updated: [Engine][Task] Fix multi SourceReader use same TaskGroupID (#2533)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 2bbb3ebe6 [Engine][Task] Fix multi SourceReader use same TaskGroupID (#2533)
2bbb3ebe6 is described below

commit 2bbb3ebe680cb41d5f49c3d201fea1e64124cd04
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Aug 29 16:54:39 2022 +0800

    [Engine][Task] Fix multi SourceReader use same TaskGroupID (#2533)
    
    * [Engine][Task] Fix multi SourceReader use same TaskGroupID
    
    * [Engine][Task] Fix multi SourceReader use same TaskGroupID
---
 .../server/dag/physical/PhysicalPlanGenerator.java   | 20 +++++++++++---------
 .../seatunnel/engine/server/task/SeaTunnelTask.java  |  7 -------
 .../engine/server/task/SourceSeaTunnelTask.java      |  8 +++++++-
 .../server/task/SourceSplitEnumeratorTask.java       |  2 +-
 .../engine/server/task/flow/SourceFlowLifeCycle.java |  6 +++---
 5 files changed, 22 insertions(+), 21 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 7dcf2e9e9..e07d85f8a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -188,7 +188,7 @@ public class PhysicalPlanGenerator {
                     long taskGroupID = idGenerator.getNextId();
                     SinkAggregatedCommitterTask<?> t =
                         new SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(),
-                            new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 0)), s,
+                            new TaskLocation(taskGroupID, mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s,
                             sinkAggregatedCommitter.get());
                     committerTaskIDMap.put(s, new TaskLocation(taskGroupID, t.getTaskID()));
                     CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
@@ -224,11 +224,12 @@ public class PhysicalPlanGenerator {
             .flatMap(flow -> {
                 List<PhysicalVertex> t = new ArrayList<>();
                 long taskIDPrefix = idGenerator.getNextId();
+                long taskGroupIDPrefix = idGenerator.getNextId();
                 for (int i = 0; i < flow.getAction().getParallelism(); i++) {
-                    long taskGroupID = idGenerator.getNextId();
+                    long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, i);
                     setFlowConfig(flow, i);
                     SeaTunnelTask seaTunnelTask = new TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
-                        new TaskLocation(taskGroupID, convertToTaskID(taskIDPrefix, i)), i, flow);
+                        new TaskLocation(taskGroupID, mixIDPrefixAndIndex(taskIDPrefix, i)), i, flow);
 
                     CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
                     waitForCompleteByPhysicalVertexList.add(new PassiveCompletableFuture<>(taskFuture));
@@ -262,7 +263,7 @@ public class PhysicalPlanGenerator {
         return sources.stream().map(s -> {
             long taskGroupID = idGenerator.getNextId();
             SourceSplitEnumeratorTask<?> t = new SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(),
-                new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 0)), s);
+                new TaskLocation(taskGroupID, mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s);
             enumeratorTaskIDMap.put(s, new TaskLocation(taskGroupID, t.getTaskID()));
             CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
             waitForCompleteByPhysicalVertexList.add(new PassiveCompletableFuture<>(taskFuture));
@@ -293,14 +294,15 @@ public class PhysicalPlanGenerator {
             .map(s -> new PhysicalExecutionFlow(s, getNextWrapper(edges, s)))
             .flatMap(flow -> {
                 List<PhysicalVertex> t = new ArrayList<>();
-                long taskGroupID = idGenerator.getNextId();
                 List<Flow> flows = new ArrayList<>(Collections.singletonList(flow));
                 if (sourceWithSink(flow)) {
                     flows.addAll(splitSinkFromFlow(flow));
                 }
+                long taskGroupIDPrefix = idGenerator.getNextId();
                 Map<Long, Long> flowTaskIDPrefixMap = new HashMap<>();
                 for (int i = 0; i < flow.getAction().getParallelism(); i++) {
                     int finalParallelismIndex = i;
+                    long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, i);
                     List<SeaTunnelTask> taskList =
                         flows.stream().map(f -> {
                             setFlowConfig(f, finalParallelismIndex);
@@ -309,12 +311,12 @@ public class PhysicalPlanGenerator {
                             if (f instanceof PhysicalExecutionFlow) {
                                 return new SourceSeaTunnelTask<>(jobImmutableInformation.getJobId(),
                                     new TaskLocation(taskGroupID,
-                                        convertToTaskID(taskIDPrefix, finalParallelismIndex)),
+                                        mixIDPrefixAndIndex(taskIDPrefix, finalParallelismIndex)),
                                     finalParallelismIndex, f);
                             } else {
                                 return new TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
                                     new TaskLocation(taskGroupID,
-                                        convertToTaskID(taskIDPrefix, finalParallelismIndex)),
+                                        mixIDPrefixAndIndex(taskIDPrefix, finalParallelismIndex)),
                                     finalParallelismIndex, f);
                             }
                         }).collect(Collectors.toList());
@@ -444,8 +446,8 @@ public class PhysicalPlanGenerator {
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
-    private long convertToTaskID(long taskTypeID, int index) {
-        return taskTypeID * 10000 + index;
+    private long mixIDPrefixAndIndex(long idPrefix, int index) {
+        return idPrefix * 10000 + index;
     }
 
     private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index b5098976c..c940ed609 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -45,7 +45,6 @@ import org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateQu
 
 import lombok.NonNull;
 
-import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -152,12 +151,6 @@ public abstract class SeaTunnelTask extends AbstractTask {
         }
     }
 
-    @Override
-    public void close() throws IOException {
-        startFlowLifeCycle.close();
-        progress.done();
-    }
-
     @Override
     public Set<URL> getJarsUrl() {
         List<Flow> now = Collections.singletonList(executionFlow);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index 7ded6d962..2956c59bf 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -29,6 +29,7 @@ import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import lombok.NonNull;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
@@ -64,10 +65,15 @@ public class SourceSeaTunnelTask<T, SplitT extends SourceSplit> extends SeaTunne
     @SuppressWarnings("unchecked")
     public ProgressState call() throws Exception {
         ((SourceFlowLifeCycle<T, SplitT>) startFlowLifeCycle).collect();
-        checkDone();
         return progress.toState();
     }
 
+    @Override
+    public void close() throws IOException {
+        startFlowLifeCycle.close();
+        progress.done();
+    }
+
     public void receivedSourceSplit(List<SplitT> splits) {
         ((SourceFlowLifeCycle<T, SplitT>) startFlowLifeCycle).receivedSplits(splits);
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 469a0baef..4f7c64afc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -135,7 +135,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
     private void stateProcess() throws Exception {
         switch (currState) {
             case INIT:
-                if (readerFinishFuture.isDone()) {
+                if (readerRegisterFuture.isDone()) {
                     readerRegisterFuture.get();
                     currState = EnumeratorState.READER_REGISTER_COMPLETE;
                 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 3b499d63f..d5bfdda10 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -80,8 +80,9 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
 
     @Override
     public void close() throws IOException {
-        super.close();
+        collector.close();
         reader.close();
+        super.close();
     }
 
     public void collect() throws Exception {
@@ -91,10 +92,9 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
     }
 
     public void signalNoMoreElement() {
-        // Close this reader
+        // ready close this reader
         try {
             this.closed = true;
-            collector.close();
             runningTask.getExecutionContext().sendToMaster(new SourceNoMoreElementOperation(currentTaskID,
                     enumeratorTaskID)).get();
         } catch (Exception e) {