You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by li...@apache.org on 2023/05/21 13:38:48 UTC
[incubator-seatunnel] branch dev updated: [Improve][Zeta] Cancel pipeline add retry to avoid cancel failed. (#4792)
This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 40707a3bc [Improve][Zeta] Cancel pipeline add retry to avoid cancel failed. (#4792)
40707a3bc is described below
commit 40707a3bc6586886b9f6ff855090452d9bc26d37
Author: Jia Fan <fa...@qq.com>
AuthorDate: Sun May 21 21:38:42 2023 +0800
[Improve][Zeta] Cancel pipeline add retry to avoid cancel failed. (#4792)
---
.../engine/server/dag/physical/SubPlan.java | 43 +++++++++++++++-------
1 file changed, 29 insertions(+), 14 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 55a3de226..a6cb3e6eb 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
@@ -34,6 +36,7 @@ import com.hazelcast.map.IMap;
import lombok.Data;
import lombok.NonNull;
+import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@@ -352,21 +355,33 @@ public class SubPlan {
}
public synchronized void cancelPipeline() {
- if (getPipelineState().isEndState()) {
- LOGGER.warning(
- String.format(
- "%s is in end state %s, can not be cancel",
- pipelineFullName, getPipelineState()));
- return;
- }
- // If an active Master Node done and another Master Node active, we can not know whether
- // canceled pipeline
- // complete. So we need cancel running pipeline again.
- if (!PipelineStatus.CANCELING.equals(runningJobStateIMap.get(pipelineLocation))) {
- updatePipelineState(getPipelineState(), PipelineStatus.CANCELING);
+ try {
+ RetryUtils.retryWithException(
+ () -> {
+ if (getPipelineState().isEndState()) {
+ LOGGER.warning(
+ String.format(
+ "%s is in end state %s, can not be cancel",
+ pipelineFullName, getPipelineState()));
+ return null;
+ }
+ // If an active Master Node done and another Master Node active, we can not
+ // know whether
+ // canceled pipeline
+ // complete. So we need cancel running pipeline again.
+ if (!PipelineStatus.CANCELING.equals(
+ runningJobStateIMap.get(pipelineLocation))) {
+ updatePipelineState(getPipelineState(), PipelineStatus.CANCELING);
+ }
+ cancelCheckpointCoordinator();
+ cancelPipelineTasks();
+ return null;
+ },
+ new RetryUtils.RetryMaterial(
+ 30, true, e -> e instanceof IOException, 1000, true));
+ } catch (Exception e) {
+ throw new SeaTunnelEngineException(e);
}
- cancelCheckpointCoordinator();
- cancelPipelineTasks();
}
private void cancelCheckpointCoordinator() {