You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/01/13 07:10:39 UTC

[incubator-seatunnel] branch dev updated: [Improve] [Zeta] Support Get Error Message From Client When Job Failed (#3928)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f74ed5676 [Improve] [Zeta] Support Get Error Message From Client When Job Failed (#3928)
f74ed5676 is described below

commit f74ed56763a907917229b9433c020d1e09c92668
Author: Hisoka <fa...@qq.com>
AuthorDate: Fri Jan 13 15:10:31 2023 +0800

    [Improve] [Zeta] Support Get Error Message From Client When Job Failed (#3928)
    
    * [Improve] [SeaTunnel-Engine] Add Job Execute ERROR Feedback To Client
    
    * [Improve] [Zeta] Support Get Error Message From Client When Job Failed
---
 .../src/test/resources/hazelcast.yaml              |  2 +-
 .../engine/client/job/ClientJobProxy.java          | 25 ++++++++++-----
 .../org/apache/seatunnel/engine/core/job/Job.java  |  2 +-
 .../engine/core/job/{Job.java => JobResult.java}   | 21 ++++++------
 .../job/{Job.java => PipelineExecutionState.java}  | 23 ++++++++------
 .../codec/SeaTunnelWaitForJobCompleteCodec.java    | 27 ++++++++--------
 .../SeaTunnelEngine.yaml                           |  4 +--
 .../engine/server/CoordinatorService.java          | 10 +++---
 .../engine/server/dag/physical/PhysicalPlan.java   | 21 ++++++++----
 .../engine/server/dag/physical/PhysicalVertex.java | 28 ++--------------
 .../engine/server/dag/physical/SubPlan.java        | 16 +++++++---
 .../server/execution/TaskExecutionState.java       | 10 +++---
 .../seatunnel/engine/server/master/JobMaster.java  | 37 +++++++++++-----------
 .../operation/WaitForJobCompleteOperation.java     |  3 +-
 .../protocol/task/WaitForJobCompleteTask.java      |  6 ++--
 .../server/task/operation/DeployTaskOperation.java |  2 +-
 .../engine/server/master/JobMasterTest.java        |  5 +--
 17 files changed, 126 insertions(+), 116 deletions(-)

diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
index a07ea6924..3146ffc69 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
@@ -28,6 +28,6 @@ hazelcast:
       port-count: 100
       port: 5801
   properties:
-    hazelcast.invocation.max.retry.count: 20
+    hazelcast.invocation.max.retry.count: 60
     hazelcast.tcp.join.port.try.count: 30
     hazelcast.logging.type: log4j2
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index 447f3ae41..06a445470 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -21,9 +21,11 @@ import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
 import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.Job;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
@@ -34,6 +36,7 @@ import com.hazelcast.client.impl.protocol.ClientMessage;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
 
 public class ClientJobProxy implements Job {
     private static final ILogger LOGGER = Logger.getLogger(ClientJobProxy.class);
@@ -68,13 +71,16 @@ public class ClientJobProxy implements Job {
      */
     @Override
     public JobStatus waitForJobComplete() {
-        JobStatus jobStatus;
+        JobResult jobResult;
         try {
-            jobStatus = RetryUtils.retryWithException(() -> {
-                PassiveCompletableFuture<JobStatus> jobFuture = doWaitForJobComplete();
+            jobResult = RetryUtils.retryWithException(() -> {
+                PassiveCompletableFuture<JobResult> jobFuture = doWaitForJobComplete();
                 return jobFuture.get();
             }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
                 exception -> exception instanceof RuntimeException, Constant.OPERATION_RETRY_SLEEP));
+            if (jobResult == null) {
+                throw new SeaTunnelEngineException("failed to fetch job result");
+            }
         } catch (Exception e) {
             LOGGER.info(String.format("Job %s (%s) end with unknown state, and throw Exception: %s",
                 jobImmutableInformation.getJobId(),
@@ -85,15 +91,18 @@ public class ClientJobProxy implements Job {
         LOGGER.info(String.format("Job %s (%s) end with state %s",
             jobImmutableInformation.getJobConfig().getName(),
             jobImmutableInformation.getJobId(),
-            jobStatus));
-        return jobStatus;
+            jobResult.getStatus()));
+        if (StringUtils.isNotEmpty(jobResult.getError())) {
+            throw new SeaTunnelEngineException(jobResult.getError());
+        }
+        return jobResult.getStatus();
     }
 
     @Override
-    public PassiveCompletableFuture<JobStatus> doWaitForJobComplete() {
-        return seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
+    public PassiveCompletableFuture<JobResult> doWaitForJobComplete() {
+        return new PassiveCompletableFuture<>(seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
             SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobImmutableInformation.getJobId()),
-            response -> JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(response)]);
+            SeaTunnelWaitForJobCompleteCodec::decodeResponse).thenApply(jobResult -> seaTunnelHazelcastClient.getSerializationService().toObject(jobResult)));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index a99053443..db3368961 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -25,7 +25,7 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 public interface Job {
     long getJobId();
 
-    PassiveCompletableFuture<JobStatus> doWaitForJobComplete();
+    PassiveCompletableFuture<JobResult> doWaitForJobComplete();
 
     void cancelJob();
 
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java
similarity index 72%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java
index a99053443..0eb8b5949 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java
@@ -17,20 +17,19 @@
 
 package org.apache.seatunnel.engine.core.job;
 
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NonNull;
 
-/**
- * Job interface define the Running job apis
- */
-public interface Job {
-    long getJobId();
-
-    PassiveCompletableFuture<JobStatus> doWaitForJobComplete();
+import java.io.Serializable;
 
-    void cancelJob();
+@Data
+@AllArgsConstructor
+public class JobResult implements Serializable {
 
-    JobStatus getJobStatus();
+    @NonNull
+    private JobStatus status;
 
-    JobStatus waitForJobComplete();
+    private String error;
 
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineExecutionState.java
similarity index 64%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineExecutionState.java
index a99053443..74251fdbf 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineExecutionState.java
@@ -17,20 +17,23 @@
 
 package org.apache.seatunnel.engine.core.job;
 
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import lombok.Getter;
 
-/**
- * Job interface define the Running job apis
- */
-public interface Job {
-    long getJobId();
+import java.io.Serializable;
+
+@Getter
+public class PipelineExecutionState implements Serializable {
 
-    PassiveCompletableFuture<JobStatus> doWaitForJobComplete();
+    private final int pipelineId;
 
-    void cancelJob();
+    private final PipelineStatus pipelineStatus;
 
-    JobStatus getJobStatus();
+    private final String throwableMsg;
 
-    JobStatus waitForJobComplete();
+    public PipelineExecutionState(int pipelineId, PipelineStatus pipelineStatus, String throwableMsg) {
+        this.pipelineId = pipelineId;
+        this.pipelineStatus = pipelineStatus;
+        this.throwableMsg = throwableMsg;
+    }
 
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelWaitForJobCompleteCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelWaitForJobCompleteCodec.java
index 06d9f5ffd..395ecc4b9 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelWaitForJobCompleteCodec.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelWaitForJobCompleteCodec.java
@@ -24,20 +24,20 @@ import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESS
 import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
 import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
 import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES;
-import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeInt;
 import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong;
 import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
 import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong;
 
 import com.hazelcast.client.impl.protocol.ClientMessage;
 import com.hazelcast.client.impl.protocol.Generated;
+import com.hazelcast.client.impl.protocol.codec.builtin.DataCodec;
 
 /*
  * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
  * to seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
  */
 
-@Generated("45a79cdd8ea874bd3c99b414d5f7639f")
+@Generated("a3d68a6968b7db8a71ab53d00085a575")
 public final class SeaTunnelWaitForJobCompleteCodec {
     //hex: 0xDE0300
     public static final int REQUEST_MESSAGE_TYPE = 14549760;
@@ -45,8 +45,7 @@ public final class SeaTunnelWaitForJobCompleteCodec {
     public static final int RESPONSE_MESSAGE_TYPE = 14549761;
     private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
     private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
-    private static final int RESPONSE_JOB_STATUS_FIELD_OFFSET = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
-    private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_JOB_STATUS_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+    private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
 
     private SeaTunnelWaitForJobCompleteCodec() {
     }
@@ -55,8 +54,7 @@ public final class SeaTunnelWaitForJobCompleteCodec {
         ClientMessage clientMessage = ClientMessage.createForEncode();
         clientMessage.setRetryable(true);
         clientMessage.setOperationName("SeaTunnel.WaitForJobComplete");
-        ClientMessage.Frame initialFrame =
-            new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
         encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
         encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
         encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
@@ -64,29 +62,30 @@ public final class SeaTunnelWaitForJobCompleteCodec {
         return clientMessage;
     }
 
+    /**
+     */
     public static long decodeRequest(ClientMessage clientMessage) {
         ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
         ClientMessage.Frame initialFrame = iterator.next();
         return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
     }
 
-    public static ClientMessage encodeResponse(int jobStatus) {
+    public static ClientMessage encodeResponse(com.hazelcast.internal.serialization.Data jobResult) {
         ClientMessage clientMessage = ClientMessage.createForEncode();
-        ClientMessage.Frame initialFrame =
-            new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
         encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
-        encodeInt(initialFrame.content, RESPONSE_JOB_STATUS_FIELD_OFFSET, jobStatus);
         clientMessage.add(initialFrame);
 
+        DataCodec.encode(clientMessage, jobResult);
         return clientMessage;
     }
 
     /**
-     *
      */
-    public static int decodeResponse(ClientMessage clientMessage) {
+    public static com.hazelcast.internal.serialization.Data decodeResponse(ClientMessage clientMessage) {
         ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
-        ClientMessage.Frame initialFrame = iterator.next();
-        return decodeInt(initialFrame.content, RESPONSE_JOB_STATUS_FIELD_OFFSET);
+        //empty initial frame
+        iterator.next();
+        return DataCodec.decode(iterator);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index 7b809adb4..1c70091c7 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -75,8 +75,8 @@ methods:
           doc: ''
     response:
       params:
-        - name: jobStatus
-          type: int
+        - name: jobResult
+          type: Data
           nullable: false
           since: 2.0
           doc: ''
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 6d31bef31..8c708ccfb 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobInfo;
+import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
@@ -398,15 +399,16 @@ public class CoordinatorService {
         runningJobInfoIMap.remove(jobId);
     }
 
-    public PassiveCompletableFuture<JobStatus> waitForJobComplete(long jobId) {
+    public PassiveCompletableFuture<JobResult> waitForJobComplete(long jobId) {
         JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
         if (runningJobMaster == null) {
             JobStatus jobStatus = jobHistoryService.getJobDetailState(jobId).getJobStatus();
-            CompletableFuture<JobStatus> future = new CompletableFuture<>();
-            future.complete(jobStatus);
+            CompletableFuture<JobResult> future = new CompletableFuture<>();
+            // TODO support history service record job execute error
+            future.complete(new JobResult(jobStatus, null));
             return new PassiveCompletableFuture<>(future);
         } else {
-            return runningJobMaster.getJobMasterCompleteFuture();
+            return new PassiveCompletableFuture<>(runningJobMaster.getJobMasterCompleteFuture());
         }
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 411613d9a..cfbee02ce 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -20,7 +20,9 @@ package org.apache.seatunnel.engine.server.dag.physical;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.master.JobMaster;
 
@@ -33,6 +35,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 public class PhysicalPlan {
@@ -67,7 +70,12 @@ public class PhysicalPlan {
      * when job status turn to end, complete this future. And then the waitForCompleteByPhysicalPlan
      * in {@link org.apache.seatunnel.engine.server.scheduler.JobScheduler} whenComplete method will be called.
      */
-    private CompletableFuture<JobStatus> jobEndFuture;
+    private CompletableFuture<JobResult> jobEndFuture;
+
+    /**
+     * The error throw by subPlan, should be set when subPlan throw error.
+     */
+    private final AtomicReference<String> errorBySubPlan = new AtomicReference<>();
 
     private final String jobFullName;
 
@@ -125,20 +133,20 @@ public class PhysicalPlan {
         pipelineList.forEach(pipeline -> pipeline.setJobMaster(jobMaster));
     }
 
-    public PassiveCompletableFuture<JobStatus> initStateFuture() {
+    public PassiveCompletableFuture<JobResult> initStateFuture() {
         jobEndFuture = new CompletableFuture<>();
         pipelineList.forEach(this::addPipelineEndCallback);
         return new PassiveCompletableFuture<>(jobEndFuture);
     }
 
     public void addPipelineEndCallback(SubPlan subPlan) {
-        PassiveCompletableFuture<PipelineStatus> future = subPlan.initStateFuture();
+        PassiveCompletableFuture<PipelineExecutionState> future = subPlan.initStateFuture();
         future.thenAcceptAsync(pipelineState -> {
             try {
                 // Notify checkpoint manager when the pipeline end, Whether the pipeline will be restarted or not
                 jobMaster.getCheckpointManager()
                     .listenPipelineRetry(subPlan.getPipelineLocation().getPipelineId(), subPlan.getPipelineState()).join();
-                if (PipelineStatus.CANCELED.equals(pipelineState)) {
+                if (PipelineStatus.CANCELED.equals(pipelineState.getPipelineStatus())) {
                     if (canRestorePipeline(subPlan)) {
                         subPlan.restorePipeline();
                         return;
@@ -149,12 +157,13 @@ public class PhysicalPlan {
                         cancelJob();
                     }
                     LOGGER.info(String.format("release the pipeline %s resource", subPlan.getPipelineFullName()));
-                } else if (PipelineStatus.FAILED.equals(pipelineState)) {
+                } else if (PipelineStatus.FAILED.equals(pipelineState.getPipelineStatus())) {
                     if (canRestorePipeline(subPlan)) {
                         subPlan.restorePipeline();
                         return;
                     }
                     failedPipelineNum.incrementAndGet();
+                    errorBySubPlan.compareAndSet(null, pipelineState.getThrowableMsg());
                     if (makeJobEndWhenPipelineEnded) {
                         cancelJob();
                     }
@@ -170,7 +179,7 @@ public class PhysicalPlan {
                     } else {
                         turnToEndState(JobStatus.FINISHED);
                     }
-                    jobEndFuture.complete((JobStatus) runningJobStateIMap.get(jobId));
+                    jobEndFuture.complete(new JobResult((JobStatus) runningJobStateIMap.get(jobId), errorBySubPlan.get()));
                 }
             } catch (Throwable e) {
                 // Because only cancelJob or releasePipelineResource can throw exception, so we only output log here
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index baf53dd2e..5660925ed 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -43,6 +43,7 @@ import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
 
 import java.net.URL;
 import java.util.List;
@@ -65,25 +66,14 @@ public class PhysicalVertex {
 
     private final TaskGroupLocation taskGroupLocation;
 
-    /**
-     * the index of PhysicalVertex
-     */
-    private final int subTaskGroupIndex;
-
     private final String taskFullName;
 
-    private final int parallelism;
-
     private final TaskGroupDefaultImpl taskGroup;
 
     private final ExecutorService executorService;
 
     private final FlakeIdGenerator flakeIdGenerator;
 
-    private final int pipelineId;
-
-    private final int totalPipelineNum;
-
     private final Set<URL> pluginJarsUrls;
 
     private final IMap<Object, Object> runningJobStateIMap;
@@ -102,14 +92,8 @@ public class PhysicalVertex {
      */
     private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
 
-    private final JobImmutableInformation jobImmutableInformation;
-
-    private final long initializationTimestamp;
-
     private final NodeEngine nodeEngine;
 
-    private TaskGroupImmutableInformation taskGroupImmutableInformation;
-
     private JobMaster jobMaster;
 
     public PhysicalVertex(int subTaskGroupIndex,
@@ -126,16 +110,10 @@ public class PhysicalVertex {
                           @NonNull IMap runningJobStateIMap,
                           @NonNull IMap runningJobStateTimestampsIMap) {
         this.taskGroupLocation = taskGroup.getTaskGroupLocation();
-        this.subTaskGroupIndex = subTaskGroupIndex;
         this.executorService = executorService;
-        this.parallelism = parallelism;
         this.taskGroup = taskGroup;
         this.flakeIdGenerator = flakeIdGenerator;
-        this.pipelineId = pipelineId;
-        this.totalPipelineNum = totalPipelineNum;
         this.pluginJarsUrls = pluginJarsUrls;
-        this.jobImmutableInformation = jobImmutableInformation;
-        this.initializationTimestamp = initializationTimestamp;
 
         Long[] stateTimestamps = new Long[ExecutionState.values().length];
         if (runningJobStateTimestampsIMap.get(taskGroup.getTaskGroupLocation()) == null) {
@@ -457,11 +435,11 @@ public class PhysicalVertex {
         if (!turnToEndState(taskExecutionState.getExecutionState())) {
             return;
         }
-        if (taskExecutionState.getThrowable() != null) {
+        if (StringUtils.isNotEmpty(taskExecutionState.getThrowableMsg())) {
             LOGGER.severe(String.format("%s end with state %s and Exception: %s",
                 this.taskFullName,
                 taskExecutionState.getExecutionState(),
-                ExceptionUtils.getMessage(taskExecutionState.getThrowable())));
+                taskExecutionState.getThrowableMsg()));
         } else {
             LOGGER.info(String.format("%s end with state %s",
                 this.taskFullName,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index aa049cfda..7bfa4eaba 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.dag.physical;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
@@ -35,6 +36,7 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 public class SubPlan {
@@ -57,7 +59,7 @@ public class SubPlan {
     private final IMap<Object, Object> runningJobStateIMap;
 
     /**
-     * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
+     * Timestamps (in milliseconds) as returned by {@code System.currentTimeMillis()} when the
      * pipeline transitioned into a certain state. The index into this array is the ordinal
      * of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
      * stateTimestamps[RUNNING.ordinal()]}.
@@ -68,10 +70,15 @@ public class SubPlan {
      * Complete this future when this sub plan complete. When this future completed, the waitForCompleteBySubPlan in {@link PhysicalPlan }
      * whenComplete method will be called.
      */
-    private CompletableFuture<PipelineStatus> pipelineFuture;
+    private CompletableFuture<PipelineExecutionState> pipelineFuture;
 
     private final PipelineLocation pipelineLocation;
 
+    /**
+     * The error throw by physicalVertex, should be set when physicalVertex throw error.
+     */
+    private final AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();
+
     private final ExecutorService executorService;
 
     private JobMaster jobMaster;
@@ -122,7 +129,7 @@ public class SubPlan {
         this.executorService = executorService;
     }
 
-    public PassiveCompletableFuture<PipelineStatus> initStateFuture() {
+    public PassiveCompletableFuture<PipelineExecutionState> initStateFuture() {
         physicalVertexList.forEach(physicalVertex -> {
             addPhysicalVertexCallBack(physicalVertex.initStateFuture());
         });
@@ -146,6 +153,7 @@ public class SubPlan {
                         executionState.getTaskGroupLocation(),
                         this.getPipelineFullName()));
                     failedTaskNum.incrementAndGet();
+                    errorByPhysicalVertex.compareAndSet(null, executionState.getThrowableMsg());
                     cancelPipeline();
                 }
 
@@ -160,7 +168,7 @@ public class SubPlan {
                         turnToEndState(PipelineStatus.FINISHED);
                         LOGGER.info(String.format("%s end with state FINISHED", this.pipelineFullName));
                     }
-                    pipelineFuture.complete((PipelineStatus) runningJobStateIMap.get(pipelineLocation));
+                    pipelineFuture.complete(new PipelineExecutionState(pipelineId, (PipelineStatus) runningJobStateIMap.get(pipelineLocation), errorByPhysicalVertex.get()));
                 }
             } catch (Throwable e) {
                 LOGGER.severe(String.format("Never come here. handle %s %s error",
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
index e43ecc35a..ad906b27b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionState.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.server.execution;
 
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+
 import java.io.Serializable;
 
 public class TaskExecutionState implements Serializable {
@@ -25,20 +27,20 @@ public class TaskExecutionState implements Serializable {
 
     private final ExecutionState executionState;
 
-    private Throwable throwable;
+    private final String throwableMsg;
 
     public TaskExecutionState(TaskGroupLocation taskGroupLocation, ExecutionState executionState, Throwable throwable) {
         this.taskGroupLocation = taskGroupLocation;
         this.executionState = executionState;
-        this.throwable = throwable;
+        this.throwableMsg = throwable == null ? "" : ExceptionUtils.getMessage(throwable);
     }
 
     public ExecutionState getExecutionState() {
         return executionState;
     }
 
-    public Throwable getThrowable() {
-        return throwable;
+    public String getThrowableMsg() {
+        return throwableMsg;
     }
 
     public TaskGroupLocation getTaskGroupLocation() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 35fbe8531..6029e762c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -35,6 +35,7 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
@@ -66,7 +67,6 @@ import com.hazelcast.logging.Logger;
 import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import lombok.NonNull;
-import org.apache.commons.collections4.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -95,7 +95,9 @@ public class JobMaster {
 
     private CheckpointManager checkpointManager;
 
-    private CompletableFuture<JobStatus> jobMasterCompleteFuture;
+    private CompletableFuture<JobResult> jobMasterCompleteFuture;
+
+    private ClassLoader classLoader;
 
     private JobImmutableInformation jobImmutableInformation;
 
@@ -154,15 +156,9 @@ public class JobMaster {
         LOGGER.info(String.format("Job %s (%s) needed jar urls %s", jobImmutableInformation.getJobConfig().getName(),
             jobImmutableInformation.getJobId(), jobImmutableInformation.getPluginJarsUrls()));
 
-        if (!CollectionUtils.isEmpty(jobImmutableInformation.getPluginJarsUrls())) {
-            logicalDag =
-                CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
-                    new SeatunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls()),
-                    jobImmutableInformation.getLogicalDag());
-        } else {
-            logicalDag = nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag());
-        }
-
+        classLoader = new SeatunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls());
+        logicalDag = CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
+            classLoader, jobImmutableInformation.getLogicalDag());
         CheckpointConfig checkpointConfig = mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(),
             jobImmutableInformation.getJobConfig().getEnvOptions());
 
@@ -205,14 +201,14 @@ public class JobMaster {
 
     public void initStateFuture() {
         jobMasterCompleteFuture = new CompletableFuture<>();
-        PassiveCompletableFuture<JobStatus> jobStatusFuture = physicalPlan.initStateFuture();
+        PassiveCompletableFuture<JobResult> jobStatusFuture = physicalPlan.initStateFuture();
         jobStatusFuture.whenComplete(withTryCatch(LOGGER, (v, t) -> {
             // We need not handle t, Because we will not return t from physicalPlan
-            if (JobStatus.FAILING.equals(v)) {
+            if (JobStatus.FAILING.equals(v.getStatus())) {
                 cleanJob();
                 physicalPlan.updateJobState(JobStatus.FAILING, JobStatus.FAILED);
             }
-            jobMasterCompleteFuture.complete(physicalPlan.getJobStatus());
+            jobMasterCompleteFuture.complete(new JobResult(physicalPlan.getJobStatus(), v.getError()));
         }));
     }
 
@@ -284,6 +280,10 @@ public class JobMaster {
         throw new IllegalArgumentException("can't find task group address from task group id: " + taskGroupId);
     }
 
+    public ClassLoader getClassLoader() {
+        return classLoader;
+    }
+
     public void cancelJob() {
         physicalPlan.neverNeedRestore();
         physicalPlan.cancelJob();
@@ -297,7 +297,7 @@ public class JobMaster {
         return checkpointManager;
     }
 
-    public PassiveCompletableFuture<JobStatus> getJobMasterCompleteFuture() {
+    public PassiveCompletableFuture<JobResult> getJobMasterCompleteFuture() {
         return new PassiveCompletableFuture<>(jobMasterCompleteFuture);
     }
 
@@ -400,10 +400,9 @@ public class JobMaster {
                                      @NonNull Map<TaskGroupLocation, SlotProfile> pipelineOwnedSlotProfiles) {
         ownedSlotProfilesIMap.put(pipelineLocation, pipelineOwnedSlotProfiles);
         try {
-            RetryUtils.retryWithException(() -> {
-                return pipelineOwnedSlotProfiles.equals(ownedSlotProfilesIMap.get(pipelineLocation));
-            }, new RetryUtils.RetryMaterial(20, true,
-                exception -> exception instanceof NullPointerException && isRunning, 1000));
+            RetryUtils.retryWithException(() -> pipelineOwnedSlotProfiles.equals(ownedSlotProfilesIMap.get(pipelineLocation)),
+                new RetryUtils.RetryMaterial(20, true,
+                    exception -> exception instanceof NullPointerException && isRunning, 1000));
         } catch (Exception e) {
             throw new SeaTunnelEngineException("Can not sync pipeline owned slot profiles with IMap", e);
         }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/WaitForJobCompleteOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/WaitForJobCompleteOperation.java
index 93df55116..6256a4c80 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/WaitForJobCompleteOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/WaitForJobCompleteOperation.java
@@ -34,7 +34,8 @@ public class WaitForJobCompleteOperation extends AbstractJobAsyncOperation {
     @Override
     protected PassiveCompletableFuture<?> doRun() throws Exception {
         SeaTunnelServer service = getService();
-        return service.getCoordinatorService().waitForJobComplete(jobId);
+        return new PassiveCompletableFuture<>(service.getCoordinatorService().waitForJobComplete(jobId)
+            .thenApply(jobResult -> this.getNodeEngine().getSerializationService().toData(jobResult)));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/WaitForJobCompleteTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/WaitForJobCompleteTask.java
index 813063b19..b7463e953 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/WaitForJobCompleteTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/WaitForJobCompleteTask.java
@@ -17,20 +17,20 @@
 
 package org.apache.seatunnel.engine.server.protocol.task;
 
-import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
 import org.apache.seatunnel.engine.server.operation.WaitForJobCompleteOperation;
 
 import com.hazelcast.client.impl.protocol.ClientMessage;
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
-public class WaitForJobCompleteTask extends AbstractSeaTunnelMessageTask<Long, JobStatus> {
+public class WaitForJobCompleteTask extends AbstractSeaTunnelMessageTask<Long, Data> {
     protected WaitForJobCompleteTask(ClientMessage clientMessage, Node node, Connection connection) {
         super(clientMessage, node, connection,
             SeaTunnelWaitForJobCompleteCodec::decodeRequest,
-            x -> SeaTunnelWaitForJobCompleteCodec.encodeResponse(x.ordinal()));
+            SeaTunnelWaitForJobCompleteCodec::encodeResponse);
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
index 336ef59a9..40416f480 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
@@ -47,7 +47,7 @@ public class DeployTaskOperation extends Operation implements IdentifiedDataSeri
     public void run() throws Exception {
         SeaTunnelServer server = getService();
         server.getSlotService().getSlotContext(slotProfile)
-            .getTaskExecutionService().deployTask(taskImmutableInformation);
+            .getTaskExecutionService().deployTask(taskImmutableInformation).get();
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 755d6e4b8..8f2919454 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobInfo;
+import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
@@ -132,14 +133,14 @@ public class JobMasterTest extends AbstractSeaTunnelServerTest {
         await().atMost(120000, TimeUnit.MILLISECONDS)
             .untilAsserted(() -> Assertions.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus()));
 
-        PassiveCompletableFuture<JobStatus> jobMasterCompleteFuture = jobMaster.getJobMasterCompleteFuture();
+        PassiveCompletableFuture<JobResult> jobMasterCompleteFuture = jobMaster.getJobMasterCompleteFuture();
         // cancel job
         jobMaster.cancelJob();
 
         // test job turn to complete
         await().atMost(120000, TimeUnit.MILLISECONDS)
             .untilAsserted(() -> Assertions.assertTrue(
-                jobMasterCompleteFuture.isDone() && JobStatus.CANCELED.equals(jobMasterCompleteFuture.get())));
+                jobMasterCompleteFuture.isDone() && JobStatus.CANCELED.equals(jobMasterCompleteFuture.get().getStatus())));
 
         testIMapRemovedAfterJobComplete(jobMaster);
     }