You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/24 07:53:37 UTC

[incubator-seatunnel] branch dev updated: [Feature][Zeta][REST-API] Add REST API To Get System Monitoring Information (#4315)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e1b69a2ae  [Feature][Zeta][REST-API] Add REST API To Get System Monitoring Information (#4315)
e1b69a2ae is described below

commit e1b69a2ae32d5de585be464dcf307a811e7f53a1
Author: monster <60...@users.noreply.github.com>
AuthorDate: Fri Mar 24 15:53:29 2023 +0800

     [Feature][Zeta][REST-API] Add REST API To Get System Monitoring Information (#4315)
---
 docs/en/seatunnel-engine/rest-api.md               | 65 ++++++++++++++++++++++
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java | 17 ++++++
 .../seatunnel/engine/server/rest/RestConstant.java |  3 +
 .../server/rest/RestHttpGetCommandProcessor.java   | 51 +++++++++++++++++
 4 files changed, 136 insertions(+)

diff --git a/docs/en/seatunnel-engine/rest-api.md b/docs/en/seatunnel-engine/rest-api.md
index bde6e95f9..0a58c3925 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -115,3 +115,68 @@ network:
 
 ------------------------------------------------------------------------------------------
 
+### Returns system monitoring information.
+
+<details>
+ <summary><code>GET</code> <code><b>/hazelcast/rest/maps/system-monitoring-information</b></code> <code>(Returns system monitoring information.)</code></summary>
+
+#### Parameters
+
+#### Responses
+
+```json
+[
+  {
+    "processors":"8",
+    "physical.memory.total":"16.0G",
+    "physical.memory.free":"16.3M",
+    "swap.space.total":"0",
+    "swap.space.free":"0",
+    "heap.memory.used":"135.7M",
+    "heap.memory.free":"440.8M",
+    "heap.memory.total":"576.5M",
+    "heap.memory.max":"3.6G",
+    "heap.memory.used/total":"23.54%",
+    "heap.memory.used/max":"3.73%",
+    "minor.gc.count":"6",
+    "minor.gc.time":"110ms",
+    "major.gc.count":"2",
+    "major.gc.time":"73ms",
+    "load.process":"24.78%",
+    "load.system":"60.00%",
+    "load.systemAverage":"2.07",
+    "thread.count":"117",
+    "thread.peakCount":"118",
+    "cluster.timeDiff":"0",
+    "event.q.size":"0",
+    "executor.q.async.size":"0",
+    "executor.q.client.size":"0",
+    "executor.q.client.query.size":"0",
+    "executor.q.client.blocking.size":"0",
+    "executor.q.query.size":"0",
+    "executor.q.scheduled.size":"0",
+    "executor.q.io.size":"0",
+    "executor.q.system.size":"0",
+    "executor.q.operations.size":"0",
+    "executor.q.priorityOperation.size":"0",
+    "operations.completed.count":"10",
+    "executor.q.mapLoad.size":"0",
+    "executor.q.mapLoadAllKeys.size":"0",
+    "executor.q.cluster.size":"0",
+    "executor.q.response.size":"0",
+    "operations.running.count":"0",
+    "operations.pending.invocations.percentage":"0.00%",
+    "operations.pending.invocations.count":"0",
+    "proxy.count":"8",
+    "clientEndpoint.count":"0",
+    "connection.active.count":"2",
+    "client.connection.count":"0",
+    "connection.count":"0"
+  }
+]
+```
+
+</details>
+
+------------------------------------------------------------------------------------------
+
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 16420b7c1..5f4e97ac8 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
 
 import static io.restassured.RestAssured.given;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.lessThan;
 
 @Slf4j
 public class RestApiIT {
@@ -114,6 +115,22 @@ public class RestApiIT {
                 .body("[0].jobStatus", equalTo("RUNNING"));
     }
 
+    @Test
+    public void testSystemMonitoringInformation() {
+        given().get(
+                        HOST
+                                + hazelcastInstance
+                                        .getCluster()
+                                        .getLocalMember()
+                                        .getAddress()
+                                        .getPort()
+                                + RestConstant.SYSTEM_MONITORING_INFORMATION)
+                .then()
+                .assertThat()
+                .time(lessThan(5000L))
+                .statusCode(200);
+    }
+
     @AfterAll
     static void afterClass() {
         if (hazelcastInstance != null) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index e3c3dc50b..0a5d8437b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -21,4 +21,7 @@ public class RestConstant {
 
     public static final String RUNNING_JOBS_URL = "/hazelcast/rest/maps/running-jobs";
     public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job";
+
+    public static final String SYSTEM_MONITORING_INFORMATION =
+            "/hazelcast/rest/maps/system-monitoring-information";
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 712e44fbf..3a1627959 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -28,7 +28,12 @@ import org.apache.seatunnel.engine.core.job.JobInfo;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
+import org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
+import com.hazelcast.cluster.Address;
+import com.hazelcast.cluster.Cluster;
+import com.hazelcast.cluster.Member;
 import com.hazelcast.internal.ascii.TextCommandService;
 import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
 import com.hazelcast.internal.ascii.rest.HttpGetCommand;
@@ -38,15 +43,20 @@ import com.hazelcast.internal.json.JsonValue;
 import com.hazelcast.internal.util.JsonUtil;
 import com.hazelcast.internal.util.StringUtil;
 import com.hazelcast.map.IMap;
+import com.hazelcast.spi.impl.NodeEngine;
 
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
 import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
 import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOBS_URL;
 import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOB_URL;
+import static org.apache.seatunnel.engine.server.rest.RestConstant.SYSTEM_MONITORING_INFORMATION;
 
 public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCommand> {
 
@@ -56,6 +66,8 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
 
     private static final String SINK_WRITE_COUNT = "SinkWriteCount";
 
+    private NodeEngine nodeEngine;
+
     public RestHttpGetCommandProcessor(TextCommandService textCommandService) {
         this(textCommandService, new Log4j2HttpGetCommandProcessor(textCommandService));
     }
@@ -77,6 +89,8 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
                 handleRunningJobsInfo(httpGetCommand);
             } else if (uri.startsWith(RUNNING_JOB_URL)) {
                 handleJobInfoById(httpGetCommand, uri);
+            } else if (uri.startsWith(SYSTEM_MONITORING_INFORMATION)) {
+                getSystemMonitoringInformation(httpGetCommand);
             } else {
                 original.handle(httpGetCommand);
             }
@@ -95,6 +109,43 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCom
         handle(httpGetCommand);
     }
 
+    private void getSystemMonitoringInformation(HttpGetCommand command) {
+        Cluster cluster = textCommandService.getNode().hazelcastInstance.getCluster();
+        nodeEngine = textCommandService.getNode().hazelcastInstance.node.nodeEngine;
+
+        Set<Member> members = cluster.getMembers();
+        JsonArray jsonValues =
+                members.stream()
+                        .map(
+                                member -> {
+                                    Address address = member.getAddress();
+                                    String input = null;
+                                    try {
+                                        input =
+                                                (String)
+                                                        NodeEngineUtil.sendOperationToMemberNode(
+                                                                        nodeEngine,
+                                                                        new GetClusterHealthMetricsOperation(),
+                                                                        address)
+                                                                .get();
+                                    } catch (InterruptedException | ExecutionException e) {
+                                        logger.severe("get system monitoring information fail", e);
+                                    }
+                                    assert input != null;
+                                    String[] parts = input.split(", ");
+                                    JsonObject jobInfo = new JsonObject();
+                                    Arrays.stream(parts)
+                                            .forEach(
+                                                    part -> {
+                                                        String[] keyValue = part.split("=");
+                                                        jobInfo.add(keyValue[0], keyValue[1]);
+                                                    });
+                                    return jobInfo;
+                                })
+                        .collect(JsonArray::new, JsonArray::add, JsonArray::add);
+        this.prepareResponse(command, jsonValues);
+    }
+
     private void handleRunningJobsInfo(HttpGetCommand command) {
         IMap<Long, JobInfo> values =
                 this.textCommandService