You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/12/14 04:06:42 UTC
(seatunnel) branch dev updated: [Feature] [rest-api] get finished jobs info (#5949)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1aff753893 [Feature] [rest-api] get finished jobs info (#5949)
1aff753893 is described below
commit 1aff753893c049ccdfb45c99a6fad909ffc5e60a
Author: Guangdong Liu <80...@qq.com>
AuthorDate: Thu Dec 14 12:06:36 2023 +0800
[Feature] [rest-api] get finished jobs info (#5949)
---------
Co-authored-by: gdliu3 <gd...@iflytek.com>
---
docs/en/seatunnel-engine/rest-api.md | 32 ++++++++
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 26 ++++++
.../seatunnel/engine/server/rest/RestConstant.java | 6 ++
.../server/rest/RestHttpGetCommandProcessor.java | 94 +++++++++++++++++++++-
4 files changed, 157 insertions(+), 1 deletion(-)
diff --git a/docs/en/seatunnel-engine/rest-api.md b/docs/en/seatunnel-engine/rest-api.md
index f899b6b711..e62b19b448 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -115,6 +115,38 @@ network:
------------------------------------------------------------------------------------------
+### Return all finished Jobs Info.
+
+<details>
+ <summary><code>GET</code> <code><b>/hazelcast/rest/maps/finished-jobs-info/:state</b></code> <code>(Return all finished Jobs Info.)</code></summary>
+
+#### Parameters
+
+> | name | type | data type | description |
+> |-------|----------|-----------|------------------------------------------------------------------|
+> | state | optional | string | finished job status. `FINISHED`,`CANCELED`,`FAILED`,`UNKNOWABLE` |
+
+#### Responses
+
+```json
+[
+ {
+ "jobId": "",
+ "jobName": "",
+ "jobStatus": "",
+ "errorMsg": null,
+ "createTime": "",
+ "finishTime": "",
+ "jobDag": "",
+ "metrics": ""
+ }
+]
+```
+
+</details>
+
+------------------------------------------------------------------------------------------
+
### Returns system monitoring information.
<details>
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 2f53e9a475..cbd50703a1 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -45,6 +45,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.equalTo;
@@ -151,6 +152,7 @@ public class RestApiIT {
@Test
public void testSubmitJob() {
+ AtomicInteger i = new AtomicInteger();
Arrays.asList(node2, node1)
.forEach(
instance -> {
@@ -187,6 +189,30 @@ public class RestApiIT {
.getJobStatus(
Long.parseLong(
jobId))));
+
+ given().get(
+ HOST
+ + instance.getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ + RestConstant.FINISHED_JOBS_INFO
+ + "/FINISHED")
+ .then()
+ .statusCode(200)
+ .body("[" + i.get() + "].jobName", equalTo("test测试"))
+ .body("[" + i.get() + "].errorMsg", equalTo(null))
+ .body(
+ "[" + i.get() + "].jobDag.jobId",
+ equalTo(Long.parseLong(jobId)))
+ .body(
+ "[" + i.get() + "].metrics.SourceReceivedCount",
+ equalTo("100"))
+ .body(
+ "[" + i.get() + "].metrics.SinkWriteCount",
+ equalTo("100"))
+ .body("[" + i.get() + "].jobStatus", equalTo("FINISHED"));
+ i.getAndIncrement();
});
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index 0b9d0dc15d..daa3da6e09 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -31,6 +31,8 @@ public class RestConstant {
public static final String CREATE_TIME = "createTime";
+ public static final String FINISH_TIME = "finishTime";
+
public static final String ENV_OPTIONS = "envOptions";
public static final String JOB_DAG = "jobDag";
@@ -39,9 +41,13 @@ public class RestConstant {
public static final String JAR_PATH = "jarPath";
+ public static final String ERROR_MSG = "errorMsg";
+
public static final String METRICS = "metrics";
public static final String RUNNING_JOBS_URL = "/hazelcast/rest/maps/running-jobs";
public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job";
+
+ public static final String FINISHED_JOBS_INFO = "/hazelcast/rest/maps/finished-jobs";
public static final String SUBMIT_JOB_URL = "/hazelcast/rest/maps/submit-job";
public static final String ENCRYPT_CONFIG = "/hazelcast/rest/maps/encrypt-config";
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 6ee5b46e83..892c59b097 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -21,14 +21,18 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingExcep
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
+import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
import org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
@@ -41,6 +45,7 @@ import com.hazelcast.instance.impl.HazelcastInstanceProxy;
import com.hazelcast.internal.ascii.TextCommandService;
import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
import com.hazelcast.internal.ascii.rest.HttpGetCommand;
+import com.hazelcast.internal.json.Json;
import com.hazelcast.internal.json.JsonArray;
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.internal.json.JsonValue;
@@ -52,6 +57,7 @@ import com.hazelcast.spi.impl.NodeEngine;
import java.text.SimpleDateFormat;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -59,6 +65,7 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
+import static org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO;
import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOBS_URL;
import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOB_URL;
import static org.apache.seatunnel.engine.server.rest.RestConstant.SYSTEM_MONITORING_INFORMATION;
@@ -92,6 +99,8 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
try {
if (uri.startsWith(RUNNING_JOBS_URL)) {
handleRunningJobsInfo(httpGetCommand);
+ } else if (uri.startsWith(FINISHED_JOBS_INFO)) {
+ handleFinishedJobsInfo(httpGetCommand, uri);
} else if (uri.startsWith(RUNNING_JOB_URL)) {
handleJobInfoById(httpGetCommand, uri);
} else if (uri.startsWith(SYSTEM_MONITORING_INFORMATION)) {
@@ -167,6 +176,67 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
this.prepareResponse(command, jobs);
}
+ private void handleFinishedJobsInfo(HttpGetCommand command, String uri) {
+
+ uri = StringUtil.stripTrailingSlash(uri);
+ int indexEnd = uri.indexOf('/', URI_MAPS.length());
+ String state = uri.substring(indexEnd + 1);
+
+ IMap<Long, JobState> finishedJob =
+ this.textCommandService
+ .getNode()
+ .getNodeEngine()
+ .getHazelcastInstance()
+ .getMap(Constant.IMAP_FINISHED_JOB_STATE);
+
+ IMap<Long, JobMetrics> finishedJobMetrics =
+ this.textCommandService
+ .getNode()
+ .getNodeEngine()
+ .getHazelcastInstance()
+ .getMap(Constant.IMAP_FINISHED_JOB_METRICS);
+
+ IMap<Long, JobDAGInfo> finishedJobDAGInfo =
+ this.textCommandService
+ .getNode()
+ .getNodeEngine()
+ .getHazelcastInstance()
+ .getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO);
+
+ JsonArray jobs =
+ finishedJob.values().stream()
+ .filter(
+ jobState -> {
+ if (state.isEmpty()) {
+ return true;
+ }
+ return jobState.getJobStatus()
+ .name()
+ .equals(state.toUpperCase());
+ })
+ .sorted(Comparator.comparing(JobState::getFinishTime))
+ .map(
+ jobState -> {
+ Long jobId = jobState.getJobId();
+ String jobMetrics =
+ getSeaTunnelServer()
+ .getCoordinatorService()
+ .getJobMetrics(jobId)
+ .toJsonString();
+ JobDAGInfo jobDAGInfo = finishedJobDAGInfo.get(jobId);
+
+ return convertToJson(
+ jobState,
+ jobMetrics,
+ Json.parse(JsonUtils.toJsonString(jobDAGInfo))
+ .asObject(),
+ jobId);
+ })
+ .collect(JsonArray::new, JsonArray::add, JsonArray::add);
+
+ this.prepareResponse(command, jobs);
+ }
+
private void handleJobInfoById(HttpGetCommand command, String uri) {
uri = StringUtil.stripTrailingSlash(uri);
int indexEnd = uri.indexOf('/', URI_MAPS.length());
@@ -181,7 +251,7 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
.getMap(Constant.IMAP_RUNNING_JOB_INFO)
.get(Long.valueOf(jobId));
- if (!"".equals(jobId) && jobInfo != null) {
+ if (!jobId.isEmpty() && jobInfo != null) {
this.prepareResponse(command, convertToJson(jobInfo, Long.parseLong(jobId)));
} else {
this.prepareResponse(command, new JsonObject());
@@ -293,4 +363,26 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
return jobInfoJson;
}
+
+ private JsonObject convertToJson(
+ JobState jobState, String jobMetrics, JsonObject jobDAGInfo, long jobId) {
+ JsonObject jobInfoJson = new JsonObject();
+ jobInfoJson
+ .add(RestConstant.JOB_ID, String.valueOf(jobId))
+ .add(RestConstant.JOB_NAME, jobState.getJobName())
+ .add(RestConstant.JOB_STATUS, jobState.getJobStatus().toString())
+ .add(RestConstant.ERROR_MSG, jobState.getErrorMessage())
+ .add(
+ RestConstant.CREATE_TIME,
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+ .format(new Date(jobState.getSubmitTime())))
+ .add(
+ RestConstant.FINISH_TIME,
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+ .format(new Date(jobState.getFinishTime())))
+ .add(RestConstant.JOB_DAG, jobDAGInfo)
+ .add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
+
+ return jobInfoJson;
+ }
}