You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2023/01/17 09:34:38 UTC

[incubator-seatunnel] branch dev updated: [Bug] [Seatunnel-Engine] Fix NEP when scheduling sub plan fails. (#3909)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9df009494 [Bug] [Seatunnel-Engine] Fix NEP when scheduling sub plan fails. (#3909)
9df009494 is described below

commit 9df009494bba65a954106906a56f26250ea06e3d
Author: Guangdong Liu <80...@qq.com>
AuthorDate: Tue Jan 17 17:34:31 2023 +0800

    [Bug] [Seatunnel-Engine] Fix NEP when scheduling sub plan fails. (#3909)
---
 .../engine/common/utils/PassiveCompletableFuture.java  | 16 +++++++++-------
 .../seatunnel/engine/server/dag/physical/SubPlan.java  |  2 ++
 .../engine/server/scheduler/PipelineBaseScheduler.java | 18 ++++++++++++++----
 3 files changed, 25 insertions(+), 11 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
index 0a6c519d7..ce5e6d182 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
@@ -27,13 +27,15 @@ public class PassiveCompletableFuture<T> extends CompletableFuture<T> {
     }
 
     public PassiveCompletableFuture(CompletableFuture<T> chainedFuture) {
-        chainedFuture.whenComplete((r, t) -> {
-            if (t != null) {
-                internalCompleteExceptionally(t);
-            } else {
-                internalComplete(r);
-            }
-        });
+        if (chainedFuture != null) {
+            chainedFuture.whenComplete((r, t) -> {
+                if (t != null) {
+                    internalCompleteExceptionally(t);
+                } else {
+                    internalComplete(r);
+                }
+            });
+        }
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 7bfa4eaba..b88224458 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.engine.server.master.JobMaster;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import com.hazelcast.map.IMap;
+import lombok.Data;
 import lombok.NonNull;
 
 import java.util.List;
@@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+@Data
 public class SubPlan {
     private static final ILogger LOGGER = Logger.getLogger(SubPlan.class);
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 7fdfe323f..791feb405 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.engine.server.scheduler;
 
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.exception.JobException;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
@@ -63,7 +65,7 @@ public class PipelineBaseScheduler implements JobScheduler {
             List<CompletableFuture<Void>> collect =
                 physicalPlan.getPipelineList()
                     .stream()
-                    .map(pipeline -> schedulerPipeline(pipeline))
+                    .map(this::schedulerPipeline)
                     .filter(Objects::nonNull).collect(Collectors.toList());
             try {
                 CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
@@ -99,6 +101,7 @@ public class PipelineBaseScheduler implements JobScheduler {
                 deployPipeline(pipeline, slotProfiles);
             }, jobMaster.getExecutorService());
         } catch (Exception e) {
+            log.error(String.format("scheduler %s error and cancel pipeline. The error is %s", pipeline.getPipelineFullName(), ExceptionUtils.getMessage(e)));
             pipeline.cancelPipeline();
             return null;
         }
@@ -135,11 +138,18 @@ public class PipelineBaseScheduler implements JobScheduler {
             oldProfile = ownedSlotProfiles.get(task.getTaskGroupLocation());
         }
         if (oldProfile == null || !resourceManager.slotActiveCheck(oldProfile)) {
-            SlotProfile newProfile = applyResourceForTask(task).join();
-            log.info(String.format("use new profile: %s to replace not active profile: %s for task %s", newProfile, oldProfile, task));
+            SlotProfile newProfile;
+            CompletableFuture<SlotProfile> slotProfileCompletableFuture = applyResourceForTask(task);
+            if (slotProfileCompletableFuture != null) {
+                newProfile = slotProfileCompletableFuture.join();
+            } else {
+                throw new SeaTunnelEngineException(String.format("The task [%s] state is [%s] and the resource can not be retrieved", task.getTaskFullName(), task.getExecutionState()));
+            }
+
+            log.info(String.format("use new profile: %s to replace not active profile: %s for task %s", newProfile, oldProfile, task.getTaskFullName()));
             return newProfile;
         }
-        log.info(String.format("use active old profile: %s for task %s", oldProfile, task));
+        log.info(String.format("use active old profile: %s for task %s", oldProfile, task.getTaskFullName()));
         task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
         return oldProfile;
     }