You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/12/14 04:06:42 UTC

(seatunnel) branch dev updated: [Feature] [rest-api] get finished jobs info (#5949)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1aff753893 [Feature] [rest-api] get finished jobs info (#5949)
1aff753893 is described below

commit 1aff753893c049ccdfb45c99a6fad909ffc5e60a
Author: Guangdong Liu <80...@qq.com>
AuthorDate: Thu Dec 14 12:06:36 2023 +0800

    [Feature] [rest-api] get finished jobs info (#5949)
    
    
    ---------
    
    Co-authored-by: gdliu3 <gd...@iflytek.com>
---
 docs/en/seatunnel-engine/rest-api.md               | 32 ++++++++
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java | 26 ++++++
 .../seatunnel/engine/server/rest/RestConstant.java |  6 ++
 .../server/rest/RestHttpGetCommandProcessor.java   | 94 +++++++++++++++++++++-
 4 files changed, 157 insertions(+), 1 deletion(-)

diff --git a/docs/en/seatunnel-engine/rest-api.md b/docs/en/seatunnel-engine/rest-api.md
index f899b6b711..e62b19b448 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -115,6 +115,38 @@ network:
 
 ------------------------------------------------------------------------------------------
 
+### Return all finished Jobs Info.
+
+<details>
+ <summary><code>GET</code> <code><b>/hazelcast/rest/maps/finished-jobs-info/:state</b></code> <code>(Return all finished Jobs Info.)</code></summary>
+
+#### Parameters
+
+> | name  |   type   | data type |                           description                            |
+> |-------|----------|-----------|------------------------------------------------------------------|
+> | state | optional | string    | finished job status. `FINISHED`,`CANCELED`,`FAILED`,`UNKNOWABLE` |
+
+#### Responses
+
+```json
+[
+  {
+    "jobId": "",
+    "jobName": "",
+    "jobStatus": "",
+    "errorMsg": null,
+    "createTime": "",
+    "finishTime": "",
+    "jobDag": "",
+    "metrics": ""
+  }
+]
+```
+
+</details>
+
+------------------------------------------------------------------------------------------
+
 ### Returns system monitoring information.
 
 <details>
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 2f53e9a475..cbd50703a1 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -45,6 +45,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static io.restassured.RestAssured.given;
 import static org.hamcrest.Matchers.equalTo;
@@ -151,6 +152,7 @@ public class RestApiIT {
 
     @Test
     public void testSubmitJob() {
+        AtomicInteger i = new AtomicInteger();
         Arrays.asList(node2, node1)
                 .forEach(
                         instance -> {
@@ -187,6 +189,30 @@ public class RestApiIT {
                                                                     .getJobStatus(
                                                                             Long.parseLong(
                                                                                     jobId))));
+
+                            given().get(
+                                            HOST
+                                                    + instance.getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + RestConstant.FINISHED_JOBS_INFO
+                                                    + "/FINISHED")
+                                    .then()
+                                    .statusCode(200)
+                                    .body("[" + i.get() + "].jobName", equalTo("test测试"))
+                                    .body("[" + i.get() + "].errorMsg", equalTo(null))
+                                    .body(
+                                            "[" + i.get() + "].jobDag.jobId",
+                                            equalTo(Long.parseLong(jobId)))
+                                    .body(
+                                            "[" + i.get() + "].metrics.SourceReceivedCount",
+                                            equalTo("100"))
+                                    .body(
+                                            "[" + i.get() + "].metrics.SinkWriteCount",
+                                            equalTo("100"))
+                                    .body("[" + i.get() + "].jobStatus", equalTo("FINISHED"));
+                            i.getAndIncrement();
                         });
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index 0b9d0dc15d..daa3da6e09 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -31,6 +31,8 @@ public class RestConstant {
 
     public static final String CREATE_TIME = "createTime";
 
+    public static final String FINISH_TIME = "finishTime";
+
     public static final String ENV_OPTIONS = "envOptions";
 
     public static final String JOB_DAG = "jobDag";
@@ -39,9 +41,13 @@ public class RestConstant {
 
     public static final String JAR_PATH = "jarPath";
 
+    public static final String ERROR_MSG = "errorMsg";
+
     public static final String METRICS = "metrics";
     public static final String RUNNING_JOBS_URL = "/hazelcast/rest/maps/running-jobs";
     public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job";
+
+    public static final String FINISHED_JOBS_INFO = "/hazelcast/rest/maps/finished-jobs";
     public static final String SUBMIT_JOB_URL = "/hazelcast/rest/maps/submit-job";
     public static final String ENCRYPT_CONFIG = "/hazelcast/rest/maps/encrypt-config";
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 6ee5b46e83..892c59b097 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -21,14 +21,18 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingExcep
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobInfo;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
+import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
 import org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
@@ -41,6 +45,7 @@ import com.hazelcast.instance.impl.HazelcastInstanceProxy;
 import com.hazelcast.internal.ascii.TextCommandService;
 import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
 import com.hazelcast.internal.ascii.rest.HttpGetCommand;
+import com.hazelcast.internal.json.Json;
 import com.hazelcast.internal.json.JsonArray;
 import com.hazelcast.internal.json.JsonObject;
 import com.hazelcast.internal.json.JsonValue;
@@ -52,6 +57,7 @@ import com.hazelcast.spi.impl.NodeEngine;
 
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
@@ -59,6 +65,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
+import static org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO;
 import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOBS_URL;
 import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOB_URL;
 import static org.apache.seatunnel.engine.server.rest.RestConstant.SYSTEM_MONITORING_INFORMATION;
@@ -92,6 +99,8 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
         try {
             if (uri.startsWith(RUNNING_JOBS_URL)) {
                 handleRunningJobsInfo(httpGetCommand);
+            } else if (uri.startsWith(FINISHED_JOBS_INFO)) {
+                handleFinishedJobsInfo(httpGetCommand, uri);
             } else if (uri.startsWith(RUNNING_JOB_URL)) {
                 handleJobInfoById(httpGetCommand, uri);
             } else if (uri.startsWith(SYSTEM_MONITORING_INFORMATION)) {
@@ -167,6 +176,67 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
         this.prepareResponse(command, jobs);
     }
 
+    private void handleFinishedJobsInfo(HttpGetCommand command, String uri) {
+
+        uri = StringUtil.stripTrailingSlash(uri);
+        int indexEnd = uri.indexOf('/', URI_MAPS.length());
+        String state = uri.substring(indexEnd + 1);
+
+        IMap<Long, JobState> finishedJob =
+                this.textCommandService
+                        .getNode()
+                        .getNodeEngine()
+                        .getHazelcastInstance()
+                        .getMap(Constant.IMAP_FINISHED_JOB_STATE);
+
+        IMap<Long, JobMetrics> finishedJobMetrics =
+                this.textCommandService
+                        .getNode()
+                        .getNodeEngine()
+                        .getHazelcastInstance()
+                        .getMap(Constant.IMAP_FINISHED_JOB_METRICS);
+
+        IMap<Long, JobDAGInfo> finishedJobDAGInfo =
+                this.textCommandService
+                        .getNode()
+                        .getNodeEngine()
+                        .getHazelcastInstance()
+                        .getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO);
+
+        JsonArray jobs =
+                finishedJob.values().stream()
+                        .filter(
+                                jobState -> {
+                                    if (state.isEmpty()) {
+                                        return true;
+                                    }
+                                    return jobState.getJobStatus()
+                                            .name()
+                                            .equals(state.toUpperCase());
+                                })
+                        .sorted(Comparator.comparing(JobState::getFinishTime))
+                        .map(
+                                jobState -> {
+                                    Long jobId = jobState.getJobId();
+                                    String jobMetrics =
+                                            getSeaTunnelServer()
+                                                    .getCoordinatorService()
+                                                    .getJobMetrics(jobId)
+                                                    .toJsonString();
+                                    JobDAGInfo jobDAGInfo = finishedJobDAGInfo.get(jobId);
+
+                                    return convertToJson(
+                                            jobState,
+                                            jobMetrics,
+                                            Json.parse(JsonUtils.toJsonString(jobDAGInfo))
+                                                    .asObject(),
+                                            jobId);
+                                })
+                        .collect(JsonArray::new, JsonArray::add, JsonArray::add);
+
+        this.prepareResponse(command, jobs);
+    }
+
     private void handleJobInfoById(HttpGetCommand command, String uri) {
         uri = StringUtil.stripTrailingSlash(uri);
         int indexEnd = uri.indexOf('/', URI_MAPS.length());
@@ -181,7 +251,7 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
                                 .getMap(Constant.IMAP_RUNNING_JOB_INFO)
                                 .get(Long.valueOf(jobId));
 
-        if (!"".equals(jobId) && jobInfo != null) {
+        if (!jobId.isEmpty() && jobInfo != null) {
             this.prepareResponse(command, convertToJson(jobInfo, Long.parseLong(jobId)));
         } else {
             this.prepareResponse(command, new JsonObject());
@@ -293,4 +363,26 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
 
         return jobInfoJson;
     }
+
+    private JsonObject convertToJson(
+            JobState jobState, String jobMetrics, JsonObject jobDAGInfo, long jobId) {
+        JsonObject jobInfoJson = new JsonObject();
+        jobInfoJson
+                .add(RestConstant.JOB_ID, String.valueOf(jobId))
+                .add(RestConstant.JOB_NAME, jobState.getJobName())
+                .add(RestConstant.JOB_STATUS, jobState.getJobStatus().toString())
+                .add(RestConstant.ERROR_MSG, jobState.getErrorMessage())
+                .add(
+                        RestConstant.CREATE_TIME,
+                        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+                                .format(new Date(jobState.getSubmitTime())))
+                .add(
+                        RestConstant.FINISH_TIME,
+                        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+                                .format(new Date(jobState.getFinishTime())))
+                .add(RestConstant.JOB_DAG, jobDAGInfo)
+                .add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
+
+        return jobInfoJson;
+    }
 }