You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by li...@apache.org on 2019/06/13 07:42:11 UTC

[zeppelin] branch master updated: [ZEPPELIN-3623] Create interpreter process in the cluster mode

This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new fd6e80c  [ZEPPELIN-3623] Create interpreter process in the cluster mode
fd6e80c is described below

commit fd6e80cfc49647ac57ca53e1dff9cdfbae164865
Author: Xun Liu <li...@apache.org>
AuthorDate: Thu Jun 13 11:22:45 2019 +0800

    [ZEPPELIN-3623] Create interpreter process in the cluster mode
    
    ### What is this PR for?
    In distributed cluster deployment mode, look for servers with idle resources in the cluster and create an interpreter process. In single-server deployment mode, the interpreter process is created directly on the machine.
    
    Just set the cluster server list in `zeppelin-site.xml`
    ```
    <property>
      <name>zeppelin.cluster.addr</name>
      <value>10.120.196.234:6000,10.120.196.235:6000,10.120.196.236:6000</value>
    </property>
    ```
    The interpreter process can be created in the cluster server through any zeppelin server.
    
    [Cluster Design Document](https://docs.google.com/document/d/1a8QLSyR3M5AhlG1GIYuDTj6bwazeuVDKCRRBm-Qa3Bw/edit#heading=h.s41ckl271z8s)
    
    ### What type of PR is it?
    [Feature]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-3623
    
    ### How should this be tested?
    * [CI Pass](https://travis-ci.org/liuxunorg/zeppelin/builds/537437337)
    
    ### Screenshots (if appropriate)
    
    #### The note is executed on the zeppelin-server-234 and the interpreter process is created on the zeppelin-server-236 host.
    
    ![ClusterCreateIntpProcess](https://user-images.githubusercontent.com/3677382/58382741-7dc61680-8000-11e9-8379-78b8e6743b63.gif)
    
    ### Questions:
    * Does the licenses files need update?
    * Is there breaking changes for older versions?
    * Does this needs documentation?
    
    Author: Xun Liu <li...@apache.org>
    
    Closes #3372 from liuxunorg/ZEPPELIN-3623 and squashes the following commits:
    
    ea4ded954 [Xun Liu] Delete unused cluster thrift interfaces
    712bba50f [Xun Liu] Add licese header.
    3c8922cb6 [Xun Liu] Get the conf path through the `ZEPPELIN_HOME` or `ZEPPELIN_CONF_DIR` environment variable.
    b88c09942 [Xun Liu] The ClusterInterpreterLauncherTest is tested, Turn off cluster.
    83f540622 [Xun Liu] [ZEPPELIN-3623] Create interpreter process in the cluster mode
---
 zeppelin-interpreter-api/pom.xml                   |   6 +-
 .../apache/zeppelin/cluster/ClusterManager.java    |   3 +-
 .../zeppelin/cluster/ClusterManagerClient.java     |   4 +-
 .../zeppelin/cluster/ClusterManagerServer.java     | 228 +++--
 .../apache/zeppelin/cluster/ClusterMonitor.java    |  16 +-
 .../zeppelin/cluster/ClusterStateMachine.java      |  18 +-
 .../ClusterEvent.java}                             |   9 +-
 .../ClusterEventListener.java}                     |  13 +-
 .../apache/zeppelin/cluster/meta/ClusterMeta.java  |  14 +-
 .../zeppelin/cluster/meta/ClusterMetaType.java     |   4 +-
 .../zeppelin/conf/ZeppelinConfiguration.java       |  34 +-
 .../remote/RemoteInterpreterServer.java            |  57 +-
 .../thrift/ClusterIntpProcParameters.java          | 914 -------------------
 .../interpreter/thrift/ClusterManagerService.java  | 995 ---------------------
 .../src/main/thrift/ClusterManagerService.thrift   |  32 -
 zeppelin-interpreter/src/main/thrift/genthrift.sh  |   1 -
 .../zeppelin/cluster/ClusterMultiNodeTest.java     | 159 ++++
 ...ManagerTest.java => ClusterSingleNodeTest.java} |  66 +-
 zeppelin-plugins/launcher/cluster/pom.xml          |  90 ++
 .../launcher/ClusterInterpreterLauncher.java       | 200 +++++
 .../launcher/ClusterInterpreterProcess.java        | 141 +++
 .../launcher/ClusterInterpreterLauncherTest.java   | 101 +++
 .../interpreter/launcher/ClusterMockTest.java      |  79 +-
 zeppelin-plugins/pom.xml                           |   1 +
 .../org/apache/zeppelin/server/ZeppelinServer.java |  18 +-
 .../zeppelin/interpreter/InterpreterSetting.java   |   6 +
 .../remote/RemoteInterpreterManagedProcess.java    |   4 +
 27 files changed, 1023 insertions(+), 2190 deletions(-)

diff --git a/zeppelin-interpreter-api/pom.xml b/zeppelin-interpreter-api/pom.xml
index a43ee89..c1f0ae8 100644
--- a/zeppelin-interpreter-api/pom.xml
+++ b/zeppelin-interpreter-api/pom.xml
@@ -68,8 +68,6 @@
               <exclude>org.apache.commons:commons-exec</exclude>
               <!-- Leave log4j unshaded so downstream users can configure logging. -->
               <exclude>log4j:log4j</exclude>
-              <exclude>com.esotericsoftware:kryo</exclude>
-              <exclude>com.esotericsoftware:reflectasm</exclude>
             </excludes>
           </artifactSet>
           <filters>
@@ -138,6 +136,10 @@
               <pattern>io</pattern>
               <shadedPattern>${shaded.dependency.prefix}.io</shadedPattern>
             </relocation>
