You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/01/13 07:10:39 UTC
[incubator-seatunnel] branch dev updated: [Improve] [Zeta] Support Get Error Message From Client When Job Failed (#3928)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f74ed5676 [Improve] [Zeta] Support Get Error Message From Client When Job Failed (#3928)
f74ed5676 is described below
commit f74ed56763a907917229b9433c020d1e09c92668
Author: Hisoka <fa...@qq.com>
AuthorDate: Fri Jan 13 15:10:31 2023 +0800
[Improve] [Zeta] Support Get Error Message From Client When Job Failed (#3928)
* [Improve] [SeaTunnel-Engine] Add Job Execute ERROR Feedback To Client
* [Improve] [Zeta] Support Get Error Message From Client When Job Failed
---
.../src/test/resources/hazelcast.yaml | 2 +-
.../engine/client/job/ClientJobProxy.java | 25 ++++++++++-----
.../org/apache/seatunnel/engine/core/job/Job.java | 2 +-
.../engine/core/job/{Job.java => JobResult.java} | 21 ++++++------
.../job/{Job.java => PipelineExecutionState.java} | 23 ++++++++------
.../codec/SeaTunnelWaitForJobCompleteCodec.java | 27 ++++++++--------
.../SeaTunnelEngine.yaml | 4 +--
.../engine/server/CoordinatorService.java | 10 +++---
.../engine/server/dag/physical/PhysicalPlan.java | 21 ++++++++----
.../engine/server/dag/physical/PhysicalVertex.java | 28 ++--------------
.../engine/server/dag/physical/SubPlan.java | 16 +++++++---
.../server/execution/TaskExecutionState.java | 10 +++---
.../seatunnel/engine/server/master/JobMaster.java | 37 +++++++++++-----------
.../operation/WaitForJobCompleteOperation.java | 3 +-
.../protocol/task/WaitForJobCompleteTask.java | 6 ++--
.../server/task/operation/DeployTaskOperation.java | 2 +-
.../engine/server/master/JobMasterTest.java | 5 +--
17 files changed, 126 insertions(+), 116 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
index a07ea6924..3146ffc69 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
@@ -28,6 +28,6 @@ hazelcast:
port-count: 100
port: 5801
properties:
- hazelcast.invocation.max.retry.count: 20
+ hazelcast.invocation.max.retry.count: 60
hazelcast.tcp.join.port.try.count: 30
hazelcast.logging.type: log4j2
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index 447f3ae41..06a445470 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -21,9 +21,11 @@ import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.Job;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
@@ -34,6 +36,7 @@ import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
public class ClientJobProxy implements Job {
private static final ILogger LOGGER = Logger.getLogger(ClientJobProxy.class);
@@ -68,13 +71,16 @@ public class ClientJobProxy implements Job {
*/
@Override
public JobStatus waitForJobComplete() {
- JobStatus jobStatus;
+ JobResult jobResult;
try {
- jobStatus = RetryUtils.retryWithException(() -> {
- PassiveCompletableFuture<JobStatus> jobFuture = doWaitForJobComplete();
+ jobResult = RetryUtils.retryWithException(() -> {
+ PassiveCompletableFuture<JobResult> jobFuture = doWaitForJobComplete();
return jobFuture.get();
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
exception -> exception instanceof RuntimeException, Constant.OPERATION_RETRY_SLEEP));
+ if (jobResult == null) {
+ throw new SeaTunnelEngineException("failed to fetch job result");
+ }
} catch (Exception e) {
LOGGER.info(String.format("Job %s (%s) end with unknown state, and throw Exception: %s",
jobImmutableInformation.getJobId(),
@@ -85,15 +91,18 @@ public class ClientJobProxy implements Job {
LOGGER.info(String.format("Job %s (%s) end with state %s",
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId(),
- jobStatus));
- return jobStatus;
+ jobResult.getStatus()));
+ if (StringUtils.isNotEmpty(jobResult.getError())) {
+ throw new SeaTunnelEngineException(jobResult.getError());
+ }
+ return jobResult.getStatus();
}
@Override
- public PassiveCompletableFuture<JobStatus> doWaitForJobComplete() {
- return seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
+ public PassiveCompletableFuture<JobResult> doWaitForJobComplete() {
+ return new PassiveCompletableFuture<>(seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobImmutableInformation.getJobId()),
- response -> JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(response)]);
+ SeaTunnelWaitForJobCompleteCodec::decodeResponse).thenApply(jobResult -> seaTunnelHazelcastClient.getSerializationService().toObject(jobResult)));
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index a99053443..db3368961 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -25,7 +25,7 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
public interface Job {
long getJobId();
- PassiveCompletableFuture<JobStatus> doWaitForJobComplete();
+ PassiveCompletableFuture<JobResult> doWaitForJobComplete();
void cancelJob();
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java
similarity index 72%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java
index a99053443..0eb8b5949 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java
@@ -17,20 +17,19 @@
package org.apache.seatunnel.engine.core.job;
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NonNull;
-/**
- * Job interface define the Running job apis
- */
-public interface Job {
- long getJobId();
-
- PassiveCompletableFuture<JobStatus> doWaitForJobComplete();
+import java.io.Serializable;
- void cancelJob();
+@Data
+@AllArgsConstructor
+public class JobResult implements Serializable {
- JobStatus getJobStatus();
+ @NonNull
+ private JobStatus status;
- JobStatus waitForJobComplete();
+ private String error;
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineExecutionState.java
similarity index 64%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineExecutionState.java
index a99053443..74251fdbf 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineExecutionState.java
@@ -17,20 +17,23 @@
package org.apache.seatunnel.engine.core.job;
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import lombok.Getter;
-/**
- * Job interface define the Running job apis
- */
-public interface Job {
- long getJobId();
+import java.io.Serializable;
+
+@Getter
+public class PipelineExecutionState implements Serializable {
- PassiveCompletableFuture<JobStatus> doWaitForJobComplete();
+ private final int pipelineId;
- void cancelJob();
+ private final PipelineStatus pipelineStatus;
- JobStatus getJobStatus();
+ private final String throwableMsg;
- JobStatus waitForJobComplete();
+ public PipelineExecutionState(int pipelineId, PipelineStatus pipelineStatus, String throwableMsg) {
+ this.pipelineId = pipelineId;
+ this.pipelineStatus = pipelineStatus;
+ this.throwableMsg = throwableMsg;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelWaitForJobCompleteCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelWaitForJobCompleteCodec.java
index 06d9f5ffd..395ecc4b9 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelWaitForJobCompleteCodec.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelWaitForJobCompleteCodec.java
@@ -24,20 +24,20 @@ import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESS
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES;
-import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeInt;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.Generated;
+import com.hazelcast.client.impl.protocol.codec.builtin.DataCodec;
/*
* definitions on the https://github.com/hazelcast/hazelcast-client-protocol
* to seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
*/
-@Generated("45a79cdd8ea874bd3c99b414d5f7639f")
+@Generated("a3d68a6968b7db8a71ab53d00085a575")
public final class SeaTunnelWaitForJobCompleteCodec {
//hex: 0xDE0300
public static final int REQUEST_MESSAGE_TYPE = 14549760;
@@ -45,8 +45,7 @@ public final class SeaTunnelWaitForJobCompleteCodec {
public static final int RESPONSE_MESSAGE_TYPE = 14549761;
private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
- private static final int RESPONSE_JOB_STATUS_FIELD_OFFSET = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
- private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_JOB_STATUS_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+ private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
private SeaTunnelWaitForJobCompleteCodec() {
}
@@ -55,8 +54,7 @@ public final class SeaTunnelWaitForJobCompleteCodec {
ClientMessage clientMessage = ClientMessage.createForEncode();
clientMessage.setRetryable(true);
clientMessage.setOperationName("SeaTunnel.WaitForJobComplete");
- ClientMessage.Frame initialFrame =
- new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
@@ -64,29 +62,30 @@ public final class SeaTunnelWaitForJobCompleteCodec {
return clientMessage;
}
+ /**
+ */
public static long decodeRequest(ClientMessage clientMessage) {
ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
ClientMessage.Frame initialFrame = iterator.next();
return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
}
- public static ClientMessage encodeResponse(int jobStatus) {
+ public static ClientMessage encodeResponse(com.hazelcast.internal.serialization.Data jobResult) {
ClientMessage clientMessage = ClientMessage.createForEncode();
- ClientMessage.Frame initialFrame =
- new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
- encodeInt(initialFrame.content, RESPONSE_JOB_STATUS_FIELD_OFFSET, jobStatus);
clientMessage.add(initialFrame);
+ DataCodec.encode(clientMessage, jobResult);
return clientMessage;
}
/**
- *
*/
- public static int decodeResponse(ClientMessage clientMessage) {
+ public static com.hazelcast.internal.serialization.Data decodeResponse(ClientMessage clientMessage) {
ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
- ClientMessage.Frame initialFrame = iterator.next();
- return decodeInt(initialFrame.content, RESPONSE_JOB_STATUS_FIELD_OFFSET);
+ //empty initial frame
+ iterator.next();
+ return DataCodec.decode(iterator);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index 7b809adb4..1c70091c7 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -75,8 +75,8 @@ methods:
doc: ''
response:
params:
- - name: jobStatus
- type: int
+ - name: jobResult
+ type: Data
nullable: false
since: 2.0
doc: ''
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 6d31bef31..8c708ccfb 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobInfo;
+import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
@@ -398,15 +399,16 @@ public class CoordinatorService {
runningJobInfoIMap.remove(jobId);
}
- public PassiveCompletableFuture<JobStatus> waitForJobComplete(long jobId) {
+ public PassiveCompletableFuture<JobResult> waitForJobComplete(long jobId) {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (runningJobMaster == null) {
JobStatus jobStatus = jobHistoryService.getJobDetailState(jobId).getJobStatus();
- CompletableFuture<JobStatus> future = new CompletableFuture<>();
- future.complete(jobStatus);
+ CompletableFuture<JobResult> future = new CompletableFuture<>();
+ // TODO support history service record job execute error
+ future.complete(new JobResult(jobStatus, null));
return new PassiveCompletableFuture<>(future);
} else {
- return runningJobMaster.getJobMasterCompleteFuture();
+ return new PassiveCompletableFuture<>(runningJobMaster.getJobMasterCompleteFuture());
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 411613d9a..cfbee02ce 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -20,7 +20,9 @@ package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.master.JobMaster;
@@ -33,6 +35,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class PhysicalPlan {
@@ -67,7 +70,12 @@ public class PhysicalPlan {
* when job status turn to end, complete this future. And then the waitForCompleteByPhysicalPlan
* in {@link org.apache.seatunnel.engine.server.scheduler.JobScheduler} whenComplete method will be called.
*/
- private CompletableFuture<JobStatus> jobEndFuture;
+ private CompletableFuture<JobResult> jobEndFuture;
+
+ /**
+ * The error throw by subPlan, should be set when subPlan throw error.
+ */
+ private final AtomicReference<String> errorBySubPlan = new AtomicReference<>();
private final String jobFullName;
@@ -125,20 +133,20 @@ public class PhysicalPlan {
pipelineList.forEach(pipeline -> pipeline.setJobMaster(jobMaster));
}
- public PassiveCompletableFuture<JobStatus> initStateFuture() {
+ public PassiveCompletableFuture<JobResult> initStateFuture() {
jobEndFuture = new CompletableFuture<>();
pipelineList.forEach(this::addPipelineEndCallback);
return new PassiveCompletableFuture<>(jobEndFuture);
}
public void addPipelineEndCallback(SubPlan subPlan) {
- PassiveCompletableFuture<PipelineStatus> future = subPlan.initStateFuture();
+ PassiveCompletableFuture<PipelineExecutionState> future = subPlan.initStateFuture();
future.thenAcceptAsync(pipelineState -> {
try {
// Notify checkpoint manager when the pipeline end, Whether the pipeline will be restarted or not
jobMaster.getCheckpointManager()
.listenPipelineRetry(subPlan.getPipelineLocation().getPipelineId(), subPlan.getPipelineState()).join();
- if (PipelineStatus.CANCELED.equals(pipelineState)) {
+ if (PipelineStatus.CANCELED.equals(pipelineState.getPipelineStatus())) {
if (canRestorePipeline(subPlan)) {
subPlan.restorePipeline();
return;
@@ -149,12 +157,13 @@ public class PhysicalPlan {
cancelJob();
}
LOGGER.info(String.format("release the pipeline %s resource", subPlan.getPipelineFullName()));
- } else if (PipelineStatus.FAILED.equals(pipelineState)) {
+ } else if (PipelineStatus.FAILED.equals(pipelineState.getPipelineStatus())) {
if (canRestorePipeline(subPlan)) {
subPlan.restorePipeline();
return;
}
failedPipelineNum.incrementAndGet();
+ errorBySubPlan.compareAndSet(null, pipelineState.getThrowableMsg());
if (makeJobEndWhenPipelineEnded) {
cancelJob();
}
@@ -170,7 +179,7 @@ public class PhysicalPlan {
} else {
turnToEndState(JobStatus.FINISHED);
}
- jobEndFuture.complete((JobStatus) runningJobStateIMap.get(jobId));
+ jobEndFuture.complete(new JobResult((JobStatus) runningJobStateIMap.get(jobId), errorBySubPlan.get()));
}
} catch (Throwable e) {
// Because only cancelJob or releasePipelineResource can throw exception, so we only output log here
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index baf53dd2e..5660925ed 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -43,6 +43,7 @@ import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
import java.net.URL;
import java.util.List;
@@ -65,25 +66,14 @@ public class PhysicalVertex {
private final TaskGroupLocation taskGroupLocation;
- /**
- * the index of PhysicalVertex
- */
- private final int subTaskGroupIndex;
-
private final String taskFullName;
- private final int parallelism;
-
private final TaskGroupDefaultImpl taskGroup;
private final ExecutorService executorService;
private final FlakeIdGenerator flakeIdGenerator;
- private final int pipelineId;
-
- private final int totalPipelineNum;
-
private final Set<URL> pluginJarsUrls;
private final IMap<Object, Object> runningJobStateIMap;
@@ -102,14 +92,8 @@ public class PhysicalVertex {
*/
private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
- private final JobImmutableInformation jobImmutableInformation;
-
- private final long initializationTimestamp;
-
private final NodeEngine nodeEngine;
- private TaskGroupImmutableInformation taskGroupImmutableInformation;
-
private JobMaster jobMaster;
public PhysicalVertex(int subTaskGroupIndex,
@@ -126,16 +110,10 @@ public class PhysicalVertex {
@NonNull IMap runningJobStateIMap,
@NonNull IMap runningJobStateTimestampsIMap) {
this.taskGroupLocation = taskGroup.getTaskGroupLocation();
- this.subTaskGroupIndex = subTaskGroupIndex;
this.executorService = executorService;
- this.parallelism = parallelism;
this.taskGroup = taskGroup;
this.flakeIdGenerator = flakeIdGenerator;
- this.pipelineId = pipelineId;
- this.totalPipelineNum = totalPipelineNum;
this.pluginJarsUrls = pluginJarsUrls;
- this.jobImmutableInformation = jobImmutableInformation;
- this.initializationTimestamp = initializationTimestamp;
Long[] stateTimestamps = new Long[ExecutionState.values().length];
if (runningJobStateTimestampsIMap.get(taskGroup.getTaskGroupLocation()) == null) {
@@ -457,11 +435,11 @@ public class PhysicalVertex {
if (!turnToEndState(taskExecutionState.getExecutionState())) {
return;
}
- if (taskExecutionState.getThrowable() != null) {
+ if (StringUtils.isNotEmpty(taskExecutionState.getThrowableMsg())) {
LOGGER.severe(String.format("%s end with state %s and Exception: %s",
this.taskFullName,
taskExecutionState.getExecutionState(),
- ExceptionUtils.getMessage(taskExecutionState.getThrowable())));
+ taskExecutionState.getThrowableMsg()));
} else {
LOGGER.info(String.format("%s end with state %s",
this.taskFullName,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index aa049cfda..7bfa4eaba 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
@@ -35,6 +36,7 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class SubPlan {
@@ -57,7 +59,7 @@ public class SubPlan {
private final IMap<Object, Object> runningJobStateIMap;
/**
- * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
+ * Timestamps (in milliseconds) as returned by {@code System.currentTimeMillis()} when the
* pipeline transitioned into a certain state. The index into this array is the ordinal
* of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
* stateTimestamps[RUNNING.ordinal()]}.
@@ -68,10 +70,15 @@ public class SubPlan {
* Complete this future when this sub plan complete. When this future completed, the waitForCompleteBySubPlan in {@link PhysicalPlan }
* whenComplete method will be called.
*/
- private CompletableFuture<PipelineStatus> pipelineFuture;
+ private CompletableFuture<PipelineExecutionState> pipelineFuture;
private final PipelineLocation pipelineLocation;
+ /**
+ * The error throw by physicalVertex, should be set when physicalVertex throw error.
+ */
+ private final AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();
+
private final ExecutorService executorService;
private JobMaster jobMaster;
@@ -122,7 +129,7 @@ public class SubPlan {
this.executorService = executorService;
}
- public PassiveCompletableFuture<PipelineStatus> initStateFuture() {
+ public PassiveCompletableFuture<PipelineExecutionState> initStateFuture() {
physicalVertexList.forEach(physicalVertex -> {
addPhysicalVertexCallBack(physicalVertex.initStateFuture());
});
@@ -146,6 +153,7 @@ public class SubPlan {
executionState.getTaskGroupLocation(),
this.getPipelineFullName()));
failedTaskNum.incrementAndGet();
+ errorByPhysicalVertex.compareAndSet(null, executionState.getThrowableMsg());
cancelPipeline();
}
@@ -160,7 +168,7 @@ public class SubPlan {
turnToEndState(PipelineStatus.FINISHED);
LOGGER.info(String.format("%s end with state FINISHED", this.pipelineFullName));
}
- pipelineFuture.complete((PipelineStatus) runningJobStateIMap.get(pipelineLocation));
+ pipelineFuture.complete(new PipelineExecutionState(pipelineId, (PipelineStatus) runningJobStateIMap.get(pipelineLocation), errorByPhysicalVertex.get()));
}
} catch (Throwable e) {
LOGGER.severe(String.format("Never come here. handle %s %s error",
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
index e43ecc35a..ad906b27b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server.execution;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+
import java.io.Serializable;
public class TaskExecutionState implements Serializable {
@@ -25,20 +27,20 @@ public class TaskExecutionState implements Serializable {
private final ExecutionState executionState;
- private Throwable throwable;
+ private final String throwableMsg;
public TaskExecutionState(TaskGroupLocation taskGroupLocation, ExecutionState executionState, Throwable throwable) {
this.taskGroupLocation = taskGroupLocation;
this.executionState = executionState;
- this.throwable = throwable;
+ this.throwableMsg = throwable == null ? "" : ExceptionUtils.getMessage(throwable);
}
public ExecutionState getExecutionState() {
return executionState;
}
- public Throwable getThrowable() {
- return throwable;
+ public String getThrowableMsg() {
+ return throwableMsg;
}
public TaskGroupLocation getTaskGroupLocation() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 35fbe8531..6029e762c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -35,6 +35,7 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
@@ -66,7 +67,6 @@ import com.hazelcast.logging.Logger;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import lombok.NonNull;
-import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Collection;
@@ -95,7 +95,9 @@ public class JobMaster {
private CheckpointManager checkpointManager;
- private CompletableFuture<JobStatus> jobMasterCompleteFuture;
+ private CompletableFuture<JobResult> jobMasterCompleteFuture;
+
+ private ClassLoader classLoader;
private JobImmutableInformation jobImmutableInformation;
@@ -154,15 +156,9 @@ public class JobMaster {
LOGGER.info(String.format("Job %s (%s) needed jar urls %s", jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId(), jobImmutableInformation.getPluginJarsUrls()));
- if (!CollectionUtils.isEmpty(jobImmutableInformation.getPluginJarsUrls())) {
- logicalDag =
- CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
- new SeatunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls()),
- jobImmutableInformation.getLogicalDag());
- } else {
- logicalDag = nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag());
- }
-
+ classLoader = new SeatunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls());
+ logicalDag = CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
+ classLoader, jobImmutableInformation.getLogicalDag());
CheckpointConfig checkpointConfig = mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(),
jobImmutableInformation.getJobConfig().getEnvOptions());
@@ -205,14 +201,14 @@ public class JobMaster {
public void initStateFuture() {
jobMasterCompleteFuture = new CompletableFuture<>();
- PassiveCompletableFuture<JobStatus> jobStatusFuture = physicalPlan.initStateFuture();
+ PassiveCompletableFuture<JobResult> jobStatusFuture = physicalPlan.initStateFuture();
jobStatusFuture.whenComplete(withTryCatch(LOGGER, (v, t) -> {
// We need not handle t, Because we will not return t from physicalPlan
- if (JobStatus.FAILING.equals(v)) {
+ if (JobStatus.FAILING.equals(v.getStatus())) {
cleanJob();
physicalPlan.updateJobState(JobStatus.FAILING, JobStatus.FAILED);
}
- jobMasterCompleteFuture.complete(physicalPlan.getJobStatus());
+ jobMasterCompleteFuture.complete(new JobResult(physicalPlan.getJobStatus(), v.getError()));
}));
}
@@ -284,6 +280,10 @@ public class JobMaster {
throw new IllegalArgumentException("can't find task group address from task group id: " + taskGroupId);
}
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+
public void cancelJob() {
physicalPlan.neverNeedRestore();
physicalPlan.cancelJob();
@@ -297,7 +297,7 @@ public class JobMaster {
return checkpointManager;
}
- public PassiveCompletableFuture<JobStatus> getJobMasterCompleteFuture() {
+ public PassiveCompletableFuture<JobResult> getJobMasterCompleteFuture() {
return new PassiveCompletableFuture<>(jobMasterCompleteFuture);
}
@@ -400,10 +400,9 @@ public class JobMaster {
@NonNull Map<TaskGroupLocation, SlotProfile> pipelineOwnedSlotProfiles) {
ownedSlotProfilesIMap.put(pipelineLocation, pipelineOwnedSlotProfiles);
try {
- RetryUtils.retryWithException(() -> {
- return pipelineOwnedSlotProfiles.equals(ownedSlotProfilesIMap.get(pipelineLocation));
- }, new RetryUtils.RetryMaterial(20, true,
- exception -> exception instanceof NullPointerException && isRunning, 1000));
+ RetryUtils.retryWithException(() -> pipelineOwnedSlotProfiles.equals(ownedSlotProfilesIMap.get(pipelineLocation)),
+ new RetryUtils.RetryMaterial(20, true,
+ exception -> exception instanceof NullPointerException && isRunning, 1000));
} catch (Exception e) {
throw new SeaTunnelEngineException("Can not sync pipeline owned slot profiles with IMap", e);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/WaitForJobCompleteOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/WaitForJobCompleteOperation.java
index 93df55116..6256a4c80 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/WaitForJobCompleteOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/WaitForJobCompleteOperation.java
@@ -34,7 +34,8 @@ public class WaitForJobCompleteOperation extends AbstractJobAsyncOperation {
@Override
protected PassiveCompletableFuture<?> doRun() throws Exception {
SeaTunnelServer service = getService();
- return service.getCoordinatorService().waitForJobComplete(jobId);
+ return new PassiveCompletableFuture<>(service.getCoordinatorService().waitForJobComplete(jobId)
+ .thenApply(jobResult -> this.getNodeEngine().getSerializationService().toData(jobResult)));
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/WaitForJobCompleteTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/WaitForJobCompleteTask.java
index 813063b19..b7463e953 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/WaitForJobCompleteTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/WaitForJobCompleteTask.java
@@ -17,20 +17,20 @@
package org.apache.seatunnel.engine.server.protocol.task;
-import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
import org.apache.seatunnel.engine.server.operation.WaitForJobCompleteOperation;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.internal.serialization.Data;
import com.hazelcast.spi.impl.operationservice.Operation;
-public class WaitForJobCompleteTask extends AbstractSeaTunnelMessageTask<Long, JobStatus> {
+public class WaitForJobCompleteTask extends AbstractSeaTunnelMessageTask<Long, Data> {
protected WaitForJobCompleteTask(ClientMessage clientMessage, Node node, Connection connection) {
super(clientMessage, node, connection,
SeaTunnelWaitForJobCompleteCodec::decodeRequest,
- x -> SeaTunnelWaitForJobCompleteCodec.encodeResponse(x.ordinal()));
+ SeaTunnelWaitForJobCompleteCodec::encodeResponse);
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
index 336ef59a9..40416f480 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
@@ -47,7 +47,7 @@ public class DeployTaskOperation extends Operation implements IdentifiedDataSeri
public void run() throws Exception {
SeaTunnelServer server = getService();
server.getSlotService().getSlotContext(slotProfile)
- .getTaskExecutionService().deployTask(taskImmutableInformation);
+ .getTaskExecutionService().deployTask(taskImmutableInformation).get();
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 755d6e4b8..8f2919454 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
+import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
@@ -132,14 +133,14 @@ public class JobMasterTest extends AbstractSeaTunnelServerTest {
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus()));
- PassiveCompletableFuture<JobStatus> jobMasterCompleteFuture = jobMaster.getJobMasterCompleteFuture();
+ PassiveCompletableFuture<JobResult> jobMasterCompleteFuture = jobMaster.getJobMasterCompleteFuture();
// cancel job
jobMaster.cancelJob();
// test job turn to complete
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertTrue(
- jobMasterCompleteFuture.isDone() && JobStatus.CANCELED.equals(jobMasterCompleteFuture.get())));
+ jobMasterCompleteFuture.isDone() && JobStatus.CANCELED.equals(jobMasterCompleteFuture.get().getStatus())));
testIMapRemovedAfterJobComplete(jobMaster);
}