You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by zh...@apache.org on 2019/12/27 12:21:36 UTC
[submarine] branch master updated: SUBMARINE-324. Submarine cluster
status RESTful
This is an automated email from the ASF dual-hosted git repository.
zhouquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new cf22db0 SUBMARINE-324. Submarine cluster status RESTful
cf22db0 is described below
commit cf22db0982e11a587af47bf57e3c7a07bca78254
Author: Xun Liu <li...@apache.org>
AuthorDate: Fri Dec 27 19:51:57 2019 +0800
SUBMARINE-324. Submarine cluster status RESTful
### What is this PR for?
Now, the submarine server supports cluster function,
You can get the status of multiple submarine servers and interpreter processes.
Now provides a REST interface that provides external access to cluster status information.
Provides diagnostics of cluster status for submarine k8s operator.
### What type of PR is it?
Feature
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/SUBMARINE-324
### How should this be tested?
* [CI Pass](https://travis-ci.org/liuxunorg/submarine/builds/629923676)
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Xun Liu <li...@apache.org>
Closes #133 from liuxunorg/SUBMARINE-324 and squashes the following commits:
0d08baa [Xun Liu] Fixed some issue.
2d8d42b [Xun Liu] SUBMARINE-324. Submarine cluster status RESTful
---
.../submarine/commons/cluster/ClusterManager.java | 6 +-
.../submarine/commons/cluster/ClusterMonitor.java | 2 +
.../commons/cluster/meta/ClusterMeta.java | 5 +
.../apache/submarine/server/SubmarineServer.java | 4 +-
.../server/jobserver/{rest => }/dao/Component.java | 2 +-
.../jobserver/{rest => }/dao/EnvVaraible.java | 2 +-
.../server/jobserver/{rest => }/dao/MLJobSpec.java | 2 +-
.../{rest => }/provider/YamlEntityProvider.java | 2 +-
.../{api/JobApi.java => JobServerRestApi.java} | 8 +-
.../submarine/server/rest/ClusterRestApi.java | 233 +++++++++++++++++++++
.../MetaStoreRestApi.java} | 10 +-
.../rest/dao => rest}/RestConstants.java | 10 +-
.../server/SubmarineServerClusterTest.java | 97 ++++++++-
.../JobApiTest.java => JobServerRestApiTest.java} | 10 +-
.../MetaStoreRestApiTest.java} | 8 +-
15 files changed, 370 insertions(+), 31 deletions(-)
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
index d45a804..508bd57 100644
--- a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
@@ -216,7 +216,7 @@ public abstract class ClusterManager {
try {
raftClientPort = NetworkUtils.findRandomAvailablePortOnAllLocalInterfaces();
} catch (IOException e) {
- LOG.error(e.getMessage());
+ LOG.error(e.getMessage(), e);
}
MemberId memberId = MemberId.from(serverHost + ":" + raftClientPort);
@@ -283,7 +283,7 @@ public abstract class ClusterManager {
}
}
} catch (InterruptedException e) {
- LOG.error(e.getMessage());
+ LOG.error(e.getMessage(), e);
}
}
}).start();
@@ -416,7 +416,7 @@ public abstract class ClusterManager {
mateData = raftSessionClient.execute(operation(ClusterStateMachine.GET,
clientSerializer.encode(entity))).get(3, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
- LOG.error(e.getMessage());
+ LOG.error(e.getMessage(), e);
}
if (null != mateData) {
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java
index afbd3e6..8c987f9 100644
--- a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java
@@ -184,6 +184,8 @@ public class ClusterMonitor {
// indicating that the process is still active.
private void sendHeartbeat() {
HashMap<String, Object> mapMonitorUtil = new HashMap<>();
+ mapMonitorUtil.put(ClusterMeta.NODE_NAME, clusterManager.getClusterNodeName());
+ mapMonitorUtil.put(ClusterMeta.INTP_PROCESS_NAME, metaKey);
mapMonitorUtil.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now());
mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMeta.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMeta.java
index 9ff7123..5014e5a 100644
--- a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMeta.java
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMeta.java
@@ -57,6 +57,11 @@ public class ClusterMeta implements Serializable {
public static String ONLINE_STATUS = "ONLINE";
public static String OFFLINE_STATUS = "OFFLINE";
+ public static String INTP_PROCESS_COUNT = "INTP_PROCESS_COUNT";
+ public static String INTP_PROCESS_LIST = "INTP_PROCESS_LIST";
+
+ public static String PROPERTIES = "properties";
+
// cluster_name = host:port
// Map:cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...}
private Map<String, Map<String, Object>> mapServerMeta = new HashMap<>();
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
index f2c5c9e..3cbe0f6 100644
--- a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
@@ -113,8 +113,8 @@ public class SubmarineServer extends ResourceConfig {
@Inject
public SubmarineServer() {
packages("org.apache.submarine.server.workbench.rest",
- "org.apache.submarine.server.jobserver.rest.api",
- "org.apache.submarine.server.metastore.rest"
+ "org.apache.submarine.server.jobserver.rest",
+ "org.apache.submarine.server.rest"
);
}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/dao/Component.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/Component.java
similarity index 96%
rename from submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/dao/Component.java
rename to submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/Component.java
index e5b39ef..93b1279 100644
--- a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/dao/Component.java
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/Component.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.submarine.server.jobserver.rest.dao;
+package org.apache.submarine.server.jobserver.dao;
/**
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/dao/EnvVaraible.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
similarity index 95%
rename from submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/dao/EnvVaraible.java
rename to submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
index b8388ff..7533907 100644
--- a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/dao/EnvVaraible.java
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.submarine.server.jobserver.rest.dao;
+package org.apache.submarine.server.jobserver.dao;
// A process level environment variable.
public class EnvVaraible {
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/dao/MLJobSpec.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/MLJobSpec.java
similarity index 98%
rename from submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/dao/MLJobSpec.java
rename to submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/MLJobSpec.java
index 0e64b71..d913a14 100644
--- a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/dao/MLJobSpec.java
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/MLJobSpec.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.submarine.server.jobserver.rest.dao;
+package org.apache.submarine.server.jobserver.dao;
/**
* The machine learning job spec the submarine job server can accept.
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/provider/YamlEntityProvider.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/provider/YamlEntityProvider.java
similarity index 97%
rename from submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/provider/YamlEntityProvider.java
rename to submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/provider/YamlEntityProvider.java
index 8250ce0..902dc03 100644
--- a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/provider/YamlEntityProvider.java
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/provider/YamlEntityProvider.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.submarine.server.jobserver.rest.provider;
+package org.apache.submarine.server.jobserver.provider;
import org.yaml.snakeyaml.Yaml;
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/api/JobApi.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/JobServerRestApi.java
similarity index 93%
rename from submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/api/JobApi.java
rename to submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/JobServerRestApi.java
index f9776d1..8121241 100644
--- a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/api/JobApi.java
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/JobServerRestApi.java
@@ -16,10 +16,10 @@
* limitations under the License.
*/
-package org.apache.submarine.server.jobserver.rest.api;
+package org.apache.submarine.server.jobserver.rest;
-import org.apache.submarine.server.jobserver.rest.dao.MLJobSpec;
-import org.apache.submarine.server.jobserver.rest.dao.RestConstants;
+import org.apache.submarine.server.rest.RestConstants;
+import org.apache.submarine.server.jobserver.dao.MLJobSpec;
import org.apache.submarine.server.response.JsonResponse;
import javax.ws.rs.Consumes;
@@ -48,7 +48,7 @@ import javax.ws.rs.core.Response;
* */
@Path(RestConstants.V1 + "/" + RestConstants.JOBS)
@Produces({MediaType.APPLICATION_JSON + "; " + RestConstants.CHARSET_UTF8})
-public class JobApi {
+public class JobServerRestApi {
// A ping test to verify the job server is up.
@Path(RestConstants.PING)
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/ClusterRestApi.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/ClusterRestApi.java
new file mode 100644
index 0000000..4ce09fe
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/ClusterRestApi.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.server.rest;
+
+import com.google.gson.Gson;
+import org.apache.submarine.commons.cluster.ClusterServer;
+import org.apache.submarine.commons.cluster.meta.ClusterMeta;
+import org.apache.submarine.commons.cluster.meta.ClusterMetaType;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.server.response.JsonResponse;
+import org.apache.submarine.server.workbench.annotation.SubmarineApi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Response;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * clusters Rest api.
+ */
+@Path(RestConstants.V1 + "/" + RestConstants.CLUSTER)
+@Produces("application/json")
+public class ClusterRestApi {
+ private static final Logger LOG = LoggerFactory.getLogger(ClusterRestApi.class);
+ Gson gson = new Gson();
+
+ private ClusterServer clusterServer = ClusterServer.getInstance();
+
+ @GET
+ @Path("/" + RestConstants.ADDRESS)
+ @SubmarineApi
+ public Response getClusterAddress() {
+ SubmarineConfiguration sConf = SubmarineConfiguration.getInstance();
+ String clusterAddr = sConf.getClusterAddress();
+ String[] arrAddr = clusterAddr.split(",");
+ List<String> listAddr = Arrays.asList(arrAddr);
+
+ return new JsonResponse.Builder<List<String>>(Response.Status.OK)
+ .success(true).result(listAddr).build();
+ }
+
+ /**
+ * get all nodes of clusters
+ */
+ @GET
+ @Path("/" + RestConstants.NODES)
+ @SubmarineApi
+ public Response getClusterNodes(){
+ ArrayList<HashMap<String, Object>> nodes = new ArrayList<>();
+
+ Map<String, HashMap<String, Object>> clusterMeta = null;
+ Map<String, HashMap<String, Object>> intpMeta = null;
+ clusterMeta = clusterServer.getClusterMeta(ClusterMetaType.SERVER_META, "");
+ intpMeta = clusterServer.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
+
+ // Number of interpreter processes
+ for (Map.Entry<String, HashMap<String, Object>> serverMetaEntity : clusterMeta.entrySet()) {
+ if (!serverMetaEntity.getValue().containsKey(ClusterMeta.NODE_NAME)) {
+ continue;
+ }
+ String serverNodeName = (String) serverMetaEntity.getValue().get(ClusterMeta.NODE_NAME);
+
+ ArrayList<String> arrIntpProcess = new ArrayList<>();
+ int intpProcCount = 0;
+ for (Map.Entry<String, HashMap<String, Object>> intpMetaEntity : intpMeta.entrySet()) {
+ if (!intpMetaEntity.getValue().containsKey(ClusterMeta.NODE_NAME)
+ && !intpMetaEntity.getValue().containsKey(ClusterMeta.INTP_PROCESS_NAME)) {
+ continue;
+ }
+ String intpNodeName = (String) intpMetaEntity.getValue().get(ClusterMeta.NODE_NAME);
+
+ if (serverNodeName.equals(intpNodeName)) {
+ intpProcCount++;
+ String intpName = (String) intpMetaEntity.getValue().get(ClusterMeta.INTP_PROCESS_NAME);
+ arrIntpProcess.add(intpName);
+ }
+ }
+ serverMetaEntity.getValue().put(ClusterMeta.INTP_PROCESS_COUNT, intpProcCount);
+ serverMetaEntity.getValue().put(ClusterMeta.INTP_PROCESS_LIST, arrIntpProcess);
+ }
+
+ for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
+ String nodeName = entry.getKey();
+ Map<String, Object> properties = entry.getValue();
+
+ Map<String, Object> sortProperties = new HashMap<>();
+
+ if (properties.containsKey(ClusterMeta.CPU_USED)
+ && properties.containsKey(ClusterMeta.CPU_CAPACITY)) {
+ float cpuUsed = (long) properties.get(ClusterMeta.CPU_USED) / (float) 100.0;
+ float cpuCapacity = (long) properties.get(ClusterMeta.CPU_CAPACITY) / (float) 100.0;
+ float cpuRate = cpuUsed / cpuCapacity * 100;
+
+ String cpuInfo = String.format("%.2f / %.2f = %.2f", cpuUsed, cpuCapacity, cpuRate);
+ sortProperties.put(ClusterMeta.CPU_USED + " / " + ClusterMeta.CPU_CAPACITY, cpuInfo + "%");
+ }
+
+ if (properties.containsKey(ClusterMeta.MEMORY_USED)
+ && properties.containsKey(ClusterMeta.MEMORY_CAPACITY)) {
+ float memoryUsed = (long) properties.get(ClusterMeta.MEMORY_USED) / (float) (1024 * 1024 * 1024);
+ float memoryCapacity
+ = (long) properties.get(ClusterMeta.MEMORY_CAPACITY) / (float) (1024 * 1024 * 1024);
+ float memoryRate = memoryUsed / memoryCapacity * 100;
+
+ String memoryInfo = String.format("%.2fGB / %.2fGB = %.2f",
+ memoryUsed, memoryCapacity, memoryRate);
+ sortProperties.put(ClusterMeta.MEMORY_USED + " / " + ClusterMeta.MEMORY_CAPACITY, memoryInfo + "%");
+ }
+
+ if (properties.containsKey(ClusterMeta.SERVER_START_TIME)) {
+ // format LocalDateTime
+ Object serverStartTime = properties.get(ClusterMeta.SERVER_START_TIME);
+ if (serverStartTime instanceof LocalDateTime) {
+ LocalDateTime localDateTime = (LocalDateTime) serverStartTime;
+ String dateTime = formatLocalDateTime(localDateTime);
+ sortProperties.put(ClusterMeta.SERVER_START_TIME, dateTime);
+ } else {
+ sortProperties.put(ClusterMeta.SERVER_START_TIME, "Wrong time type!");
+ }
+ }
+ if (properties.containsKey(ClusterMeta.STATUS)) {
+ sortProperties.put(ClusterMeta.STATUS, properties.get(ClusterMeta.STATUS));
+ }
+ if (properties.containsKey(ClusterMeta.LATEST_HEARTBEAT)) {
+ // format LocalDateTime
+ Object latestHeartbeat = properties.get(ClusterMeta.LATEST_HEARTBEAT);
+ if (latestHeartbeat instanceof LocalDateTime) {
+ LocalDateTime localDateTime = (LocalDateTime) latestHeartbeat;
+ String dateTime = formatLocalDateTime(localDateTime);
+ sortProperties.put(ClusterMeta.LATEST_HEARTBEAT, dateTime);
+ } else {
+ sortProperties.put(ClusterMeta.LATEST_HEARTBEAT, "Wrong time type!");
+ }
+ }
+ if (properties.containsKey(ClusterMeta.INTP_PROCESS_LIST)) {
+ sortProperties.put(ClusterMeta.INTP_PROCESS_LIST, properties.get(ClusterMeta.INTP_PROCESS_LIST));
+ }
+
+ HashMap<String, Object> node = new HashMap<>();
+ node.put(ClusterMeta.NODE_NAME, nodeName);
+ node.put(ClusterMeta.PROPERTIES, sortProperties);
+
+ nodes.add(node);
+ }
+
+ return new JsonResponse.Builder<ArrayList<HashMap<String, Object>>>(Response.Status.OK)
+ .success(true).result(nodes).build();
+ }
+
+ /**
+ * get node info by id
+ */
+ @GET
+ @Path("/" + RestConstants.NODE + "/{nodeName}/{intpName}")
+ @SubmarineApi
+ public Response getClusterNode(@PathParam("nodeName") String nodeName,
+ @PathParam("intpName") String intpName){
+ ArrayList<HashMap<String, Object>> intpProcesses = new ArrayList<>();
+
+ Map<String, HashMap<String, Object>> intpMeta = null;
+ intpMeta = clusterServer.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
+
+ // Number of calculation processes
+ for (Map.Entry<String, HashMap<String, Object>> intpMetaEntity : intpMeta.entrySet()) {
+ String intpNodeName = (String) intpMetaEntity.getValue().get(ClusterMeta.NODE_NAME);
+
+ if (null != intpNodeName && intpNodeName.equals(nodeName)) {
+ HashMap<String, Object> node = new HashMap<String, Object>();
+ node.put(ClusterMeta.NODE_NAME, intpNodeName);
+ node.put(ClusterMeta.PROPERTIES, intpMetaEntity.getValue());
+
+ // format LocalDateTime
+ HashMap<String, Object> properties = intpMetaEntity.getValue();
+ if (properties.containsKey(ClusterMeta.INTP_START_TIME)) {
+ Object intpStartTime = properties.get(ClusterMeta.INTP_START_TIME);
+ if (intpStartTime instanceof LocalDateTime) {
+ LocalDateTime localDateTime = (LocalDateTime) intpStartTime;
+ String dateTime = formatLocalDateTime(localDateTime);
+ properties.put(ClusterMeta.INTP_START_TIME, dateTime);
+ } else {
+ properties.put(ClusterMeta.INTP_START_TIME, "Wrong time type!");
+ }
+ }
+ if (properties.containsKey(ClusterMeta.LATEST_HEARTBEAT)) {
+ Object latestHeartbeat = properties.get(ClusterMeta.LATEST_HEARTBEAT);
+ if (latestHeartbeat instanceof LocalDateTime) {
+ LocalDateTime localDateTime = (LocalDateTime) latestHeartbeat;
+ String dateTime = formatLocalDateTime(localDateTime);
+ properties.put(ClusterMeta.LATEST_HEARTBEAT, dateTime);
+ } else {
+ properties.put(ClusterMeta.LATEST_HEARTBEAT, "Wrong time type!");
+ }
+ }
+
+ intpProcesses.add(node);
+ }
+ }
+
+ return new JsonResponse.Builder<ArrayList<HashMap<String, Object>>>(Response.Status.OK)
+ .success(true).result(intpProcesses).build();
+ }
+
+ private String formatLocalDateTime(LocalDateTime localDateTime) {
+ DateTimeFormatter dtf = DateTimeFormatter.ISO_DATE_TIME;
+ String strDate = localDateTime.format(dtf);
+ return strDate;
+ }
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/metastore/rest/MetaStoreApi.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/MetaStoreRestApi.java
similarity index 97%
rename from submarine-server/server-core/src/main/java/org/apache/submarine/server/metastore/rest/MetaStoreApi.java
rename to submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/MetaStoreRestApi.java
index 32df0e3..9b56a35 100644
--- a/submarine-server/server-core/src/main/java/org/apache/submarine/server/metastore/rest/MetaStoreApi.java
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/MetaStoreRestApi.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.submarine.server.metastore.rest;
+package org.apache.submarine.server.rest;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
@@ -44,17 +44,17 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.util.List;
-@Path("/metaStore")
+@Path(RestConstants.V1 + "/" + RestConstants.METASTORE)
@Produces("application/json")
@Singleton
-public class MetaStoreApi {
- private static final Logger LOG = LoggerFactory.getLogger(MetaStoreApi.class);
+public class MetaStoreRestApi {
+ private static final Logger LOG = LoggerFactory.getLogger(MetaStoreRestApi.class);
private static final Gson gson = new Gson();
private static final SubmarineConfiguration submarineConf = SubmarineConfiguration.getInstance();
private SubmarineMetaStore submarineMetaStore = new SubmarineMetaStore(submarineConf);
@Inject
- public MetaStoreApi() {
+ public MetaStoreRestApi() {
}
@POST
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/dao/RestConstants.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
similarity index 79%
rename from submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/dao/RestConstants.java
rename to submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
index 360d86c..e0b2ede 100644
--- a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/dao/RestConstants.java
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.submarine.server.jobserver.rest.dao;
+package org.apache.submarine.server.rest;
public class RestConstants {
public static final String V1 = "v1";
@@ -26,4 +26,12 @@ public class RestConstants {
public static final String PING = "ping";
public static final String MEDIA_TYPE_YAML = "application/yaml";
public static final String CHARSET_UTF8 = "charset=utf-8";
+
+ public static final String METASTORE = "metastore";
+
+ public static final String CLUSTER = "cluster";
+ public static final String ADDRESS = "address";
+
+ public static final String NODES = "nodes";
+ public static final String NODE = "node";
}
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/SubmarineServerClusterTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/SubmarineServerClusterTest.java
index d0475cd..da45676 100644
--- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/SubmarineServerClusterTest.java
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/SubmarineServerClusterTest.java
@@ -18,18 +18,31 @@
*/
package org.apache.submarine.server;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.internal.LinkedTreeMap;
+import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.submarine.commons.cluster.ClusterClient;
+import org.apache.submarine.commons.cluster.meta.ClusterMeta;
import org.apache.submarine.commons.cluster.meta.ClusterMetaType;
import org.apache.submarine.commons.utils.NetworkUtils;
import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.server.response.JsonResponse;
+import org.apache.submarine.server.rest.RestConstants;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
@@ -60,7 +73,7 @@ public class SubmarineServerClusterTest extends AbstractSubmarineServerTest {
constructor = clazz.getDeclaredConstructor();
constructor.setAccessible(true);
clusterClient = (ClusterClient) constructor.newInstance();
- clusterClient.start("SubmarineServerClusterTest");
+ clusterClient.start(SubmarineServerClusterTest.class.getSimpleName());
// Waiting for cluster startup
int wait = 0;
@@ -75,8 +88,8 @@ public class SubmarineServerClusterTest extends AbstractSubmarineServerTest {
assertTrue("Can not start Submarine server!", clusterClient.raftInitialized());
- // Waiting for the workbench server to register in the cluster
- sleep(5000);
+ // Waiting for the submarine server to register in the cluster and client send heartbeat to cluster
+ sleep(10000);
}
@AfterClass
@@ -104,4 +117,82 @@ public class SubmarineServerClusterTest extends AbstractSubmarineServerTest {
assertEquals(hashMap.size(), 1);
LOG.info("SubmarineServerClusterTest::testGetServerClusterMeta <<<");
}
+
+ @Test
+ public void testGetClusterAddress() throws IOException {
+ GetMethod response = httpGet("/api/" + RestConstants.V1 + "/"
+ + RestConstants.CLUSTER + "/" + RestConstants.ADDRESS);
+ LOG.info(response.toString());
+
+ String requestBody = response.getResponseBodyAsString();
+ LOG.info(requestBody);
+
+ Type type = new TypeToken<JsonResponse<List<String>>>() {}.getType();
+ Gson gson = new Gson();
+ JsonResponse<List<String>> jsonResponse = gson.fromJson(requestBody, type);
+ LOG.info(jsonResponse.getResult().toString());
+ assertEquals(jsonResponse.getCode(), Response.Status.OK.getStatusCode());
+
+ List<String> listAddr = jsonResponse.getResult();
+ LOG.info("listAddr.size = {}", listAddr.size());
+ assertEquals(listAddr.size(), 1);
+ }
+
+ private ArrayList<HashMap<String, Object>> getClusterNodes() throws IOException {
+ GetMethod response = httpGet("/api/" + RestConstants.V1 + "/"
+ + RestConstants.CLUSTER + "/" + RestConstants.NODES);
+ LOG.info(response.toString());
+
+ String requestBody = response.getResponseBodyAsString();
+ LOG.info(requestBody);
+
+ Type type = new TypeToken<JsonResponse<ArrayList<HashMap<String, Object>>>>() {}.getType();
+ Gson gson = new Gson();
+ JsonResponse<ArrayList<HashMap<String, Object>>> jsonResponse = gson.fromJson(requestBody, type);
+ LOG.info(jsonResponse.getResult().toString());
+ assertEquals(jsonResponse.getCode(), Response.Status.OK.getStatusCode());
+
+ ArrayList<HashMap<String, Object>> listNodes = jsonResponse.getResult();
+ LOG.info("listNodes.size = {}", listNodes.size());
+ assertEquals(listNodes.size(), 1);
+
+ return listNodes;
+ }
+
+ @Test
+ public void testGetClusterNodes() throws IOException {
+ getClusterNodes();
+ }
+
+ @Test
+ public void testGetClusterNode() throws IOException {
+ ArrayList<HashMap<String, Object>> listNodes = getClusterNodes();
+
+ Map<String, Object> properties
+ = (LinkedTreeMap<String, Object>) listNodes.get(0).get(ClusterMeta.PROPERTIES);
+ ArrayList<String> intpList = (ArrayList<String>) properties.get(ClusterMeta.INTP_PROCESS_LIST);
+ String nodeName = listNodes.get(0).get(ClusterMeta.NODE_NAME).toString();
+ String intpName = intpList.get(0);
+ LOG.info("properties = {}", properties);
+ LOG.info("intpList = {}", intpList);
+ LOG.info("nodeName = {}", nodeName);
+ LOG.info("intpName = {}", intpName);
+
+ GetMethod response = httpGet("/api/" + RestConstants.V1 + "/"
+ + RestConstants.CLUSTER + "/" + RestConstants.NODE + "/" + nodeName + "/" + intpName);
+ LOG.info(response.toString());
+
+ String requestBody = response.getResponseBodyAsString();
+ LOG.info(requestBody);
+
+ Type type = new TypeToken<JsonResponse<ArrayList<HashMap<String, Object>>>>() {}.getType();
+ Gson gson = new Gson();
+ JsonResponse<ArrayList<HashMap<String, Object>>> jsonResponse = gson.fromJson(requestBody, type);
+ LOG.info(jsonResponse.getResult().toString());
+ assertEquals(jsonResponse.getCode(), Response.Status.OK.getStatusCode());
+
+ ArrayList<HashMap<String, Object>> intpProcesses = jsonResponse.getResult();
+ LOG.info("intpProcesses = {}", intpProcesses);
+ assertEquals(intpProcesses.size(), 1);
+ }
}
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/jobserver/rest/api/JobApiTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/jobserver/JobServerRestApiTest.java
similarity index 94%
rename from submarine-server/server-core/src/test/java/org/apache/submarine/server/jobserver/rest/api/JobApiTest.java
rename to submarine-server/server-core/src/test/java/org/apache/submarine/server/jobserver/JobServerRestApiTest.java
index dfefca2..448faa7 100644
--- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/jobserver/rest/api/JobApiTest.java
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/jobserver/JobServerRestApiTest.java
@@ -17,14 +17,14 @@
* under the License.
*/
-package org.apache.submarine.server.jobserver.rest.api;
+package org.apache.submarine.server.jobserver;
import com.google.gson.Gson;
import org.apache.commons.httpclient.methods.DeleteMethod;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.submarine.server.AbstractSubmarineServerTest;
-import org.apache.submarine.server.jobserver.rest.dao.RestConstants;
+import org.apache.submarine.server.rest.RestConstants;
import org.apache.submarine.server.response.JsonResponse;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -37,12 +37,12 @@ import java.io.IOException;
import static org.junit.Assert.assertEquals;
-public class JobApiTest extends AbstractSubmarineServerTest {
- private static final Logger LOG = LoggerFactory.getLogger(JobApiTest.class);
+public class JobServerRestApiTest extends AbstractSubmarineServerTest {
+ private static final Logger LOG = LoggerFactory.getLogger(JobServerRestApiTest.class);
@BeforeClass
public static void init() throws Exception {
- AbstractSubmarineServerTest.startUp(JobApiTest.class.getSimpleName());
+ AbstractSubmarineServerTest.startUp(JobServerRestApiTest.class.getSimpleName());
}
@AfterClass
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/metastore/rest/MetaStoreApiTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/rest/MetaStoreRestApiTest.java
similarity index 97%
rename from submarine-server/server-core/src/test/java/org/apache/submarine/server/metastore/rest/MetaStoreApiTest.java
rename to submarine-server/server-core/src/test/java/org/apache/submarine/server/rest/MetaStoreRestApiTest.java
index 35910ab..9c904e3 100644
--- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/metastore/rest/MetaStoreApiTest.java
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/rest/MetaStoreRestApiTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.submarine.server.metastore.rest;
+package org.apache.submarine.server.rest;
import com.google.gson.Gson;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -41,8 +41,8 @@ import java.util.ArrayList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public class MetaStoreApiTest {
- private static MetaStoreApi metaStoreApi;
+public class MetaStoreRestApiTest {
+ private static MetaStoreRestApi metaStoreApi;
@BeforeClass
public static void init() {
@@ -56,7 +56,7 @@ public class MetaStoreApiTest {
"useSSL=false");
submarineConf.setMetastoreJdbcUserName("metastore_test");
submarineConf.setMetastoreJdbcPassword("password_test");
- metaStoreApi = new MetaStoreApi();
+ metaStoreApi = new MetaStoreRestApi();
}
@Before
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org