You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/05/31 08:05:32 UTC

[seatunnel] branch dev updated: [Bug][Zeta] Fix restoreComplete Future can't be completed when cancel task (#4863)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 224f9d81c [Bug][Zeta] Fix restoreComplete Future can't be completed when cancel task (#4863)
224f9d81c is described below

commit 224f9d81c54a9a3c5b39d38c90ad9fd224b68bad
Author: Jia Fan <fa...@qq.com>
AuthorDate: Wed May 31 16:05:23 2023 +0800

    [Bug][Zeta] Fix restoreComplete Future can't be completed when cancel task (#4863)
---
 .../org/apache/seatunnel/engine/server/task/AbstractTask.java | 11 +++++++++++
 .../apache/seatunnel/engine/server/task/SeaTunnelTask.java    |  1 +
 .../engine/server/task/SinkAggregatedCommitterTask.java       |  1 +
 .../engine/server/task/SourceSplitEnumeratorTask.java         |  1 +
 4 files changed, 14 insertions(+)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index 4fc23f34c..776329716 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 
 import lombok.NonNull;
 
+import java.io.IOException;
 import java.net.URL;
 import java.util.List;
 import java.util.Set;
@@ -93,6 +94,16 @@ public abstract class AbstractTask implements Task {
         return taskLocation.getTaskID();
     }
 
+    @Override
+    public void close() throws IOException {
+        try {
+            if (!restoreComplete.isDone()) {
+                restoreComplete.cancel(true);
+            }
+        } catch (Exception ignore) {
+        }
+    }
+
     protected void reportTaskStatus(SeaTunnelTaskState status) {
         getExecutionContext()
                 .sendToMaster(new TaskReportStatusOperation(taskLocation, status))
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 07abe857c..585940738 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -313,6 +313,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
 
     @Override
     public void close() throws IOException {
+        super.close();
         allCycles
                 .parallelStream()
                 .forEach(
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 7f798bfbb..e831a9269 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -179,6 +179,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT, AggregatedCommitInfoT>
 
     @Override
     public void close() throws IOException {
+        super.close();
         aggregatedCommitter.close();
         progress.done();
         completableFuture.complete(null);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index c769cd059..18ba0a78e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -111,6 +111,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
 
     @Override
     public void close() throws IOException {
+        super.close();
         if (enumerator != null) {
             enumerator.close();
         }