+            <relocation>
+              <pattern>com.esotericsoftware</pattern>
+              <shadedPattern>${shaded.dependency.prefix}.com.esotericsoftware</shadedPattern>
+            </relocation>
           </relocations>
         </configuration>
         <executions>
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
index 9c70a2e..2b2cd50 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
@@ -252,7 +252,7 @@ public abstract class ClusterManager {
               while (!raftInitialized()) {
                 retry++;
                 if (0 == retry % 30) {
-                  LOGGER.error("Raft incomplete initialization! retry[{}]", retry);
+                  LOGGER.warn("Raft incomplete initialization! retry[{}]", retry);
                 }
                 Thread.sleep(100);
               }
@@ -268,6 +268,7 @@ public abstract class ClusterManager {
               if (true == success) {
                 // The operation was successfully deleted
                 clusterMetaQueue.remove(metaEntity);
+                LOGGER.info("Cluster Meta Consume success! {}", metaEntity);
               } else {
                 LOGGER.error("Cluster Meta Consume faild!");
               }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java
index c969bd6..57d51e3 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java
@@ -18,7 +18,7 @@ package org.apache.zeppelin.cluster;
 
 import io.atomix.primitive.PrimitiveState;
 
-import static org.apache.zeppelin.cluster.meta.ClusterMetaType.IntpProcessMeta;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
 
 /**
  * Cluster management client class instantiated in zeppelin-interperter
@@ -63,7 +63,7 @@ public class ClusterManagerClient extends ClusterManager {
 
     // Instantiated cluster monitoring class
     clusterMonitor = new ClusterMonitor(this);
-    clusterMonitor.start(IntpProcessMeta, metaKey);
+    clusterMonitor.start(INTP_PROCESS_META, metaKey);
   }
 
   public void shutdown() {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
index 41e670a..eb64393 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.zeppelin.cluster;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
 import io.atomix.cluster.*;
 import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
 import io.atomix.cluster.impl.DefaultClusterMembershipService;
@@ -30,20 +32,10 @@ import io.atomix.protocols.raft.storage.RaftStorage;
 import io.atomix.storage.StorageLevel;
 import io.atomix.utils.net.Address;
 import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
+import org.apache.zeppelin.cluster.event.ClusterEventListener;
 import org.apache.zeppelin.cluster.meta.ClusterMeta;
 import org.apache.zeppelin.cluster.protocol.RaftServerMessagingProtocol;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.InterpreterFactoryInterface;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.apache.zeppelin.interpreter.thrift.ClusterIntpProcParameters;
-import org.apache.zeppelin.interpreter.thrift.ClusterManagerService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,20 +48,22 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
+import java.time.Duration;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
 
-import static org.apache.zeppelin.cluster.meta.ClusterMetaType.ServerMeta;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.SERVER_META;
 
 /**
  * Cluster management server class instantiated in zeppelin-server
  * 1. Create a raft server
  * 2. Remotely create interpreter's thrift service
  */
-public class ClusterManagerServer extends ClusterManager
-    implements ClusterManagerService.Iface {
+public class ClusterManagerServer extends ClusterManager {
   private static Logger LOGGER = LoggerFactory.getLogger(ClusterManagerServer.class);
 
   private static ClusterManagerServer instance = null;
@@ -79,22 +73,15 @@ public class ClusterManagerServer extends ClusterManager
 
   protected MessagingService messagingService = null;
 
-  // zeppelin cluster manager thrift service
-  private TThreadPoolServer clusterManagerTserver = null;
-  private ClusterManagerService.Processor<ClusterManagerServer> clusterManagerProcessor = null;
-
-  // Find interpreter by note
-  private InterpreterFactoryInterface interpreterFactory = null;
-
   // Connect to the interpreter process that has been created
   public static String CONNET_EXISTING_PROCESS = "CONNET_EXISTING_PROCESS";
 
+  private List<ClusterEventListener> clusterEventListeners = new ArrayList<>();
+  // zeppelin cluster event
+  public static String ZEPL_CLUSTER_EVENT_TOPIC = "ZEPL_CLUSTER_EVENT_TOPIC";
+
   private ClusterManagerServer() {
     super();
-
-    clusterManagerProcessor = new ClusterManagerService.Processor<>(this);
-
-    deleteRaftSystemData();
   }
 
   public static ClusterManagerServer getInstance() {
@@ -106,23 +93,46 @@ public class ClusterManagerServer extends ClusterManager
     }
   }
 
-  public void start(InterpreterFactoryInterface interpreterFactory) {
+  public void start() {
     if (!zconf.isClusterMode()) {
       return;
     }
 
-    this.interpreterFactory = interpreterFactory;
-
     initThread();
 
     // Instantiated raftServer monitoring class
     String clusterName = getClusterNodeName();
     clusterMonitor = new ClusterMonitor(this);
-    clusterMonitor.start(ServerMeta, clusterName);
+    clusterMonitor.start(SERVER_META, clusterName);
 
     super.start();
   }
 
+  @VisibleForTesting
+  public void initTestCluster(String clusterAddrList, String host, int port) {
+    this.zeplServerHost = host;
+    this.raftServerPort = port;
+
+    // clear
+    clusterNodes.clear();
+    raftAddressMap.clear();
+    clusterMemberIds.clear();
+
+    String cluster[] = clusterAddrList.split(",");
+    for (int i = 0; i < cluster.length; i++) {
+      String[] parts = cluster[i].split(":");
+      String clusterHost = parts[0];
+      int clusterPort = Integer.valueOf(parts[1]);
+
+      String memberId = clusterHost + ":" + clusterPort;
+      Address address = Address.from(clusterHost, clusterPort);
+      Node node = Node.builder().withId(memberId).withAddress(address).build();
+      clusterNodes.add(node);
+      raftAddressMap.put(MemberId.from(memberId), address);
+      clusterMemberIds.add(MemberId.from(memberId));
+    }
+  }
+
   @Override
   public boolean raftInitialized() {
     if (null != raftServer && raftServer.isRunning()
@@ -145,32 +155,6 @@ public class ClusterManagerServer extends ClusterManager
     return true;
   }
 
-  protected void deleteRaftSystemData() {
-    String zeppelinHome = zconf.getZeppelinHome();
-    Path directory = new File(zeppelinHome, ".data").toPath();
-    if (Files.exists(directory)) {
-      try {
-        Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
-          @Override
-          public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
-              throws IOException {
-            Files.delete(file);
-            return FileVisitResult.CONTINUE;
-          }
-
-          @Override
-          public FileVisitResult postVisitDirectory(Path dir, IOException exc)
-              throws IOException {
-            Files.delete(dir);
-            return FileVisitResult.CONTINUE;
-          }
-        });
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
   private void initThread() {
     // RaftServer Thread
     new Thread(new Runnable() {
@@ -206,11 +190,15 @@ public class ClusterManagerServer extends ClusterManager
             bootstrapService,
             new MembershipConfig());
 
+        File atomixDateDir = com.google.common.io.Files.createTempDir();
+        atomixDateDir.deleteOnExit();
+
         RaftServer.Builder builder = RaftServer.builder(member.id())
             .withMembershipService(clusterService)
             .withProtocol(protocol)
             .withStorage(RaftStorage.builder()
                 .withStorageLevel(StorageLevel.MEMORY)
+                .withDirectory(atomixDateDir)
                 .withSerializer(storageSerializer)
                 .withMaxSegmentSize(1024 * 1024)
                 .build());
@@ -218,46 +206,10 @@ public class ClusterManagerServer extends ClusterManager
         raftServer = builder.build();
         raftServer.bootstrap(clusterMemberIds);
 
-        LOGGER.info("RaftServer run() <<<");
-      }
-    }).start();
-
-    // Cluster manager thrift thread
-    new Thread(new Runnable() {
-      @Override
-      public void run() {
-        LOGGER.info("TServerThread run() >>>");
-
-        ZeppelinConfiguration zconf = new ZeppelinConfiguration();
-        String portRange = zconf.getZeppelinServerRPCPortRange();
-
-        try {
-          TServerSocket serverTransport = RemoteInterpreterUtils.createTServerSocket(portRange);
-          int tserverPort = serverTransport.getServerSocket().getLocalPort();
-
-          clusterManagerTserver = new TThreadPoolServer(
-              new TThreadPoolServer.Args(serverTransport).processor(clusterManagerProcessor));
-          LOGGER.info("Starting raftServer manager Tserver on port {}", tserverPort);
-
-          String nodeName = getClusterNodeName();
-          HashMap<String, Object> meta = new HashMap<String, Object>();
-          meta.put(ClusterMeta.NODE_NAME, nodeName);
-          meta.put(ClusterMeta.SERVER_TSERVER_HOST, zeplServerHost);
-          meta.put(ClusterMeta.SERVER_TSERVER_PORT, tserverPort);
-          meta.put(ClusterMeta.SERVER_START_TIME, new Date());
-
-          putClusterMeta(ServerMeta, nodeName, meta);
-        } catch (UnknownHostException e) {
-          LOGGER.error(e.getMessage());
-        } catch (SocketException e) {
-          LOGGER.error(e.getMessage());
-        } catch (IOException e) {
-          LOGGER.error(e.getMessage());
-        }
-
-        clusterManagerTserver.serve();
+        messagingService.registerHandler(ZEPL_CLUSTER_EVENT_TOPIC,
+            subscribeClusterEvent, MoreExecutors.directExecutor());
 
-        LOGGER.info("TServerThread run() <<<");
+        LOGGER.info("RaftServer run() <<<");
       }
     }).start();
   }
@@ -270,58 +222,34 @@ public class ClusterManagerServer extends ClusterManager
 
     try {
       // delete local machine meta
-      deleteClusterMeta(ServerMeta, getClusterNodeName());
+      deleteClusterMeta(SERVER_META, getClusterNodeName());
       Thread.sleep(300);
       clusterMonitor.shutdown();
       // wait raft commit metadata
       Thread.sleep(300);
     } catch (InterruptedException e) {
-      LOGGER.error(e.getMessage());
+      LOGGER.error(e.getMessage(), e);
     }
 
     if (null != raftServer && raftServer.isRunning()) {
       try {
         raftServer.shutdown().get(3, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
-        LOGGER.error(e.getMessage());
+        LOGGER.error(e.getMessage(), e);
       } catch (ExecutionException e) {
-        LOGGER.error(e.getMessage());
+        LOGGER.error(e.getMessage(), e);
       } catch (TimeoutException e) {
-        LOGGER.error(e.getMessage());
+        LOGGER.error(e.getMessage(), e);
       }
     }
 
-    clusterManagerTserver.stop();
-
     super.shutdown();
   }
 
-  public boolean openRemoteInterpreterProcess(
-      String host, int port, final ClusterIntpProcParameters clusterIntpProcParameters)
-      throws TException {
-    LOGGER.info("host: {}, port: {}, clusterIntpProcParameters: {}",
-        host, port, clusterIntpProcParameters);
-
-    try (TTransport transport = new TSocket(host, port)) {
-      transport.open();
-      TProtocol protocol = new TBinaryProtocol(transport);
-      ClusterManagerService.Client client = new ClusterManagerService.Client(protocol);
-
-      return client.createClusterInterpreterProcess(clusterIntpProcParameters);
-    }
-  }
-
-  @Override
-  public boolean createClusterInterpreterProcess(ClusterIntpProcParameters clusterIntpProcParameters) {
-    // TODO: ZEPPELIN-3623
-
-    return true;
-  }
-
   // Obtain the server node whose resources are idle in the cluster
   public HashMap<String, Object> getIdleNodeMeta() {
     HashMap<String, Object> idleNodeMeta = null;
-    HashMap<String, HashMap<String, Object>> clusterMeta = getClusterMeta(ServerMeta, "");
+    HashMap<String, HashMap<String, Object>> clusterMeta = getClusterMeta(SERVER_META, "");
 
     long memoryIdle = 0;
     for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
@@ -344,4 +272,56 @@ public class ClusterManagerServer extends ClusterManager
 
     return idleNodeMeta;
   }
+
+  public void unicastClusterEvent(String host, int port,  String msg) {
+    LOGGER.info("send unicastClusterEvent message {}", msg);
+
+    Address address = Address.from(host, port);
+    CompletableFuture<byte[]> response = messagingService.sendAndReceive(address,
+        ZEPL_CLUSTER_EVENT_TOPIC, msg.getBytes(), Duration.ofSeconds(2));
+    response.whenComplete((r, e) -> {
+      if (null == e) {
+        LOGGER.error(e.getMessage(), e);
+      } else {
+        LOGGER.info("unicastClusterEvent success! {}", msg);
+      }
+    });
+  }
+
+  public void broadcastClusterEvent(String msg) {
+    LOGGER.info("send broadcastClusterEvent message {}", msg);
+
+    for (Node node : clusterNodes) {
+      if (StringUtils.equals(node.address().host(), zeplServerHost)
+          && node.address().port() == raftServerPort) {
+        // skip myself
+        continue;
+      }
+
+      CompletableFuture<byte[]> response = messagingService.sendAndReceive(node.address(),
+          ZEPL_CLUSTER_EVENT_TOPIC, msg.getBytes(), Duration.ofSeconds(2));
+      response.whenComplete((r, e) -> {
+        if (null == e) {
+          LOGGER.error(e.getMessage(), e);
+        } else {
+          LOGGER.info("broadcastClusterNoteEvent success! {}", msg);
+        }
+      });
+    }
+  }
+
+  private BiFunction<Address, byte[], byte[]> subscribeClusterEvent = (address, data) -> {
+    String message = new String(data);
+    LOGGER.info("subscribeClusterEvent() {}", message);
+
+    for (ClusterEventListener eventListener : clusterEventListeners) {
+      eventListener.onClusterEvent(message);
+    }
+
+    return null;
+  };
+
+  public void addClusterEventListeners(ClusterEventListener listener) {
+    clusterEventListeners.add(listener);
+  }
 }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
index 86fa6a7..4fe8d98 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
@@ -27,8 +27,8 @@ import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.zeppelin.cluster.meta.ClusterMetaType.IntpProcessMeta;
-import static org.apache.zeppelin.cluster.meta.ClusterMetaType.ServerMeta;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.SERVER_META;
 
 /**
  * cluster monitoring
@@ -94,11 +94,11 @@ public class ClusterMonitor {
       public void run() {
         while (running.get()) {
           switch (clusterMetaType) {
-            case ServerMeta:
+            case SERVER_META:
               sendMachineUsage();
               checkHealthy();
               break;
-            case IntpProcessMeta:
+            case INTP_PROCESS_META:
               sendHeartbeat();
               break;
             default:
@@ -136,6 +136,10 @@ public class ClusterMonitor {
       Map<String, HashMap<String, Object>> clusterMeta
           = clusterManager.getClusterMeta(metaType, "");
 
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("clusterMeta : {}", clusterMeta);
+      }
+
       for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
         String key = entry.getKey();
         Map<String, Object> meta = entry.getValue();
@@ -172,7 +176,7 @@ public class ClusterMonitor {
     mapMonitorUtil.put(ClusterMeta.HEARTBEAT, new Date());
     mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
 
-    clusterManager.putClusterMeta(IntpProcessMeta, metaKey, mapMonitorUtil);
+    clusterManager.putClusterMeta(INTP_PROCESS_META, metaKey, mapMonitorUtil);
   }
 
   // send the usage of each service
@@ -212,7 +216,7 @@ public class ClusterMonitor {
     mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
 
     String clusterName = clusterManager.getClusterNodeName();
-    clusterManager.putClusterMeta(ServerMeta, clusterName, mapMonitorUtil);
+    clusterManager.putClusterMeta(SERVER_META, clusterName, mapMonitorUtil);
   }
 
   private UsageUtil getMachineUsage() {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
index 460f6ac..dc07daa 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
@@ -90,12 +90,12 @@ public class ClusterStateMachine extends AbstractPrimitiveService {
       logger.debug("ClusterStateMachine.backup()");
     }
 
-    // backup ServerMeta
+    // backup SERVER_META
     // cluster meta map struct
     // cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...}
     Map<String, Map<String, Object>> mapServerMeta
-        = clusterMeta.get(ClusterMetaType.ServerMeta, "");
-    // write all ServerMeta size
+        = clusterMeta.get(ClusterMetaType.SERVER_META, "");
+    // write all SERVER_META size
     writer.writeInt(mapServerMeta.size());
     for (Map.Entry<String, Map<String, Object>> entry : mapServerMeta.entrySet()) {
       // write cluster_name
@@ -111,11 +111,11 @@ public class ClusterStateMachine extends AbstractPrimitiveService {
       }
     }
 
-    // backup IntpProcessMeta
+    // backup INTP_PROCESS_META
     // Interpreter meta map struct
     // IntpGroupId -> {server_tserver_host,server_tserver_port,...}
     Map<String, Map<String, Object>> mapIntpProcMeta
-        = clusterMeta.get(ClusterMetaType.IntpProcessMeta, "");
+        = clusterMeta.get(ClusterMetaType.INTP_PROCESS_META, "");
     // write interpreter size
     writer.writeInt(mapIntpProcMeta.size());
     for (Map.Entry<String, Map<String, Object>> entry : mapIntpProcMeta.entrySet()) {
@@ -140,7 +140,7 @@ public class ClusterStateMachine extends AbstractPrimitiveService {
     }
 
     clusterMeta = new ClusterMeta();
-    // read all ServerMeta size
+    // read all SERVER_META size
     int nServerMeta = reader.readInt();
     for (int i = 0; i < nServerMeta; i++) {
       // read cluster_name
@@ -153,12 +153,12 @@ public class ClusterStateMachine extends AbstractPrimitiveService {
         String key = reader.readString();
         Object value = reader.readObject();
 
-        clusterMeta.put(ClusterMetaType.ServerMeta,
+        clusterMeta.put(ClusterMetaType.SERVER_META,
             clusterName, Maps.immutableEntry(key, value));
       }
     }
 
-    // read all IntpProcessMeta size
+    // read all INTP_PROCESS_META size
     int nIntpMeta = reader.readInt();
     for (int i = 0; i < nIntpMeta; i++) {
       // read interpreter name
@@ -171,7 +171,7 @@ public class ClusterStateMachine extends AbstractPrimitiveService {
         String key = reader.readString();
         Object value = reader.readObject();
 
-        clusterMeta.put(ClusterMetaType.IntpProcessMeta,
+        clusterMeta.put(ClusterMetaType.INTP_PROCESS_META,
             intpName, Maps.immutableEntry(key, value));
       }
     }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java
similarity index 86%
copy from zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
copy to zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java
index c6229bd..0e1120c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java
@@ -14,12 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.zeppelin.cluster.meta;
+package org.apache.zeppelin.cluster.event;
 
 /**
- * Type of cluster metadata
+ * Cluster Event
  */
-public enum ClusterMetaType {
-  ServerMeta,
-  IntpProcessMeta
+public enum ClusterEvent {
+  CREATE_INTP_PROCESS
 }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEventListener.java
similarity index 67%
copy from zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
copy to zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEventListener.java
index c6229bd..00c0b3d 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEventListener.java
@@ -14,12 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.zeppelin.cluster.meta;
+package org.apache.zeppelin.cluster.event;
 
 /**
- * Type of cluster metadata
+ * Listen for the NEW_NOTE、DEL_NOTE、REMOVE_NOTE_TO_TRASH ... event
+ * of the notebook in the NotebookServer#onMessage() function.
  */
-public enum ClusterMetaType {
-  ServerMeta,
-  IntpProcessMeta
+public interface ClusterEventListener {
+  public static final String CLUSTER_EVENT = "CLUSTER_EVENT";
+  public static final String CLUSTER_EVENT_MSG = "CLUSTER_EVENT_MSG";
+  
+  void onClusterEvent(String msg);
 }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
index a26007c..e862635 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
@@ -37,8 +37,6 @@ public class ClusterMeta implements Serializable {
   // zeppelin-server meta
   public static String SERVER_HOST          = "SERVER_HOST";
   public static String SERVER_PORT          = "SERVER_PORT";
-  public static String SERVER_TSERVER_HOST  = "SERVER_TSERVER_HOST";
-  public static String SERVER_TSERVER_PORT  = "SERVER_TSERVER_PORT";
   public static String SERVER_START_TIME    = "SERVER_START_TIME";
 
   // interperter-process meta
@@ -73,7 +71,7 @@ public class ClusterMeta implements Serializable {
     Map<String, Object> mapValue = (Map<String, Object>) value;
 
     switch (type) {
-      case ServerMeta:
+      case SERVER_META:
         // Because it may be partially updated metadata information
         if (mapServerMeta.containsKey(key)) {
           Map<String, Object> values = mapServerMeta.get(key);
@@ -82,7 +80,7 @@ public class ClusterMeta implements Serializable {
           mapServerMeta.put(key, mapValue);
         }
         break;
-      case IntpProcessMeta:
+      case INTP_PROCESS_META:
         if (mapInterpreterMeta.containsKey(key)) {
           Map<String, Object> values = mapInterpreterMeta.get(key);
           values.putAll(mapValue);
@@ -97,7 +95,7 @@ public class ClusterMeta implements Serializable {
     Map<String, Object> values = null;
 
     switch (type) {
-      case ServerMeta:
+      case SERVER_META:
         if (null == key || StringUtils.isEmpty(key)) {
           return mapServerMeta;
         }
@@ -107,7 +105,7 @@ public class ClusterMeta implements Serializable {
           logger.warn("can not find key : {}", key);
         }
         break;
-      case IntpProcessMeta:
+      case INTP_PROCESS_META:
         if (null == key || StringUtils.isEmpty(key)) {
           return mapInterpreterMeta;
         }
@@ -127,14 +125,14 @@ public class ClusterMeta implements Serializable {
 
   public Map<String, Object> remove(ClusterMetaType type, String key) {
     switch (type) {
-      case ServerMeta:
+      case SERVER_META:
         if (mapServerMeta.containsKey(key)) {
           return mapServerMeta.remove(key);
         } else {
           logger.warn("can not find key : {}", key);
         }
         break;
-      case IntpProcessMeta:
+      case INTP_PROCESS_META:
         if (mapInterpreterMeta.containsKey(key)) {
           return mapInterpreterMeta.remove(key);
         } else {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
index c6229bd..cf995f5 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
@@ -20,6 +20,6 @@ package org.apache.zeppelin.cluster.meta;
  * Type of cluster metadata
  */
 public enum ClusterMetaType {
-  ServerMeta,
-  IntpProcessMeta
+  SERVER_META,
+  INTP_PROCESS_META
 }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index be792c7..d4824dc 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.conf;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -28,6 +29,7 @@ import java.util.function.Predicate;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.commons.configuration.tree.ConfigurationNode;
+import org.apache.commons.exec.environment.EnvironmentUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.zeppelin.util.Util;
 import org.slf4j.Logger;
@@ -120,6 +122,37 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     }
 
     if (url == null) {
+      try {
+        Map procEnv = EnvironmentUtils.getProcEnvironment();
+        if (procEnv.containsKey("ZEPPELIN_HOME")) {
+          String zconfDir = (String) procEnv.get("ZEPPELIN_HOME");
+          File file = new File(zconfDir + File.separator
+              + "conf" + File.separator + ZEPPELIN_SITE_XML);
+          if (file.exists()) {
+            url = file.toURL();
+          }
+        }
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+
+    if (url == null) {
+      try {
+        Map procEnv = EnvironmentUtils.getProcEnvironment();
+        if (procEnv.containsKey("ZEPPELIN_CONF_DIR")) {
+          String zconfDir = (String) procEnv.get("ZEPPELIN_CONF_DIR");
+          File file = new File(zconfDir + File.separator + ZEPPELIN_SITE_XML);
+          if (file.exists()) {
+            url = file.toURL();
+          }
+        }
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+
+    if (url == null) {
       LOG.warn("Failed to load configuration, proceeding with a default");
       conf = new ZeppelinConfiguration();
     } else {
@@ -144,7 +177,6 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     return conf;
   }
 
-
   private String getStringValue(String name, String d) {
     String value = this.properties.get(name);
     if (value != null) {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 1d4d231..3dc41f4 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -29,6 +29,10 @@ import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.cluster.ClusterManagerClient;
+import org.apache.zeppelin.cluster.meta.ClusterMeta;
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
@@ -85,6 +89,7 @@ import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -132,6 +137,10 @@ public class RemoteInterpreterServer extends Thread
 
   private boolean isTest;
 
+  // cluster manager client
+  ClusterManagerClient clusterManagerClient = ClusterManagerClient.getInstance();
+  ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
+
   public RemoteInterpreterServer(String intpEventServerHost,
                                  int intpEventServerPort,
                                  String interpreterGroupId,
@@ -178,6 +187,8 @@ public class RemoteInterpreterServer extends Thread
     server = new TThreadPoolServer(
         new TThreadPoolServer.Args(serverTransport).processor(processor));
     remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
+
+    clusterManagerClient.start(interpreterGroupId);
   }
 
   @Override
@@ -196,16 +207,24 @@ public class RemoteInterpreterServer extends Thread
             }
           }
 
-          if (!interrupted) {
-            RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId);
-            try {
-              intpEventServiceClient.registerInterpreterProcess(registerInfo);
-            } catch (TException e) {
-              logger.error("Error while registering interpreter: {}", registerInfo, e);
+          if (zconf.isClusterMode()) {
+            // Cluster mode, discovering interpreter processes through metadata registration
+            // TODO (Xun): Unified use of cluster metadata for process discovery of all operating modes
+            // 1. Can optimize the startup logic of the process
+            // 2. Can solve the problem that running the interpreter's IP in docker may be a virtual IP
+            putClusterMeta();
+          } else {
+            if (!interrupted) {
+              RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId);
               try {
-                shutdown();
-              } catch (TException e1) {
-                logger.warn("Exception occurs while shutting down", e1);
+                intpEventServiceClient.registerInterpreterProcess(registerInfo);
+              } catch (TException e) {
+                logger.error("Error while registering interpreter: {}", registerInfo, e);
+                try {
+                  shutdown();
+                } catch (TException e1) {
+                  logger.warn("Exception occurs while shutting down", e1);
+                }
               }
             }
           }
@@ -303,6 +322,26 @@ public class RemoteInterpreterServer extends Thread
     System.exit(0);
   }
 
+  // Submit interpreter process metadata information to cluster metadata
+  private void putClusterMeta() {
+    if (!zconf.isClusterMode()){
+      return;
+    }
+    String nodeName = clusterManagerClient.getClusterNodeName();
+
+    // commit interpreter meta
+    HashMap<String, Object> meta = new HashMap<>();
+    meta.put(ClusterMeta.NODE_NAME, nodeName);
+    meta.put(ClusterMeta.INTP_PROCESS_ID, interpreterGroupId);
+    meta.put(ClusterMeta.INTP_TSERVER_HOST, host);
+    meta.put(ClusterMeta.INTP_TSERVER_PORT, port);
+    meta.put(ClusterMeta.INTP_START_TIME, new Date());
+    meta.put(ClusterMeta.HEARTBEAT, new Date());
+    meta.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
+
+    clusterManagerClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, interpreterGroupId, meta);
+  }
+
   @Override
   public void createInterpreter(String interpreterGroupId, String sessionId, String
       className, Map<String, String> properties, String userName) throws TException {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterIntpProcParameters.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterIntpProcParameters.java
deleted file mode 100644
index 4cd82f0..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterIntpProcParameters.java
+++ /dev/null
@@ -1,914 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Autogenerated by Thrift Compiler (0.12.0)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- *  @generated
- */
-package org.apache.zeppelin.interpreter.thrift;
-
-@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.12.0)", date = "2019-06-10")
-public class ClusterIntpProcParameters implements org.apache.thrift.TBase<ClusterIntpProcParameters, ClusterIntpProcParameters._Fields>, java.io.Serializable, Cloneable, Comparable<ClusterIntpProcParameters> {
-  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ClusterIntpProcParameters");
-
-  private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)1);
-  private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, (short)2);
-  private static final org.apache.thrift.protocol.TField USER_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("userName", org.apache.thrift.protocol.TType.STRING, (short)3);
-  private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("noteId", org.apache.thrift.protocol.TType.STRING, (short)4);
-  private static final org.apache.thrift.protocol.TField REPL_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("replName", org.apache.thrift.protocol.TType.STRING, (short)5);
-  private static final org.apache.thrift.protocol.TField DEFAULT_INTERPRETER_SETTING_FIELD_DESC = new org.apache.thrift.protocol.TField("defaultInterpreterSetting", org.apache.thrift.protocol.TType.STRING, (short)6);
-
-  private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new ClusterIntpProcParametersStandardSchemeFactory();
-  private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new ClusterIntpProcParametersTupleSchemeFactory();
-
-  public @org.apache.thrift.annotation.Nullable java.lang.String host; // required
-  public int port; // required
-  public @org.apache.thrift.annotation.Nullable java.lang.String userName; // required
-  public @org.apache.thrift.annotation.Nullable java.lang.String noteId; // required
-  public @org.apache.thrift.annotation.Nullable java.lang.String replName; // required
-  public @org.apache.thrift.annotation.Nullable java.lang.String defaultInterpreterSetting; // required
-
-  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
-  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-    HOST((short)1, "host"),
-    PORT((short)2, "port"),
-    USER_NAME((short)3, "userName"),
-    NOTE_ID((short)4, "noteId"),
-    REPL_NAME((short)5, "replName"),
-    DEFAULT_INTERPRETER_SETTING((short)6, "defaultInterpreterSetting");
-
-    private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
-
-    static {
-      for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
-        byName.put(field.getFieldName(), field);
-      }
-    }
-
-    /**
-     * Find the _Fields constant that matches fieldId, or null if its not found.
-     */
-    @org.apache.thrift.annotation.Nullable
-    public static _Fields findByThriftId(int fieldId) {
-      switch(fieldId) {
-        case 1: // HOST
-          return HOST;
-        case 2: // PORT
-          return PORT;
-        case 3: // USER_NAME
-          return USER_NAME;
-        case 4: // NOTE_ID
-          return NOTE_ID;
-        case 5: // REPL_NAME
-          return REPL_NAME;
-        case 6: // DEFAULT_INTERPRETER_SETTING
-          return DEFAULT_INTERPRETER_SETTING;
-        default:
-          return null;
-      }
-    }
-
-    /**
-     * Find the _Fields constant that matches fieldId, throwing an exception
-     * if it is not found.
-     */
-    public static _Fields findByThriftIdOrThrow(int fieldId) {
-      _Fields fields = findByThriftId(fieldId);
-      if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
-      return fields;
-    }
-
-    /**
-     * Find the _Fields constant that matches name, or null if its not found.
-     */
-    @org.apache.thrift.annotation.Nullable
-    public static _Fields findByName(java.lang.String name) {
-      return byName.get(name);
-    }
-
-    private final short _thriftId;
-    private final java.lang.String _fieldName;
-
-    _Fields(short thriftId, java.lang.String fieldName) {
-      _thriftId = thriftId;
-      _fieldName = fieldName;
-    }
-
-    public short getThriftFieldId() {
-      return _thriftId;
-    }
-
-    public java.lang.String getFieldName() {
-      return _fieldName;
-    }
-  }
-
-  // isset id assignments
-  private static final int __PORT_ISSET_ID = 0;
-  private byte __isset_bitfield = 0;
-  public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
-  static {
-    java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
-    tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.DEFAULT, 
-        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
-    tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.DEFAULT, 
-        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
-    tmpMap.put(_Fields.USER_NAME, new org.apache.thrift.meta_data.FieldMetaData("userName", org.apache.thrift.TFieldRequirementType.DEFAULT, 
-        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
-    tmpMap.put(_Fields.NOTE_ID, new org.apache.thrift.meta_data.FieldMetaData("noteId", org.apache.thrift.TFieldRequirementType.DEFAULT, 
-        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
-    tmpMap.put(_Fields.REPL_NAME, new org.apache.thrift.meta_data.FieldMetaData("replName", org.apache.thrift.TFieldRequirementType.DEFAULT, 
-        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
-    tmpMap.put(_Fields.DEFAULT_INTERPRETER_SETTING, new org.apache.thrift.meta_data.FieldMetaData("defaultInterpreterSetting", org.apache.thrift.TFieldRequirementType.DEFAULT, 
-        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
-    metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
-    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ClusterIntpProcParameters.class, metaDataMap);
-  }
-
-  public ClusterIntpProcParameters() {
-  }
-
-  public ClusterIntpProcParameters(
-    java.lang.String host,
-    int port,
-    java.lang.String userName,
-    java.lang.String noteId,
-    java.lang.String replName,
-    java.lang.String defaultInterpreterSetting)
-  {
-    this();
-    this.host = host;
-    this.port = port;
-    setPortIsSet(true);
-    this.userName = userName;
-    this.noteId = noteId;
-    this.replName = replName;
-    this.defaultInterpreterSetting = defaultInterpreterSetting;
-  }
-
-  /**
-   * Performs a deep copy on <i>other</i>.
-   */
-  public ClusterIntpProcParameters(ClusterIntpProcParameters other) {
-    __isset_bitfield = other.__isset_bitfield;
-    if (other.isSetHost()) {
-      this.host = other.host;
-    }
-    this.port = other.port;
-    if (other.isSetUserName()) {
-      this.userName = other.userName;
-    }
-    if (other.isSetNoteId()) {
-      this.noteId = other.noteId;
-    }
-    if (other.isSetReplName()) {
-      this.replName = other.replName;
-    }
-    if (other.isSetDefaultInterpreterSetting()) {
-      this.defaultInterpreterSetting = other.defaultInterpreterSetting;
-    }
-  }
-
-  public ClusterIntpProcParameters deepCopy() {
-    return new ClusterIntpProcParameters(this);
-  }
-
-  @Override
-  public void clear() {
-    this.host = null;
-    setPortIsSet(false);
-    this.port = 0;
-    this.userName = null;
-    this.noteId = null;
-    this.replName = null;
-    this.defaultInterpreterSetting = null;
-  }
-
-  @org.apache.thrift.annotation.Nullable
-  public java.lang.String getHost() {
-    return this.host;
-  }
-
-  public ClusterIntpProcParameters setHost(@org.apache.thrift.annotation.Nullable java.lang.String host) {
-    this.host = host;
-    return this;
-  }
-
-  public void unsetHost() {
-    this.host = null;
-  }
-
-  /** Returns true if field host is set (has been assigned a value) and false otherwise */
-  public boolean isSetHost() {
-    return this.host != null;
-  }
-
-  public void setHostIsSet(boolean value) {
-    if (!value) {
-      this.host = null;
-    }
-  }
-
-  public int getPort() {
-    return this.port;
-  }
-
-  public ClusterIntpProcParameters setPort(int port) {
-    this.port = port;
-    setPortIsSet(true);
-    return this;
-  }
-
-  public void unsetPort() {
-    __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __PORT_ISSET_ID);
-  }
-
-  /** Returns true if field port is set (has been assigned a value) and false otherwise */
-  public boolean isSetPort() {
-    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __PORT_ISSET_ID);
-  }
-
-  public void setPortIsSet(boolean value) {
-    __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __PORT_ISSET_ID, value);
-  }
-
-  @org.apache.thrift.annotation.Nullable
-  public java.lang.String getUserName() {
-    return this.userName;
-  }
-
-  public ClusterIntpProcParameters setUserName(@org.apache.thrift.annotation.Nullable java.lang.String userName) {
-    this.userName = userName;
-    return this;
-  }
-
-  public void unsetUserName() {
-    this.userName = null;
-  }
-
-  /** Returns true if field userName is set (has been assigned a value) and false otherwise */
-  public boolean isSetUserName() {
-    return this.userName != null;
-  }
-
-  public void setUserNameIsSet(boolean value) {
-    if (!value) {
-      this.userName = null;
-    }
-  }
-
-  @org.apache.thrift.annotation.Nullable
-  public java.lang.String getNoteId() {
-    return this.noteId;
-  }
-
-  public ClusterIntpProcParameters setNoteId(@org.apache.thrift.annotation.Nullable java.lang.String noteId) {
-    this.noteId = noteId;
-    return this;
-  }
-
-  public void unsetNoteId() {
-    this.noteId = null;
-  }
-
-  /** Returns true if field noteId is set (has been assigned a value) and false otherwise */
-  public boolean isSetNoteId() {
-    return this.noteId != null;
-  }
-
-  public void setNoteIdIsSet(boolean value) {
-    if (!value) {
-      this.noteId = null;
-    }
-  }
-
-  @org.apache.thrift.annotation.Nullable
-  public java.lang.String getReplName() {
-    return this.replName;
-  }
-
-  public ClusterIntpProcParameters setReplName(@org.apache.thrift.annotation.Nullable java.lang.String replName) {
-    this.replName = replName;
-    return this;
-  }
-
-  public void unsetReplName() {
-    this.replName = null;
-  }
-
-  /** Returns true if field replName is set (has been assigned a value) and false otherwise */
-  public boolean isSetReplName() {
-    return this.replName != null;
-  }
-
-  public void setReplNameIsSet(boolean value) {
-    if (!value) {
-      this.replName = null;
-    }
-  }
-
-  @org.apache.thrift.annotation.Nullable
-  public java.lang.String getDefaultInterpreterSetting() {
-    return this.defaultInterpreterSetting;
-  }
-
-  public ClusterIntpProcParameters setDefaultInterpreterSetting(@org.apache.thrift.annotation.Nullable java.lang.String defaultInterpreterSetting) {
-    this.defaultInterpreterSetting = defaultInterpreterSetting;
-    return this;
-  }
-
-  public void unsetDefaultInterpreterSetting() {
-    this.defaultInterpreterSetting = null;
-  }
-
-  /** Returns true if field defaultInterpreterSetting is set (has been assigned a value) and false otherwise */
-  public boolean isSetDefaultInterpreterSetting() {
-    return this.defaultInterpreterSetting != null;
-  }
-
-  public void setDefaultInterpreterSettingIsSet(boolean value) {
-    if (!value) {
-      this.defaultInterpreterSetting = null;
-    }
-  }
-
-  public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
-    switch (field) {
-    case HOST:
-      if (value == null) {
-        unsetHost();
-      } else {
-        setHost((java.lang.String)value);
-      }
-      break;
-
-    case PORT:
-      if (value == null) {
-        unsetPort();
-      } else {
-        setPort((java.lang.Integer)value);
-      }
-      break;
-
-    case USER_NAME:
-      if (value == null) {
-        unsetUserName();
-      } else {
-        setUserName((java.lang.String)value);
-      }
-      break;
-
-    case NOTE_ID:
-      if (value == null) {
-        unsetNoteId();
-      } else {
-        setNoteId((java.lang.String)value);
-      }
-      break;
-
-    case REPL_NAME:
-      if (value == null) {
-        unsetReplName();
-      } else {
-        setReplName((java.lang.String)value);
-      }
-      break;
-
-    case DEFAULT_INTERPRETER_SETTING:
-      if (value == null) {
-        unsetDefaultInterpreterSetting();
-      } else {
-        setDefaultInterpreterSetting((java.lang.String)value);
-      }
-      break;
-
-    }
-  }
-
-  @org.apache.thrift.annotation.Nullable
-  public java.lang.Object getFieldValue(_Fields field) {
-    switch (field) {
-    case HOST:
-      return getHost();
-
-    case PORT:
-      return getPort();
-
-    case USER_NAME:
-      return getUserName();
-
-    case NOTE_ID:
-      return getNoteId();
-
-    case REPL_NAME:
-      return getReplName();
-
-    case DEFAULT_INTERPRETER_SETTING:
-      return getDefaultInterpreterSetting();
-
-    }
-    throw new java.lang.IllegalStateException();
-  }
-
-  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
-  public boolean isSet(_Fields field) {
-    if (field == null) {
-      throw new java.lang.IllegalArgumentException();
-    }
-
-    switch (field) {
-    case HOST:
-      return isSetHost();
-    case PORT:
-      return isSetPort();
-    case USER_NAME:
-      return isSetUserName();
-    case NOTE_ID:
-      return isSetNoteId();
-    case REPL_NAME:
-      return isSetReplName();
-    case DEFAULT_INTERPRETER_SETTING:
-      return isSetDefaultInterpreterSetting();
-    }
-    throw new java.lang.IllegalStateException();
-  }
-
-  @Override
-  public boolean equals(java.lang.Object that) {
-    if (that == null)
-      return false;
-    if (that instanceof ClusterIntpProcParameters)
-      return this.equals((ClusterIntpProcParameters)that);
-    return false;
-  }
-
-  public boolean equals(ClusterIntpProcParameters that) {
-    if (that == null)
-      return false;
-    if (this == that)
-      return true;
-
-    boolean this_present_host = true && this.isSetHost();
-    boolean that_present_host = true && that.isSetHost();
-    if (this_present_host || that_present_host) {
-      if (!(this_present_host && that_present_host))
-        return false;
-      if (!this.host.equals(that.host))
-        return false;
-    }
-
-    boolean this_present_port = true;
-    boolean that_present_port = true;
-    if (this_present_port || that_present_port) {
-      if (!(this_present_port && that_present_port))
-        return false;
-      if (this.port != that.port)
-        return false;
-    }
-
-    boolean this_present_userName = true && this.isSetUserName();
-    boolean that_present_userName = true && that.isSetUserName();
-    if (this_present_userName || that_present_userName) {
-      if (!(this_present_userName && that_present_userName))
-        return false;
-      if (!this.userName.equals(that.userName))
-        return false;
-    }
-
-    boolean this_present_noteId = true && this.isSetNoteId();
-    boolean that_present_noteId = true && that.isSetNoteId();
-    if (this_present_noteId || that_present_noteId) {
-      if (!(this_present_noteId && that_present_noteId))
-        return false;
-      if (!this.noteId.equals(that.noteId))
-        return false;
-    }
-
-    boolean this_present_replName = true && this.isSetReplName();
-    boolean that_present_replName = true && that.isSetReplName();
-    if (this_present_replName || that_present_replName) {
-      if (!(this_present_replName && that_present_replName))
-        return false;
-      if (!this.replName.equals(that.replName))
-        return false;
-    }
-
-    boolean this_present_defaultInterpreterSetting = true && this.isSetDefaultInterpreterSetting();
-    boolean that_present_defaultInterpreterSetting = true && that.isSetDefaultInterpreterSetting();
-    if (this_present_defaultInterpreterSetting || that_present_defaultInterpreterSetting) {
-      if (!(this_present_defaultInterpreterSetting && that_present_defaultInterpreterSetting))
-        return false;
-      if (!this.defaultInterpreterSetting.equals(that.defaultInterpreterSetting))
-        return false;
-    }
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int hashCode = 1;
-
-    hashCode = hashCode * 8191 + ((isSetHost()) ? 131071 : 524287);
-    if (isSetHost())
-      hashCode = hashCode * 8191 + host.hashCode();
-
-    hashCode = hashCode * 8191 + port;
-
-    hashCode = hashCode * 8191 + ((isSetUserName()) ? 131071 : 524287);
-    if (isSetUserName())
-      hashCode = hashCode * 8191 + userName.hashCode();
-
-    hashCode = hashCode * 8191 + ((isSetNoteId()) ? 131071 : 524287);
-    if (isSetNoteId())
-      hashCode = hashCode * 8191 + noteId.hashCode();
-
-    hashCode = hashCode * 8191 + ((isSetReplName()) ? 131071 : 524287);
-    if (isSetReplName())
-      hashCode = hashCode * 8191 + replName.hashCode();
-
-    hashCode = hashCode * 8191 + ((isSetDefaultInterpreterSetting()) ? 131071 : 524287);
-    if (isSetDefaultInterpreterSetting())
-      hashCode = hashCode * 8191 + defaultInterpreterSetting.hashCode();
-
-    return hashCode;
-  }
-
-  @Override
-  public int compareTo(ClusterIntpProcParameters other) {
-    if (!getClass().equals(other.getClass())) {
-      return getClass().getName().compareTo(other.getClass().getName());
-    }
-
-    int lastComparison = 0;
-
-    lastComparison = java.lang.Boolean.valueOf(isSetHost()).compareTo(other.isSetHost());
-    if (lastComparison != 0) {
-      return lastComparison;
-    }
-    if (isSetHost()) {
-      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, other.host);
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-    }
-    lastComparison = java.lang.Boolean.valueOf(isSetPort()).compareTo(other.isSetPort());
-    if (lastComparison != 0) {
-      return lastComparison;
-    }
-    if (isSetPort()) {
-      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, other.port);
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-    }
-    lastComparison = java.lang.Boolean.valueOf(isSetUserName()).compareTo(other.isSetUserName());
-    if (lastComparison != 0) {
-      return lastComparison;
-    }
-    if (isSetUserName()) {
-      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.userName, other.userName);
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-    }
-    lastComparison = java.lang.Boolean.valueOf(isSetNoteId()).compareTo(other.isSetNoteId());
-    if (lastComparison != 0) {
-      return lastComparison;
-    }
-    if (isSetNoteId()) {
-      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.noteId, other.noteId);
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-    }
-    lastComparison = java.lang.Boolean.valueOf(isSetReplName()).compareTo(other.isSetReplName());
-    if (lastComparison != 0) {
-      return lastComparison;
-    }
-    if (isSetReplName()) {
-      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.replName, other.replName);
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-    }
-    lastComparison = java.lang.Boolean.valueOf(isSetDefaultInterpreterSetting()).compareTo(other.isSetDefaultInterpreterSetting());
-    if (lastComparison != 0) {
-      return lastComparison;
-    }
-    if (isSetDefaultInterpreterSetting()) {
-      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.defaultInterpreterSetting, other.defaultInterpreterSetting);
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-    }
-    return 0;
-  }
-
-  @org.apache.thrift.annotation.Nullable
-  public _Fields fieldForId(int fieldId) {
-    return _Fields.findByThriftId(fieldId);
-  }
-
-  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
-    scheme(iprot).read(iprot, this);
-  }
-
-  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
-    scheme(oprot).write(oprot, this);
-  }
-
-  @Override
-  public java.lang.String toString() {
-    java.lang.StringBuilder sb = new java.lang.StringBuilder("ClusterIntpProcParameters(");
-    boolean first = true;
-
-    sb.append("host:");
-    if (this.host == null) {
-      sb.append("null");
-    } else {
-      sb.append(this.host);
-    }
-    first = false;
-    if (!first) sb.append(", ");
-    sb.append("port:");
-    sb.append(this.port);
-    first = false;
-    if (!first) sb.append(", ");
-    sb.append("userName:");
-    if (this.userName == null) {
-      sb.append("null");
-    } else {
-      sb.append(this.userName);
-    }
-    first = false;
-    if (!first) sb.append(", ");
-    sb.append("noteId:");
-    if (this.noteId == null) {
-      sb.append("null");
-    } else {
-      sb.append(this.noteId);
-    }
-    first = false;
-    if (!first) sb.append(", ");
-    sb.append("replName:");
-    if (this.replName == null) {
-      sb.append("null");
-    } else {
-      sb.append(this.replName);
-    }
-    first = false;
-    if (!first) sb.append(", ");
-    sb.append("defaultInterpreterSetting:");
-    if (this.defaultInterpreterSetting == null) {
-      sb.append("null");
-    } else {
-      sb.append(this.defaultInterpreterSetting);
-    }
-    first = false;
-    sb.append(")");
-    return sb.toString();
-  }
-
-  public void validate() throws org.apache.thrift.TException {
-    // check for required fields
-    // check for sub-struct validity
-  }
-
-  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
-    try {
-      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
-    } catch (org.apache.thrift.TException te) {
-      throw new java.io.IOException(te);
-    }
-  }
-
-  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
-    try {
-      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
-      __isset_bitfield = 0;
-      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
-    } catch (org.apache.thrift.TException te) {
-      throw new java.io.IOException(te);
-    }
-  }
-
-  private static class ClusterIntpProcParametersStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
-    public ClusterIntpProcParametersStandardScheme getScheme() {
-      return new ClusterIntpProcParametersStandardScheme();
-    }
-  }
-
-  private static class ClusterIntpProcParametersStandardScheme extends org.apache.thrift.scheme.StandardScheme<ClusterIntpProcParameters> {
-
-    public void read(org.apache.thrift.protocol.TProtocol iprot, ClusterIntpProcParameters struct) throws org.apache.thrift.TException {
-      org.apache.thrift.protocol.TField schemeField;
-      iprot.readStructBegin();
-      while (true)
-      {
-        schemeField = iprot.readFieldBegin();
-        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
-          break;
-        }
-        switch (schemeField.id) {
-          case 1: // HOST
-            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
-              struct.host = iprot.readString();
-              struct.setHostIsSet(true);
-            } else { 
-              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-            }
-            break;
-          case 2: // PORT
-            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
-              struct.port = iprot.readI32();
-              struct.setPortIsSet(true);
-            } else { 
-              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-            }
-            break;
-          case 3: // USER_NAME
-            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
-              struct.userName = iprot.readString();
-              struct.setUserNameIsSet(true);
-            } else { 
-              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-            }
-            break;
-          case 4: // NOTE_ID
-            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
-              struct.noteId = iprot.readString();
-              struct.setNoteIdIsSet(true);
-            } else { 
-              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-            }
-            break;
-          case 5: // REPL_NAME
-            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
-              struct.replName = iprot.readString();
-              struct.setReplNameIsSet(true);
-            } else { 
-              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-            }
-            break;
-          case 6: // DEFAULT_INTERPRETER_SETTING
-            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
-              struct.defaultInterpreterSetting = iprot.readString();
-              struct.setDefaultInterpreterSettingIsSet(true);
-            } else { 
-              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-            }
-            break;
-          default:
-            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-        }
-        iprot.readFieldEnd();
-      }
-      iprot.readStructEnd();
-
-      // check for required fields of primitive type, which can't be checked in the validate method
-      struct.validate();
-    }
-
-    public void write(org.apache.thrift.protocol.TProtocol oprot, ClusterIntpProcParameters struct) throws org.apache.thrift.TException {
-      struct.validate();
-
-      oprot.writeStructBegin(STRUCT_DESC);
-      if (struct.host != null) {
-        oprot.writeFieldBegin(HOST_FIELD_DESC);
-        oprot.writeString(struct.host);
-        oprot.writeFieldEnd();
-      }
-      oprot.writeFieldBegin(PORT_FIELD_DESC);
-      oprot.writeI32(struct.port);
-      oprot.writeFieldEnd();
-      if (struct.userName != null) {
-        oprot.writeFieldBegin(USER_NAME_FIELD_DESC);
-        oprot.writeString(struct.userName);
-        oprot.writeFieldEnd();
-      }
-      if (struct.noteId != null) {
-        oprot.writeFieldBegin(NOTE_ID_FIELD_DESC);
-        oprot.writeString(struct.noteId);
-        oprot.writeFieldEnd();
-      }
-      if (struct.replName != null) {
-        oprot.writeFieldBegin(REPL_NAME_FIELD_DESC);
-        oprot.writeString(struct.replName);
-        oprot.writeFieldEnd();
-      }
-      if (struct.defaultInterpreterSetting != null) {
-        oprot.writeFieldBegin(DEFAULT_INTERPRETER_SETTING_FIELD_DESC);
-        oprot.writeString(struct.defaultInterpreterSetting);
-        oprot.writeFieldEnd();
-      }
-      oprot.writeFieldStop();
-      oprot.writeStructEnd();
-    }
-
-  }
-
-  private static class ClusterIntpProcParametersTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
-    public ClusterIntpProcParametersTupleScheme getScheme() {
-      return new ClusterIntpProcParametersTupleScheme();
-    }
-  }
-
-  private static class ClusterIntpProcParametersTupleScheme extends org.apache.thrift.scheme.TupleScheme<ClusterIntpProcParameters> {
-
-    @Override
-    public void write(org.apache.thrift.protocol.TProtocol prot, ClusterIntpProcParameters struct) throws org.apache.thrift.TException {
-      org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
-      java.util.BitSet optionals = new java.util.BitSet();
-      if (struct.isSetHost()) {
-        optionals.set(0);
-      }
-      if (struct.isSetPort()) {
-        optionals.set(1);
-      }
-      if (struct.isSetUserName()) {
-        optionals.set(2);
-      }
-      if (struct.isSetNoteId()) {
-        optionals.set(3);
-      }
-      if (struct.isSetReplName()) {
-        optionals.set(4);
-      }
-      if (struct.isSetDefaultInterpreterSetting()) {
-        optionals.set(5);
-      }
-      oprot.writeBitSet(optionals, 6);
-      if (struct.isSetHost()) {
-        oprot.writeString(struct.host);
-      }
-      if (struct.isSetPort()) {
-        oprot.writeI32(struct.port);
-      }
-      if (struct.isSetUserName()) {
-        oprot.writeString(struct.userName);
-      }
-      if (struct.isSetNoteId()) {
-        oprot.writeString(struct.noteId);
-      }
-      if (struct.isSetReplName()) {
-        oprot.writeString(struct.replName);
-      }
-      if (struct.isSetDefaultInterpreterSetting()) {
-        oprot.writeString(struct.defaultInterpreterSetting);
-      }
-    }
-
-    @Override
-    public void read(org.apache.thrift.protocol.TProtocol prot, ClusterIntpProcParameters struct) throws org.apache.thrift.TException {
-      org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
-      java.util.BitSet incoming = iprot.readBitSet(6);
-      if (incoming.get(0)) {
-        struct.host = iprot.readString();
-        struct.setHostIsSet(true);
-      }
-      if (incoming.get(1)) {
-        struct.port = iprot.readI32();
-        struct.setPortIsSet(true);
-      }
-      if (incoming.get(2)) {
-        struct.userName = iprot.readString();
-        struct.setUserNameIsSet(true);
-      }
-      if (incoming.get(3)) {
-        struct.noteId = iprot.readString();
-        struct.setNoteIdIsSet(true);
-      }
-      if (incoming.get(4)) {
-        struct.replName = iprot.readString();
-        struct.setReplNameIsSet(true);
-      }
-      if (incoming.get(5)) {
-        struct.defaultInterpreterSetting = iprot.readString();
-        struct.setDefaultInterpreterSettingIsSet(true);
-      }
-    }
-  }
-
-  private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
-    return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
-  }
-}
-
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterManagerService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterManagerService.java
deleted file mode 100644
index 5c46b0e..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterManagerService.java
+++ /dev/null
@@ -1,995 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Autogenerated by Thrift Compiler (0.12.0)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- *  @generated
- */
-package org.apache.zeppelin.interpreter.thrift;
-
-@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.12.0)", date = "2019-06-10")
-public class ClusterManagerService {
-
-  public interface Iface {
-
-    public boolean createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters) throws org.apache.thrift.TException;
-
-  }
-
-  public interface AsyncIface {
-
-    public void createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) throws org.apache.thrift.TException;
-
-  }
-
-  public static class Client extends org.apache.thrift.TServiceClient implements Iface {
-    public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
-      public Factory() {}
-      public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
-        return new Client(prot);
-      }
-      public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
-        return new Client(iprot, oprot);
-      }
-    }
-
-    public Client(org.apache.thrift.protocol.TProtocol prot)
-    {
-      super(prot, prot);
-    }
-
-    public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
-      super(iprot, oprot);
-    }
-
-    public boolean createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters) throws org.apache.thrift.TException
-    {
-      send_createClusterInterpreterProcess(intpProcParameters);
-      return recv_createClusterInterpreterProcess();
-    }
-
-    public void send_createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters) throws org.apache.thrift.TException
-    {
-      createClusterInterpreterProcess_args args = new createClusterInterpreterProcess_args();
-      args.setIntpProcParameters(intpProcParameters);
-      sendBase("createClusterInterpreterProcess", args);
-    }
-
-    public boolean recv_createClusterInterpreterProcess() throws org.apache.thrift.TException
-    {
-      createClusterInterpreterProcess_result result = new createClusterInterpreterProcess_result();
-      receiveBase(result, "createClusterInterpreterProcess");
-      if (result.isSetSuccess()) {
-        return result.success;
-      }
-      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "createClusterInterpreterProcess failed: unknown result");
-    }
-
-  }
-  public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
-    public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
-      private org.apache.thrift.async.TAsyncClientManager clientManager;
-      private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
-      public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
-        this.clientManager = clientManager;
-        this.protocolFactory = protocolFactory;
-      }
-      public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
-        return new AsyncClient(protocolFactory, clientManager, transport);
-      }
-    }
-
-    public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) {
-      super(protocolFactory, clientManager, transport);
-    }
-
-    public void createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) throws org.apache.thrift.TException {
-      checkReady();
-      createClusterInterpreterProcess_call method_call = new createClusterInterpreterProcess_call(intpProcParameters, resultHandler, this, ___protocolFactory, ___transport);
-      this.___currentMethod = method_call;
-      ___manager.call(method_call);
-    }
-
-    public static class createClusterInterpreterProcess_call extends org.apache.thrift.async.TAsyncMethodCall<java.lang.Boolean> {
-      private ClusterIntpProcParameters intpProcParameters;
-      public createClusterInterpreterProcess_call(ClusterIntpProcParameters intpProcParameters, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
-        super(client, protocolFactory, transport, resultHandler, false);
-        this.intpProcParameters = intpProcParameters;
-      }
-
-      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
-        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("createClusterInterpreterProcess", org.apache.thrift.protocol.TMessageType.CALL, 0));
-        createClusterInterpreterProcess_args args = new createClusterInterpreterProcess_args();
-        args.setIntpProcParameters(intpProcParameters);
-        args.write(prot);
-        prot.writeMessageEnd();
-      }
-
-      public java.lang.Boolean getResult() throws org.apache.thrift.TException {
-        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
-          throw new java.lang.IllegalStateException("Method call not finished!");
-        }
-        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
-        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
-        return (new Client(prot)).recv_createClusterInterpreterProcess();
-      }
-    }
-
-  }
-
-  public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
-    private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(Processor.class.getName());
-    public Processor(I iface) {
-      super(iface, getProcessMap(new java.util.HashMap<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
-    }
-
-    protected Processor(I iface, java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
-      super(iface, getProcessMap(processMap));
-    }
-
-    private static <I extends Iface> java.util.Map<java.lang.String,  org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
-      processMap.put("createClusterInterpreterProcess", new createClusterInterpreterProcess());
-      return processMap;
-    }
-
-    public static class createClusterInterpreterProcess<I extends Iface> extends org.apache.thrift.ProcessFunction<I, createClusterInterpreterProcess_args> {
-      public createClusterInterpreterProcess() {
-        super("createClusterInterpreterProcess");
-      }
-
-      public createClusterInterpreterProcess_args getEmptyArgsInstance() {
-        return new createClusterInterpreterProcess_args();
-      }
-
-      protected boolean isOneway() {
-        return false;
-      }
-
-      @Override
-      protected boolean rethrowUnhandledExceptions() {
-        return false;
-      }
-
-      public createClusterInterpreterProcess_result getResult(I iface, createClusterInterpreterProcess_args args) throws org.apache.thrift.TException {
-        createClusterInterpreterProcess_result result = new createClusterInterpreterProcess_result();
-        result.success = iface.createClusterInterpreterProcess(args.intpProcParameters);
-        result.setSuccessIsSet(true);
-        return result;
-      }
-    }
-
-  }
-
-  public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> {
-    private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(AsyncProcessor.class.getName());
-    public AsyncProcessor(I iface) {
-      super(iface, getProcessMap(new java.util.HashMap<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>>()));
-    }
-
-    protected AsyncProcessor(I iface, java.util.Map<java.lang.String,  org.apache.thrift.AsyncProcessFunction<I, ? extends  org.apache.thrift.TBase, ?>> processMap) {
-      super(iface, getProcessMap(processMap));
-    }
-
-    private static <I extends AsyncIface> java.util.Map<java.lang.String,  org.apache.thrift.AsyncProcessFunction<I, ? extends  org.apache.thrift.TBase,?>> getProcessMap(java.util.Map<java.lang.String,  org.apache.thrift.AsyncProcessFunction<I, ? extends  org.apache.thrift.TBase, ?>> processMap) {
-      processMap.put("createClusterInterpreterProcess", new createClusterInterpreterProcess());
-      return processMap;
-    }
-
-    public static class createClusterInterpreterProcess<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, createClusterInterpreterProcess_args, java.lang.Boolean> {
-      public createClusterInterpreterProcess() {
-        super("createClusterInterpreterProcess");
-      }
-
-      public createClusterInterpreterProcess_args getEmptyArgsInstance() {
-        return new createClusterInterpreterProcess_args();
-      }
-
-      public org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) {
-        final org.apache.thrift.AsyncProcessFunction fcall = this;
-        return new org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean>() { 
-          public void onComplete(java.lang.Boolean o) {
-            createClusterInterpreterProcess_result result = new createClusterInterpreterProcess_result();
-            result.success = o;
-            result.setSuccessIsSet(true);
-            try {
-              fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
-            } catch (org.apache.thrift.transport.TTransportException e) {
-              _LOGGER.error("TTransportException writing to internal frame buffer", e);
-              fb.close();
-            } catch (java.lang.Exception e) {
-              _LOGGER.error("Exception writing to internal frame buffer", e);
-              onError(e);
-            }
-          }
-          public void onError(java.lang.Exception e) {
-            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
-            org.apache.thrift.TSerializable msg;
-            createClusterInterpreterProcess_result result = new createClusterInterpreterProcess_result();
-            if (e instanceof org.apache.thrift.transport.TTransportException) {
-              _LOGGER.error("TTransportException inside handler", e);
-              fb.close();
-              return;
-            } else if (e instanceof org.apache.thrift.TApplicationException) {
-              _LOGGER.error("TApplicationException inside handler", e);
-              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
-              msg = (org.apache.thrift.TApplicationException)e;
-            } else {
-              _LOGGER.error("Exception inside handler", e);
-              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
-              msg = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
-            }
-            try {
-              fcall.sendResponse(fb,msg,msgType,seqid);
-            } catch (java.lang.Exception ex) {
-              _LOGGER.error("Exception writing to internal frame buffer", ex);
-              fb.close();
-            }
-          }
-        };
-      }
-
-      protected boolean isOneway() {
-        return false;
-      }
-
-      public void start(I iface, createClusterInterpreterProcess_args args, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) throws org.apache.thrift.TException {
-        iface.createClusterInterpreterProcess(args.intpProcParameters,resultHandler);
-      }
-    }
-
-  }
-
-  public static class createClusterInterpreterProcess_args implements org.apache.thrift.TBase<createClusterInterpreterProcess_args, createClusterInterpreterProcess_args._Fields>, java.io.Serializable, Cloneable, Comparable<createClusterInterpreterProcess_args>   {
-    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("createClusterInterpreterProcess_args");
-
-    private static final org.apache.thrift.protocol.TField INTP_PROC_PARAMETERS_FIELD_DESC = new org.apache.thrift.protocol.TField("intpProcParameters", org.apache.thrift.protocol.TType.STRUCT, (short)1);
-
-    private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new createClusterInterpreterProcess_argsStandardSchemeFactory();
-    private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new createClusterInterpreterProcess_argsTupleSchemeFactory();
-
-    public @org.apache.thrift.annotation.Nullable ClusterIntpProcParameters intpProcParameters; // required
-
-    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
-    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-      INTP_PROC_PARAMETERS((short)1, "intpProcParameters");
-
-      private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
-
-      static {
-        for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
-          byName.put(field.getFieldName(), field);
-        }
-      }
-
-      /**
-       * Find the _Fields constant that matches fieldId, or null if its not found.
-       */
-      @org.apache.thrift.annotation.Nullable
-      public static _Fields findByThriftId(int fieldId) {
-        switch(fieldId) {
-          case 1: // INTP_PROC_PARAMETERS
-            return INTP_PROC_PARAMETERS;
-          default:
-            return null;
-        }
-      }
-
-      /**
-       * Find the _Fields constant that matches fieldId, throwing an exception
-       * if it is not found.
-       */
-      public static _Fields findByThriftIdOrThrow(int fieldId) {
-        _Fields fields = findByThriftId(fieldId);
-        if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
-        return fields;
-      }
-
-      /**
-       * Find the _Fields constant that matches name, or null if its not found.
-       */
-      @org.apache.thrift.annotation.Nullable
-      public static _Fields findByName(java.lang.String name) {
-        return byName.get(name);
-      }
-
-      private final short _thriftId;
-      private final java.lang.String _fieldName;
-
-      _Fields(short thriftId, java.lang.String fieldName) {
-        _thriftId = thriftId;
-        _fieldName = fieldName;
-      }
-
-      public short getThriftFieldId() {
-        return _thriftId;
-      }
-
-      public java.lang.String getFieldName() {
-        return _fieldName;
-      }
-    }
-
-    // isset id assignments
-    public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
-    static {
-      java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
-      tmpMap.put(_Fields.INTP_PROC_PARAMETERS, new org.apache.thrift.meta_data.FieldMetaData("intpProcParameters", org.apache.thrift.TFieldRequirementType.DEFAULT, 
-          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ClusterIntpProcParameters.class)));
-      metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
-      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(createClusterInterpreterProcess_args.class, metaDataMap);
-    }
-
-    public createClusterInterpreterProcess_args() {
-    }
-
-    public createClusterInterpreterProcess_args(
-      ClusterIntpProcParameters intpProcParameters)
-    {
-      this();
-      this.intpProcParameters = intpProcParameters;
-    }
-
-    /**
-     * Performs a deep copy on <i>other</i>.
-     */
-    public createClusterInterpreterProcess_args(createClusterInterpreterProcess_args other) {
-      if (other.isSetIntpProcParameters()) {
-        this.intpProcParameters = new ClusterIntpProcParameters(other.intpProcParameters);
-      }
-    }
-
-    public createClusterInterpreterProcess_args deepCopy() {
-      return new createClusterInterpreterProcess_args(this);
-    }
-
-    @Override
-    public void clear() {
-      this.intpProcParameters = null;
-    }
-
-    @org.apache.thrift.annotation.Nullable
-    public ClusterIntpProcParameters getIntpProcParameters() {
-      return this.intpProcParameters;
-    }
-
-    public createClusterInterpreterProcess_args setIntpProcParameters(@org.apache.thrift.annotation.Nullable ClusterIntpProcParameters intpProcParameters) {
-      this.intpProcParameters = intpProcParameters;
-      return this;
-    }
-
-    public void unsetIntpProcParameters() {
-      this.intpProcParameters = null;
-    }
-
-    /** Returns true if field intpProcParameters is set (has been assigned a value) and false otherwise */
-    public boolean isSetIntpProcParameters() {
-      return this.intpProcParameters != null;
-    }
-
-    public void setIntpProcParametersIsSet(boolean value) {
-      if (!value) {
-        this.intpProcParameters = null;
-      }
-    }
-
-    public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
-      switch (field) {
-      case INTP_PROC_PARAMETERS:
-        if (value == null) {
-          unsetIntpProcParameters();
-        } else {
-          setIntpProcParameters((ClusterIntpProcParameters)value);
-        }
-        break;
-
-      }
-    }
-
-    @org.apache.thrift.annotation.Nullable
-    public java.lang.Object getFieldValue(_Fields field) {
-      switch (field) {
-      case INTP_PROC_PARAMETERS:
-        return getIntpProcParameters();
-
-      }
-      throw new java.lang.IllegalStateException();
-    }
-
-    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
-    public boolean isSet(_Fields field) {
-      if (field == null) {
-        throw new java.lang.IllegalArgumentException();
-      }
-
-      switch (field) {
-      case INTP_PROC_PARAMETERS:
-        return isSetIntpProcParameters();
-      }
-      throw new java.lang.IllegalStateException();
-    }
-
-    @Override
-    public boolean equals(java.lang.Object that) {
-      if (that == null)
-        return false;
-      if (that instanceof createClusterInterpreterProcess_args)
-        return this.equals((createClusterInterpreterProcess_args)that);
-      return false;
-    }
-
-    public boolean equals(createClusterInterpreterProcess_args that) {
-      if (that == null)
-        return false;
-      if (this == that)
-        return true;
-
-      boolean this_present_intpProcParameters = true && this.isSetIntpProcParameters();
-      boolean that_present_intpProcParameters = true && that.isSetIntpProcParameters();
-      if (this_present_intpProcParameters || that_present_intpProcParameters) {
-        if (!(this_present_intpProcParameters && that_present_intpProcParameters))
-          return false;
-        if (!this.intpProcParameters.equals(that.intpProcParameters))
-          return false;
-      }
-
-      return true;
-    }
-
-    @Override
-    public int hashCode() {
-      int hashCode = 1;
-
-      hashCode = hashCode * 8191 + ((isSetIntpProcParameters()) ? 131071 : 524287);
-      if (isSetIntpProcParameters())
-        hashCode = hashCode * 8191 + intpProcParameters.hashCode();
-
-      return hashCode;
-    }
-
-    @Override
-    public int compareTo(createClusterInterpreterProcess_args other) {
-      if (!getClass().equals(other.getClass())) {
-        return getClass().getName().compareTo(other.getClass().getName());
-      }
-
-      int lastComparison = 0;
-
-      lastComparison = java.lang.Boolean.valueOf(isSetIntpProcParameters()).compareTo(other.isSetIntpProcParameters());
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-      if (isSetIntpProcParameters()) {
-        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.intpProcParameters, other.intpProcParameters);
-        if (lastComparison != 0) {
-          return lastComparison;
-        }
-      }
-      return 0;
-    }
-
-    @org.apache.thrift.annotation.Nullable
-    public _Fields fieldForId(int fieldId) {
-      return _Fields.findByThriftId(fieldId);
-    }
-
-    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
-      scheme(iprot).read(iprot, this);
-    }
-
-    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
-      scheme(oprot).write(oprot, this);
-    }
-
-    @Override
-    public java.lang.String toString() {
-      java.lang.StringBuilder sb = new java.lang.StringBuilder("createClusterInterpreterProcess_args(");
-      boolean first = true;
-
-      sb.append("intpProcParameters:");
-      if (this.intpProcParameters == null) {
-        sb.append("null");
-      } else {
-        sb.append(this.intpProcParameters);
-      }
-      first = false;
-      sb.append(")");
-      return sb.toString();
-    }
-
-    public void validate() throws org.apache.thrift.TException {
-      // check for required fields
-      // check for sub-struct validity
-      if (intpProcParameters != null) {
-        intpProcParameters.validate();
-      }
-    }
-
-    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
-      try {
-        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
-      } catch (org.apache.thrift.TException te) {
-        throw new java.io.IOException(te);
-      }
-    }
-
-    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
-      try {
-        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
-      } catch (org.apache.thrift.TException te) {
-        throw new java.io.IOException(te);
-      }
-    }
-
-    private static class createClusterInterpreterProcess_argsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
-      public createClusterInterpreterProcess_argsStandardScheme getScheme() {
-        return new createClusterInterpreterProcess_argsStandardScheme();
-      }
-    }
-
-    private static class createClusterInterpreterProcess_argsStandardScheme extends org.apache.thrift.scheme.StandardScheme<createClusterInterpreterProcess_args> {
-
-      public void read(org.apache.thrift.protocol.TProtocol iprot, createClusterInterpreterProcess_args struct) throws org.apache.thrift.TException {
-        org.apache.thrift.protocol.TField schemeField;
-        iprot.readStructBegin();
-        while (true)
-        {
-          schemeField = iprot.readFieldBegin();
-          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
-            break;
-          }
-          switch (schemeField.id) {
-            case 1: // INTP_PROC_PARAMETERS
-              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
-                struct.intpProcParameters = new ClusterIntpProcParameters();
-                struct.intpProcParameters.read(iprot);
-                struct.setIntpProcParametersIsSet(true);
-              } else { 
-                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-              }
-              break;
-            default:
-              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-          }
-          iprot.readFieldEnd();
-        }
-        iprot.readStructEnd();
-
-        // check for required fields of primitive type, which can't be checked in the validate method
-        struct.validate();
-      }
-
-      public void write(org.apache.thrift.protocol.TProtocol oprot, createClusterInterpreterProcess_args struct) throws org.apache.thrift.TException {
-        struct.validate();
-
-        oprot.writeStructBegin(STRUCT_DESC);
-        if (struct.intpProcParameters != null) {
-          oprot.writeFieldBegin(INTP_PROC_PARAMETERS_FIELD_DESC);
-          struct.intpProcParameters.write(oprot);
-          oprot.writeFieldEnd();
-        }
-        oprot.writeFieldStop();
-        oprot.writeStructEnd();
-      }
-
-    }
-
-    private static class createClusterInterpreterProcess_argsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
-      public createClusterInterpreterProcess_argsTupleScheme getScheme() {
-        return new createClusterInterpreterProcess_argsTupleScheme();
-      }
-    }
-
-    private static class createClusterInterpreterProcess_argsTupleScheme extends org.apache.thrift.scheme.TupleScheme<createClusterInterpreterProcess_args> {
-
-      @Override
-      public void write(org.apache.thrift.protocol.TProtocol prot, createClusterInterpreterProcess_args struct) throws org.apache.thrift.TException {
-        org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
-        java.util.BitSet optionals = new java.util.BitSet();
-        if (struct.isSetIntpProcParameters()) {
-          optionals.set(0);
-        }
-        oprot.writeBitSet(optionals, 1);
-        if (struct.isSetIntpProcParameters()) {
-          struct.intpProcParameters.write(oprot);
-        }
-      }
-
-      @Override
-      public void read(org.apache.thrift.protocol.TProtocol prot, createClusterInterpreterProcess_args struct) throws org.apache.thrift.TException {
-        org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
-        java.util.BitSet incoming = iprot.readBitSet(1);
-        if (incoming.get(0)) {
-          struct.intpProcParameters = new ClusterIntpProcParameters();
-          struct.intpProcParameters.read(iprot);
-          struct.setIntpProcParametersIsSet(true);
-        }
-      }
-    }
-
-    private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
-      return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
-    }
-  }
-
-  public static class createClusterInterpreterProcess_result implements org.apache.thrift.TBase<createClusterInterpreterProcess_result, createClusterInterpreterProcess_result._Fields>, java.io.Serializable, Cloneable, Comparable<createClusterInterpreterProcess_result>   {
-    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("createClusterInterpreterProcess_result");
-
-    private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.BOOL, (short)0);
-
-    private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new createClusterInterpreterProcess_resultStandardSchemeFactory();
-    private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new createClusterInterpreterProcess_resultTupleSchemeFactory();
-
-    public boolean success; // required
-
-    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
-    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-      SUCCESS((short)0, "success");
-
-      private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
-
-      static {
-        for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
-          byName.put(field.getFieldName(), field);
-        }
-      }
-
-      /**
-       * Find the _Fields constant that matches fieldId, or null if its not found.
-       */
-      @org.apache.thrift.annotation.Nullable
-      public static _Fields findByThriftId(int fieldId) {
-        switch(fieldId) {
-          case 0: // SUCCESS
-            return SUCCESS;
-          default:
-            return null;
-        }
-      }
-
-      /**
-       * Find the _Fields constant that matches fieldId, throwing an exception
-       * if it is not found.
-       */
-      public static _Fields findByThriftIdOrThrow(int fieldId) {
-        _Fields fields = findByThriftId(fieldId);
-        if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
-        return fields;
-      }
-
-      /**
-       * Find the _Fields constant that matches name, or null if its not found.
-       */
-      @org.apache.thrift.annotation.Nullable
-      public static _Fields findByName(java.lang.String name) {
-        return byName.get(name);
-      }
-
-      private final short _thriftId;
-      private final java.lang.String _fieldName;
-
-      _Fields(short thriftId, java.lang.String fieldName) {
-        _thriftId = thriftId;
-        _fieldName = fieldName;
-      }
-
-      public short getThriftFieldId() {
-        return _thriftId;
-      }
-
-      public java.lang.String getFieldName() {
-        return _fieldName;
-      }
-    }
-
-    // isset id assignments
-    private static final int __SUCCESS_ISSET_ID = 0;
-    private byte __isset_bitfield = 0;
-    public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
-    static {
-      java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
-      tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, 
-          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
-      metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
-      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(createClusterInterpreterProcess_result.class, metaDataMap);
-    }
-
-    public createClusterInterpreterProcess_result() {
-    }
-
-    public createClusterInterpreterProcess_result(
-      boolean success)
-    {
-      this();
-      this.success = success;
-      setSuccessIsSet(true);
-    }
-
-    /**
-     * Performs a deep copy on <i>other</i>.
-     */
-    public createClusterInterpreterProcess_result(createClusterInterpreterProcess_result other) {
-      __isset_bitfield = other.__isset_bitfield;
-      this.success = other.success;
-    }
-
-    public createClusterInterpreterProcess_result deepCopy() {
-      return new createClusterInterpreterProcess_result(this);
-    }
-
-    @Override
-    public void clear() {
-      setSuccessIsSet(false);
-      this.success = false;
-    }
-
-    public boolean isSuccess() {
-      return this.success;
-    }
-
-    public createClusterInterpreterProcess_result setSuccess(boolean success) {
-      this.success = success;
-      setSuccessIsSet(true);
-      return this;
-    }
-
-    public void unsetSuccess() {
-      __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __SUCCESS_ISSET_ID);
-    }
-
-    /** Returns true if field success is set (has been assigned a value) and false otherwise */
-    public boolean isSetSuccess() {
-      return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __SUCCESS_ISSET_ID);
-    }
-
-    public void setSuccessIsSet(boolean value) {
-      __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __SUCCESS_ISSET_ID, value);
-    }
-
-    public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
-      switch (field) {
-      case SUCCESS:
-        if (value == null) {
-          unsetSuccess();
-        } else {
-          setSuccess((java.lang.Boolean)value);
-        }
-        break;
-
-      }
-    }
-
-    @org.apache.thrift.annotation.Nullable
-    public java.lang.Object getFieldValue(_Fields field) {
-      switch (field) {
-      case SUCCESS:
-        return isSuccess();
-
-      }
-      throw new java.lang.IllegalStateException();
-    }
-
-    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
-    public boolean isSet(_Fields field) {
-      if (field == null) {
-        throw new java.lang.IllegalArgumentException();
-      }
-
-      switch (field) {
-      case SUCCESS:
-        return isSetSuccess();
-      }
-      throw new java.lang.IllegalStateException();
-    }
-
-    @Override
-    public boolean equals(java.lang.Object that) {
-      if (that == null)
-        return false;
-      if (that instanceof createClusterInterpreterProcess_result)
-        return this.equals((createClusterInterpreterProcess_result)that);
-      return false;
-    }
-
-    public boolean equals(createClusterInterpreterProcess_result that) {
-      if (that == null)
-        return false;
-      if (this == that)
-        return true;
-
-      boolean this_present_success = true;
-      boolean that_present_success = true;
-      if (this_present_success || that_present_success) {
-        if (!(this_present_success && that_present_success))
-          return false;
-        if (this.success != that.success)
-          return false;
-      }
-
-      return true;
-    }
-
-    @Override
-    public int hashCode() {
-      int hashCode = 1;
-
-      hashCode = hashCode * 8191 + ((success) ? 131071 : 524287);
-
-      return hashCode;
-    }
-
-    @Override
-    public int compareTo(createClusterInterpreterProcess_result other) {
-      if (!getClass().equals(other.getClass())) {
-        return getClass().getName().compareTo(other.getClass().getName());
-      }
-
-      int lastComparison = 0;
-
-      lastComparison = java.lang.Boolean.valueOf(isSetSuccess()).compareTo(other.isSetSuccess());
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-      if (isSetSuccess()) {
-        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, other.success);
-        if (lastComparison != 0) {
-          return lastComparison;
-        }
-      }
-      return 0;
-    }
-
-    @org.apache.thrift.annotation.Nullable
-    public _Fields fieldForId(int fieldId) {
-      return _Fields.findByThriftId(fieldId);
-    }
-
-    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
-      scheme(iprot).read(iprot, this);
-    }
-
-    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
-      scheme(oprot).write(oprot, this);
-      }
-
-    @Override
-    public java.lang.String toString() {
-      java.lang.StringBuilder sb = new java.lang.StringBuilder("createClusterInterpreterProcess_result(");
-      boolean first = true;
-
-      sb.append("success:");
-      sb.append(this.success);
-      first = false;
-      sb.append(")");
-      return sb.toString();
-    }
-
-    public void validate() throws org.apache.thrift.TException {
-      // check for required fields
-      // check for sub-struct validity
-    }
-
-    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
-      try {
-        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
-      } catch (org.apache.thrift.TException te) {
-        throw new java.io.IOException(te);
-      }
-    }
-
-    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
-      try {
-        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
-        __isset_bitfield = 0;
-        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
-      } catch (org.apache.thrift.TException te) {
-        throw new java.io.IOException(te);
-      }
-    }
-
-    private static class createClusterInterpreterProcess_resultStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
-      public createClusterInterpreterProcess_resultStandardScheme getScheme() {
-        return new createClusterInterpreterProcess_resultStandardScheme();
-      }
-    }
-
-    private static class createClusterInterpreterProcess_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme<createClusterInterpreterProcess_result> {
-
-      public void read(org.apache.thrift.protocol.TProtocol iprot, createClusterInterpreterProcess_result struct) throws org.apache.thrift.TException {
-        org.apache.thrift.protocol.TField schemeField;
-        iprot.readStructBegin();
-        while (true)
-        {
-          schemeField = iprot.readFieldBegin();
-          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
-            break;
-          }
-          switch (schemeField.id) {
-            case 0: // SUCCESS
-              if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
-                struct.success = iprot.readBool();
-                struct.setSuccessIsSet(true);
-              } else { 
-                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-              }
-              break;
-            default:
-              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-          }
-          iprot.readFieldEnd();
-        }
-        iprot.readStructEnd();
-
-        // check for required fields of primitive type, which can't be checked in the validate method
-        struct.validate();
-      }
-
-      public void write(org.apache.thrift.protocol.TProtocol oprot, createClusterInterpreterProcess_result struct) throws org.apache.thrift.TException {
-        struct.validate();
-
-        oprot.writeStructBegin(STRUCT_DESC);
-        if (struct.isSetSuccess()) {
-          oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
-          oprot.writeBool(struct.success);
-          oprot.writeFieldEnd();
-        }
-        oprot.writeFieldStop();
-        oprot.writeStructEnd();
-      }
-
-    }
-
-    private static class createClusterInterpreterProcess_resultTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
-      public createClusterInterpreterProcess_resultTupleScheme getScheme() {
-        return new createClusterInterpreterProcess_resultTupleScheme();
-      }
-    }
-
-    private static class createClusterInterpreterProcess_resultTupleScheme extends org.apache.thrift.scheme.TupleScheme<createClusterInterpreterProcess_result> {
-
-      @Override
-      public void write(org.apache.thrift.protocol.TProtocol prot, createClusterInterpreterProcess_result struct) throws org.apache.thrift.TException {
-        org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
-        java.util.BitSet optionals = new java.util.BitSet();
-        if (struct.isSetSuccess()) {
-          optionals.set(0);
-        }
-        oprot.writeBitSet(optionals, 1);
-        if (struct.isSetSuccess()) {
-          oprot.writeBool(struct.success);
-        }
-      }
-
-      @Override
-      public void read(org.apache.thrift.protocol.TProtocol prot, createClusterInterpreterProcess_result struct) throws org.apache.thrift.TException {
-        org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
-        java.util.BitSet incoming = iprot.readBitSet(1);
-        if (incoming.get(0)) {
-          struct.success = iprot.readBool();
-          struct.setSuccessIsSet(true);
-        }
-      }
-    }
-
-    private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
-      return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
-    }
-  }
-
-}
diff --git a/zeppelin-interpreter/src/main/thrift/ClusterManagerService.thrift b/zeppelin-interpreter/src/main/thrift/ClusterManagerService.thrift
deleted file mode 100644
index c6f208e..0000000
--- a/zeppelin-interpreter/src/main/thrift/ClusterManagerService.thrift
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace java org.apache.zeppelin.interpreter.thrift
-
-struct ClusterIntpProcParameters {
-  1: string host,
-  2: i32 port,
-  3: string userName,
-  4: string noteId,
-  5: string replName,
-  6: string defaultInterpreterSetting
-}
-
-service ClusterManagerService {
-  bool createClusterInterpreterProcess(1: ClusterIntpProcParameters intpProcParameters);
-}
diff --git a/zeppelin-interpreter/src/main/thrift/genthrift.sh b/zeppelin-interpreter/src/main/thrift/genthrift.sh
index 31efeae..23a295a 100755
--- a/zeppelin-interpreter/src/main/thrift/genthrift.sh
+++ b/zeppelin-interpreter/src/main/thrift/genthrift.sh
@@ -21,7 +21,6 @@ rm -rf gen-java
 rm -rf ../java/org/apache/zeppelin/interpreter/thrift
 thrift --gen java RemoteInterpreterService.thrift
 thrift --gen java RemoteInterpreterEventService.thrift
