You are viewing a plain text version of this content. The canonical link for it is here.
Posted to submarine-dev@hadoop.apache.org by li...@apache.org on 2019/10/22 13:05:02 UTC

[hadoop-submarine] branch master updated: SUBMARINE-254. Workbench server support cluster mode

This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 186a8dc  SUBMARINE-254. Workbench server support cluster mode
186a8dc is described below

commit 186a8dcf291c41eea4d65e05a846957872d8241f
Author: Xun Liu <li...@apache.org>
AuthorDate: Sun Oct 20 22:59:09 2019 +0800

    SUBMARINE-254. Workbench server support cluster mode
    
    ### What is this PR for?
    Workbench Server is mainly for Submarine Workbench WEB is mainly for algorithm users to provide algorithm development, Python/Spark interpreter operation and other services through Notebook.
    
    The workbench server allows multiple workbench servers to form a service cluster by integrating the cluster components.
    
    The goal of the Submarine project is to provide high availability and high reliability services for big data processing, algorithm development, job scheduling, job scheduling, model online services, model batch and incremental updates. In addition to the high availability of big data and machine learning frameworks, the high availability of Submarine Server and Workbench Server itself is a key consideration.
    
    Design Doc: https://docs.google.com/document/d/1Ax6FQ5CAP-jowm2_Mp2r5kc9r7s1bkFLRfzvvbm5Wzc/edit#
    
    ### What type of PR is it?
    [Feature]
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/SUBMARINE-254
    
    ### How should this be tested?
    * [CI Pass](https://travis-ci.org/liuxunorg/hadoop-submarine/builds/600527624)
    * TestWorkbenchClusterServer.java
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update?  No
    * Is there breaking changes for older versions?  No
    * Does this needs documentation? Yes
    
    Author: Xun Liu <li...@apache.org>
    
    Closes #59 from liuxunorg/SUBMARINE-254 and squashes the following commits:
    
    031fd84 [Xun Liu] SUBMARINE-254. Workbench server support cluster mode
---
 docs/design/SubmarineClusterServer.md              | 157 +++++++++++++++++++++
 pom.xml                                            |   2 +-
 .../submarine/commons/cluster/ClusterManager.java  |   4 +-
 .../commons/cluster/ClusterManagerClient.java      |   4 +-
 .../commons/cluster/ClusterManagerServer.java      |   4 +-
 .../commons/cluster/ClusterMultiNodeTest.java      |   6 +-
 submarine-commons/commons-utils/pom.xml            |  12 ++
 .../commons/utils/SubmarineConfiguration.java      |   9 +-
 submarine-workbench/workbench-server/pom.xml       |  15 +-
 .../apache/submarine/server/WorkbenchServer.java   |  11 ++
 .../server/AbstractWorkbenchServerTest.java        |  40 +-----
 .../server/WorkbenchClusterServerTest.java         | 107 ++++++++++++++
 12 files changed, 307 insertions(+), 64 deletions(-)

diff --git a/docs/design/SubmarineClusterServer.md b/docs/design/SubmarineClusterServer.md
new file mode 100644
index 0000000..1c7d126
--- /dev/null
+++ b/docs/design/SubmarineClusterServer.md
@@ -0,0 +1,157 @@
+# Submarine Cluster Server Design
+
+## Introduction
+The Submarine system contains a total of two daemon services, Submarine Server and Workbench Server.
+
+Submarine Server mainly provides job submission, job scheduling, job status monitoring, and model online service for Submarine.
+
+Workbench Server is mainly for algorithm users to provide algorithm development, Python/Spark interpreter operation and other services through Notebook.
+
+The goal of the Submarine project is to provide high availability and high reliability services for big data processing, 
+algorithm development, job scheduling, model online services, model batch and incremental updates. 
+
+In addition to the high availability of big data and machine learning frameworks, 
+the high availability of Submarine Server and Workbench Server itself is a key consideration.
+
+## Requirement
+
+### Cluster Metadata Center
+
+Multiple Submarine (or Workbench) Server processes create a Submarine Cluster through the RAFT algorithm library. 
+
+The cluster internally maintains a metadata center. All servers can operate the metadata. 
+
+The RAFT algorithm ensures that multiple processes are simultaneously co-located. 
+
+A data modification will not cause problems such as mutual coverage and dirty data.
+
+This metadata center stores data by means of key-value pairs. it can store/support a variety of data, 
+but it should be noted that metadata is only suitable for storing small amounts of data and cannot be used to replace data storage.
+
+### Service discovery
+
+By storing the information of the service or process in the metadata center, we can easily find the information of the service or process we need in any place, 
+for example, the IP address and port where the Python interpreter will be the process. Information is stored in metadata, 
+and other services can easily find process information through process IDs and connect to provide service discovery capabilities.
+
+### Cluster event
+
+In the entire Submarine cluster, the servers can communicate with each other and other child processes to send cluster events to each other. 
+
+The service or process processes the corresponding programs according to the cluster events. For example, 
+the Workbench Server can be managed to Python. The interpreter process sends a shutdown event that controls the operation of the services and individual subprocesses throughout the cluster.
+
+Cluster events support both broadcast and separate delivery capabilities.
+
+### Independence
+
+We implement Submarine's clustering capabilities through the RAFT algorithm library, without relying on any external services (eg Zookeeper, Etcd, etc.)
+
+### Disadvantages
+
+Because the RAFT algorithm requires more than half of the servers available to ensure the normality of the RAFT algorithm, 
+if we need to turn on the clustering capabilities of Submarine (Workbench) Server, when more than half of the servers are unavailable, 
+some programs may appear abnormal. Of course, we also detected this in the system, downgrading the system or refusing to provide service status.
+
+## System design
+
+### Universal design
+
+Modular design, because Submarine (Workbench) Server exists in Submarine system, these two services need to provide clustering capabilities, 
+so we abstract the cluster function into a separate module for development, so that Submarine (Workbench) Server can reuse the cluster function module.
+
+### ClusterConfigure
+
+Add a `submarine.server.addr` and `workbench.server.addr` configuration items in `submarine-site.xml`, `submarine.server.addr=ip1, ip2, ip3`, 
+through the IP list, the RAFT algorithm module in the server process can Cluster with other server processes.
+
+### ClusterServer
+
++ The ClusterServer module encapsulates the RAFT algorithm module, which can create a service cluster and read and write metadata based 
+on the two configuration items submarine.server.addr or workbench.server.addr.
+
++ The cluster management service runs in each submarine server;
+
++ The cluster management service establishes a cluster by using the atomix RaftServer class of the Raft algorithm library, maintains the ClusterStateMachine, 
+and manages the service state metadata of each submarine server through the PutCommand, GetQuery, and DeleteCommand operation commands.
+
+### ClusterClient
+
++ The ClusterClient module encapsulates the RAFT algorithm client module, which can communicate with the cluster according to the two configuration items `submarine.server.addr` or `workbench.server.addr`, 
+read and write metadata, and write the IP and port information of the client process. Into the cluster's metadata center.
+
++ The cluster management client runs in each submarine server and submarine Interpreter process;
+
++ The cluster management client manages the submarine server and submarine Interpreter process state (metadata information) 
+in the ClusterStateMachine by using the atomix RaftClient class of the Raft library to connect to the atomix RaftServer. 
+
++ When the submarine server and Submarine Interpreter processes are started, they are added to the ClusterStateMachine and are removed from the ClusterStateMachine 
+
++ when the Submarine Server and Submarine Interpreter processes are closed.
+
+### ClusterMetadata
+Metadata stores metadata information in a KV key-value pair。
+ServerMeta:key='host:port',value= {SERVER_HOST=...,SERVER_PORT=...,...}
+
+
+
+| Name                  | Description                     |
+| --------------------- | ------------------------------- |
+| SUBAMRINE_SERVER_HOST | Submarine server IP             |
+| SUBAMRINE_SERVER_PORT | Submarine server port           |
+| WORKBENCH_SERVER_HOST | Submarine workbench server IP   |
+| WORKBENCH_SERVER_PORT | Submarine workbench server port |
+
+InterpreterMeta:key=InterpreterGroupId,value={INTP_TSERVER_HOST=...,...}
+
+| Name              | Description                          |
+| ----------------- | ------------------------------------ |
+| INTP_TSERVER_HOST | Submarine Interpreter Thrift IP      |
+| INTP_TSERVER_PORT | Submarine Interpreter Thrift port    |
+| INTP_START_TIME   | Submarine Interpreter start time     |
+| HEARTBEAT         | Submarine Interpreter heartbeat time |
+
+### Network fault tolerance
+
+In a distributed environment, there may be network anomalies, network delays, or service exceptions. After submitting metadata to the cluster, 
+check whether the submission is successful. After the submission fails, save the metadata in the local message queue. a separate commit thread to retry;
+
+### Cluster monitoring
+
+The cluster needs to monitor whether the Submarine Server and Submarine-Interpreter processes are working properly.
+
+The Submarine Server and Submarine Interpreter processes periodically send heartbeats to update their own timestamps in the cluster metadata. 
+
+The Submarine Server with Leader identity periodically checks the timestamps of the Submarine Server and Submarine Interpreter processes to clear the timeout services and processes.
+
+1. The cluster monitoring module runs in each Submarine Server and Submarine Interpreter process, 
+periodically sending heartbeat data of the service or process to the cluster;
+
+2. When the cluster monitoring module runs in Submarine Server, it sends the heartbeat to the cluster's ClusterStateMachine. 
+If the cluster does not receive heartbeat information for a long time, Indicates that the service or process is abnormal and unavailable.
+
+3. Resource usage statistics strategy, in order to avoid the instantaneous high peak and low peak of the server, 
+the cluster monitoring will collect the average resource usage in the most recent period for reporting, and improve the reasonable line and effectiveness of the server resources as much as possible;
+
+4. When the cluster monitoring module runs in Submarine Server, it checks the heartbeat data of each Submarine Server and Submarine Interpreter process. 
+If it times out, it considers that the service or process is abnormally unavailable and removes it from the cluster.
+
+### Atomix Raft algorithm library
+
+In order to reduce the deployment complexity of distributed mode, submarine server does not use zookeeper to build a distributed cluster. 
+Multiple submarine server groups are built into distributed clusters by using the Raft algorithm in submarine server. 
+The Raft algorithm is involved by atomix lib of atomix that has passed Jepsen consistency verification.
+
+### Synchronize workbench notes
+
+In cluster mode, the user creates, modifies, and deletes the note on any of the servers. 
+all need to be notified to all the servers in the cluster to synchronize the update of Notebook. 
+failure to do so will result in the user not being able to continue while switching to another server.
+
+### Listen for note update events
+
+Listen for the NEW_NOTE, DEL_NOTE, REMOVE_NOTE_TO_TRASH ... event of the notebook in the NotebookServer#onMessage() function.
+
+### Broadcast note update event
+
+The note is refreshed by notifying the event to all Submarine servers in the cluster via messagingService.
diff --git a/pom.xml b/pom.xml
index 3706e90..0b95abb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,7 +71,7 @@
     <commons-configuration.version>1.10</commons-configuration.version>
     <commons-httpclient.version>3.1</commons-httpclient.version>
 
-    <cglib.version>2.2.2</cglib.version>
+    <cglib.version>3.2.2</cglib.version>
     <mybatis.version>3.2.8</mybatis.version>
     <mysql-connector-java.version>5.1.39</mysql-connector-java.version>
 
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
index 5745751..71dec98 100644
--- a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
@@ -200,7 +200,7 @@ public abstract class ClusterManager {
   }
 
   public void start() {
-    if (!sconf.isClusterMode()) {
+    if (!sconf.workbenchIsClusterMode()) {
       return;
     }
 
@@ -291,7 +291,7 @@ public abstract class ClusterManager {
 
   // cluster shutdown
   public void shutdown() {
-    if (!sconf.isClusterMode()) {
+    if (!sconf.workbenchIsClusterMode()) {
       return;
     }
 
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerClient.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerClient.java
index d5bc7a7..930930e 100644
--- a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerClient.java
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerClient.java
@@ -63,7 +63,7 @@ public class ClusterManagerClient extends ClusterManager {
   // In the ClusterManagerClient metaKey equal interperterGroupId
   public void start(String metaKey) {
     LOG.info("ClusterManagerClient::start({})", metaKey);
-    if (!sconf.isClusterMode()) {
+    if (!sconf.workbenchIsClusterMode()) {
       return;
     }
     super.start();
@@ -74,7 +74,7 @@ public class ClusterManagerClient extends ClusterManager {
   }
 
   public void shutdown() {
-    if (!sconf.isClusterMode()) {
+    if (!sconf.workbenchIsClusterMode()) {
       return;
     }
     clusterMonitor.shutdown();
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerServer.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerServer.java
index fded6e1..be3273d 100644
--- a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerServer.java
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerServer.java
@@ -84,7 +84,7 @@ public class ClusterManagerServer extends ClusterManager {
   }
 
   public void start() {
-    if (!sconf.isClusterMode()) {
+    if (!sconf.workbenchIsClusterMode()) {
       return;
     }
 
@@ -212,7 +212,7 @@ public class ClusterManagerServer extends ClusterManager {
 
   @Override
   public void shutdown() {
-    if (!sconf.isClusterMode()) {
+    if (!sconf.workbenchIsClusterMode()) {
       return;
     }
 
diff --git a/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java b/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java
index cf1c107..6a4c39a 100644
--- a/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java
+++ b/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java
@@ -48,11 +48,11 @@ public class ClusterMultiNodeTest {
     LOG.info("ClusterMultiNodeTest::startCluster >>>");
 
     String clusterAddrList = "";
-    String zServerHost = NetUtils.findAvailableHostAddress();
+    String serverHost = NetUtils.findAvailableHostAddress();
     for (int i = 0; i < 3; i++) {
       // Set the cluster IP and port
-      int zServerPort = NetUtils.findRandomAvailablePortOnAllLocalInterfaces();
-      clusterAddrList += zServerHost + ":" + zServerPort;
+      int serverPort = NetUtils.findRandomAvailablePortOnAllLocalInterfaces();
+      clusterAddrList += serverHost + ":" + serverPort;
       if (i != 2) {
         clusterAddrList += ",";
       }
diff --git a/submarine-commons/commons-utils/pom.xml b/submarine-commons/commons-utils/pom.xml
index e58d6ea..0453160 100644
--- a/submarine-commons/commons-utils/pom.xml
+++ b/submarine-commons/commons-utils/pom.xml
@@ -39,6 +39,18 @@
       <groupId>commons-configuration</groupId>
       <artifactId>commons-configuration</artifactId>
       <version>${commons-configuration.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-lang</groupId>
+          <artifactId>commons-lang</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <version>${commons-lang.version}</version>
     </dependency>
 
     <dependency>
diff --git a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
index 8fe12ea..2ddb50c 100644
--- a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
+++ b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
@@ -42,7 +42,7 @@ public class SubmarineConfiguration extends XMLConfiguration {
 
   private Map<String, String> properties = new HashMap<>();
 
-  public SubmarineConfiguration(URL url) throws ConfigurationException {
+  private SubmarineConfiguration(URL url) throws ConfigurationException {
     setDelimiterParsingDisabled(true);
     load(url);
     initProperties();
@@ -135,6 +135,11 @@ public class SubmarineConfiguration extends XMLConfiguration {
     return getInt(ConfVars.SERVER_PORT);
   }
 
+  @VisibleForTesting
+  public void setServerPort(int port) {
+    properties.put(ConfVars.SERVER_PORT.getVarName(), String.valueOf(port));
+  }
+
   public int getServerSslPort() {
     return getInt(ConfVars.SERVER_SSL_PORT);
   }
@@ -250,7 +255,7 @@ public class SubmarineConfiguration extends XMLConfiguration {
     properties.put(ConfVars.WORKBENCH_CLUSTER_ADDR.getVarName(), clusterAddr);
   }
 
-  public boolean isClusterMode() {
+  public boolean workbenchIsClusterMode() {
     String clusterAddr = getString(ConfVars.WORKBENCH_CLUSTER_ADDR);
     if (StringUtils.isEmpty(clusterAddr)) {
       return false;
diff --git a/submarine-workbench/workbench-server/pom.xml b/submarine-workbench/workbench-server/pom.xml
index f1ee9f3..4d2ee67 100644
--- a/submarine-workbench/workbench-server/pom.xml
+++ b/submarine-workbench/workbench-server/pom.xml
@@ -35,10 +35,9 @@
   <name>Submarine: Workbench Server</name>
 
   <dependencies>
-
     <dependency>
       <groupId>org.apache.submarine</groupId>
-      <artifactId>commons-utils</artifactId>
+      <artifactId>commons-cluster</artifactId>
       <version>${submarine.version}</version>
     </dependency>
 
@@ -125,18 +124,6 @@
     </dependency>
 
     <dependency>
-      <groupId>commons-configuration</groupId>
-      <artifactId>commons-configuration</artifactId>
-      <version>${commons-configuration.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>commons-collections</groupId>
-      <artifactId>commons-collections</artifactId>
-      <version>${commons-collections.version}</version>
-    </dependency>
-
-    <dependency>
       <groupId>cglib</groupId>
       <artifactId>cglib</artifactId>
       <version>${cglib.version}</version>
diff --git a/submarine-workbench/workbench-server/src/main/java/org/apache/submarine/server/WorkbenchServer.java b/submarine-workbench/workbench-server/src/main/java/org/apache/submarine/server/WorkbenchServer.java
index 8e4a381..01c937f 100644
--- a/submarine-workbench/workbench-server/src/main/java/org/apache/submarine/server/WorkbenchServer.java
+++ b/submarine-workbench/workbench-server/src/main/java/org/apache/submarine/server/WorkbenchServer.java
@@ -20,6 +20,7 @@ package org.apache.submarine.server;
 
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.submarine.websocket.NotebookServer;
+import org.apache.submarine.commons.cluster.ClusterManagerServer;
 import org.eclipse.jetty.http.HttpVersion;
 import org.eclipse.jetty.server.HttpConfiguration;
 import org.eclipse.jetty.server.HttpConnectionFactory;
@@ -91,6 +92,9 @@ public class WorkbenchServer extends ResourceConfig {
     // Notebook server
     setupNotebookServer(webApp, conf, sharedServiceLocator);
 
+    // Cluster Manager Server
+    setupClusterManagerServer();
+
     startServer();
   }
 
@@ -218,6 +222,13 @@ public class WorkbenchServer extends ResourceConfig {
     webapp.addServlet(servletHolder, "/ws/*");
   }
 
+  private static void setupClusterManagerServer() {
+    if (conf.workbenchIsClusterMode()) {
+      ClusterManagerServer clusterManagerServer = ClusterManagerServer.getInstance();
+      clusterManagerServer.start();
+    }
+  }
+
   private static SslContextFactory getSslContextFactory(SubmarineConfiguration conf) {
     SslContextFactory sslContextFactory = new SslContextFactory();
 
diff --git a/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/AbstractWorkbenchServerTest.java b/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/AbstractWorkbenchServerTest.java
index 681d20a..c3140da 100644
--- a/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/AbstractWorkbenchServerTest.java
+++ b/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/AbstractWorkbenchServerTest.java
@@ -46,20 +46,6 @@ public abstract class AbstractWorkbenchServerTest {
 
   protected static File workbenchServerHome;
   protected static File confDir;
-  protected static File notebookDir;
-
-  private String getUrl(String path) {
-    String url;
-    if (System.getProperty("url") != null) {
-      url = System.getProperty("url");
-    } else {
-      url = "http://localhost:8080";
-    }
-    if (path != null) {
-      url += path;
-    }
-    return url;
-  }
 
   public static String getWebsocketApiUrlToTest() {
     String websocketUrl = "ws://localhost:8080" + WEBSOCKET_API_URL;
@@ -91,10 +77,8 @@ public abstract class AbstractWorkbenchServerTest {
     }
   };
 
-  private static void start(boolean withAuth, String testClassName,
-      boolean withKnox, boolean cleanData) throws Exception {
-    LOG.info("Starting WorkbenchServer withAuth: {}, testClassName: {}, withKnox: {}",
-        withAuth, testClassName, withKnox);
+  public static void startUp(String testClassName) throws Exception {
+    LOG.info("Starting WorkbenchServer testClassName: {}", testClassName);
 
     if (!WAS_RUNNING) {
       // copy the resources files to a temp folder
@@ -108,10 +92,6 @@ public abstract class AbstractWorkbenchServerTest {
           new File("../workbench-web/dist").getAbsolutePath());
       System.setProperty(SubmarineConfiguration.ConfVars.SUBMARINE_CONF_DIR.getVarName(),
           confDir.getAbsolutePath());
-      notebookDir = new File(workbenchServerHome.getAbsolutePath() + "/notebook_" + testClassName);
-      if (cleanData) {
-        FileUtils.deleteDirectory(notebookDir);
-      }
 
       // some test profile does not build workbench-web.
       // to prevent submarine workbench server starting up fail,
@@ -138,22 +118,6 @@ public abstract class AbstractWorkbenchServerTest {
     }
   }
 
-  public static void startUpWithKnoxEnable(String testClassName) throws Exception {
-    start(true, testClassName, true, true);
-  }
-
-  public static void startUpWithAuthenticationEnable(String testClassName) throws Exception {
-    start(true, testClassName, false, true);
-  }
-
-  public static void startUp(String testClassName) throws Exception {
-    start(false, testClassName, false, true);
-  }
-
-  public static void startUp(String testClassName, boolean cleanData) throws Exception {
-    start(false, testClassName, false, cleanData);
-  }
-
   private static String getHostname() {
     try {
       return InetAddress.getLocalHost().getHostName();
diff --git a/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/WorkbenchClusterServerTest.java b/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/WorkbenchClusterServerTest.java
new file mode 100644
index 0000000..7742451
--- /dev/null
+++ b/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/WorkbenchClusterServerTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.server;
+
+import org.apache.submarine.commons.cluster.ClusterManagerClient;
+import org.apache.submarine.commons.cluster.meta.ClusterMetaType;
+import org.apache.submarine.commons.utils.NetUtils;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class WorkbenchClusterServerTest {
+  private static final Logger LOG = LoggerFactory.getLogger(WorkbenchClusterServerTest.class);
+
+  private static ClusterManagerClient clusterClient = null;
+
+  @BeforeClass
+  public static void start() throws Exception {
+    LOG.info("WorkbenchClusterServerTest:start()");
+
+    SubmarineConfiguration conf = SubmarineConfiguration.create();
+    String serverHost = NetUtils.findAvailableHostAddress();
+    int serverPort = NetUtils.findRandomAvailablePortOnAllLocalInterfaces();
+    String clusterAdd = serverHost + ":" + serverPort;
+    conf.setClusterAddress(clusterAdd);
+
+    // Run the workbench service in a thread
+    AbstractWorkbenchServerTest.startUp(WorkbenchClusterServerTest.class.getSimpleName());
+
+    // Mock Cluster client
+    Class clazz = ClusterManagerClient.class;
+    Constructor constructor = null;
+    constructor = clazz.getDeclaredConstructor();
+    constructor.setAccessible(true);
+    clusterClient = (ClusterManagerClient) constructor.newInstance();
+    clusterClient.start("TestWorkbenchClusterServer");
+
+    // Waiting for cluster startup
+    int wait = 0;
+    while (wait++ < 100) {
+      if (clusterClient.raftInitialized()) {
+        LOG.info("TestWorkbenchClusterServer::start {}(ms) found cluster leader", wait * 3000);
+        break;
+      }
+
+      sleep(3000);
+    }
+
+    assertTrue("Can not start Submarine workbench server!", clusterClient.raftInitialized());
+
+    // Waiting for the workbench server to register in the cluster
+    sleep(5000);
+  }
+
+  @AfterClass
+  public static void stop() throws Exception {
+    LOG.info("WorkbenchClusterServerTest::stop >>>");
+    AbstractWorkbenchServerTest.shutDown();
+
+    if (null != clusterClient) {
+      clusterClient.shutdown();
+    }
+    LOG.info("WorkbenchClusterServerTest::stop <<<");
+  }
+
+  @Test
+  public void testGetWorkbenchClusterMeta() {
+    LOG.info("TestWorkbenchClusterServer::testGetWorkbenchClusterMeta >>>");
+    // Get metadata for workbench server
+    Object srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
+    LOG.info("testGetWorkbenchClusterMeta = {}", srvMeta.toString());
+
+    assertNotNull(srvMeta);
+    assertEquals(true, (srvMeta instanceof HashMap));
+    HashMap hashMap = (HashMap) srvMeta;
+
+    assertEquals(hashMap.size(), 1);
+    LOG.info("TestWorkbenchClusterServer::testGetWorkbenchClusterMeta <<<");
+  }
+}