You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/21 06:03:45 UTC
[incubator-seatunnel] branch dev updated: [hotfix][engine] Fix the task closing error (#3154)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ed14476a5 [hotfix][engine] Fix the task closing error (#3154)
ed14476a5 is described below
commit ed14476a518a7491a69078cd6d09786380d7028b
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Fri Oct 21 14:03:39 2022 +0800
[hotfix][engine] Fix the task closing error (#3154)
---
.../org/apache/seatunnel/engine/server/task/AbstractTask.java | 10 ++++++++++
.../apache/seatunnel/engine/server/task/SeaTunnelTask.java | 11 ++++-------
.../engine/server/task/SinkAggregatedCommitterTask.java | 11 ++++-------
.../engine/server/task/SourceSplitEnumeratorTask.java | 5 +++--
4 files changed, 21 insertions(+), 16 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index d45e08901..3daf0512b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -32,6 +32,7 @@ import lombok.NonNull;
import java.net.URL;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public abstract class AbstractTask implements Task {
@@ -45,6 +46,8 @@ public abstract class AbstractTask implements Task {
protected volatile boolean closeCalled;
protected volatile boolean prepareCloseStatus;
+ protected AtomicLong prepareCloseBarrierId;
+
protected Progress progress;
public AbstractTask(long jobID, TaskLocation taskLocation) {
@@ -55,6 +58,7 @@ public abstract class AbstractTask implements Task {
this.startCalled = false;
this.closeCalled = false;
this.prepareCloseStatus = false;
+ this.prepareCloseBarrierId = new AtomicLong(-1);
}
public abstract Set<URL> getJarsUrl();
@@ -103,6 +107,12 @@ public abstract class AbstractTask implements Task {
startCalled = true;
}
+ public void tryClose(long checkpointId) {
+ if (prepareCloseStatus && prepareCloseBarrierId.get() == checkpointId) {
+ closeCall();
+ }
+ }
+
public void closeCall() {
closeCalled = true;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 9a96d0f3e..37b221742 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -267,7 +267,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
Integer ackSize = cycleAcks.compute(barrier.getId(), (id, count) -> count == null ? 1 : ++count);
if (ackSize == allCycles.size()) {
if (barrier.prepareClose()) {
- prepareCloseStatus = true;
+ this.prepareCloseStatus = true;
+ this.prepareCloseBarrierId.set(barrier.getId());
}
if (barrier.snapshot()) {
this.getExecutionContext().sendToMaster(
@@ -284,17 +285,13 @@ public abstract class SeaTunnelTask extends AbstractTask {
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
notifyAllAction(listener -> listener.notifyCheckpointComplete(checkpointId));
- if (prepareCloseStatus) {
- closeCall();
- }
+ tryClose(checkpointId);
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
notifyAllAction(listener -> listener.notifyCheckpointAborted(checkpointId));
- if (prepareCloseStatus) {
- closeCall();
- }
+ tryClose(checkpointId);
}
public void notifyAllAction(ConsumerWithException<InternalCheckpointListener> consumer){
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index f1206a897..750a79690 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -176,7 +176,8 @@ public class SinkAggregatedCommitterTask<CommandInfoT, AggregatedCommitInfoT> ex
return;
}
if (barrier.prepareClose()) {
- prepareCloseStatus = true;
+ this.prepareCloseStatus = true;
+ this.prepareCloseBarrierId.set(barrier.getId());
}
if (barrier.snapshot()) {
if (commitInfoCache.containsKey(barrier.getId())) {
@@ -222,17 +223,13 @@ public class SinkAggregatedCommitterTask<CommandInfoT, AggregatedCommitInfoT> ex
checkpointCommitInfoMap.remove(key);
});
aggregatedCommitter.commit(aggregatedCommitInfo);
- if (prepareCloseStatus) {
- closeCall();
- }
+ tryClose(checkpointId);
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
aggregatedCommitter.abort(checkpointCommitInfoMap.get(checkpointId));
checkpointCommitInfoMap.remove(checkpointId);
- if (prepareCloseStatus) {
- closeCall();
- }
+ tryClose(checkpointId);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 608a0fb1c..1f8163e3f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -126,6 +126,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
public void triggerBarrier(Barrier barrier) throws Exception {
if (barrier.prepareClose()) {
this.currState = PREPARE_CLOSE;
+ this.prepareCloseBarrierId.set(barrier.getId());
}
final long barrierId = barrier.getId();
Serializable snapshotState = null;
@@ -273,7 +274,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
enumerator.notifyCheckpointComplete(checkpointId);
- if (prepareCloseStatus) {
+ if (currState == PREPARE_CLOSE && prepareCloseBarrierId.get() == checkpointId) {
closeCall();
}
}
@@ -281,7 +282,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
enumerator.notifyCheckpointAborted(checkpointId);
- if (prepareCloseStatus) {
+ if (currState == PREPARE_CLOSE && prepareCloseBarrierId.get() == checkpointId) {
closeCall();
}
}