You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by li...@apache.org on 2023/05/21 13:38:48 UTC

[incubator-seatunnel] branch dev updated: [Improve][Zeta] Cancel pipeline add retry to avoid cancel failed. (#4792)

This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 40707a3bc [Improve][Zeta] Cancel pipeline add retry to avoid cancel failed. (#4792)
40707a3bc is described below

commit 40707a3bc6586886b9f6ff855090452d9bc26d37
Author: Jia Fan <fa...@qq.com>
AuthorDate: Sun May 21 21:38:42 2023 +0800

    [Improve][Zeta] Cancel pipeline add retry to avoid cancel failed. (#4792)
---
 .../engine/server/dag/physical/SubPlan.java        | 43 +++++++++++++++-------
 1 file changed, 29 insertions(+), 14 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 55a3de226..a6cb3e6eb 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -18,6 +18,8 @@
 package org.apache.seatunnel.engine.server.dag.physical;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
@@ -34,6 +36,7 @@ import com.hazelcast.map.IMap;
 import lombok.Data;
 import lombok.NonNull;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -352,21 +355,33 @@ public class SubPlan {
     }
 
     public synchronized void cancelPipeline() {
-        if (getPipelineState().isEndState()) {
-            LOGGER.warning(
-                    String.format(
-                            "%s is in end state %s, can not be cancel",
-                            pipelineFullName, getPipelineState()));
-            return;
-        }
-        // If an active Master Node done and another Master Node active, we can not know whether
-        // canceled pipeline
-        // complete. So we need cancel running pipeline again.
-        if (!PipelineStatus.CANCELING.equals(runningJobStateIMap.get(pipelineLocation))) {
-            updatePipelineState(getPipelineState(), PipelineStatus.CANCELING);
+        try {
+            RetryUtils.retryWithException(
+                    () -> {
+                        if (getPipelineState().isEndState()) {
+                            LOGGER.warning(
+                                    String.format(
+                                            "%s is in end state %s, can not be cancel",
+                                            pipelineFullName, getPipelineState()));
+                            return null;
+                        }
+                        // If an active Master Node done and another Master Node active, we can not
+                        // know whether
+                        // canceled pipeline
+                        // complete. So we need cancel running pipeline again.
+                        if (!PipelineStatus.CANCELING.equals(
+                                runningJobStateIMap.get(pipelineLocation))) {
+                            updatePipelineState(getPipelineState(), PipelineStatus.CANCELING);
+                        }
+                        cancelCheckpointCoordinator();
+                        cancelPipelineTasks();
+                        return null;
+                    },
+                    new RetryUtils.RetryMaterial(
+                            30, true, e -> e instanceof IOException, 1000, true));
+        } catch (Exception e) {
+            throw new SeaTunnelEngineException(e);
         }
-        cancelCheckpointCoordinator();
-        cancelPipelineTasks();
     }
 
     private void cancelCheckpointCoordinator() {