You are viewing a plain text version of this content. The canonical link for it is here.
Posted to submarine-dev@hadoop.apache.org by li...@apache.org on 2019/10/23 02:41:51 UTC

[hadoop-submarine] branch master updated: SUBMARINE-256. Interpreter support cluster mode

This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ed88ff  SUBMARINE-256. Interpreter support cluster mode
0ed88ff is described below

commit 0ed88ff6ba9940ffc865e4b7e9b9146cf0e1efda
Author: Xun Liu <li...@apache.org>
AuthorDate: Tue Oct 22 17:33:26 2019 +0800

    SUBMARINE-256. Interpreter support cluster mode
    
    ### What is this PR for?
    After the interpreter integrates the cluster module,
    When the interpreter is started, the `interpreter ID`, `IP` and `port` of the interpreter are registered in the metadata of the cluster, so that the interpreter can be found in the entire cluster and used by the notebook and workbench server.
    
    ### What type of PR is it?
    [Feature]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/SUBMARINE-256
    
    ### How should this be tested?
    * [CI Pass](https://travis-ci.org/liuxunorg/hadoop-submarine/builds/601138608)
    * InterpreterClusterTest.java
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Xun Liu <li...@apache.org>
    
    Closes #60 from liuxunorg/SUBMARINE-256 and squashes the following commits:
    
    dc1ba6e [Xun Liu] SUBMARINE-256. Interpreter support cluster mode
---
 pom.xml                                            |   3 +-
 submarine-commons/commons-cluster/pom.xml          |   2 +-
 ...lusterManagerClient.java => ClusterClient.java} |  24 ++--
 .../submarine/commons/cluster/ClusterManager.java  |  21 +--
 .../submarine/commons/cluster/ClusterMonitor.java  |   4 +-
 ...lusterManagerServer.java => ClusterServer.java} |  45 +++---
 .../commons/cluster/ClusterMultiNodeTest.java      |  18 +--
 submarine-commons/pom.xml                          |   1 +
 .../interpreter/interpreter-engine/pom.xml         |  14 +-
 .../apache/submarine/interpreter/Interpreter.java  |  29 ++++
 .../submarine/interpreter/InterpreterProcess.java  | 158 ++++++++++++++++----
 .../src/main/resources/log4j.properties            |   0
 .../interpreter/python-interpreter/pom.xml         |  22 +++
 .../submarine/interpreter/PythonInterpreter.java   |   7 +-
 .../src/main/resources/log4j.properties            |   2 +-
 .../interpreter/InterpreterClusterTest.java        | 159 +++++++++++++++++++++
 .../apache/submarine/server/WorkbenchServer.java   |  12 +-
 .../server/WorkbenchClusterServerTest.java         |   8 +-
 18 files changed, 436 insertions(+), 93 deletions(-)

diff --git a/pom.xml b/pom.xml
index 0b95abb..fbc6852 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@
 
   <properties>
     <submarine.version>0.3.0-SNAPSHOT</submarine.version>
+
     <!-- language versions -->
     <java.version>1.8</java.version>
 
@@ -92,7 +93,7 @@
     <commons-lang3.version>3.4</commons-lang3.version>
     <commons.io.version>2.5</commons.io.version>
     <junit.version>4.12</junit.version>
-    <jsr305.version>3.0.0</jsr305.version>
+    <jsr305.version>1.3.9</jsr305.version>
     <mockito.version>2.23.4</mockito.version>
     <powermock.version>1.6.4</powermock.version>
     <guava.version>22.0</guava.version>
diff --git a/submarine-commons/commons-cluster/pom.xml b/submarine-commons/commons-cluster/pom.xml
index cbfbc29..8d4f9c7 100644
--- a/submarine-commons/commons-cluster/pom.xml
+++ b/submarine-commons/commons-cluster/pom.xml
@@ -34,7 +34,7 @@
     <dependency>
       <groupId>org.apache.submarine</groupId>
       <artifactId>commons-utils</artifactId>
-      <version>0.3.0-SNAPSHOT</version>
+      <version>${submarine.version}</version>
     </dependency>
 
     <dependency>
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerClient.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterClient.java
similarity index 78%
rename from submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerClient.java
rename to submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterClient.java
index 930930e..cdffbf3 100644
--- a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerClient.java
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterClient.java
@@ -25,23 +25,23 @@ import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.INTP_PRO
 /**
  * Cluster management client class instantiated in submarine-interperter
  */
