You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/24 07:53:37 UTC
[incubator-seatunnel] branch dev updated: [Feature][Zeta][REST-API] Add REST API To Get System Monitoring Information (#4315)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e1b69a2ae [Feature][Zeta][REST-API] Add REST API To Get System Monitoring Information (#4315)
e1b69a2ae is described below
commit e1b69a2ae32d5de585be464dcf307a811e7f53a1
Author: monster <60...@users.noreply.github.com>
AuthorDate: Fri Mar 24 15:53:29 2023 +0800
[Feature][Zeta][REST-API] Add REST API To Get System Monitoring Information (#4315)
---
docs/en/seatunnel-engine/rest-api.md | 65 ++++++++++++++++++++++
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 17 ++++++
.../seatunnel/engine/server/rest/RestConstant.java | 3 +
.../server/rest/RestHttpGetCommandProcessor.java | 51 +++++++++++++++++
4 files changed, 136 insertions(+)
diff --git a/docs/en/seatunnel-engine/rest-api.md b/docs/en/seatunnel-engine/rest-api.md
index bde6e95f9..0a58c3925 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -115,3 +115,68 @@ network:
------------------------------------------------------------------------------------------
+### Returns system monitoring information.
+
+<details>
+ <summary><code>GET</code> <code><b>/hazelcast/rest/maps/system-monitoring-information</b></code> <code>(Returns system monitoring information.)</code></summary>
+
+#### Parameters
+
+#### Responses
+
+```json
+[
+ {
+ "processors":"8",
+ "physical.memory.total":"16.0G",
+ "physical.memory.free":"16.3M",
+ "swap.space.total":"0",
+ "swap.space.free":"0",
+ "heap.memory.used":"135.7M",
+ "heap.memory.free":"440.8M",
+ "heap.memory.total":"576.5M",
+ "heap.memory.max":"3.6G",
+ "heap.memory.used/total":"23.54%",
+ "heap.memory.used/max":"3.73%",
+ "minor.gc.count":"6",
+ "minor.gc.time":"110ms",
+ "major.gc.count":"2",
+ "major.gc.time":"73ms",
+ "load.process":"24.78%",
+ "load.system":"60.00%",
+ "load.systemAverage":"2.07",
+ "thread.count":"117",
+ "thread.peakCount":"118",
+ "cluster.timeDiff":"0",
+ "event.q.size":"0",
+ "executor.q.async.size":"0",
+ "executor.q.client.size":"0",
+ "executor.q.client.query.size":"0",
+ "executor.q.client.blocking.size":"0",
+ "executor.q.query.size":"0",
+ "executor.q.scheduled.size":"0",
+ "executor.q.io.size":"0",
+ "executor.q.system.size":"0",
+ "executor.q.operations.size":"0",
+ "executor.q.priorityOperation.size":"0",
+ "operations.completed.count":"10",
+ "executor.q.mapLoad.size":"0",
+ "executor.q.mapLoadAllKeys.size":"0",
+ "executor.q.cluster.size":"0",
+ "executor.q.response.size":"0",
+ "operations.running.count":"0",
+ "operations.pending.invocations.percentage":"0.00%",
+ "operations.pending.invocations.count":"0",
+ "proxy.count":"8",
+ "clientEndpoint.count":"0",
+ "connection.active.count":"2",
+ "client.connection.count":"0",
+ "connection.count":"0"
+ }
+]
+```
+
+</details>
+
+------------------------------------------------------------------------------------------
+
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 16420b7c1..5f4e97ac8 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.lessThan;
@Slf4j
public class RestApiIT {
@@ -114,6 +115,22 @@ public class RestApiIT {
.body("[0].jobStatus", equalTo("RUNNING"));
}
+ @Test
+ public void testSystemMonitoringInformation() {
+ given().get(
+ HOST
+ + hazelcastInstance
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ + RestConstant.SYSTEM_MONITORING_INFORMATION)
+ .then()
+ .assertThat()
+ .time(lessThan(5000L))
+ .statusCode(200);
+ }
+
@AfterAll
static void afterClass() {
if (hazelcastInstance != null) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index e3c3dc50b..0a5d8437b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -21,4 +21,7 @@ public class RestConstant {
public static final String RUNNING_JOBS_URL = "/hazelcast/rest/maps/running-jobs";
public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job";
+
+ public static final String SYSTEM_MONITORING_INFORMATION =
+ "/hazelcast/rest/maps/system-monitoring-information";
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 712e44fbf..3a1627959 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -28,7 +28,12 @@ import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
+import org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
+import com.hazelcast.cluster.Address;
+import com.hazelcast.cluster.Cluster;
+import com.hazelcast.cluster.Member;
import com.hazelcast.internal.ascii.TextCommandService;
import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
import com.hazelcast.internal.ascii.rest.HttpGetCommand;
@@ -38,15 +43,20 @@ import com.hazelcast.internal.json.JsonValue;
import com.hazelcast.internal.util.JsonUtil;
import com.hazelcast.internal.util.StringUtil;
import com.hazelcast.map.IMap;
+import com.hazelcast.spi.impl.NodeEngine;
import java.text.SimpleDateFormat;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOBS_URL;
import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOB_URL;
+import static org.apache.seatunnel.engine.server.rest.RestConstant.SYSTEM_MONITORING_INFORMATION;
public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCommand> {
@@ -56,6 +66,8 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
private static final String SINK_WRITE_COUNT = "SinkWriteCount";
+ private NodeEngine nodeEngine;
+
public RestHttpGetCommandProcessor(TextCommandService textCommandService) {
this(textCommandService, new Log4j2HttpGetCommandProcessor(textCommandService));
}
@@ -77,6 +89,8 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
handleRunningJobsInfo(httpGetCommand);
} else if (uri.startsWith(RUNNING_JOB_URL)) {
handleJobInfoById(httpGetCommand, uri);
+ } else if (uri.startsWith(SYSTEM_MONITORING_INFORMATION)) {
+ getSystemMonitoringInformation(httpGetCommand);
} else {
original.handle(httpGetCommand);
}
@@ -95,6 +109,43 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
handle(httpGetCommand);
}
+ private void getSystemMonitoringInformation(HttpGetCommand command) {
+ Cluster cluster = textCommandService.getNode().hazelcastInstance.getCluster();
+ nodeEngine = textCommandService.getNode().hazelcastInstance.node.nodeEngine;
+
+ Set<Member> members = cluster.getMembers();
+ JsonArray jsonValues =
+ members.stream()
+ .map(
+ member -> {
+ Address address = member.getAddress();
+ String input = null;
+ try {
+ input =
+ (String)
+ NodeEngineUtil.sendOperationToMemberNode(
+ nodeEngine,
+ new GetClusterHealthMetricsOperation(),
+ address)
+ .get();
+ } catch (InterruptedException | ExecutionException e) {
+ logger.severe("get system monitoring information fail", e);
+ }
+ assert input != null;
+ String[] parts = input.split(", ");
+ JsonObject jobInfo = new JsonObject();
+ Arrays.stream(parts)
+ .forEach(
+ part -> {
+ String[] keyValue = part.split("=");
+ jobInfo.add(keyValue[0], keyValue[1]);
+ });
+ return jobInfo;
+ })
+ .collect(JsonArray::new, JsonArray::add, JsonArray::add);
+ this.prepareResponse(command, jsonValues);
+ }
+
private void handleRunningJobsInfo(HttpGetCommand command) {
IMap<Long, JobInfo> values =
this.textCommandService