You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/08/29 08:54:47 UTC
[incubator-seatunnel] branch st-engine updated: [Engine][Task] Fix multi SourceReader use same TaskGroupID (#2533)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 2bbb3ebe6 [Engine][Task] Fix multi SourceReader use same TaskGroupID (#2533)
2bbb3ebe6 is described below
commit 2bbb3ebe680cb41d5f49c3d201fea1e64124cd04
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Aug 29 16:54:39 2022 +0800
[Engine][Task] Fix multi SourceReader use same TaskGroupID (#2533)
* [Engine][Task] Fix multi SourceReader use same TaskGroupID
* [Engine][Task] Fix multi SourceReader use same TaskGroupID
---
.../server/dag/physical/PhysicalPlanGenerator.java | 20 +++++++++++---------
.../seatunnel/engine/server/task/SeaTunnelTask.java | 7 -------
.../engine/server/task/SourceSeaTunnelTask.java | 8 +++++++-
.../server/task/SourceSplitEnumeratorTask.java | 2 +-
.../engine/server/task/flow/SourceFlowLifeCycle.java | 6 +++---
5 files changed, 22 insertions(+), 21 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 7dcf2e9e9..e07d85f8a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -188,7 +188,7 @@ public class PhysicalPlanGenerator {
long taskGroupID = idGenerator.getNextId();
SinkAggregatedCommitterTask<?> t =
new SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 0)), s,
+ new TaskLocation(taskGroupID, mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s,
sinkAggregatedCommitter.get());
committerTaskIDMap.put(s, new TaskLocation(taskGroupID, t.getTaskID()));
CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
@@ -224,11 +224,12 @@ public class PhysicalPlanGenerator {
.flatMap(flow -> {
List<PhysicalVertex> t = new ArrayList<>();
long taskIDPrefix = idGenerator.getNextId();
+ long taskGroupIDPrefix = idGenerator.getNextId();
for (int i = 0; i < flow.getAction().getParallelism(); i++) {
- long taskGroupID = idGenerator.getNextId();
+ long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, i);
setFlowConfig(flow, i);
SeaTunnelTask seaTunnelTask = new TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID, convertToTaskID(taskIDPrefix, i)), i, flow);
+ new TaskLocation(taskGroupID, mixIDPrefixAndIndex(taskIDPrefix, i)), i, flow);
CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
waitForCompleteByPhysicalVertexList.add(new PassiveCompletableFuture<>(taskFuture));
@@ -262,7 +263,7 @@ public class PhysicalPlanGenerator {
return sources.stream().map(s -> {
long taskGroupID = idGenerator.getNextId();
SourceSplitEnumeratorTask<?> t = new SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 0)), s);
+ new TaskLocation(taskGroupID, mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s);
enumeratorTaskIDMap.put(s, new TaskLocation(taskGroupID, t.getTaskID()));
CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
waitForCompleteByPhysicalVertexList.add(new PassiveCompletableFuture<>(taskFuture));
@@ -293,14 +294,15 @@ public class PhysicalPlanGenerator {
.map(s -> new PhysicalExecutionFlow(s, getNextWrapper(edges, s)))
.flatMap(flow -> {
List<PhysicalVertex> t = new ArrayList<>();
- long taskGroupID = idGenerator.getNextId();
List<Flow> flows = new ArrayList<>(Collections.singletonList(flow));
if (sourceWithSink(flow)) {
flows.addAll(splitSinkFromFlow(flow));
}
+ long taskGroupIDPrefix = idGenerator.getNextId();
Map<Long, Long> flowTaskIDPrefixMap = new HashMap<>();
for (int i = 0; i < flow.getAction().getParallelism(); i++) {
int finalParallelismIndex = i;
+ long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, i);
List<SeaTunnelTask> taskList =
flows.stream().map(f -> {
setFlowConfig(f, finalParallelismIndex);
@@ -309,12 +311,12 @@ public class PhysicalPlanGenerator {
if (f instanceof PhysicalExecutionFlow) {
return new SourceSeaTunnelTask<>(jobImmutableInformation.getJobId(),
new TaskLocation(taskGroupID,
- convertToTaskID(taskIDPrefix, finalParallelismIndex)),
+ mixIDPrefixAndIndex(taskIDPrefix, finalParallelismIndex)),
finalParallelismIndex, f);
} else {
return new TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
new TaskLocation(taskGroupID,
- convertToTaskID(taskIDPrefix, finalParallelismIndex)),
+ mixIDPrefixAndIndex(taskIDPrefix, finalParallelismIndex)),
finalParallelismIndex, f);
}
}).collect(Collectors.toList());
@@ -444,8 +446,8 @@ public class PhysicalPlanGenerator {
}
@SuppressWarnings("checkstyle:MagicNumber")
- private long convertToTaskID(long taskTypeID, int index) {
- return taskTypeID * 10000 + index;
+ private long mixIDPrefixAndIndex(long idPrefix, int index) {
+ return idPrefix * 10000 + index;
}
private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index b5098976c..c940ed609 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -45,7 +45,6 @@ import org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateQu
import lombok.NonNull;
-import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
@@ -152,12 +151,6 @@ public abstract class SeaTunnelTask extends AbstractTask {
}
}
- @Override
- public void close() throws IOException {
- startFlowLifeCycle.close();
- progress.done();
- }
-
@Override
public Set<URL> getJarsUrl() {
List<Flow> now = Collections.singletonList(executionFlow);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index 7ded6d962..2956c59bf 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -29,6 +29,7 @@ import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.NonNull;
+import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -64,10 +65,15 @@ public class SourceSeaTunnelTask<T, SplitT extends SourceSplit> extends SeaTunne
@SuppressWarnings("unchecked")
public ProgressState call() throws Exception {
((SourceFlowLifeCycle<T, SplitT>) startFlowLifeCycle).collect();
- checkDone();
return progress.toState();
}
+ @Override
+ public void close() throws IOException {
+ startFlowLifeCycle.close();
+ progress.done();
+ }
+
public void receivedSourceSplit(List<SplitT> splits) {
((SourceFlowLifeCycle<T, SplitT>) startFlowLifeCycle).receivedSplits(splits);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 469a0baef..4f7c64afc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -135,7 +135,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
private void stateProcess() throws Exception {
switch (currState) {
case INIT:
- if (readerFinishFuture.isDone()) {
+ if (readerRegisterFuture.isDone()) {
readerRegisterFuture.get();
currState = EnumeratorState.READER_REGISTER_COMPLETE;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 3b499d63f..d5bfdda10 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -80,8 +80,9 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
@Override
public void close() throws IOException {
- super.close();
+ collector.close();
reader.close();
+ super.close();
}
public void collect() throws Exception {
@@ -91,10 +92,9 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends Abstract
}
public void signalNoMoreElement() {
- // Close this reader
+ // ready close this reader
try {
this.closed = true;
- collector.close();
runningTask.getExecutionContext().sendToMaster(new SourceNoMoreElementOperation(currentTaskID,
enumeratorTaskID)).get();
} catch (Exception e) {