You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/21 06:03:45 UTC

[incubator-seatunnel] branch dev updated: [hotfix][engine] Fix the task closing error (#3154)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ed14476a5 [hotfix][engine] Fix the task closing error (#3154)
ed14476a5 is described below

commit ed14476a518a7491a69078cd6d09786380d7028b
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Fri Oct 21 14:03:39 2022 +0800

    [hotfix][engine] Fix the task closing error (#3154)
---
 .../org/apache/seatunnel/engine/server/task/AbstractTask.java | 10 ++++++++++
 .../apache/seatunnel/engine/server/task/SeaTunnelTask.java    | 11 ++++-------
 .../engine/server/task/SinkAggregatedCommitterTask.java       | 11 ++++-------
 .../engine/server/task/SourceSplitEnumeratorTask.java         |  5 +++--
 4 files changed, 21 insertions(+), 16 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index d45e08901..3daf0512b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -32,6 +32,7 @@ import lombok.NonNull;
 import java.net.URL;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 public abstract class AbstractTask implements Task {
@@ -45,6 +46,8 @@ public abstract class AbstractTask implements Task {
     protected volatile boolean closeCalled;
     protected volatile boolean prepareCloseStatus;
 
+    protected AtomicLong prepareCloseBarrierId;
+
     protected Progress progress;
 
     public AbstractTask(long jobID, TaskLocation taskLocation) {
@@ -55,6 +58,7 @@ public abstract class AbstractTask implements Task {
         this.startCalled = false;
         this.closeCalled = false;
         this.prepareCloseStatus = false;
+        this.prepareCloseBarrierId = new AtomicLong(-1);
     }
 
     public abstract Set<URL> getJarsUrl();
@@ -103,6 +107,12 @@ public abstract class AbstractTask implements Task {
         startCalled = true;
     }
 
+    public void tryClose(long checkpointId) {
+        if (prepareCloseStatus && prepareCloseBarrierId.get() == checkpointId) {
+            closeCall();
+        }
+    }
+
     public void closeCall() {
         closeCalled = true;
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 9a96d0f3e..37b221742 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -267,7 +267,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
         Integer ackSize = cycleAcks.compute(barrier.getId(), (id, count) -> count == null ? 1 : ++count);
         if (ackSize == allCycles.size()) {
             if (barrier.prepareClose()) {
-                prepareCloseStatus = true;
+                this.prepareCloseStatus = true;
+                this.prepareCloseBarrierId.set(barrier.getId());
             }
             if (barrier.snapshot()) {
                 this.getExecutionContext().sendToMaster(
@@ -284,17 +285,13 @@ public abstract class SeaTunnelTask extends AbstractTask {
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         notifyAllAction(listener -> listener.notifyCheckpointComplete(checkpointId));
-        if (prepareCloseStatus) {
-            closeCall();
-        }
+        tryClose(checkpointId);
     }
 
     @Override
     public void notifyCheckpointAborted(long checkpointId) throws Exception {
         notifyAllAction(listener -> listener.notifyCheckpointAborted(checkpointId));
-        if (prepareCloseStatus) {
-            closeCall();
-        }
+        tryClose(checkpointId);
     }
 
     public void notifyAllAction(ConsumerWithException<InternalCheckpointListener> consumer){
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index f1206a897..750a79690 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -176,7 +176,8 @@ public class SinkAggregatedCommitterTask<CommandInfoT, AggregatedCommitInfoT> ex
             return;
         }
         if (barrier.prepareClose()) {
-            prepareCloseStatus = true;
+            this.prepareCloseStatus = true;
+            this.prepareCloseBarrierId.set(barrier.getId());
         }
         if (barrier.snapshot()) {
             if (commitInfoCache.containsKey(barrier.getId())) {
@@ -222,17 +223,13 @@ public class SinkAggregatedCommitterTask<CommandInfoT, AggregatedCommitInfoT> ex
             checkpointCommitInfoMap.remove(key);
         });
         aggregatedCommitter.commit(aggregatedCommitInfo);
-        if (prepareCloseStatus) {
-            closeCall();
-        }
+        tryClose(checkpointId);
     }
 
     @Override
     public void notifyCheckpointAborted(long checkpointId) throws Exception {
         aggregatedCommitter.abort(checkpointCommitInfoMap.get(checkpointId));
         checkpointCommitInfoMap.remove(checkpointId);
-        if (prepareCloseStatus) {
-            closeCall();
-        }
+        tryClose(checkpointId);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 608a0fb1c..1f8163e3f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -126,6 +126,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
     public void triggerBarrier(Barrier barrier) throws Exception {
         if (barrier.prepareClose()) {
             this.currState = PREPARE_CLOSE;
+            this.prepareCloseBarrierId.set(barrier.getId());
         }
         final long barrierId = barrier.getId();
         Serializable snapshotState = null;
@@ -273,7 +274,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         enumerator.notifyCheckpointComplete(checkpointId);
-        if (prepareCloseStatus) {
+        if (currState == PREPARE_CLOSE && prepareCloseBarrierId.get() == checkpointId) {
             closeCall();
         }
     }
@@ -281,7 +282,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
     @Override
     public void notifyCheckpointAborted(long checkpointId) throws Exception {
         enumerator.notifyCheckpointAborted(checkpointId);
-        if (prepareCloseStatus) {
+        if (currState == PREPARE_CLOSE && prepareCloseBarrierId.get() == checkpointId) {
             closeCall();
         }
     }