You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2023/01/17 09:34:38 UTC
[incubator-seatunnel] branch dev updated: [Bug] [Seatunnel-Engine] Fix NEP when scheduling sub plan fails. (#3909)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9df009494 [Bug] [Seatunnel-Engine] Fix NEP when scheduling sub plan fails. (#3909)
9df009494 is described below
commit 9df009494bba65a954106906a56f26250ea06e3d
Author: Guangdong Liu <80...@qq.com>
AuthorDate: Tue Jan 17 17:34:31 2023 +0800
[Bug] [Seatunnel-Engine] Fix NEP when scheduling sub plan fails. (#3909)
---
.../engine/common/utils/PassiveCompletableFuture.java | 16 +++++++++-------
.../seatunnel/engine/server/dag/physical/SubPlan.java | 2 ++
.../engine/server/scheduler/PipelineBaseScheduler.java | 18 ++++++++++++++----
3 files changed, 25 insertions(+), 11 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
index 0a6c519d7..ce5e6d182 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
@@ -27,13 +27,15 @@ public class PassiveCompletableFuture<T> extends CompletableFuture<T> {
}
public PassiveCompletableFuture(CompletableFuture<T> chainedFuture) {
- chainedFuture.whenComplete((r, t) -> {
- if (t != null) {
- internalCompleteExceptionally(t);
- } else {
- internalComplete(r);
- }
- });
+ if (chainedFuture != null) {
+ chainedFuture.whenComplete((r, t) -> {
+ if (t != null) {
+ internalCompleteExceptionally(t);
+ } else {
+ internalComplete(r);
+ }
+ });
+ }
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 7bfa4eaba..b88224458 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.engine.server.master.JobMaster;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.map.IMap;
+import lombok.Data;
import lombok.NonNull;
import java.util.List;
@@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+@Data
public class SubPlan {
private static final ILogger LOGGER = Logger.getLogger(SubPlan.class);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 7fdfe323f..791feb405 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.engine.server.scheduler;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.exception.JobException;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
@@ -63,7 +65,7 @@ public class PipelineBaseScheduler implements JobScheduler {
List<CompletableFuture<Void>> collect =
physicalPlan.getPipelineList()
.stream()
- .map(pipeline -> schedulerPipeline(pipeline))
+ .map(this::schedulerPipeline)
.filter(Objects::nonNull).collect(Collectors.toList());
try {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
@@ -99,6 +101,7 @@ public class PipelineBaseScheduler implements JobScheduler {
deployPipeline(pipeline, slotProfiles);
}, jobMaster.getExecutorService());
} catch (Exception e) {
+ log.error(String.format("scheduler %s error and cancel pipeline. The error is %s", pipeline.getPipelineFullName(), ExceptionUtils.getMessage(e)));
pipeline.cancelPipeline();
return null;
}
@@ -135,11 +138,18 @@ public class PipelineBaseScheduler implements JobScheduler {
oldProfile = ownedSlotProfiles.get(task.getTaskGroupLocation());
}
if (oldProfile == null || !resourceManager.slotActiveCheck(oldProfile)) {
- SlotProfile newProfile = applyResourceForTask(task).join();
- log.info(String.format("use new profile: %s to replace not active profile: %s for task %s", newProfile, oldProfile, task));
+ SlotProfile newProfile;
+ CompletableFuture<SlotProfile> slotProfileCompletableFuture = applyResourceForTask(task);
+ if (slotProfileCompletableFuture != null) {
+ newProfile = slotProfileCompletableFuture.join();
+ } else {
+ throw new SeaTunnelEngineException(String.format("The task [%s] state is [%s] and the resource can not be retrieved", task.getTaskFullName(), task.getExecutionState()));
+ }
+
+ log.info(String.format("use new profile: %s to replace not active profile: %s for task %s", newProfile, oldProfile, task.getTaskFullName()));
return newProfile;
}
- log.info(String.format("use active old profile: %s for task %s", oldProfile, task));
+ log.info(String.format("use active old profile: %s for task %s", oldProfile, task.getTaskFullName()));
task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
return oldProfile;
}