-thrift --gen java ClusterManagerService.thrift
 for file in gen-java/org/apache/zeppelin/interpreter/thrift/* ; do
   cat java_license_header.txt ${file} > ${file}.tmp
   mv -f ${file}.tmp ${file}
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java
new file mode 100644
index 0000000..e2ce781
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ClusterMultiNodeTest {
+  private static Logger LOGGER = LoggerFactory.getLogger(ClusterMultiNodeTest.class);
+
+  private static List<ClusterManagerServer> clusterServers = new ArrayList<>();
+  private static ClusterManagerClient clusterClient = null;
+
+  static final String metaKey = "ClusterMultiNodeTestKey";
+
+  @BeforeClass
+  public static void startCluster() throws IOException, InterruptedException {
+    LOGGER.info("startCluster >>>");
+
+    String clusterAddrList = "";
+    String zServerHost = RemoteInterpreterUtils.findAvailableHostAddress();
+    for (int i = 0; i < 3; i ++) {
+      // Set the cluster IP and port
+      int zServerPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+      clusterAddrList += zServerHost + ":" + zServerPort;
+      if (i != 2) {
+        clusterAddrList += ",";
+      }
+    }
+    ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
+    zconf.setClusterAddress(clusterAddrList);
+
+    // mock cluster manager server
+    String cluster[] = clusterAddrList.split(",");
+    try {
+      for (int i = 0; i < 3; i ++) {
+        String[] parts = cluster[i].split(":");
+        String clusterHost = parts[0];
+        int clusterPort = Integer.valueOf(parts[1]);
+
+        Class clazz = ClusterManagerServer.class;
+        Constructor constructor = clazz.getDeclaredConstructor();
+        constructor.setAccessible(true);
+        ClusterManagerServer clusterServer = (ClusterManagerServer) constructor.newInstance();
+        clusterServer.initTestCluster(clusterAddrList, clusterHost, clusterPort);
+
+        clusterServers.add(clusterServer);
+      }
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage(), e);
+    }
+
+    for (ClusterManagerServer clusterServer : clusterServers) {
+      clusterServer.start();
+    }
+
+    // mock cluster manager client
+    clusterClient = ClusterManagerClient.getInstance();
+    clusterClient.start(metaKey);
+
+    // Waiting for cluster startup
+    int wait = 0;
+    while(wait++ < 100) {
+      if (clusterIsStartup() && clusterClient.raftInitialized()) {
+        LOGGER.info("wait {}(ms) found cluster leader", wait*3000);
+        break;
+      }
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        LOGGER.error(e.getMessage(), e);
+      }
+    }
+
+    Thread.sleep(3000);
+    assertEquals(true, clusterIsStartup());
+    LOGGER.info("startCluster <<<");
+
+    getClusterServerMeta();
+  }
+
+  @AfterClass
+  public static void stopCluster() {
+    LOGGER.info("stopCluster >>>");
+    if (null != clusterClient) {
+      clusterClient.shutdown();
+    }
+    for (ClusterManagerServer clusterServer : clusterServers) {
+      clusterServer.shutdown();
+    }
+    LOGGER.info("stopCluster <<<");
+  }
+
+  static boolean clusterIsStartup() {
+    boolean foundLeader = false;
+    for (ClusterManagerServer clusterServer : clusterServers) {
+      if (!clusterServer.raftInitialized()) {
+        LOGGER.warn("clusterServer not Initialized!");
+        return false;
+      }
+      if (clusterServer.isClusterLeader()) {
+        foundLeader = true;
+      }
+    }
+
+    if (!foundLeader) {
+      LOGGER.warn("Can not found leader!");
+      return false;
+    }
+
+    return true;
+  }
+
+  public static void getClusterServerMeta() {
+    LOGGER.info("getClusterServerMeta >>>");
+    // Get metadata for all services
+    Object srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
+    LOGGER.info(srvMeta.toString());
+
+    Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
+    LOGGER.info(intpMeta.toString());
+
+    assertNotNull(srvMeta);
+    assertEquals(true, (srvMeta instanceof HashMap));
+    HashMap hashMap = (HashMap) srvMeta;
+
+    assertEquals(hashMap.size(), 3);
+    LOGGER.info("getClusterServerMeta <<<");
+  }
+}
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java
similarity index 69%
copy from zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java
copy to zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java
index ef4d7fd..59853d4 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java
@@ -1,4 +1,3 @@
-package org.apache.zeppelin.cluster;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -15,11 +14,13 @@ package org.apache.zeppelin.cluster;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.zeppelin.cluster;
 
 import org.apache.zeppelin.cluster.meta.ClusterMeta;
 import org.apache.zeppelin.cluster.meta.ClusterMetaType;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -31,21 +32,20 @@ import java.util.HashMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
-public class ClusterManagerTest {
-  private static Logger LOGGER = LoggerFactory.getLogger(ClusterManagerTest.class);
-
-  private static ClusterManagerServer clusterManagerServer = null;
-  private static ClusterManagerClient clusterManagerClient = null;
+public class ClusterSingleNodeTest {
+  private static Logger LOGGER = LoggerFactory.getLogger(ClusterSingleNodeTest.class);
+  private static ZeppelinConfiguration zconf;
 
-  private static ZeppelinConfiguration zconf = null;
+  private static ClusterManagerServer clusterServer = null;
+  private static ClusterManagerClient clusterClient = null;
 
   static String zServerHost;
   static int zServerPort;
-  static final String metaKey = "ClusterManagerTestKey";
+  static final String metaKey = "ClusterSingleNodeTestKey";
 
   @BeforeClass
-  public static void initClusterEnv() throws IOException, InterruptedException {
-    LOGGER.info("initClusterEnv >>>");
+  public static void startCluster() throws IOException, InterruptedException {
+    LOGGER.info("startCluster >>>");
 
     zconf = ZeppelinConfiguration.create();
 
@@ -55,19 +55,19 @@ public class ClusterManagerTest {
     zconf.setClusterAddress(zServerHost + ":" + zServerPort);
 
     // mock cluster manager server
-    clusterManagerServer = ClusterManagerServer.getInstance();
-    clusterManagerServer.start(null);
+    clusterServer = ClusterManagerServer.getInstance();
+    clusterServer.start();
 
     // mock cluster manager client
-    clusterManagerClient = ClusterManagerClient.getInstance();
-    clusterManagerClient.start(metaKey);
+    clusterClient = ClusterManagerClient.getInstance();
+    clusterClient.start(metaKey);
 
     // Waiting for cluster startup
     int wait = 0;
     while(wait++ < 100) {
-      if (clusterManagerServer.isClusterLeader()
-          && clusterManagerServer.raftInitialized()
-          && clusterManagerClient.raftInitialized()) {
+      if (clusterServer.isClusterLeader()
+          && clusterServer.raftInitialized()
+          && clusterClient.raftInitialized()) {
         LOGGER.info("wait {}(ms) found cluster leader", wait*3000);
         break;
       }
@@ -77,19 +77,33 @@ public class ClusterManagerTest {
         e.printStackTrace();
       }
     }
-    assertEquals(true, clusterManagerServer.isClusterLeader());
-    LOGGER.info("initClusterEnv <<<");
+    Thread.sleep(3000);
+    assertEquals(true, clusterServer.isClusterLeader());
+    LOGGER.info("startCluster <<<");
+  }
+
+  @AfterClass
+  public static void stopCluster() {
+    if (null != clusterClient) {
+      clusterClient.shutdown();
+    }
+    if (null != clusterClient) {
+      clusterServer.shutdown();
+    }
+    LOGGER.info("stopCluster");
   }
 
   @Test
   public void getServerMeta() {
-    LOGGER.info("serverMeta >>>");
+    LOGGER.info("getServerMeta >>>");
 
     // Get metadata for all services
-    Object meta = clusterManagerClient.getClusterMeta(ClusterMetaType.ServerMeta, "");
-
+    Object meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
     LOGGER.info(meta.toString());
 
+    Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
+    LOGGER.info(intpMeta.toString());
+
     assertNotNull(meta);
     assertEquals(true, (meta instanceof HashMap));
     HashMap hashMap = (HashMap) meta;
@@ -101,7 +115,7 @@ public class ClusterManagerTest {
 
     assertEquals(true, mapMetaValues.size()>0);
 
-    LOGGER.info("serverMeta <<<");
+    LOGGER.info("getServerMeta <<<");
   }
 
   @Test
@@ -110,8 +124,6 @@ public class ClusterManagerTest {
     HashMap<String, Object> meta = new HashMap<>();
     meta.put(ClusterMeta.SERVER_HOST, zServerHost);
     meta.put(ClusterMeta.SERVER_PORT, zServerPort);
-    meta.put(ClusterMeta.SERVER_TSERVER_HOST, "SERVER_TSERVER_HOST");
-    meta.put(ClusterMeta.SERVER_TSERVER_PORT, "SERVER_TSERVER_PORT");
     meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST");
     meta.put(ClusterMeta.INTP_TSERVER_PORT, "INTP_TSERVER_PORT");
     meta.put(ClusterMeta.CPU_CAPACITY, "CPU_CAPACITY");
@@ -120,11 +132,11 @@ public class ClusterManagerTest {
     meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED");
 
     // put IntpProcess Meta
-    clusterManagerClient.putClusterMeta(ClusterMetaType.IntpProcessMeta, metaKey, meta);
+    clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, meta);
 
     // get IntpProcess Meta
     HashMap<String, HashMap<String, Object>> check
-        = clusterManagerClient.getClusterMeta(ClusterMetaType.IntpProcessMeta, metaKey);
+        = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey);
 
     LOGGER.info(check.toString());
 
diff --git a/zeppelin-plugins/launcher/cluster/pom.xml b/zeppelin-plugins/launcher/cluster/pom.xml
new file mode 100644
index 0000000..80ca0b3
--- /dev/null
+++ b/zeppelin-plugins/launcher/cluster/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>zengine-plugins-parent</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.9.0-SNAPSHOT</version>
+    <relativePath>../../../zeppelin-plugins</relativePath>
+  </parent>
+
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>launcher-cluster</artifactId>
+  <packaging>jar</packaging>
+  <version>0.9.0-SNAPSHOT</version>
+  <name>Zeppelin: Plugin Cluster Launcher</name>
+  <description>Launcher implementation to run interpreters on cluster</description>
+
+  <properties>
+    <plugin.name>Launcher/ClusterInterpreterLauncher</plugin.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>launcher-standard</artifactId>
+      <version>0.9.0-SNAPSHOT</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <testResources>
+      <testResource>
+        <directory>${project.basedir}/src/test/resources</directory>
+      </testResource>
+      <testResource>
+        <directory>${project.basedir}/src/main/resources</directory>
+      </testResource>
+    </testResources>
+    <plugins>
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>enforce</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration>
+          <skip>false</skip>
+        </configuration>
+      </plugin>
+    </plugins>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+        <includes>
+          <include>**/*.*</include>
+        </includes>
+      </resource>
+    </resources>
+  </build>
+</project>
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
new file mode 100644
index 0000000..7d8ff1e
--- /dev/null
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.zeppelin.cluster.ClusterManagerServer;
+import org.apache.zeppelin.cluster.event.ClusterEvent;
+import org.apache.zeppelin.cluster.event.ClusterEventListener;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterRunner;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.zeppelin.cluster.event.ClusterEvent.CREATE_INTP_PROCESS;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_HOST;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_PORT;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
+
+/**
+ * Interpreter Launcher which use cluster to launch the interpreter process.
+ */
+public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
+    implements ClusterEventListener {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ClusterInterpreterLauncher.class);
+
+  public static final int CHECK_META_INTERVAL = 500; // ms
+  private InterpreterLaunchContext context;
+  private ClusterManagerServer clusterServer = ClusterManagerServer.getInstance();
+
+  public ClusterInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage)
+      throws IOException {
+    super(zConf, recoveryStorage);
+    clusterServer.addClusterEventListeners(this);
+  }
+
+  @Override
+  public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
+    LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
+
+    this.context = context;
+    this.properties = context.getProperties();
+    int connectTimeout = getConnectTimeout();
+    String intpGroupId = context.getInterpreterGroupId();
+
+    HashMap<String, Object> intpProcMeta = clusterServer
+        .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
+    if (null != intpProcMeta && intpProcMeta.containsKey(INTP_TSERVER_HOST)
+        && intpProcMeta.containsKey(INTP_TSERVER_PORT)) {
+      // connect exist Interpreter Process
+      String intpTserverHost = (String) intpProcMeta.get(INTP_TSERVER_HOST);
+      int intpTserverPort = (int) intpProcMeta.get(INTP_TSERVER_PORT);
+      return new RemoteInterpreterRunningProcess(
+          context.getInterpreterSettingName(),
+          connectTimeout,
+          intpTserverHost,
+          intpTserverPort);
+    } else {
+      // No process was found for the InterpreterGroup ID
+      HashMap<String, Object> meta = clusterServer.getIdleNodeMeta();
+      if (null == meta) {
+        LOGGER.error("Don't get idle node meta, launch interpreter on local.");
+        super.launch(context);
+      }
+
+      String srvHost = (String) meta.get(SERVER_HOST);
+      String localhost = RemoteInterpreterUtils.findAvailableHostAddress();
+
+      if (localhost.equalsIgnoreCase(srvHost) && false) {
+        // launch interpreter on local
+        return super.launch(context);
+      } else {
+        int srvPort = (int) meta.get(SERVER_PORT);
+
+        Gson gson = new Gson();
+        String sContext = gson.toJson(context);
+
+        Map<String, Object> mapEvent = new HashMap<>();
+        mapEvent.put(CLUSTER_EVENT, CREATE_INTP_PROCESS);
+        mapEvent.put(CLUSTER_EVENT_MSG, sContext);
+        String strEvent = gson.toJson(mapEvent);
+        clusterServer.unicastClusterEvent(srvHost, srvPort, strEvent);
+
+        HashMap<String, Object> intpMeta = clusterServer
+            .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
+        int retryGetMeta = connectTimeout / CHECK_META_INTERVAL;
+        while ((retryGetMeta-- > 0) &
+            (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
+                || !intpMeta.containsKey(INTP_TSERVER_PORT)) ) {
+          try {
+            Thread.sleep(CHECK_META_INTERVAL);
+            intpMeta = clusterServer
+                .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
+            LOGGER.warn("retry {} times to get {} meta!", retryGetMeta, intpGroupId);
+          } catch (InterruptedException e) {
+            LOGGER.error(e.getMessage(), e);
+          }
+        }
+
+        // Check if the remote creation process is successful
+        if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
+            || !intpMeta.containsKey(INTP_TSERVER_PORT)) {
+          LOGGER.error("Creating process {} failed on remote server {}:{}",
+              intpGroupId, srvHost, srvPort);
+
+          // launch interpreter on local
+          return super.launch(context);
+        } else {
+          // connnect remote interpreter process
+          String intpTSrvHost = (String) intpMeta.get(INTP_TSERVER_HOST);
+          int intpTSrvPort = (int) intpMeta.get(INTP_TSERVER_PORT);
+          return new RemoteInterpreterRunningProcess(
+              context.getInterpreterSettingName(),
+              connectTimeout,
+              intpTSrvHost,
+              intpTSrvPort);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void onClusterEvent(String event) {
+    Gson gson = new Gson();
+    Map<String, Object> mapEvent = gson.fromJson(event,
+        new TypeToken<Map<String, Object>>(){}.getType());
+    String sEvent = (String) mapEvent.get(CLUSTER_EVENT);
+    ClusterEvent clusterEvent = ClusterEvent.valueOf(sEvent);
+
+    switch (clusterEvent) {
+      case CREATE_INTP_PROCESS:
+        onCreateIntpProcess(mapEvent);
+        break;
+      default:
+        LOGGER.error("Unknown Cluster Event : {}", clusterEvent);
+        break;
+    }
+  }
+
+  private void onCreateIntpProcess(Map<String, Object> mapEvent) {
+    String eventMsg = (String) mapEvent.get(CLUSTER_EVENT_MSG);
+    try {
+      Gson gson = new Gson();
+      InterpreterLaunchContext context = gson.fromJson(
+          eventMsg, new TypeToken<InterpreterLaunchContext>() {}.getType());
+
+      this.properties = context.getProperties();
+      InterpreterOption option = context.getOption();
+      InterpreterRunner runner = context.getRunner();
+      String intpSetGroupName = context.getInterpreterSettingGroup();
+      String intpSetName = context.getInterpreterSettingName();
+      int connectTimeout = getConnectTimeout();
+      String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
+          + context.getInterpreterSettingId();
+
+      ClusterInterpreterProcess clusterInterpreterProcess
+          = new ClusterInterpreterProcess(
+          runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
+          context.getZeppelinServerRPCPort(),
+          context.getZeppelinServerHost(),
+          zConf.getInterpreterPortRange(),
+          zConf.getInterpreterDir() + "/" + intpSetGroupName,
+          localRepoPath,
+          buildEnvFromProperties(context),
+          connectTimeout,
+          intpSetName,
+          context.getInterpreterGroupId(),
+          option.isUserImpersonate());
+
+      clusterInterpreterProcess.start(context.getUserName());
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage(), e);
+    }
+  }
+}
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
new file mode 100644
index 0000000..986c2ed
--- /dev/null
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
@@ -0,0 +1,141 @@
+package org.apache.zeppelin.interpreter.launcher;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.zeppelin.cluster.ClusterManagerServer;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
+
+
+public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess {
+  private static final Logger LOGGER
+      = LoggerFactory.getLogger(ClusterInterpreterProcess.class);
+
+  private String intpHost = "";
+  private int intpPort = 0;
+
+  private ClusterManagerServer clusterServer = ClusterManagerServer.getInstance();
+
+  public ClusterInterpreterProcess(
+      String intpRunner,
+      int zeppelinServerRPCPort,
+      String zeppelinServerRPCHost,
+      String interpreterPortRange,
+      String intpDir,
+      String localRepoDir,
+      Map<String, String> env,
+      int connectTimeout,
+      String interpreterSettingName,
+      String interpreterGroupId,
+      boolean isUserImpersonated) {
+
+    super(intpRunner,
+      zeppelinServerRPCPort,
+      zeppelinServerRPCHost,
+      interpreterPortRange,
+      intpDir,
+      localRepoDir,
+      env,
+      connectTimeout,
+      interpreterSettingName,
+      interpreterGroupId,
+      isUserImpersonated);
+  }
+
+  @Override
+  public void start(String userName) throws IOException {
+    CheckIntpRunStatusThread checkIntpRunStatusThread = new CheckIntpRunStatusThread(this);
+    checkIntpRunStatusThread.start();
+
+    super.start(userName);
+  }
+
+  @Override
+  public void processStarted(int port, String host) {
+    // Cluster mode, discovering interpreter processes through metadata registration
+    this.intpHost = host;
+    this.intpPort = port;
+    super.processStarted(port, host);
+  }
+
+  @Override
+  public String getHost() {
+    return intpHost;
+  }
+
+  @Override
+  public int getPort() {
+    return intpPort;
+  }
+
+  @Override
+  public boolean isRunning() {
+    if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public String getErrorMessage() {
+    return null;
+  }
+
+  // Metadata registered in the cluster by the interpreter process,
+  // Keep the interpreter process started
+  private class CheckIntpRunStatusThread extends Thread {
+    private ClusterInterpreterProcess intpProcess;
+
+    CheckIntpRunStatusThread(ClusterInterpreterProcess intpProcess) {
+      this.intpProcess = intpProcess;
+    }
+
+    @Override
+    public void run() {
+      LOGGER.info("checkIntpRunStatusThread run() >>>");
+
+      String intpGroupId = getInterpreterGroupId();
+
+      HashMap<String, Object> intpMeta = clusterServer
+          .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
+      int connectTimeout = intpProcess.getConnectTimeout();
+      int retryGetMeta = connectTimeout / ClusterInterpreterLauncher.CHECK_META_INTERVAL;
+      while ((retryGetMeta-- > 0)
+          && (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
+          || !intpMeta.containsKey(INTP_TSERVER_PORT))) {
+        try {
+          Thread.sleep(ClusterInterpreterLauncher.CHECK_META_INTERVAL);
+          intpMeta = clusterServer
+              .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
+          LOGGER.info("retry {} times to get {} meta!", retryGetMeta, intpGroupId);
+        } catch (InterruptedException e) {
+          LOGGER.error(e.getMessage(), e);
+        }
+
+        if (null != intpMeta && intpMeta.containsKey(INTP_TSERVER_HOST)
+            && intpMeta.containsKey(INTP_TSERVER_PORT)) {
+          String intpHost = (String) intpMeta.get(INTP_TSERVER_HOST);
+          int intpPort = (int) intpMeta.get(INTP_TSERVER_PORT);
+          LOGGER.info("Found cluster interpreter {}:{}", intpHost, intpPort);
+
+          intpProcess.processStarted(intpPort, intpHost);
+        }
+      }
+
+      if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
+          || !intpMeta.containsKey(INTP_TSERVER_PORT)) {
+        LOGGER.error("Can not found interpreter meta!");
+      }
+
+      LOGGER.info("checkIntpRunStatusThread run() <<<");
+    }
+  }
+}
diff --git a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
new file mode 100644
index 0000000..da4bf5e
--- /dev/null
+++ b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ClusterInterpreterLauncherTest extends ClusterMockTest {
+
+  @BeforeClass
+  public static void startTest() throws IOException, InterruptedException {
+    ClusterMockTest.startCluster();
+  }
+
+  @AfterClass
+  public static void stopTest() throws IOException, InterruptedException {
+    ClusterMockTest.stopCluster();
+  }
+
+  @Before
+  public void setUp() {
+    for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) {
+      System.clearProperty(confVar.getVarName());
+    }
+  }
+
+  @Test
+  public void testConnectExistIntpProcess() throws IOException {
+    mockIntpProcessMeta("intpGroupId");
+
+    ClusterInterpreterLauncher launcher
+        = new ClusterInterpreterLauncher(ClusterMockTest.zconf, null);
+    Properties properties = new Properties();
+    properties.setProperty(
+        ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000");
+    InterpreterOption option = new InterpreterOption();
+    option.setUserImpersonate(true);
+    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null,
+        "user1", "intpGroupId", "groupId",
+        "groupName", "name", 0, "host");
+    InterpreterClient client = launcher.launch(context);
+
+    assertTrue(client instanceof RemoteInterpreterRunningProcess);
+    RemoteInterpreterRunningProcess interpreterProcess = (RemoteInterpreterRunningProcess) client;
+    assertEquals("INTP_TSERVER_HOST", interpreterProcess.getHost());
+    assertEquals(0, interpreterProcess.getPort());
+    assertEquals("name", interpreterProcess.getInterpreterSettingName());
+    assertEquals(5000, interpreterProcess.getConnectTimeout());
+  }
+
+  @Test
+  public void testCreateIntpProcess() throws IOException {
+    ClusterInterpreterLauncher launcher
+        = new ClusterInterpreterLauncher(ClusterMockTest.zconf, null);
+    Properties properties = new Properties();
+    properties.setProperty(
+        ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000");
+    InterpreterOption option = new InterpreterOption();
+    option.setUserImpersonate(true);
+    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null,
+        "user1", "intpGroupId", "groupId",
+        "groupName", "name", 0, "host");
+    InterpreterClient client = launcher.launch(context);
+
+    assertTrue(client instanceof RemoteInterpreterManagedProcess);
+    RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
+    assertEquals("name", interpreterProcess.getInterpreterSettingName());
+    assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir());
+    assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
+    assertEquals(5000, interpreterProcess.getConnectTimeout());
+    assertEquals(zconf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
+    assertTrue(interpreterProcess.getEnv().size() >= 1);
+    assertEquals(true, interpreterProcess.isUserImpersonated());
+  }
+}
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
similarity index 61%
rename from zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java
rename to zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
index ef4d7fd..72b51b4 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java
+++ b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
@@ -1,4 +1,3 @@
-package org.apache.zeppelin.cluster;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -15,13 +14,14 @@ package org.apache.zeppelin.cluster;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.zeppelin.interpreter.launcher;
 
