You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/25 09:50:12 UTC
[incubator-seatunnel] branch st-engine updated: [Engine][Task] Remove join and recursion (#2524)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new eb827ea73 [Engine][Task] Remove join and recursion (#2524)
eb827ea73 is described below
commit eb827ea738efafd83288d5decf2677e14cc07f40
Author: Hisoka <fa...@qq.com>
AuthorDate: Thu Aug 25 17:50:05 2022 +0800
[Engine][Task] Remove join and recursion (#2524)
---
.../engine/server/task/SinkAggregatedCommitterTask.java | 4 +++-
.../engine/server/task/SourceSplitEnumeratorTask.java | 17 ++++++++---------
2 files changed, 11 insertions(+), 10 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index ecde94116..14756102e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -92,7 +92,9 @@ public class SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
@NonNull
@Override
public ProgressState call() throws Exception {
- completableFuture.join();
+ if (completableFuture.isDone()) {
+ completableFuture.get();
+ }
return progress.toState();
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index c673271d0..469a0baef 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -135,8 +135,10 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
private void stateProcess() throws Exception {
switch (currState) {
case INIT:
- waitReader();
- currState = EnumeratorState.READER_REGISTER_COMPLETE;
+ if (readerFinishFuture.isDone()) {
+ readerRegisterFuture.get();
+ currState = EnumeratorState.READER_REGISTER_COMPLETE;
+ }
break;
case READER_REGISTER_COMPLETE:
currState = EnumeratorState.ASSIGN;
@@ -147,8 +149,10 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
currState = EnumeratorState.WAITING_FEEDBACK;
break;
case WAITING_FEEDBACK:
- readerFinishFuture.join();
- currState = EnumeratorState.READER_CLOSED;
+ if (readerFinishFuture.isDone()) {
+ readerFinishFuture.get();
+ currState = EnumeratorState.READER_CLOSED;
+ }
break;
case READER_CLOSED:
closeAllReader();
@@ -165,11 +169,6 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
default:
throw new IllegalArgumentException("Unknown Enumerator State: " + currState);
}
- stateProcess();
- }
-
- private void waitReader() {
- readerRegisterFuture.join();
}
public Set<Long> getRegisteredReaders() {