You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/25 09:50:12 UTC

[incubator-seatunnel] branch st-engine updated: [Engine][Task] Remove join and recursion (#2524)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new eb827ea73 [Engine][Task] Remove join and recursion (#2524)
eb827ea73 is described below

commit eb827ea738efafd83288d5decf2677e14cc07f40
Author: Hisoka <fa...@qq.com>
AuthorDate: Thu Aug 25 17:50:05 2022 +0800

    [Engine][Task] Remove join and recursion (#2524)
---
 .../engine/server/task/SinkAggregatedCommitterTask.java |  4 +++-
 .../engine/server/task/SourceSplitEnumeratorTask.java   | 17 ++++++++---------
 2 files changed, 11 insertions(+), 10 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index ecde94116..14756102e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -92,7 +92,9 @@ public class SinkAggregatedCommitterTask<AggregatedCommitInfoT> extends Coordina
     @NonNull
     @Override
     public ProgressState call() throws Exception {
-        completableFuture.join();
+        if (completableFuture.isDone()) {
+            completableFuture.get();
+        }
         return progress.toState();
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index c673271d0..469a0baef 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -135,8 +135,10 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
     private void stateProcess() throws Exception {
         switch (currState) {
             case INIT:
-                waitReader();
-                currState = EnumeratorState.READER_REGISTER_COMPLETE;
+                if (readerFinishFuture.isDone()) {
+                    readerRegisterFuture.get();
+                    currState = EnumeratorState.READER_REGISTER_COMPLETE;
+                }
                 break;
             case READER_REGISTER_COMPLETE:
                 currState = EnumeratorState.ASSIGN;
@@ -147,8 +149,10 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
                 currState = EnumeratorState.WAITING_FEEDBACK;
                 break;
             case WAITING_FEEDBACK:
-                readerFinishFuture.join();
-                currState = EnumeratorState.READER_CLOSED;
+                if (readerFinishFuture.isDone()) {
+                    readerFinishFuture.get();
+                    currState = EnumeratorState.READER_CLOSED;
+                }
                 break;
             case READER_CLOSED:
                 closeAllReader();
@@ -165,11 +169,6 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
             default:
                 throw new IllegalArgumentException("Unknown Enumerator State: " + currState);
         }
-        stateProcess();
-    }
-
-    private void waitReader() {
-        readerRegisterFuture.join();
     }
 
     public Set<Long> getRegisteredReaders() {