+import org.apache.zeppelin.cluster.ClusterManagerClient;
+import org.apache.zeppelin.cluster.ClusterManagerServer;
 import org.apache.zeppelin.cluster.meta.ClusterMeta;
 import org.apache.zeppelin.cluster.meta.ClusterMetaType;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.junit.BeforeClass;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,21 +31,20 @@ import java.util.HashMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
-public class ClusterManagerTest {
-  private static Logger LOGGER = LoggerFactory.getLogger(ClusterManagerTest.class);
+public class ClusterMockTest {
+  private static Logger LOGGER = LoggerFactory.getLogger(ClusterMockTest.class);
 
-  private static ClusterManagerServer clusterManagerServer = null;
-  private static ClusterManagerClient clusterManagerClient = null;
+  private static ClusterManagerServer clusterServer = null;
+  private static ClusterManagerClient clusterClient = null;
 
-  private static ZeppelinConfiguration zconf = null;
+  protected static ZeppelinConfiguration zconf = null;
 
   static String zServerHost;
   static int zServerPort;
-  static final String metaKey = "ClusterManagerTestKey";
+  static final String metaKey = "ClusterMockKey";
 
-  @BeforeClass
-  public static void initClusterEnv() throws IOException, InterruptedException {
-    LOGGER.info("initClusterEnv >>>");
+  public static void startCluster() throws IOException, InterruptedException {
+    LOGGER.info("startCluster >>>");
 
     zconf = ZeppelinConfiguration.create();
 
@@ -55,20 +54,20 @@ public class ClusterManagerTest {
     zconf.setClusterAddress(zServerHost + ":" + zServerPort);
 
     // mock cluster manager server
-    clusterManagerServer = ClusterManagerServer.getInstance();
-    clusterManagerServer.start(null);
+    clusterServer = ClusterManagerServer.getInstance();
+    clusterServer.start();
 
     // mock cluster manager client
-    clusterManagerClient = ClusterManagerClient.getInstance();
-    clusterManagerClient.start(metaKey);
+    clusterClient = ClusterManagerClient.getInstance();
+    clusterClient.start(metaKey);
 
     // Waiting for cluster startup
     int wait = 0;
-    while(wait++ < 100) {
-      if (clusterManagerServer.isClusterLeader()
-          && clusterManagerServer.raftInitialized()
-          && clusterManagerClient.raftInitialized()) {
-        LOGGER.info("wait {}(ms) found cluster leader", wait*3000);
+    while (wait++ < 100) {
+      if (clusterServer.isClusterLeader()
+          && clusterServer.raftInitialized()
+          && clusterClient.raftInitialized()) {
+        LOGGER.info("wait {}(ms) found cluster leader", wait * 3000);
         break;
       }
       try {
@@ -77,16 +76,27 @@ public class ClusterManagerTest {
         e.printStackTrace();
       }
     }
-    assertEquals(true, clusterManagerServer.isClusterLeader());
-    LOGGER.info("initClusterEnv <<<");
+    assertEquals(true, clusterServer.isClusterLeader());
+
+    LOGGER.info("startCluster <<<");
+  }
+
+  public static void stopCluster() {
+    LOGGER.info("stopCluster >>>");
+    if (null != clusterClient) {
+      clusterClient.shutdown();
+    }
+    if (null != clusterClient) {
+      clusterServer.shutdown();
+    }
+    LOGGER.info("stopCluster <<<");
   }
 
-  @Test
   public void getServerMeta() {
     LOGGER.info("serverMeta >>>");
 
     // Get metadata for all services
-    Object meta = clusterManagerClient.getClusterMeta(ClusterMetaType.ServerMeta, "");
+    Object meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
 
     LOGGER.info(meta.toString());
 
@@ -99,37 +109,34 @@ public class ClusterManagerTest {
     assertEquals(true, (values instanceof HashMap));
     HashMap mapMetaValues = (HashMap) values;
 
-    assertEquals(true, mapMetaValues.size()>0);
+    assertEquals(true, mapMetaValues.size() > 0);
 
     LOGGER.info("serverMeta <<<");
   }
 
-  @Test
-  public void putIntpProcessMeta() {
+  public void mockIntpProcessMeta(String metaKey) {
     // mock IntpProcess Meta
     HashMap<String, Object> meta = new HashMap<>();
-    meta.put(ClusterMeta.SERVER_HOST, zServerHost);
-    meta.put(ClusterMeta.SERVER_PORT, zServerPort);
-    meta.put(ClusterMeta.SERVER_TSERVER_HOST, "SERVER_TSERVER_HOST");
-    meta.put(ClusterMeta.SERVER_TSERVER_PORT, "SERVER_TSERVER_PORT");
+    meta.put(ClusterMeta.SERVER_HOST, "SERVER_HOST");
+    meta.put(ClusterMeta.SERVER_PORT, 0);
     meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST");
-    meta.put(ClusterMeta.INTP_TSERVER_PORT, "INTP_TSERVER_PORT");
+    meta.put(ClusterMeta.INTP_TSERVER_PORT, 0);
     meta.put(ClusterMeta.CPU_CAPACITY, "CPU_CAPACITY");
     meta.put(ClusterMeta.CPU_USED, "CPU_USED");
     meta.put(ClusterMeta.MEMORY_CAPACITY, "MEMORY_CAPACITY");
     meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED");
 
     // put IntpProcess Meta
-    clusterManagerClient.putClusterMeta(ClusterMetaType.IntpProcessMeta, metaKey, meta);
+    clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, meta);
 
     // get IntpProcess Meta
     HashMap<String, HashMap<String, Object>> check
-        = clusterManagerClient.getClusterMeta(ClusterMetaType.IntpProcessMeta, metaKey);
+        = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey);
 
     LOGGER.info(check.toString());
 
     assertNotNull(check);
     assertNotNull(check.get(metaKey));
-    assertEquals(true, check.get(metaKey).size()>0);
+    assertEquals(true, check.get(metaKey).size() == 8);
   }
 }
diff --git a/zeppelin-plugins/pom.xml b/zeppelin-plugins/pom.xml
index 6c7aea9..727acd7 100644
--- a/zeppelin-plugins/pom.xml
+++ b/zeppelin-plugins/pom.xml
@@ -47,6 +47,7 @@
         <module>launcher/standard</module>
         <module>launcher/k8s-standard</module>
         <module>launcher/spark</module>
+        <module>launcher/cluster</module>
     </modules>
 
     <dependencies>
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index cf9074f..1e180d8 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -31,7 +31,6 @@ import javax.servlet.ServletContextListener;
 import org.apache.commons.lang.StringUtils;
 import org.apache.shiro.web.env.EnvironmentLoaderListener;
 import org.apache.shiro.web.servlet.ShiroFilter;
-import org.apache.zeppelin.cluster.ClusterManager;
 import org.apache.zeppelin.cluster.ClusterManagerServer;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
@@ -43,8 +42,8 @@ import org.apache.zeppelin.helium.HeliumBundleFactory;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
-import org.apache.zeppelin.interpreter.thrift.ClusterManagerService;
 import org.apache.zeppelin.notebook.NoteEventListener;
 import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.notebook.AuthorizationService;
@@ -53,6 +52,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
 import org.apache.zeppelin.notebook.scheduler.NoSchedulerService;
 import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService;
 import org.apache.zeppelin.notebook.scheduler.SchedulerService;
+import org.apache.zeppelin.plugin.PluginManager;
 import org.apache.zeppelin.rest.exception.WebApplicationExceptionMapper;
 import org.apache.zeppelin.search.LuceneSearch;
 import org.apache.zeppelin.search.SearchService;
@@ -96,10 +96,10 @@ public class ZeppelinServer extends ResourceConfig {
   public static Server jettyWebServer;
   public static ServiceLocator sharedServiceLocator;
 
+  private static ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+
   @Inject
   public ZeppelinServer() {
-    ZeppelinConfiguration conf = ZeppelinConfiguration.create();
-
     InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
 
     packages("org.apache.zeppelin.rest");
@@ -162,10 +162,6 @@ public class ZeppelinServer extends ResourceConfig {
                 .to(NoteEventListener.class)
                 .to(WebSocketServlet.class)
                 .in(Singleton.class);
-            bindAsContract(ClusterManagerServer.class)
-                .to(ClusterManager.class)
-                .to(ClusterManagerService.Iface.class)
-                .in(Singleton.class);
             if (conf.isZeppelinNotebookCronEnable()) {
               bind(QuartzSchedulerService.class).to(SchedulerService.class).in(Singleton.class);
             } else {
@@ -356,9 +352,9 @@ public class ZeppelinServer extends ResourceConfig {
   }
 
   private static void setupClusterManagerServer(ServiceLocator serviceLocator) {
-    InterpreterFactory interpreterFactory
-        = sharedServiceLocator.getService(InterpreterFactory.class);
-    sharedServiceLocator.getService(ClusterManagerServer.class).start(interpreterFactory);
+    if (conf.isClusterMode()) {
+      ClusterManagerServer.getInstance().start();
+    }
   }
 
   private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 8eebce4..2fbef23 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -670,6 +670,8 @@ public class InterpreterSetting {
   public String getLauncherPlugin() {
     if (isRunningOnKubernetes()) {
       return "K8sStandardInterpreterLauncher";
+    } else if (isRunningOnCluster()) {
+      return "ClusterInterpreterLauncher";
     } else {
       if (group.equals("spark")) {
         return "SparkInterpreterLauncher";
@@ -683,6 +685,10 @@ public class InterpreterSetting {
     return conf.getRunMode() == ZeppelinConfiguration.RUN_MODE.K8S;
   }
 
+  private boolean isRunningOnCluster() {
+    return conf.isClusterMode();
+  }
+
   public boolean isUserAuthorized(List<String> userAndRoles) {
     if (!option.permissionIsSet()) {
       return true;
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 02e8fd0..6d91f1d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -173,6 +173,10 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
     return interpreterSettingName;
   }
 
+  public String getInterpreterGroupId() {
+    return interpreterGroupId;
+  }
+
   @VisibleForTesting
   public String getInterpreterRunner() {
     return interpreterRunner;