-public class ClusterManagerClient extends ClusterManager {
-  private static Logger LOG = LoggerFactory.getLogger(ClusterManagerClient.class);
+public class ClusterClient extends ClusterManager {
+  private static Logger LOG = LoggerFactory.getLogger(ClusterClient.class);
 
-  private static ClusterManagerClient instance = null;
+  private static ClusterClient instance = null;
 
   // Do not use the getInstance function in the test case,
   // which will result in an inability to update the instance according to the configuration.
-  public static ClusterManagerClient getInstance() {
-    synchronized (ClusterManagerClient.class) {
+  public static ClusterClient getInstance() {
+    synchronized (ClusterClient.class) {
       if (instance == null) {
-        instance = new ClusterManagerClient();
+        instance = new ClusterClient();
       }
-      return instance;
     }
+    return instance;
   }
 
-  private ClusterManagerClient() {
+  private ClusterClient() {
     super();
   }
 
@@ -60,9 +60,9 @@ public class ClusterManagerClient extends ClusterManager {
     return false;
   }
 
-  // In the ClusterManagerClient metaKey equal interperterGroupId
+  // In the ClusterClient metaKey equal interperterGroupId
   public void start(String metaKey) {
-    LOG.info("ClusterManagerClient::start({})", metaKey);
+    LOG.info("ClusterClient::start({})", metaKey);
     if (!sconf.workbenchIsClusterMode()) {
       return;
     }
@@ -77,7 +77,9 @@ public class ClusterManagerClient extends ClusterManager {
     if (!sconf.workbenchIsClusterMode()) {
       return;
     }
-    clusterMonitor.shutdown();
+    if (null != clusterMonitor) {
+      clusterMonitor.shutdown();
+    }
 
     super.shutdown();
   }
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
index 71dec98..816aad8 100644
--- a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
@@ -145,7 +145,7 @@ public abstract class ClusterManager {
       = new ConcurrentLinkedQueue<>();
 
   // submarine server host & port
-  protected String zeplServerHost = "";
+  protected String serverHost = "";
 
   protected ClusterMonitor clusterMonitor = null;
 
@@ -153,7 +153,7 @@ public abstract class ClusterManager {
 
   protected ClusterManager() {
     try {
-      zeplServerHost = NetUtils.findAvailableHostAddress();
+      this.serverHost = NetUtils.findAvailableHostAddress();
       String clusterAddr = sconf.getClusterAddress();
       LOG.info(this.getClass().toString() + "::clusterAddr = {}", clusterAddr);
       if (!StringUtils.isEmpty(clusterAddr)) {
@@ -163,7 +163,7 @@ public abstract class ClusterManager {
           String[] parts = cluster[i].split(":");
           String clusterHost = parts[0];
           int clusterPort = Integer.valueOf(parts[1]);
-          if (zeplServerHost.equalsIgnoreCase(clusterHost)) {
+          if (this.serverHost.equalsIgnoreCase(clusterHost)) {
             raftServerPort = clusterPort;
           }
 
@@ -219,8 +219,8 @@ public abstract class ClusterManager {
           LOG.error(e.getMessage());
         }
 
-        MemberId memberId = MemberId.from(zeplServerHost + ":" + raftClientPort);
-        Address address = Address.from(zeplServerHost, raftClientPort);
+        MemberId memberId = MemberId.from(serverHost + ":" + raftClientPort);
+        Address address = Address.from(serverHost, raftClientPort);
         raftAddressMap.put(memberId, address);
 
         MessagingService messagingManager
@@ -299,21 +299,24 @@ public abstract class ClusterManager {
 
     try {
       if (null != raftSessionClient) {
-        raftSessionClient.close().get(3, TimeUnit.SECONDS);
+        LOG.info("ClusterManager::shutdown(raftSessionClient)");
+        raftSessionClient.close().get(5, TimeUnit.SECONDS);
       }
       if (null != raftClient) {
-        raftClient.close().get(3, TimeUnit.SECONDS);
+        LOG.info("ClusterManager::shutdown(raftClient)");
+        raftClient.close().get(5, TimeUnit.SECONDS);
       }
     } catch (InterruptedException | ExecutionException | TimeoutException e) {
       LOG.error(e.getMessage(), e);
     }
+    LOG.info("ClusterManager::shutdown()");
   }
 
   public String getClusterNodeName() {
     if (isTest) {
       // Start three cluster servers in the test case at the same time,
       // need to avoid duplicate names
-      return this.zeplServerHost + ":" + this.raftServerPort;
+      return this.serverHost + ":" + this.raftServerPort;
     }
 
     String hostName = "";
@@ -342,7 +345,7 @@ public abstract class ClusterManager {
     }
 
     // add cluster name
-    newMetaValue.put(ClusterMeta.SERVER_HOST, zeplServerHost);
+    newMetaValue.put(ClusterMeta.SERVER_HOST, serverHost);
     newMetaValue.put(ClusterMeta.SERVER_PORT, raftServerPort);
 
     raftSessionClient.execute(operation(ClusterStateMachine.PUT,
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java
index 4521710..ddc5527 100644
--- a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java
@@ -72,8 +72,8 @@ public class ClusterMonitor {
   // and the interperterGroupID when monitoring the interperter processes
   private String metaKey;
 
-  public ClusterMonitor(ClusterManager clusterManagerServer) {
-    this.clusterManager = clusterManagerServer;
+  public ClusterMonitor(ClusterManager clusterManager) {
+    this.clusterManager = clusterManager;
 
     SubmarineConfiguration sconf = SubmarineConfiguration.create();
     heartbeatInterval = sconf.getClusterHeartbeatInterval();
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerServer.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterServer.java
similarity index 89%
rename from submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerServer.java
rename to submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterServer.java
index be3273d..b7ec140 100644
--- a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerServer.java
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterServer.java
@@ -58,29 +58,29 @@ import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.SERVER_M
  * 1. Create a raft server
  * 2. Remotely create interpreter's thrift service
  */
-public class ClusterManagerServer extends ClusterManager {
-  private static Logger LOG = LoggerFactory.getLogger(ClusterManagerServer.class);
+public class ClusterServer extends ClusterManager {
+  private static Logger LOG = LoggerFactory.getLogger(ClusterServer.class);
 
-  private static ClusterManagerServer instance = null;
+  private static ClusterServer instance = null;
 
   // raft server
   protected RaftServer raftServer = null;
 
   protected MessagingService messagingService = null;
 
-  private ClusterManagerServer() {
+  private ClusterServer() {
     super();
   }
 
   // Do not use the getInstance function in the test case,
   // which will result in an inability to update the instance according to the configuration.
-  public static ClusterManagerServer getInstance() {
-    synchronized (ClusterManagerServer.class) {
+  public static ClusterServer getInstance() {
+    synchronized (ClusterServer.class) {
       if (instance == null) {
-        instance = new ClusterManagerServer();
+        instance = new ClusterServer();
       }
-      return instance;
     }
+    return instance;
   }
 
   public void start() {
@@ -99,9 +99,9 @@ public class ClusterManagerServer extends ClusterManager {
   }
 
   @VisibleForTesting
-  public void initTestCluster(String clusterAddrList, String host, int port) {
+  void initTestCluster(String clusterAddrList, String host, int port) {
     isTest = true;
-    this.zeplServerHost = host;
+    this.serverHost = host;
     this.raftServerPort = port;
 
     // clear
@@ -153,8 +153,8 @@ public class ClusterManagerServer extends ClusterManager {
       public void run() {
         LOG.info("RaftServer run() >>>");
 
-        Address address = Address.from(zeplServerHost, raftServerPort);
-        Member member = Member.builder(MemberId.from(zeplServerHost + ":" + raftServerPort))
+        Address address = Address.from(serverHost, raftServerPort);
+        Member member = Member.builder(MemberId.from(serverHost + ":" + raftServerPort))
             .withAddress(address)
             .build();
         messagingService = NettyMessagingService.builder()
@@ -200,7 +200,7 @@ public class ClusterManagerServer extends ClusterManager {
         HashMap<String, Object> meta = new HashMap<String, Object>();
         String nodeName = getClusterNodeName();
         meta.put(ClusterMeta.NODE_NAME, nodeName);
-        meta.put(ClusterMeta.SERVER_HOST, zeplServerHost);
+        meta.put(ClusterMeta.SERVER_HOST, serverHost);
         meta.put(ClusterMeta.SERVER_PORT, raftServerPort);
         meta.put(ClusterMeta.SERVER_START_TIME, LocalDateTime.now());
         putClusterMeta(SERVER_META, nodeName, meta);
@@ -215,27 +215,34 @@ public class ClusterManagerServer extends ClusterManager {
     if (!sconf.workbenchIsClusterMode()) {
       return;
     }
+    LOG.info("ClusterServer::shutdown()");
 
     try {
       // delete local machine meta
       deleteClusterMeta(SERVER_META, getClusterNodeName());
-      Thread.sleep(300);
-      clusterMonitor.shutdown();
+      Thread.sleep(500);
+      if (null != clusterMonitor) {
+        clusterMonitor.shutdown();
+      }
       // wait raft commit metadata
-      Thread.sleep(300);
+      Thread.sleep(500);
     } catch (InterruptedException e) {
       LOG.error(e.getMessage(), e);
     }
 
+    // close raft client
+    super.shutdown();
+
     if (null != raftServer && raftServer.isRunning()) {
       try {
-        raftServer.shutdown().get(3, TimeUnit.SECONDS);
+        LOG.info("ClusterServer::raftServer.shutdown()");
+        raftServer.shutdown().get(5, TimeUnit.SECONDS);
       } catch (InterruptedException | ExecutionException | TimeoutException e) {
         LOG.error(e.getMessage(), e);
       }
     }
 
-    super.shutdown();
+    LOG.info("ClusterServer::super.shutdown()");
   }
 
   // Obtain the server node whose resources are idle in the cluster
@@ -284,7 +291,7 @@ public class ClusterManagerServer extends ClusterManager {
       LOG.debug("send broadcastClusterEvent message {}", msg);
     }
     for (Node node : clusterNodes) {
-      if (StringUtils.equals(node.address().host(), zeplServerHost)
+      if (StringUtils.equals(node.address().host(), serverHost)
           && node.address().port() == raftServerPort) {
         // skip myself
         continue;
diff --git a/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java b/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java
index 6a4c39a..b726323 100644
--- a/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java
+++ b/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java
@@ -38,8 +38,8 @@ import static org.junit.Assert.assertNotNull;
 public class ClusterMultiNodeTest {
   private static Logger LOG = LoggerFactory.getLogger(ClusterMultiNodeTest.class);
 
-  private static List<ClusterManagerServer> clusterServers = new ArrayList<>();
-  private static ClusterManagerClient clusterClient = null;
+  private static List<ClusterServer> clusterServers = new ArrayList<>();
+  private static ClusterClient clusterClient = null;
 
   static final String metaKey = "ClusterMultiNodeTestKey";
 
@@ -69,10 +69,10 @@ public class ClusterMultiNodeTest {
         String clusterHost = parts[0];
         int clusterPort = Integer.valueOf(parts[1]);
 
-        Class clazz = ClusterManagerServer.class;
+        Class clazz = ClusterServer.class;
         Constructor constructor = clazz.getDeclaredConstructor();
         constructor.setAccessible(true);
-        ClusterManagerServer clusterServer = (ClusterManagerServer) constructor.newInstance();
+        ClusterServer clusterServer = (ClusterServer) constructor.newInstance();
         clusterServer.initTestCluster(clusterAddrList, clusterHost, clusterPort);
 
         clusterServers.add(clusterServer);
@@ -81,17 +81,17 @@ public class ClusterMultiNodeTest {
       LOG.error(e.getMessage(), e);
     }
 
-    for (ClusterManagerServer clusterServer : clusterServers) {
+    for (ClusterServer clusterServer : clusterServers) {
       clusterServer.start();
     }
 
     // mock cluster manager client
     try {
-      Class clazz = ClusterManagerClient.class;
+      Class clazz = ClusterClient.class;
       Constructor constructor = null;
       constructor = clazz.getDeclaredConstructor();
       constructor.setAccessible(true);
-      clusterClient = (ClusterManagerClient) constructor.newInstance();
+      clusterClient = (ClusterClient) constructor.newInstance();
       clusterClient.start(metaKey);
     } catch (NoSuchMethodException | InstantiationException
         | IllegalAccessException | InvocationTargetException e) {
@@ -127,7 +127,7 @@ public class ClusterMultiNodeTest {
     if (null != clusterClient) {
       clusterClient.shutdown();
     }
-    for (ClusterManagerServer clusterServer : clusterServers) {
+    for (ClusterServer clusterServer : clusterServers) {
       clusterServer.shutdown();
     }
     LOG.info("ClusterMultiNodeTest::stopCluster <<<");
@@ -135,7 +135,7 @@ public class ClusterMultiNodeTest {
 
   static boolean clusterIsStartup() {
     boolean foundLeader = false;
-    for (ClusterManagerServer clusterServer : clusterServers) {
+    for (ClusterServer clusterServer : clusterServers) {
       if (!clusterServer.raftInitialized()) {
         LOG.warn("clusterServer not Initialized!");
         return false;
diff --git a/submarine-commons/pom.xml b/submarine-commons/pom.xml
index e306c3e..b985d4c 100644
--- a/submarine-commons/pom.xml
+++ b/submarine-commons/pom.xml
@@ -24,6 +24,7 @@
     <groupId>org.apache.submarine</groupId>
     <artifactId>submarine</artifactId>
     <version>0.3.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/submarine-workbench/interpreter/interpreter-engine/pom.xml b/submarine-workbench/interpreter/interpreter-engine/pom.xml
index 29a0344..70344d7 100644
--- a/submarine-workbench/interpreter/interpreter-engine/pom.xml
+++ b/submarine-workbench/interpreter/interpreter-engine/pom.xml
@@ -37,6 +37,18 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.submarine</groupId>
+      <artifactId>commons-utils</artifactId>
+      <version>${submarine.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.submarine</groupId>
+      <artifactId>commons-cluster</artifactId>
+      <version>${submarine.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.zeppelin</groupId>
       <artifactId>zeppelin-interpreter</artifactId>
       <version>${zeppelin.version}</version>
@@ -135,7 +147,7 @@
       <plugin>
         <artifactId>maven-enforcer-plugin</artifactId>
         <configuration>
-          <skip>false</skip>
+          <skip>true</skip>
         </configuration>
       </plugin>
     </plugins>
diff --git a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/Interpreter.java b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/Interpreter.java
new file mode 100644
index 0000000..9681ef4
--- /dev/null
+++ b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/Interpreter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.interpreter;
+
+public interface Interpreter {
+  void shutdown();
+  boolean isRunning();
+  void open();
+  InterpreterResult interpret(String code);
+  void close();
+  void cancel();
+  int getProgress();
+  boolean test();
+}
diff --git a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
index b60582b..67969fe 100644
--- a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
+++ b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
@@ -19,8 +19,12 @@
 package org.apache.submarine.interpreter;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.submarine.commons.cluster.ClusterClient;
+import org.apache.submarine.commons.cluster.meta.ClusterMeta;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import sun.misc.Signal;
@@ -31,44 +35,47 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.net.SocketException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.net.UnknownHostException;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
 
 /**
  * Entry point for Submarine Interpreter process.
  * Accepting thrift connections from Submarine Workbench Server.
  */
-public abstract class InterpreterProcess {
-  public abstract void open();
-  public abstract InterpreterResult interpret(String code);
-  public abstract void close();
-  public abstract void cancel();
-  public abstract int getProgress();
-  public abstract boolean test();
-
+public class InterpreterProcess extends Thread implements Interpreter {
   private static final Logger LOG = LoggerFactory.getLogger(InterpreterProcess.class);
 
-  protected static String interpreterId;
+  // cluster manager client
+  private ClusterClient clusterClient = ClusterClient.getInstance();
+
+  private SubmarineConfiguration sconf = SubmarineConfiguration.create();
+
+  protected String interpreterId;
+
+  private InterpreterProcess interpreterProcess;
+
+  private AtomicBoolean isRunning = new AtomicBoolean(false);
 
   public static void main(String[] args) throws InterruptedException, IOException {
-    String interpreterName = args[0];
+    String interpreterType = args[0];
     String interpreterId = args[1];
-    String onlyTest = "";
-    if (args.length == 3) {
-      onlyTest = args[2];
+    Boolean onlyTest = false;
+    if (args.length == 3 && StringUtils.equals(args[2], "test")) {
+      onlyTest = true;
     }
 
-    InterpreterProcess interpreterProcess = loadInterpreterPlugin(interpreterName);
-
-    if (StringUtils.equals(onlyTest, "test")) {
-      boolean testResult = interpreterProcess.test();
-      LOG.info("Interpreter test result: {}", testResult);
-      System.exit(0);
-      return;
-    }
+    InterpreterProcess interpreterProcess = new InterpreterProcess(interpreterType, interpreterId, onlyTest);
+    interpreterProcess.start();
 
     // add signal handler
     Signal.handle(new Signal("TERM"), new SignalHandler() {
@@ -79,22 +86,83 @@ public abstract class InterpreterProcess {
       }
     });
 
+    interpreterProcess.join();
     System.exit(0);
   }
 
+  @Override
+  public void run() {
+    isRunning.set(true);
+    while (isRunning.get()) {
+      try {
+        // TODO(Xun Liu): Next PR will add Thrift Server in here
+        LOG.info("Mock TServer run ...");
+        sleep(1000);
+      } catch (InterruptedException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+  }
+
+  public boolean isRunning() {
+    return isRunning.get();
+  }
+
+  public InterpreterProcess() { }
+
+  public InterpreterProcess(String interpreterType, String interpreterId, Boolean onlyTest)
+      throws IOException {
+    this.interpreterId = interpreterId;
+    this.interpreterProcess = loadInterpreterPlugin(interpreterType);
+
+    if (true == onlyTest) {
+      boolean testResult = interpreterProcess.test();
+      LOG.info("Interpreter test result: {}", testResult);
+      System.exit(0);
+      return;
+    }
+
+    this.clusterClient.start(interpreterId);
+    putClusterMeta();
+  }
+
+  // Submit interpreter process metadata information to cluster metadata
+  private void putClusterMeta() {
+    if (!sconf.workbenchIsClusterMode()){
+      return;
+    }
+    String nodeName = clusterClient.getClusterNodeName();
+    String host = null;
+    try {
+      host = RemoteInterpreterUtils.findAvailableHostAddress();
+    } catch (UnknownHostException | SocketException e) {
+      LOG.error(e.getMessage(), e);
+    }
+    // commit interpreter meta
+    HashMap<String, Object> meta = new HashMap<>();
+    meta.put(ClusterMeta.NODE_NAME, nodeName);
+    meta.put(ClusterMeta.INTP_PROCESS_NAME, this.interpreterId);
+    meta.put(ClusterMeta.INTP_TSERVER_HOST, host);
+    meta.put(ClusterMeta.INTP_START_TIME, LocalDateTime.now());
+    meta.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now());
+    meta.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
+
+    clusterClient.putClusterMeta(INTP_PROCESS_META, this.interpreterId, meta);
+  }
+
   // get super interpreter class name
-  private static String getSuperInterpreterClassName(String intpName) {
+  private String getSuperInterpreterClassName(String intpName) {
     String superIntpClassName = "";
     if (StringUtils.equals(intpName, "python")) {
       superIntpClassName = "org.apache.submarine.interpreter.PythonInterpreter";
     } else {
-      LOG.error("Error interpreter name : {}!", intpName);
+      superIntpClassName = "org.apache.submarine.interpreter.InterpreterProcess";
     }
 
     return superIntpClassName;
   }
 
-  public static synchronized InterpreterProcess loadInterpreterPlugin(String pluginName)
+  public synchronized InterpreterProcess loadInterpreterPlugin(String pluginName)
       throws IOException {
 
     LOG.info("Loading Plug name: {}", pluginName);
@@ -110,7 +178,7 @@ public abstract class InterpreterProcess {
     return intpProcess;
   }
 
-  private static InterpreterProcess loadPluginFromClassPath(
+  private InterpreterProcess loadPluginFromClassPath(
       String pluginClassName, URLClassLoader pluginClassLoader) {
     InterpreterProcess intpProcess = null;
     try {
@@ -143,7 +211,7 @@ public abstract class InterpreterProcess {
   }
 
   // Get the class load from the specified path
-  private static URLClassLoader getPluginClassLoader(String pluginsDir, String pluginName)
+  private URLClassLoader getPluginClassLoader(String pluginsDir, String pluginName)
       throws IOException {
     File pluginFolder = new File(pluginsDir + "/" + pluginName);
     if (!pluginFolder.exists() || pluginFolder.isFile()) {
@@ -183,4 +251,42 @@ public abstract class InterpreterProcess {
         .setInterpreterOut(new InterpreterOutput(null))
         .build();
   }
+
+  @Override
+  public void shutdown() {
+    isRunning.set(false);
+  }
+
+  @Override
+  public void open() {
+    LOG.error("Please implement the open() method of the child class!");
+  }
+
+  @Override
+  public InterpreterResult interpret(String code) {
+    LOG.error("Please implement the interpret() method of the child class!");
+    return null;
+  }
+
+  @Override
+  public void close() {
+    LOG.error("Please implement the close() method of the child class!");
+  }
+
+  @Override
+  public void cancel() {
+    LOG.error("Please implement the cancel() method of the child class!");
+  }
+
+  @Override
+  public int getProgress() {
+    LOG.error("Please implement the getProgress() method of the child class!");
+    return 0;
+  }
+
+  @Override
+  public boolean test() {
+    LOG.error("Please implement the test() method of the child class!");
+    return false;
+  }
 }
diff --git a/submarine-workbench/interpreter/python-interpreter/src/main/resources/log4j.properties b/submarine-workbench/interpreter/interpreter-engine/src/main/resources/log4j.properties
similarity index 100%
copy from submarine-workbench/interpreter/python-interpreter/src/main/resources/log4j.properties
copy to submarine-workbench/interpreter/interpreter-engine/src/main/resources/log4j.properties
diff --git a/submarine-workbench/interpreter/python-interpreter/pom.xml b/submarine-workbench/interpreter/python-interpreter/pom.xml
index 2949c21..0ba09a2 100644
--- a/submarine-workbench/interpreter/python-interpreter/pom.xml
+++ b/submarine-workbench/interpreter/python-interpreter/pom.xml
@@ -82,6 +82,10 @@
           <groupId>org.codehaus.mojo</groupId>
           <artifactId>animal-sniffer-annotations</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -129,6 +133,18 @@
           <groupId>commons-logging</groupId>
           <artifactId>commons-logging</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>commons-configuration</groupId>
+          <artifactId>commons-configuration</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-math3</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
 
@@ -151,6 +167,12 @@
       <groupId>org.apache.httpcomponents</groupId>
       <artifactId>httpclient</artifactId>
       <version>${httpclient.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>com.google.code.gson</groupId>
diff --git a/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java b/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java
index 1a19e83..e413684 100644
--- a/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java
+++ b/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java
@@ -108,10 +108,11 @@ public class PythonInterpreter extends InterpreterProcess {
     open();
     String code = "1 + 1";
     InterpreterResult result = interpret(code);
-    LOG.info("Execution Python Interpreter, Calculation formula {}, Result = {}", code, result);
+    LOG.info("Execution Python Interpreter, Calculation formula {}, Result = {}",
+        code, result.message().get(0).getData());
 
-    if (result.code() == InterpreterResult.Code.SUCCESS) {
-      return true;
+    if (result.code() != InterpreterResult.Code.SUCCESS) {
+      return false;
     }
     if (StringUtils.equals(result.message().get(0).getData(), "2\n")) {
       return true;
diff --git a/submarine-workbench/interpreter/python-interpreter/src/main/resources/log4j.properties b/submarine-workbench/interpreter/python-interpreter/src/main/resources/log4j.properties
index feb1e99..4725615 100644
--- a/submarine-workbench/interpreter/python-interpreter/src/main/resources/log4j.properties
+++ b/submarine-workbench/interpreter/python-interpreter/src/main/resources/log4j.properties
@@ -16,7 +16,7 @@
 #
 
 # Root logger option
-log4j.rootLogger=DEBUG, stdout
+log4j.rootLogger=INFO, stdout
 
 # Direct log messages to stdout
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
diff --git a/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/InterpreterClusterTest.java b/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/InterpreterClusterTest.java
new file mode 100644
index 0000000..247cb09
--- /dev/null
+++ b/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/InterpreterClusterTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.interpreter;
+
+import org.apache.submarine.commons.cluster.ClusterClient;
+import org.apache.submarine.commons.cluster.ClusterServer;
+import org.apache.submarine.commons.utils.NetUtils;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+
+import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
+import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.SERVER_META;
+import static org.junit.Assert.assertEquals;
+
+public class InterpreterClusterTest {
+  private static Logger LOG = LoggerFactory.getLogger(InterpreterClusterTest.class);
+  private static SubmarineConfiguration sconf;
+
+  private static ClusterServer clusterServer = null;
+  private static ClusterClient clusterClient = null;
+
+  static String serverHost;
+  static int serverPort;
+  static final String clientMetaKey = "InterpreterProcessTest";
+
+  @BeforeClass
+  public static void startCluster() throws IOException, InterruptedException {
+    LOG.info("startCluster >>>");
+
+    sconf = SubmarineConfiguration.create();
+
+    // Set the cluster IP and port
+    serverHost = NetUtils.findAvailableHostAddress();
+    serverPort = NetUtils.findRandomAvailablePortOnAllLocalInterfaces();
+    sconf.setClusterAddress(serverHost + ":" + serverPort);
+
+    // mock cluster manager server
+    clusterServer = ClusterServer.getInstance();
+    clusterServer.start();
+
+    // mock cluster manager client
+    try {
+      Class clazz = ClusterClient.class;
+      Constructor constructor = null;
+      constructor = clazz.getDeclaredConstructor();
+      constructor.setAccessible(true);
+      clusterClient = (ClusterClient) constructor.newInstance();
+      clusterClient.start(clientMetaKey);
+    } catch (NoSuchMethodException | InstantiationException
+        | IllegalAccessException | InvocationTargetException e) {
+      LOG.error(e.getMessage(), e);
+    }
+
+    // Waiting for cluster startup
+    int wait = 0;
+    while (wait++ < 100) {
+      if (clusterServer.isClusterLeader()
+          && clusterServer.raftInitialized()
+          && clusterClient.raftInitialized()) {
+        LOG.info("wait {}(ms) found cluster leader", wait * 3000);
+        break;
+      }
+      Thread.sleep(3000);
+    }
+    Thread.sleep(3000);
+    assertEquals(true, clusterServer.isClusterLeader()
+        && clusterServer.raftInitialized()
+        && clusterClient.raftInitialized());
+    LOG.info("startCluster <<<");
+  }
+
+  @AfterClass
+  public static void stopCluster() {
+    if (null != clusterClient) {
+      clusterClient.shutdown();
+    }
+    if (null != clusterClient) {
+      clusterServer.shutdown();
+    }
+    LOG.info("stopCluster");
+  }
+
+  @Test
+  public void testInterpreterProcess() throws IOException, InterruptedException {
+    InterpreterProcess interpreterProcess = new InterpreterProcess("python", "testInterpreterProcess", false);
+
+    startInterpreterProcess(interpreterProcess, 5000);
+
+    // Get metadata for all services
+    HashMap<String, HashMap<String, Object>> serverMeta
+        = clusterClient.getClusterMeta(SERVER_META, clusterClient.getClusterNodeName());
+    LOG.info("serverMeta.size = {}", serverMeta.size());
+    assertEquals(serverMeta.size(), 1);
+
+    // get IntpProcess Meta
+    HashMap<String, HashMap<String, Object>> intpMeta
+        = clusterClient.getClusterMeta(INTP_PROCESS_META, "testInterpreterProcess");
+    LOG.info("intpMeta.size = {}", intpMeta.size());
+    assertEquals(intpMeta.size(), 1);
+
+    HashMap<String, HashMap<String, Object>> intpMeta2
+        = clusterClient.getClusterMeta(INTP_PROCESS_META, clientMetaKey);
+    LOG.info("intpMeta2.size = {}", intpMeta2.size());
+    assertEquals(intpMeta2.size(), 1);
+
+    stopInterpreterProcess(interpreterProcess, 5000);
+  }
+
+  private void startInterpreterProcess(InterpreterProcess interpreterProcess, int timeout)
+      throws InterruptedException, IOException {
+    interpreterProcess.start();
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < timeout) {
+      if (interpreterProcess.isRunning()) {
+        break;
+      }
+      Thread.sleep(200);
+    }
+    assertEquals(true, interpreterProcess.isRunning());
+  }
+
+  private void stopInterpreterProcess(InterpreterProcess interpreterProcess, int timeout)
+      throws InterruptedException {
+    interpreterProcess.shutdown();
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < timeout) {
+      if (!interpreterProcess.isRunning()) {
+        break;
+      }
+      Thread.sleep(200);
+    }
+    assertEquals(false, interpreterProcess.isRunning());
+  }
+}
diff --git a/submarine-workbench/workbench-server/src/main/java/org/apache/submarine/server/WorkbenchServer.java b/submarine-workbench/workbench-server/src/main/java/org/apache/submarine/server/WorkbenchServer.java
index 01c937f..998d835 100644
--- a/submarine-workbench/workbench-server/src/main/java/org/apache/submarine/server/WorkbenchServer.java
+++ b/submarine-workbench/workbench-server/src/main/java/org/apache/submarine/server/WorkbenchServer.java
@@ -20,7 +20,7 @@ package org.apache.submarine.server;
 
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.submarine.websocket.NotebookServer;
-import org.apache.submarine.commons.cluster.ClusterManagerServer;
+import org.apache.submarine.commons.cluster.ClusterServer;
 import org.eclipse.jetty.http.HttpVersion;
 import org.eclipse.jetty.server.HttpConfiguration;
 import org.eclipse.jetty.server.HttpConnectionFactory;
@@ -92,8 +92,8 @@ public class WorkbenchServer extends ResourceConfig {
     // Notebook server
     setupNotebookServer(webApp, conf, sharedServiceLocator);
 
-    // Cluster Manager Server
-    setupClusterManagerServer();
+    // Cluster Server
+    setupClusterServer();
 
     startServer();
   }
@@ -222,10 +222,10 @@ public class WorkbenchServer extends ResourceConfig {
     webapp.addServlet(servletHolder, "/ws/*");
   }
 
-  private static void setupClusterManagerServer() {
+  private static void setupClusterServer() {
     if (conf.workbenchIsClusterMode()) {
-      ClusterManagerServer clusterManagerServer = ClusterManagerServer.getInstance();
-      clusterManagerServer.start();
+      ClusterServer clusterServer = ClusterServer.getInstance();
+      clusterServer.start();
     }
   }
 
diff --git a/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/WorkbenchClusterServerTest.java b/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/WorkbenchClusterServerTest.java
index 7742451..d4c748d 100644
--- a/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/WorkbenchClusterServerTest.java
+++ b/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/WorkbenchClusterServerTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.submarine.server;
 
-import org.apache.submarine.commons.cluster.ClusterManagerClient;
+import org.apache.submarine.commons.cluster.ClusterClient;
 import org.apache.submarine.commons.cluster.meta.ClusterMetaType;
 import org.apache.submarine.commons.utils.NetUtils;
 import org.apache.submarine.commons.utils.SubmarineConfiguration;
@@ -39,7 +39,7 @@ import static org.junit.Assert.assertTrue;
 public class WorkbenchClusterServerTest {
   private static final Logger LOG = LoggerFactory.getLogger(WorkbenchClusterServerTest.class);
 
-  private static ClusterManagerClient clusterClient = null;
+  private static ClusterClient clusterClient = null;
 
   @BeforeClass
   public static void start() throws Exception {
@@ -55,11 +55,11 @@ public class WorkbenchClusterServerTest {
     AbstractWorkbenchServerTest.startUp(WorkbenchClusterServerTest.class.getSimpleName());
 
     // Mock Cluster client
-    Class clazz = ClusterManagerClient.class;
+    Class clazz = ClusterClient.class;
     Constructor constructor = null;
     constructor = clazz.getDeclaredConstructor();
     constructor.setAccessible(true);
-    clusterClient = (ClusterManagerClient) constructor.newInstance();
+    clusterClient = (ClusterClient) constructor.newInstance();
     clusterClient.start("TestWorkbenchClusterServer");
 
     // Waiting for cluster startup