You are viewing a plain text version of this content. The canonical link for it is here.
Posted to submarine-dev@hadoop.apache.org by li...@apache.org on 2019/10/18 16:26:21 UTC

[hadoop-submarine] branch master updated: SUBMARINE-246. Submarine cluster module

This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new d9118a4  SUBMARINE-246. Submarine cluster module
d9118a4 is described below

commit d9118a4f25a62eeaa8a2bc195a81530317d002f6
Author: Xun Liu <li...@apache.org>
AuthorDate: Wed Oct 16 23:28:09 2019 +0800

    SUBMARINE-246. Submarine cluster module
    
    ### What is this PR for?
    The Submarine system contains a total of two Server services, Submarine Server and Workbench Server, which are long-running in the form of Daemon.
    
    Among them, Submarine Server mainly provides job submission, job scheduling, job status monitoring, and model online service for Submarine.
    
    Workbench Server is mainly for Submarine Workbench WEB is mainly for algorithm users to provide algorithm development, Python/Spark interpreter operation and other services through Notebook.
    
    The goal of the Submarine project is to provide high availability and high reliability services for big data processing, algorithm development, job scheduling, job scheduling, model online services, model batch and incremental updates. In addition to the high availability of big data and machine learning frameworks, the high availability of Submarine Server and Workbench Server itself is a key consideration.
    
    Design Doc: https://docs.google.com/document/d/1Ax6FQ5CAP-jowm2_Mp2r5kc9r7s1bkFLRfzvvbm5Wzc/edit#
    
    ### What type of PR is it?
    [Feature]
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/SUBMARINE-246
    
    ### How should this be tested?
    * ClusterMultiNodeTest.java
    * [CI Pass](https://travis-ci.org/liuxunorg/hadoop-submarine/builds/599231112)
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update?  No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Xun Liu <li...@apache.org>
    
    Closes #53 from liuxunorg/SUBMARINE-246 and squashes the following commits:
    
    f4be77a [Xun Liu] SUBMARINE-246. Submarine cluster module
---
 pom.xml                                            |  14 +-
 .../commons-cluster}/pom.xml                       | 195 ++++----
 .../commons/cluster/BroadcastServiceAdapter.java   |  44 ++
 .../submarine/commons/cluster/ClusterManager.java  | 530 +++++++++++++++++++++
 .../commons/cluster/ClusterManagerClient.java      |  84 ++++
 .../commons/cluster/ClusterManagerServer.java      | 304 ++++++++++++
 .../submarine/commons/cluster/ClusterMonitor.java  | 263 ++++++++++
 .../commons/cluster/ClusterPrimitiveType.java      |  57 +++
 .../commons/cluster/ClusterStateMachine.java       | 179 +++++++
 .../commons/cluster/meta/ClusterMeta.java          | 143 ++++++
 .../commons/cluster/meta/ClusterMetaEntity.java    |  57 +++
 .../commons/cluster/meta/ClusterMetaOperation.java |  26 +
 .../commons/cluster/meta/ClusterMetaType.java      |  25 +
 .../cluster/protocol/LocalRaftClientProtocol.java  | 163 +++++++
 .../cluster/protocol/LocalRaftProtocol.java        |  58 +++
 .../cluster/protocol/LocalRaftProtocolFactory.java |  57 +++
 .../cluster/protocol/LocalRaftServerProtocol.java  | 527 ++++++++++++++++++++
 .../protocol/RaftClientMessagingProtocol.java      | 123 +++++
 .../cluster/protocol/RaftMessagingProtocol.java    |  83 ++++
 .../protocol/RaftServerMessagingProtocol.java      | 346 ++++++++++++++
 .../commons/cluster/ClusterMultiNodeTest.java      | 186 ++++++++
 .../src/test/resources/log4j.properties            |  24 +
 submarine-commons/commons-runtime/pom.xml          |   2 +-
 submarine-commons/commons-utils/pom.xml            |  22 +
 .../apache/submarine/commons/utils/NetUtils.java   | 156 ++++++
 .../commons/utils/SubmarineConfiguration.java      |  32 +-
 submarine-commons/pom.xml                          |   2 +-
 submarine-dist/src/assembly/distribution.xml       |   7 +
 .../runtimes/yarnservice/AbstractComponent.java    |   4 +-
 .../runtimes/yarnservice/AbstractServiceSpec.java  |   8 +-
 .../runtimes/yarnservice/FileSystemOperations.java |   2 +-
 .../yarnservice/HadoopEnvironmentSetup.java        |   4 +-
 .../yarnservice/YarnServiceJobSubmitter.java       |   2 +-
 .../yarnservice/pytorch/PyTorchServiceSpec.java    |   2 +-
 .../tensorflow/TensorFlowServiceSpec.java          |   2 +-
 .../tensorflow/component/TensorBoardComponent.java |   4 +-
 .../component/TensorFlowPsComponent.java           |   4 +-
 .../apache/submarine/utils/ClassPathUtilities.java |   2 +-
 .../apache/submarine/utils/DockerUtilities.java    |   2 +-
 .../submarine/utils/EnvironmentUtilities.java      |   2 +-
 .../submarine/utils/KerberosPrincipalFactory.java  |   2 +-
 .../java/org/apache/submarine/utils/Localizer.java |   4 +-
 .../submarine/utils/SubmarineResourceUtils.java    |   2 +-
 .../org/apache/submarine/utils/ZipUtilities.java   |   2 +-
 .../org/apache/submarine/utils/package-info.java   |   2 +-
 .../cli/yarnservice/TestYarnServiceRunJobCli.java  |   2 +-
 .../pytorch/TestPyTorchServiceSpec.java            |   2 +-
 .../submarine/utils/TestClassPathUtilities.java    |   2 +-
 .../submarine/utils/TestEnvironmentUtilities.java  |   2 +-
 .../utils/TestKerberosPrincipalFactory.java        |   2 +-
 .../utils/TestSubmarineResourceUtils.java          |   2 +-
 .../interpreter/python-interpreter/pom.xml         |   8 +
 52 files changed, 3630 insertions(+), 149 deletions(-)

diff --git a/pom.xml b/pom.xml
index b7ca3ac..7a1c36f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,21 +79,21 @@
     <yarn.version>v1.10.1</yarn.version>
 
     <hadoop.common.build.dir>${basedir}/../hadoop-common-project/hadoop-common/target</hadoop.common.build.dir>
-    <slf4j.version>1.7.25</slf4j.version>
+    <slf4j.version>1.7.7</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
     <commons.logging.version>1.1.3</commons.logging.version>
     <commons.cli.version>1.2</commons.cli.version>
     <snakeyaml.version>1.16</snakeyaml.version>
-    <commons-lang.version>2.6</commons-lang.version>
-    <commons.lang3.version>3.4</commons.lang3.version>
     <httpcore.version>4.4.4</httpcore.version>
     <httpclient.version>4.5.2</httpclient.version>
-    <commons.io.version>2.4</commons.io.version>
+    <commons-lang.version>2.5</commons-lang.version>
+    <commons-lang3.version>3.4</commons-lang3.version>
+    <commons.io.version>2.5</commons.io.version>
     <junit.version>4.12</junit.version>
     <jsr305.version>3.0.0</jsr305.version>
     <mockito.version>2.23.4</mockito.version>
     <powermock.version>1.6.4</powermock.version>
-    <guava.version>11.0.2</guava.version>
+    <guava.version>22.0</guava.version>
     <testng.version>6.4</testng.version>
     <avro.version>1.8.2</avro.version>
     <httpclient.version>4.5.2</httpclient.version>
@@ -109,6 +109,8 @@
     <mybatis-generator.version>1.3.7</mybatis-generator.version>
     <derby.version>10.15.1.3</derby.version>
     <zeppelin.version>0.9.0-SNAPSHOT</zeppelin.version>
+    <atomix.version>3.0.0-rc4</atomix.version>
+    <libthrift.version>0.12.0</libthrift.version>
   </properties>
 
   <modules>
@@ -166,7 +168,7 @@
       <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-lang3</artifactId>
-        <version>3.7</version>
+        <version>${commons-lang3.version}</version>
       </dependency>
       <dependency>
         <groupId>com.google.guava</groupId>
diff --git a/submarine-workbench/interpreter/python-interpreter/pom.xml b/submarine-commons/commons-cluster/pom.xml
similarity index 57%
copy from submarine-workbench/interpreter/python-interpreter/pom.xml
copy to submarine-commons/commons-cluster/pom.xml
index 39218d0..cbfbc29 100644
--- a/submarine-workbench/interpreter/python-interpreter/pom.xml
+++ b/submarine-commons/commons-cluster/pom.xml
@@ -1,4 +1,4 @@
-<?xml version="1.0"?>
+<?xml version="1.0" encoding="UTF-8"?>
 <!--
   Licensed to the Apache Software Foundation (ASF) under one
   or more contributor license agreements.  See the NOTICE file
@@ -16,144 +16,86 @@
   KIND, either express or implied.  See the License for the
   specific language governing permissions and limitations
   under the License.
--->
+  -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
-         https://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <parent>
     <groupId>org.apache.submarine</groupId>
-    <artifactId>interpreter</artifactId>
+    <artifactId>submarine-commons</artifactId>
     <version>0.3.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
   </parent>
-
-  <artifactId>python-interpreter</artifactId>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>commons-cluster</artifactId>
   <version>0.3.0-SNAPSHOT</version>
-  <name>Submarine: Interpreter Pythone</name>
-  <description>Submarine Pythone Interpreter</description>
+  <name>Submarine: Commons Cluster</name>
 
   <dependencies>
     <dependency>
       <groupId>org.apache.submarine</groupId>
-      <artifactId>interpreter-engine</artifactId>
+      <artifactId>commons-utils</artifactId>
       <version>0.3.0-SNAPSHOT</version>
     </dependency>
+
     <dependency>
-      <groupId>org.apache.zeppelin</groupId>
-      <artifactId>zeppelin-python</artifactId>
-      <version>${zeppelin.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>com.google.code.findbugs</groupId>
-          <artifactId>jsr305</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-lang</groupId>
-          <artifactId>commons-lang</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.code.gson</groupId>
-          <artifactId>gson</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.httpcomponents</groupId>
-          <artifactId>httpcore</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-codec</groupId>
-          <artifactId>commons-codec</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-logging</groupId>
-          <artifactId>commons-logging</artifactId>
-        </exclusion>
-      </exclusions>
+      <groupId>io.atomix</groupId>
+      <artifactId>atomix</artifactId>
+      <version>${atomix.version}</version>
     </dependency>
+
     <dependency>
-      <groupId>org.apache.zeppelin</groupId>
-      <artifactId>zeppelin-interpreter</artifactId>
-      <version>${zeppelin.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>io.atomix</groupId>
-          <artifactId>atomix</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.atomix</groupId>
-          <artifactId>atomix-raft</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.atomix</groupId>
-          <artifactId>atomix-primary-backup</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-lang</groupId>
-          <artifactId>commons-lang</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.httpcomponents</groupId>
-          <artifactId>httpcore</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.httpcomponents</groupId>
-          <artifactId>httpclient</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.code.gson</groupId>
-          <artifactId>gson</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-codec</groupId>
-          <artifactId>commons-codec</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-logging</groupId>
-          <artifactId>commons-logging</artifactId>
-        </exclusion>
-      </exclusions>
+      <groupId>io.atomix</groupId>
+      <artifactId>atomix-raft</artifactId>
+      <version>${atomix.version}</version>
     </dependency>
 
     <dependency>
-      <groupId>com.google.code.findbugs</groupId>
-      <artifactId>jsr305</artifactId>
-      <version>${jsr305.version}</version>
+      <groupId>io.atomix</groupId>
+      <artifactId>atomix-primary-backup</artifactId>
+      <version>${atomix.version}</version>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${commons-lang3.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>commons-lang</groupId>
       <artifactId>commons-lang</artifactId>
       <version>${commons-lang.version}</version>
     </dependency>
+
     <dependency>
-      <groupId>org.apache.httpcomponents</groupId>
-      <artifactId>httpcore</artifactId>
-      <version>${httpcore.version}</version>
+      <groupId>commons-configuration</groupId>
+      <artifactId>commons-configuration</artifactId>
+      <version>${commons-configuration.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-lang</groupId>
+          <artifactId>commons-lang</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
     </dependency>
+
     <dependency>
-      <groupId>org.apache.httpcomponents</groupId>
-      <artifactId>httpclient</artifactId>
-      <version>${httpclient.version}</version>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j.version}</version>
     </dependency>
+
     <dependency>
       <groupId>com.google.code.gson</groupId>
       <artifactId>gson</artifactId>
       <version>${gson.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <version>${slf4j.version}</version>
-    </dependency>
 
     <!-- Test libraries -->
     <dependency>
@@ -162,13 +104,13 @@
       <version>${junit.version}</version>
       <scope>test</scope>
     </dependency>
+
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
       <version>${mockito.version}</version>
       <scope>test</scope>
     </dependency>
-
   </dependencies>
 
   <build>
@@ -203,8 +145,45 @@
               </transformers>
               <relocations>
                 <relocation>
-                  <pattern>org.apache.zeppelin</pattern>
-                  <shadedPattern>${shaded.dependency.prefix}.org.apache.zeppelin</shadedPattern>
+                  <pattern>org</pattern>
+                  <shadedPattern>${shaded.dependency.prefix}.org</shadedPattern>
+                  <excludes>
+                    <exclude>org/apache/submarine/*</exclude>
+                    <exclude>org/apache/submarine/**/*</exclude>
+                    <exclude>org/apache/thrift/*</exclude>
+                    <exclude>org/apache/thrift/**/*</exclude>
+                    <exclude>org/slf4j/*</exclude>
+                    <exclude>org/slf4j/**/*</exclude>
+                    <exclude>org/apache/commons/logging/*</exclude>
+                    <exclude>org/apache/commons/logging/**/*</exclude>
+                    <exclude>org/apache/commons/exec/*</exclude>
+                    <exclude>org/apache/commons/exec/**/*</exclude>
+                    <exclude>org/apache/log4j/*</exclude>
+                    <exclude>org/apache/log4j/**/*</exclude>
+                    <exclude>org/sonatype/*</exclude>
+                    <exclude>org/sonatype/**/*</exclude>
+                    <exclude>**/pom.xml</exclude>
+
+                    <!-- Not the org/ packages that are a part of the jdk -->
+                    <exclude>org/ietf/jgss/*</exclude>
+                    <exclude>org/omg/**/*</exclude>
+                    <exclude>org/w3c/dom/*</exclude>
+                    <exclude>org/w3c/dom/**/*</exclude>
+                    <exclude>org/xml/sax/*</exclude>
+                    <exclude>org/xml/sax/**/*</exclude>
+                  </excludes>
+                </relocation>
+                <relocation>
+                  <pattern>com.google</pattern>
+                  <shadedPattern>${shaded.dependency.prefix}.com.google</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>io</pattern>
+                  <shadedPattern>${shaded.dependency.prefix}.io</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.esotericsoftware</pattern>
+                  <shadedPattern>${shaded.dependency.prefix}.com.esotericsoftware</shadedPattern>
                 </relocation>
               </relocations>
             </configuration>
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/BroadcastServiceAdapter.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/BroadcastServiceAdapter.java
new file mode 100644
index 0000000..1d95cff
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/BroadcastServiceAdapter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster;
+
+import io.atomix.cluster.messaging.BroadcastService;
+
+import java.util.function.Consumer;
+
+/**
+ * Broadcast Service Adapter
+ * Service for broadcast messaging between nodes.
+ * The broadcast service is an unreliable broadcast messaging service backed by multicast.
+ * This service provides no guaranteed regarding reliability or order of messages.
+ */
+public class BroadcastServiceAdapter implements BroadcastService {
+  @Override
+  public void broadcast(String subject, byte[] message) {
+
+  }
+
+  @Override
+  public void addListener(String subject, Consumer<byte[]> listener) {
+
+  }
+
+  @Override
+  public void removeListener(String subject, Consumer<byte[]> listener) {
+
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
new file mode 100644
index 0000000..5745751
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster;
+
+import com.google.common.collect.Maps;
+import io.atomix.cluster.MemberId;
+import io.atomix.cluster.Node;
+import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.cluster.messaging.impl.NettyMessagingService;
+import io.atomix.primitive.operation.OperationType;
+import io.atomix.primitive.operation.PrimitiveOperation;
+import io.atomix.primitive.operation.impl.DefaultOperationId;
+import io.atomix.primitive.partition.PartitionId;
+import io.atomix.primitive.service.ServiceConfig;
+import io.atomix.primitive.session.SessionClient;
+import io.atomix.primitive.session.SessionId;
+import io.atomix.protocols.raft.RaftClient;
+import io.atomix.protocols.raft.RaftError;
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.RaftMember;
+import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.JoinRequest;
+import io.atomix.protocols.raft.protocol.JoinResponse;
+import io.atomix.protocols.raft.protocol.LeaveRequest;
+import io.atomix.protocols.raft.protocol.LeaveResponse;
+import io.atomix.protocols.raft.protocol.ConfigureRequest;
+import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.ReconfigureRequest;
+import io.atomix.protocols.raft.protocol.ReconfigureResponse;
+import io.atomix.protocols.raft.protocol.InstallRequest;
+import io.atomix.protocols.raft.protocol.InstallResponse;
+import io.atomix.protocols.raft.protocol.PollRequest;
+import io.atomix.protocols.raft.protocol.PollResponse;
+import io.atomix.protocols.raft.protocol.VoteRequest;
+import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.protocol.AppendRequest;
+import io.atomix.protocols.raft.protocol.AppendResponse;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.RaftResponse;
+import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
+import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
+import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
+import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
+import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
+import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
+import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
+import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.session.CommunicationStrategy;
+import io.atomix.protocols.raft.storage.system.Configuration;
+import io.atomix.utils.net.Address;
+import io.atomix.utils.serializer.Namespace;
+import io.atomix.utils.serializer.Serializer;
+import org.apache.commons.lang.StringUtils;
+import org.apache.submarine.commons.cluster.meta.ClusterMeta;
+import org.apache.submarine.commons.cluster.meta.ClusterMetaEntity;
+import org.apache.submarine.commons.cluster.meta.ClusterMetaOperation;
+import org.apache.submarine.commons.cluster.meta.ClusterMetaType;
+import org.apache.submarine.commons.cluster.protocol.LocalRaftProtocolFactory;
+import org.apache.submarine.commons.cluster.protocol.RaftClientMessagingProtocol;
+import org.apache.submarine.commons.utils.NetUtils;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.time.Instant;
+
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static io.atomix.primitive.operation.PrimitiveOperation.operation;
+import static org.apache.submarine.commons.cluster.meta.ClusterMetaOperation.DELETE_OPERATION;
+import static org.apache.submarine.commons.cluster.meta.ClusterMetaOperation.PUT_OPERATION;
+import static org.apache.submarine.commons.cluster.meta.ClusterMetaOperation.GET_OPERATION;
+
+/**
+ * The base class for cluster management, including the following implementations
+ * 1. RaftClient as the raft client
+ * 2. Threading to provide retry after cluster metadata submission failure
+ * 3. Cluster monitoring
+ */
+public abstract class ClusterManager {
+  private static Logger LOG = LoggerFactory.getLogger(ClusterManager.class);
+
+  public final SubmarineConfiguration sconf = SubmarineConfiguration.create();
+
+  protected Collection<Node> clusterNodes = new ArrayList<>();
+
+  protected int raftServerPort = 0;
+
+  protected RaftClient raftClient = null;
+  protected SessionClient raftSessionClient = null;
+  protected Map<MemberId, Address> raftAddressMap = new ConcurrentHashMap<>();
+  protected LocalRaftProtocolFactory protocolFactory
+      = new LocalRaftProtocolFactory(protocolSerializer);
+  protected List<MemberId> clusterMemberIds = new ArrayList<MemberId>();
+
+  protected AtomicBoolean running = new AtomicBoolean(true);
+
+  // Write data through the queue to prevent failure due to network exceptions
+  private ConcurrentLinkedQueue<ClusterMetaEntity> clusterMetaQueue
+      = new ConcurrentLinkedQueue<>();
+
+  // submarine server host & port
+  protected String zeplServerHost = "";
+
+  protected ClusterMonitor clusterMonitor = null;
+
+  protected boolean isTest = false;
+
+  protected ClusterManager() {
+    try {
+      zeplServerHost = NetUtils.findAvailableHostAddress();
+      String clusterAddr = sconf.getClusterAddress();
+      LOG.info(this.getClass().toString() + "::clusterAddr = {}", clusterAddr);
+      if (!StringUtils.isEmpty(clusterAddr)) {
+        String cluster[] = clusterAddr.split(",");
+
+        for (int i = 0; i < cluster.length; i++) {
+          String[] parts = cluster[i].split(":");
+          String clusterHost = parts[0];
+          int clusterPort = Integer.valueOf(parts[1]);
+          if (zeplServerHost.equalsIgnoreCase(clusterHost)) {
+            raftServerPort = clusterPort;
+          }
+
+          String memberId = clusterHost + ":" + clusterPort;
+          Address address = Address.from(clusterHost, clusterPort);
+          Node node = Node.builder().withId(memberId).withAddress(address).build();
+          clusterNodes.add(node);
+          raftAddressMap.put(MemberId.from(memberId), address);
+          clusterMemberIds.add(MemberId.from(memberId));
+        }
+      }
+    } catch (UnknownHostException | SocketException e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  // Check if the raft environment is initialized
+  public abstract boolean raftInitialized();
+  // Is it a cluster leader
+  public abstract boolean isClusterLeader();
+
+  public AtomicBoolean getRunning() {
+    return running;
+  }
+
+  private SessionClient createProxy(RaftClient client) {
+    return client.sessionBuilder(ClusterPrimitiveType.PRIMITIVE_NAME,
+        ClusterPrimitiveType.INSTANCE, new ServiceConfig())
+        .withReadConsistency(ReadConsistency.SEQUENTIAL)
+        .withCommunicationStrategy(CommunicationStrategy.LEADER)
+        .build()
+        .connect()
+        .join();
+  }
+
+  public void start() {
+    if (!sconf.isClusterMode()) {
+      return;
+    }
+
+    LOG.info(this.getClass().toString() + "::ClusterManager::start()");
+
+    // RaftClient Thread
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        LOG.info(this.getClass().toString() + "::RaftClientThread run() >>>");
+
+        int raftClientPort = 0;
+        try {
+          raftClientPort = NetUtils.findRandomAvailablePortOnAllLocalInterfaces();
+        } catch (IOException e) {
+          LOG.error(e.getMessage());
+        }
+
+        MemberId memberId = MemberId.from(zeplServerHost + ":" + raftClientPort);
+        Address address = Address.from(zeplServerHost, raftClientPort);
+        raftAddressMap.put(memberId, address);
+
+        MessagingService messagingManager
+            = NettyMessagingService.builder().withAddress(address).build().start().join();
+        RaftClientProtocol protocol = new RaftClientMessagingProtocol(
+            messagingManager, protocolSerializer, raftAddressMap::get);
+
+        raftClient = RaftClient.builder()
+            .withMemberId(memberId)
+            .withPartitionId(PartitionId.from("partition", 1))
+            .withProtocol(protocol)
+            .build();
+
+        raftClient.connect(clusterMemberIds).join();
+
+        raftSessionClient = createProxy(raftClient);
+
+        LOG.info(this.getClass().toString() + "::RaftClientThread run() <<<");
+      }
+    }).start();
+
+    // Cluster Meta Consume Thread
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          while (getRunning().get()) {
+            ClusterMetaEntity metaEntity = clusterMetaQueue.peek();
+            if (null != metaEntity) {
+              // Determine whether the client is connected
+              int retry = 0;
+              while (!raftInitialized()) {
+                retry++;
+                if (0 == retry % 30) {
+                  LOG.warn(this.getClass().toString()
+                      + "::Raft incomplete initialization! retry[{}]", retry);
+                }
+                Thread.sleep(100);
+              }
+              boolean success = false;
+              switch (metaEntity.getOperation()) {
+                case DELETE_OPERATION:
+                  success = deleteClusterMeta(metaEntity);
+                  break;
+                case PUT_OPERATION:
+                  success = putClusterMeta(metaEntity);
+                  break;
+              }
+              if (true == success) {
+                // The operation was successfully deleted
+                clusterMetaQueue.remove(metaEntity);
+                LOG.info(this.getClass().toString()
+                    + "::Cluster Meta Consume success! {}", metaEntity);
+              } else {
+                LOG.error(this.getClass().toString()
+                    + "::Cluster Meta Consume faild!");
+              }
+            } else {
+              Thread.sleep(100);
+            }
+          }
+        } catch (InterruptedException e) {
+          LOG.error(e.getMessage());
+        }
+      }
+    }).start();
+  }
+
+  // cluster shutdown
+  public void shutdown() {
+    if (!sconf.isClusterMode()) {
+      return;
+    }
+
+    running.set(false);
+
+    try {
+      if (null != raftSessionClient) {
+        raftSessionClient.close().get(3, TimeUnit.SECONDS);
+      }
+      if (null != raftClient) {
+        raftClient.close().get(3, TimeUnit.SECONDS);
+      }
+    } catch (InterruptedException | ExecutionException | TimeoutException e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  public String getClusterNodeName() {
+    if (isTest) {
+      // Start three cluster servers in the test case at the same time,
+      // need to avoid duplicate names
+      return this.zeplServerHost + ":" + this.raftServerPort;
+    }
+
+    String hostName = "";
+    try {
+      InetAddress addr = InetAddress.getLocalHost();
+      hostName = addr.getHostName().toString();
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
+    return hostName;
+  }
+
+  // put metadata into cluster metadata
+  private boolean putClusterMeta(ClusterMetaEntity entity) {
+    if (!raftInitialized()) {
+      LOG.error(this.getClass().toString() + "::Raft incomplete initialization!");
+      return false;
+    }
+
+    ClusterMetaType metaType = entity.getMetaType();
+    String metaKey = entity.getKey();
+    HashMap<String, Object> newMetaValue = entity.getValues();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(this.getClass().toString() + "::putClusterMeta {} {}", metaType, metaKey);
+    }
+
+    // add cluster name
+    newMetaValue.put(ClusterMeta.SERVER_HOST, zeplServerHost);
+    newMetaValue.put(ClusterMeta.SERVER_PORT, raftServerPort);
+
+    raftSessionClient.execute(operation(ClusterStateMachine.PUT,
+        clientSerializer.encode(entity)))
+        .<Long>thenApply(clientSerializer::decode);
+    return true;
+  }
+
+  // put metadata into cluster metadata
+  public void putClusterMeta(ClusterMetaType type, String key, HashMap<String, Object> values) {
+    ClusterMetaEntity metaEntity = new ClusterMetaEntity(PUT_OPERATION, type, key, values);
+
+    boolean result = putClusterMeta(metaEntity);
+    if (false == result) {
+      LOG.warn(this.getClass().toString() + "::putClusterMeta failure, Cache metadata to queue.");
+      clusterMetaQueue.add(metaEntity);
+    }
+  }
+
+  // delete metadata by cluster metadata
+  private boolean deleteClusterMeta(ClusterMetaEntity entity) {
+    ClusterMetaType metaType = entity.getMetaType();
+    String metaKey = entity.getKey();
+
+    // Need to pay attention to delete metadata operations
+    LOG.info(this.getClass().toString() + "::deleteClusterMeta {} {}", metaType, metaKey);
+
+    if (!raftInitialized()) {
+      LOG.error(this.getClass().toString() + "::Raft incomplete initialization!");
+      return false;
+    }
+
+    raftSessionClient.execute(operation(
+        ClusterStateMachine.REMOVE,
+        clientSerializer.encode(entity)))
+        .<Long>thenApply(clientSerializer::decode)
+        .thenAccept(result -> {
+          LOG.info(this.getClass().toString() + "::deleteClusterMeta {}", result);
+        });
+
+    return true;
+  }
+
+  // delete metadata from cluster metadata
+  public void deleteClusterMeta(ClusterMetaType type, String key) {
+    ClusterMetaEntity metaEntity = new ClusterMetaEntity(DELETE_OPERATION, type, key, null);
+
+    boolean result = deleteClusterMeta(metaEntity);
+    if (false == result) {
+      LOG.warn(this.getClass().toString() + "::deleteClusterMeta faild, Cache data to queue.");
+      clusterMetaQueue.add(metaEntity);
+    }
+  }
+
+  // get metadata by cluster metadata
+  public HashMap<String, HashMap<String, Object>> getClusterMeta(
+      ClusterMetaType metaType, String metaKey) {
+    HashMap<String, HashMap<String, Object>> clusterMeta = new HashMap<>();
+    if (!raftInitialized()) {
+      LOG.error(this.getClass().toString() + "::Raft incomplete initialization!");
+      return clusterMeta;
+    }
+
+    ClusterMetaEntity entity = new ClusterMetaEntity(GET_OPERATION, metaType, metaKey, null);
+
+    byte[] mateData = null;
+    try {
+      mateData = raftSessionClient.execute(operation(ClusterStateMachine.GET,
+          clientSerializer.encode(entity))).get(3, TimeUnit.SECONDS);
+    } catch (InterruptedException | ExecutionException | TimeoutException e) {
+      LOG.error(e.getMessage());
+    }
+
+    if (null != mateData) {
+      clusterMeta = clientSerializer.decode(mateData);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(this.getClass().toString() + "::getClusterMeta >>> {}", clusterMeta.toString());
+    }
+
+    return clusterMeta;
+  }
+
+  protected static final Serializer protocolSerializer = Serializer.using(Namespace.builder()
+      .register(OpenSessionRequest.class)
+      .register(OpenSessionResponse.class)
+      .register(CloseSessionRequest.class)
+      .register(CloseSessionResponse.class)
+      .register(KeepAliveRequest.class)
+      .register(KeepAliveResponse.class)
+      .register(QueryRequest.class)
+      .register(QueryResponse.class)
+      .register(CommandRequest.class)
+      .register(CommandResponse.class)
+      .register(MetadataRequest.class)
+      .register(MetadataResponse.class)
+      .register(JoinRequest.class)
+      .register(JoinResponse.class)
+      .register(LeaveRequest.class)
+      .register(LeaveResponse.class)
+      .register(ConfigureRequest.class)
+      .register(ConfigureResponse.class)
+      .register(ReconfigureRequest.class)
+      .register(ReconfigureResponse.class)
+      .register(InstallRequest.class)
+      .register(InstallResponse.class)
+      .register(PollRequest.class)
+      .register(PollResponse.class)
+      .register(VoteRequest.class)
+      .register(VoteResponse.class)
+      .register(AppendRequest.class)
+      .register(AppendResponse.class)
+      .register(PublishRequest.class)
+      .register(ResetRequest.class)
+      .register(RaftResponse.Status.class)
+      .register(RaftError.class)
+      .register(RaftError.Type.class)
+      .register(PrimitiveOperation.class)
+      .register(ReadConsistency.class)
+      .register(byte[].class)
+      .register(long[].class)
+      .register(CloseSessionEntry.class)
+      .register(CommandEntry.class)
+      .register(ConfigurationEntry.class)
+      .register(InitializeEntry.class)
+      .register(KeepAliveEntry.class)
+      .register(MetadataEntry.class)
+      .register(OpenSessionEntry.class)
+      .register(QueryEntry.class)
+      .register(PrimitiveOperation.class)
+      .register(DefaultOperationId.class)
+      .register(OperationType.class)
+      .register(ReadConsistency.class)
+      .register(ArrayList.class)
+      .register(HashMap.class)
+      .register(ClusterMetaEntity.class)
+      .register(LocalDateTime.class)
+      .register(Collections.emptyList().getClass())
+      .register(HashSet.class)
+      .register(DefaultRaftMember.class)
+      .register(MemberId.class)
+      .register(SessionId.class)
+      .register(RaftMember.Type.class)
+      .register(Instant.class)
+      .register(Configuration.class)
+      .build());
+
+  protected static final Serializer storageSerializer = Serializer.using(Namespace.builder()
+      .register(CloseSessionEntry.class)
+      .register(CommandEntry.class)
+      .register(ConfigurationEntry.class)
+      .register(InitializeEntry.class)
+      .register(KeepAliveEntry.class)
+      .register(MetadataEntry.class)
+      .register(OpenSessionEntry.class)
+      .register(QueryEntry.class)
+      .register(PrimitiveOperation.class)
+      .register(DefaultOperationId.class)
+      .register(OperationType.class)
+      .register(ReadConsistency.class)
+      .register(ArrayList.class)
+      .register(ClusterMetaEntity.class)
+      .register(HashMap.class)
+      .register(HashSet.class)
+      .register(LocalDateTime.class)
+      .register(DefaultRaftMember.class)
+      .register(MemberId.class)
+      .register(RaftMember.Type.class)
+      .register(Instant.class)
+      .register(Configuration.class)
+      .register(byte[].class)
+      .register(long[].class)
+      .build());
+
+  protected static final Serializer clientSerializer = Serializer.using(Namespace.builder()
+      .register(ReadConsistency.class)
+      .register(ClusterMetaEntity.class)
+      .register(ClusterMetaOperation.class)
+      .register(ClusterMetaType.class)
+      .register(HashMap.class)
+      .register(LocalDateTime.class)
+      .register(Maps.immutableEntry(new String(), new Object()).getClass())
+      .build());
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerClient.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerClient.java
new file mode 100644
index 0000000..d5bc7a7
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerClient.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster;
+
+import io.atomix.primitive.PrimitiveState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
+
+/**
+ * Cluster management client class instantiated in submarine-interperter
+ */
+public class ClusterManagerClient extends ClusterManager {
+  private static Logger LOG = LoggerFactory.getLogger(ClusterManagerClient.class);
+
+  private static ClusterManagerClient instance = null;
+
+  // Do not use the getInstance function in the test case,
+  // which will result in an inability to update the instance according to the configuration.
+  public static ClusterManagerClient getInstance() {
+    synchronized (ClusterManagerClient.class) {
+      if (instance == null) {
+        instance = new ClusterManagerClient();
+      }
+      return instance;
+    }
+  }
+
+  private ClusterManagerClient() {
+    super();
+  }
+
+  @Override
+  public boolean raftInitialized() {
+    if (null != raftClient && null != raftSessionClient
+        && raftSessionClient.getState() == PrimitiveState.CONNECTED) {
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean isClusterLeader() {
+    return false;
+  }
+
+  // In the ClusterManagerClient metaKey equal interperterGroupId
+  public void start(String metaKey) {
+    LOG.info("ClusterManagerClient::start({})", metaKey);
+    if (!sconf.isClusterMode()) {
+      return;
+    }
+    super.start();
+
+    // Instantiated cluster monitoring class
+    clusterMonitor = new ClusterMonitor(this);
+    clusterMonitor.start(INTP_PROCESS_META, metaKey);
+  }
+
+  public void shutdown() {
+    if (!sconf.isClusterMode()) {
+      return;
+    }
+    clusterMonitor.shutdown();
+
+    super.shutdown();
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerServer.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerServer.java
new file mode 100644
index 0000000..fded6e1
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerServer.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.atomix.cluster.BootstrapService;
+import io.atomix.cluster.ManagedClusterMembershipService;
+import io.atomix.cluster.Member;
+import io.atomix.cluster.MemberId;
+import io.atomix.cluster.MembershipConfig;
+import io.atomix.cluster.Node;
+import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
+import io.atomix.cluster.impl.DefaultClusterMembershipService;
+import io.atomix.cluster.impl.DefaultNodeDiscoveryService;
+import io.atomix.cluster.messaging.BroadcastService;
+import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.cluster.messaging.impl.NettyMessagingService;
+import io.atomix.primitive.PrimitiveState;
+import io.atomix.protocols.raft.RaftServer;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.storage.StorageLevel;
+import io.atomix.utils.net.Address;
+import org.apache.commons.lang.StringUtils;
+import org.apache.submarine.commons.cluster.meta.ClusterMeta;
+import org.apache.submarine.commons.cluster.protocol.RaftServerMessagingProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.SERVER_META;
+
+/**
+ * Cluster management server class instantiated in submarine-server
+ * 1. Create a raft server
+ * 2. Remotely create interpreter's thrift service
+ */
+public class ClusterManagerServer extends ClusterManager {
+  private static Logger LOG = LoggerFactory.getLogger(ClusterManagerServer.class);
+
+  private static ClusterManagerServer instance = null;
+
+  // raft server
+  protected RaftServer raftServer = null;
+
+  protected MessagingService messagingService = null;
+
+  private ClusterManagerServer() {
+    super();
+  }
+
+  // Do not use the getInstance function in the test case,
+  // which will result in an inability to update the instance according to the configuration.
+  public static ClusterManagerServer getInstance() {
+    synchronized (ClusterManagerServer.class) {
+      if (instance == null) {
+        instance = new ClusterManagerServer();
+      }
+      return instance;
+    }
+  }
+
+  public void start() {
+    if (!sconf.isClusterMode()) {
+      return;
+    }
+
+    initThread();
+
+    // Instantiated raftServer monitoring class
+    String clusterName = getClusterNodeName();
+    clusterMonitor = new ClusterMonitor(this);
+    clusterMonitor.start(SERVER_META, clusterName);
+
+    super.start();
+  }
+
+  @VisibleForTesting
+  public void initTestCluster(String clusterAddrList, String host, int port) {
+    isTest = true;
+    this.zeplServerHost = host;
+    this.raftServerPort = port;
+
+    // clear
+    clusterNodes.clear();
+    raftAddressMap.clear();
+    clusterMemberIds.clear();
+
+    String cluster[] = clusterAddrList.split(",");
+    for (int i = 0; i < cluster.length; i++) {
+      String[] parts = cluster[i].split(":");
+      String clusterHost = parts[0];
+      int clusterPort = Integer.valueOf(parts[1]);
+
+      String memberId = clusterHost + ":" + clusterPort;
+      Address address = Address.from(clusterHost, clusterPort);
+      Node node = Node.builder().withId(memberId).withAddress(address).build();
+      clusterNodes.add(node);
+      raftAddressMap.put(MemberId.from(memberId), address);
+      clusterMemberIds.add(MemberId.from(memberId));
+    }
+  }
+
+  @Override
+  public boolean raftInitialized() {
+    if (null != raftServer && raftServer.isRunning()
+        && null != raftClient && null != raftSessionClient
+        && raftSessionClient.getState() == PrimitiveState.CONNECTED) {
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean isClusterLeader() {
+    if (null == raftServer
+        || !raftServer.isRunning()
+        || !raftServer.isLeader()) {
+      return false;
+    }
+
+    return true;
+  }
+
+  private void initThread() {
+    // RaftServer Thread
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        LOG.info("RaftServer run() >>>");
+
+        Address address = Address.from(zeplServerHost, raftServerPort);
+        Member member = Member.builder(MemberId.from(zeplServerHost + ":" + raftServerPort))
+            .withAddress(address)
+            .build();
+        messagingService = NettyMessagingService.builder()
+            .withAddress(address).build().start().join();
+        RaftServerProtocol protocol = new RaftServerMessagingProtocol(
+            messagingService, ClusterManager.protocolSerializer, raftAddressMap::get);
+
+        BootstrapService bootstrapService = new BootstrapService() {
+          @Override
+          public MessagingService getMessagingService() {
+            return messagingService;
+          }
+
+          @Override
+          public BroadcastService getBroadcastService() {
+            return new BroadcastServiceAdapter();
+          }
+        };
+
+        ManagedClusterMembershipService clusterService = new DefaultClusterMembershipService(
+            member,
+            new DefaultNodeDiscoveryService(bootstrapService, member,
+                new BootstrapDiscoveryProvider(clusterNodes)),
+            bootstrapService,
+            new MembershipConfig());
+
+        File atomixDateDir = com.google.common.io.Files.createTempDir();
+        atomixDateDir.deleteOnExit();
+
+        RaftServer.Builder builder = RaftServer.builder(member.id())
+            .withMembershipService(clusterService)
+            .withProtocol(protocol)
+            .withStorage(RaftStorage.builder()
+                .withStorageLevel(StorageLevel.MEMORY)
+                .withDirectory(atomixDateDir)
+                .withSerializer(storageSerializer)
+                .withMaxSegmentSize(1024 * 1024)
+                .build());
+
+        raftServer = builder.build();
+        raftServer.bootstrap(clusterMemberIds);
+
+        HashMap<String, Object> meta = new HashMap<String, Object>();
+        String nodeName = getClusterNodeName();
+        meta.put(ClusterMeta.NODE_NAME, nodeName);
+        meta.put(ClusterMeta.SERVER_HOST, zeplServerHost);
+        meta.put(ClusterMeta.SERVER_PORT, raftServerPort);
+        meta.put(ClusterMeta.SERVER_START_TIME, LocalDateTime.now());
+        putClusterMeta(SERVER_META, nodeName, meta);
+
+        LOG.info("RaftServer run() <<<");
+      }
+    }).start();
+  }
+
+  @Override
+  public void shutdown() {
+    if (!sconf.isClusterMode()) {
+      return;
+    }
+
+    try {
+      // delete local machine meta
+      deleteClusterMeta(SERVER_META, getClusterNodeName());
+      Thread.sleep(300);
+      clusterMonitor.shutdown();
+      // wait raft commit metadata
+      Thread.sleep(300);
+    } catch (InterruptedException e) {
+      LOG.error(e.getMessage(), e);
+    }
+
+    if (null != raftServer && raftServer.isRunning()) {
+      try {
+        raftServer.shutdown().get(3, TimeUnit.SECONDS);
+      } catch (InterruptedException | ExecutionException | TimeoutException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+
+    super.shutdown();
+  }
+
+  // Obtain the server node whose resources are idle in the cluster
+  public HashMap<String, Object> getIdleNodeMeta() {
+    HashMap<String, Object> idleNodeMeta = null;
+    HashMap<String, HashMap<String, Object>> clusterMeta = getClusterMeta(SERVER_META, "");
+
+    long memoryIdle = 0;
+    for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
+      HashMap<String, Object> meta = entry.getValue();
+      // Check if the service or process is offline
+      String status = (String) meta.get(ClusterMeta.STATUS);
+      if (null == status || StringUtils.isEmpty(status)
+          || status.equals(ClusterMeta.OFFLINE_STATUS)) {
+        continue;
+      }
+
+      long memoryCapacity  = (long) meta.get(ClusterMeta.MEMORY_CAPACITY);
+      long memoryUsed      = (long) meta.get(ClusterMeta.MEMORY_USED);
+      long idle = memoryCapacity - memoryUsed;
+      if (idle > memoryIdle) {
+        memoryIdle = idle;
+        idleNodeMeta = meta;
+      }
+    }
+
+    return idleNodeMeta;
+  }
+
+  public void unicastClusterEvent(String host, int port, String topic, String msg) {
+    LOG.info("send unicastClusterEvent host:{} port:{} topic:{} message:{}",
+        host, port, topic, msg);
+
+    Address address = Address.from(host, port);
+    CompletableFuture<byte[]> response = messagingService.sendAndReceive(address,
+        topic, msg.getBytes(), Duration.ofSeconds(2));
+    response.whenComplete((r, e) -> {
+      if (null == e) {
+        LOG.error(e.getMessage(), e);
+      }
+    });
+  }
+
+  public void broadcastClusterEvent(String topic, String msg) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("send broadcastClusterEvent message {}", msg);
+    }
+    for (Node node : clusterNodes) {
+      if (StringUtils.equals(node.address().host(), zeplServerHost)
+          && node.address().port() == raftServerPort) {
+        // skip myself
+        continue;
+      }
+
+      CompletableFuture<byte[]> response = messagingService.sendAndReceive(node.address(),
+          topic, msg.getBytes(), Duration.ofSeconds(2));
+      response.whenComplete((r, e) -> {
+        if (null == e) {
+          LOG.error(e.getMessage(), e);
+        } else {
+          LOG.info("broadcastClusterNoteEvent success! {}", msg);
+        }
+      });
+    }
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java
new file mode 100644
index 0000000..4521710
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster;
+
+import com.sun.management.OperatingSystemMXBean;
+import org.apache.commons.lang.StringUtils;
+import org.apache.submarine.commons.cluster.meta.ClusterMeta;
+import org.apache.submarine.commons.cluster.meta.ClusterMetaType;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
+import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.SERVER_META;
+
+/**
+ * cluster monitoring
+ * 1. cluster monitoring is also used for submarine-Server and submarine Interperter,
+ *    distinguish by member variable ClusterMetaType
+ * 2. Report the average of the server resource CPU and MEMORY usage in the
+ *    last few minutes to smooth the server's instantaneous peak
+ * 3. checks the heartbeat timeout of the submarine-server and interperter processes
+ */
+public class ClusterMonitor {
+  private static Logger LOG = LoggerFactory.getLogger(ClusterMonitor.class);
+
+  // Whether the thread has started
+  private static AtomicBoolean running = new AtomicBoolean(true);
+
+  private ClusterManager clusterManager = null;
+
+  // Save the CPU resource and MEmory usage of the server resources
+  // in the last few minutes through the queue, and then average them through the queue.
+  private Queue<UsageUtil> monitorUsageQueues = new LinkedList<>();
+  private final int USAGE_QUEUE_LIMIT = 100; // queue length
+  private int heartbeatInterval = 3000; // Heartbeat reporting interval(milliseconds)
+
+  // The submarine-server leader checks the heartbeat timeout of
+  // the submarine-server and submarine-interperter processes in the cluster metadata.
+  // If this time is exceeded, the submarine-server and interperter processes
+  // can have an exception and no heartbeat is reported.
+  private int heartbeatTimeout = 9000;
+
+  // Type of cluster monitoring object
+  private ClusterMetaType clusterMetaType;
+
+  // The key of the cluster monitoring object,
+  // the name of the cluster when monitoring the submarine-server,
+  // and the interperterGroupID when monitoring the interperter processes
+  private String metaKey;
+
+  public ClusterMonitor(ClusterManager clusterManagerServer) {
+    this.clusterManager = clusterManagerServer;
+
+    SubmarineConfiguration sconf = SubmarineConfiguration.create();
+    heartbeatInterval = sconf.getClusterHeartbeatInterval();
+    heartbeatTimeout = sconf.getClusterHeartbeatTimeout();
+
+    if (heartbeatTimeout < heartbeatInterval) {
+      LOG.error("Heartbeat timeout must be greater than heartbeat period.");
+      heartbeatTimeout = heartbeatInterval * 3;
+      LOG.info("Heartbeat timeout is modified to 3 times the heartbeat period.");
+    }
+
+    if (heartbeatTimeout < heartbeatInterval * 3) {
+      LOG.warn("Heartbeat timeout recommended than 3 times the heartbeat period.");
+    }
+  }
+
+  // start cluster monitor
+  public void start(ClusterMetaType clusterMetaType, String metaKey) {
+    this.clusterMetaType = clusterMetaType;
+    this.metaKey = metaKey;
+
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (running.get()) {
+          switch (clusterMetaType) {
+            case SERVER_META:
+              sendMachineUsage();
+              checkHealthy();
+              break;
+            case INTP_PROCESS_META:
+              sendHeartbeat();
+              break;
+            default:
+              LOG.error("unknown cluster meta type:{}", clusterMetaType);
+              break;
+          }
+
+          try {
+            Thread.sleep(heartbeatInterval);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    }).start();
+  }
+
+  public void shutdown() {
+    running.set(false);
+  }
+
+  // Check the healthy of each service and interperter instance
+  private void checkHealthy() {
+    // only leader check cluster healthy
+    if (!clusterManager.isClusterLeader()) {
+      return;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("checkHealthy()");
+    }
+
+    LocalDateTime now = LocalDateTime.now();
+    // check machine mate
+    for (ClusterMetaType metaType : ClusterMetaType.values()) {
+      Map<String, HashMap<String, Object>> clusterMeta
+          = clusterManager.getClusterMeta(metaType, "");
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("clusterMeta : {}", clusterMeta);
+      }
+
+      for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
+        String key = entry.getKey();
+        Map<String, Object> meta = entry.getValue();
+        if (null == meta) {
+          continue;
+        }
+
+        // Metadata that has been offline is not processed
+        String status = (String) meta.get(ClusterMeta.STATUS);
+        if (StringUtils.equals(status, ClusterMeta.OFFLINE_STATUS)) {
+          continue;
+        }
+
+        Object heartbeat = meta.get(ClusterMeta.LATEST_HEARTBEAT);
+        if (heartbeat instanceof LocalDateTime) {
+          LocalDateTime dHeartbeat = (LocalDateTime) heartbeat;
+          Duration duration = Duration.between(dHeartbeat, now);
+          long timeInterval = duration.getSeconds() * 1000; // Convert to milliseconds
+          if (timeInterval > heartbeatTimeout) {
+            // Set the metadata for the heartbeat timeout to offline
+            // Cannot delete metadata
+            HashMap<String, Object> mapValues = new HashMap<>();
+            mapValues.put(ClusterMeta.STATUS, ClusterMeta.OFFLINE_STATUS);
+            clusterManager.putClusterMeta(metaType, key, mapValues);
+            LOG.warn("offline heartbeat timeout[{}] meta[{}]", dHeartbeat, key);
+          }
+        } else {
+          LOG.error("wrong data type");
+        }
+      }
+    }
+  }
+
+  // The interpreter process sends a heartbeat to the cluster,
+  // indicating that the process is still active.
+  private void sendHeartbeat() {
+    HashMap<String, Object> mapMonitorUtil = new HashMap<>();
+    mapMonitorUtil.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now());
+    mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
+
+    clusterManager.putClusterMeta(INTP_PROCESS_META, metaKey, mapMonitorUtil);
+  }
+
+  // send the usage of each service
+  private void sendMachineUsage() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("sendMachineUsage >>>");
+    }
+
+    // Limit queue size
+    while (monitorUsageQueues.size() > USAGE_QUEUE_LIMIT) {
+      monitorUsageQueues.poll();
+    }
+    UsageUtil monitorUtil = getMachineUsage();
+    monitorUsageQueues.add(monitorUtil);
+
+    UsageUtil avgMonitorUtil = new UsageUtil();
+    for (UsageUtil monitor : monitorUsageQueues){
+      avgMonitorUtil.memoryUsed += monitor.memoryUsed;
+      avgMonitorUtil.memoryCapacity += monitor.memoryCapacity;
+      avgMonitorUtil.cpuUsed += monitor.cpuUsed;
+      avgMonitorUtil.cpuCapacity += monitor.cpuCapacity;
+    }
+
+    // Resource consumption average
+    int queueSize = monitorUsageQueues.size();
+    avgMonitorUtil.memoryUsed = avgMonitorUtil.memoryUsed / queueSize;
+    avgMonitorUtil.memoryCapacity = avgMonitorUtil.memoryCapacity / queueSize;
+    avgMonitorUtil.cpuUsed = avgMonitorUtil.cpuUsed / queueSize;
+    avgMonitorUtil.cpuCapacity = avgMonitorUtil.cpuCapacity / queueSize;
+
+    HashMap<String, Object> mapMonitorUtil = new HashMap<>();
+    mapMonitorUtil.put(ClusterMeta.MEMORY_USED, avgMonitorUtil.memoryUsed);
+    mapMonitorUtil.put(ClusterMeta.MEMORY_CAPACITY, avgMonitorUtil.memoryCapacity);
+    mapMonitorUtil.put(ClusterMeta.CPU_USED, avgMonitorUtil.cpuUsed);
+    mapMonitorUtil.put(ClusterMeta.CPU_CAPACITY, avgMonitorUtil.cpuCapacity);
+    mapMonitorUtil.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now());
+    mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
+
+    String clusterName = clusterManager.getClusterNodeName();
+    clusterManager.putClusterMeta(SERVER_META, clusterName, mapMonitorUtil);
+  }
+
+  private UsageUtil getMachineUsage() {
+    OperatingSystemMXBean operatingSystemMXBean
+        = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
+
+    // Returns the amount of free physical memory in bytes.
+    long freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize();
+
+    // Returns the total amount of physical memory in bytes.
+    long totalPhysicalMemorySize = operatingSystemMXBean.getTotalPhysicalMemorySize();
+
+    // Returns the "recent cpu usage" for the whole system.
+    double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
+
+    int process = Runtime.getRuntime().availableProcessors();
+
+    UsageUtil monitorUtil = new UsageUtil();
+    monitorUtil.memoryUsed = totalPhysicalMemorySize - freePhysicalMemorySize;
+    monitorUtil.memoryCapacity = totalPhysicalMemorySize;
+    monitorUtil.cpuUsed = (long) (process * systemCpuLoad * 100);
+    monitorUtil.cpuCapacity = process * 100;
+
+    return monitorUtil;
+  }
+
+  private class UsageUtil {
+    private long memoryUsed = 0;
+    private long memoryCapacity = 0;
+    private long cpuUsed = 0;
+    private long cpuCapacity = 0;
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterPrimitiveType.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterPrimitiveType.java
new file mode 100644
index 0000000..f5750af
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterPrimitiveType.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster;
+
+import io.atomix.primitive.PrimitiveBuilder;
+import io.atomix.primitive.PrimitiveManagementService;
+import io.atomix.primitive.PrimitiveType;
+import io.atomix.primitive.config.PrimitiveConfig;
+import io.atomix.primitive.service.PrimitiveService;
+import io.atomix.primitive.service.ServiceConfig;
+
+/**
+ * Cluster primitive type
+ * Creating a custom distributed primitive is defining the primitive type.
+ * To create a new type, implement the PrimitiveType interface
+ */
+public class ClusterPrimitiveType implements PrimitiveType {
+  public static final ClusterPrimitiveType INSTANCE = new ClusterPrimitiveType();
+
+  public static final String PRIMITIVE_NAME = "CLUSTER_PRIMITIVE";
+
+  @Override
+  public String name() {
+    return PRIMITIVE_NAME;
+  }
+
+  @Override
+  public PrimitiveConfig newConfig() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public PrimitiveBuilder newBuilder(String primitiveName,
+                                     PrimitiveConfig config,
+                                     PrimitiveManagementService managementService) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public PrimitiveService newService(ServiceConfig config) {
+    return new ClusterStateMachine();
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterStateMachine.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterStateMachine.java
new file mode 100644
index 0000000..62f24de
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterStateMachine.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster;
+
+import com.google.common.collect.Maps;
+import io.atomix.primitive.operation.OperationId;
+import io.atomix.primitive.service.AbstractPrimitiveService;
+import io.atomix.primitive.service.BackupOutput;
+import io.atomix.primitive.service.BackupInput;
+import io.atomix.primitive.service.Commit;
+import io.atomix.primitive.service.ServiceExecutor;
+import io.atomix.utils.serializer.Serializer;
+import org.apache.submarine.commons.cluster.meta.ClusterMeta;
+import org.apache.submarine.commons.cluster.meta.ClusterMetaEntity;
+import org.apache.submarine.commons.cluster.meta.ClusterMetaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Cluster State Machine for submarine
+ * The cluster state is implemented as a snapshot state machine.
+ * The state machine stores the service and process metadata information of the cluster.
+ * Metadata information can be manipulated by put, get, remove, index, and snapshot.
+ */
+public class ClusterStateMachine extends AbstractPrimitiveService {
+  private static Logger LOG = LoggerFactory.getLogger(ClusterStateMachine.class);
+  private ClusterMeta clusterMeta = new ClusterMeta();
+
+  // Command to operation a variable in cluster state machine
+  public static final OperationId PUT = OperationId.command("put");
+  public static final OperationId GET = OperationId.query("get");
+  public static final OperationId REMOVE = OperationId.command("remove");
+  public static final OperationId INDEX = OperationId.command("index");
+
+  public ClusterStateMachine() {
+    super(ClusterPrimitiveType.INSTANCE);
+  }
+
+  @Override
+  public Serializer serializer() {
+    return ClusterManager.clientSerializer;
+  }
+
+  @Override
+  protected void configure(ServiceExecutor executor) {
+    executor.register(PUT, this::put);
+    executor.register(GET, this::get);
+    executor.register(REMOVE, this::remove);
+    executor.register(INDEX, this::index);
+  }
+
+  protected long put(Commit<ClusterMetaEntity> commit) {
+    clusterMeta.put(commit.value().getMetaType(),
+        commit.value().getKey(), commit.value().getValues());
+    return commit.index();
+  }
+
+  protected Map<String, Map<String, Object>> get(Commit<ClusterMetaEntity> commit) {
+    return clusterMeta.get(commit.value().getMetaType(), commit.value().getKey());
+  }
+
+  protected long remove(Commit<ClusterMetaEntity> commit) {
+    clusterMeta.remove(commit.value().getMetaType(), commit.value().getKey());
+    return commit.index();
+  }
+
+  protected long index(Commit<Void> commit) {
+    return commit.index();
+  }
+
+  @Override
+  public void backup(BackupOutput writer) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ClusterStateMachine.backup()");
+    }
+
+    // backup SERVER_META
+    // cluster meta map struct
+    // cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...}
+    Map<String, Map<String, Object>> mapServerMeta
+        = clusterMeta.get(ClusterMetaType.SERVER_META, "");
+    // write all SERVER_META size
+    writer.writeInt(mapServerMeta.size());
+    for (Map.Entry<String, Map<String, Object>> entry : mapServerMeta.entrySet()) {
+      // write cluster_name
+      writer.writeString(entry.getKey());
+
+      Map<String, Object> kvPairs = entry.getValue();
+      // write cluster mate kv pairs size
+      writer.writeInt(kvPairs.size());
+      for (Map.Entry<String, Object> entryValue : kvPairs.entrySet()) {
+        // write cluster mate kv pairs
+        writer.writeString(entryValue.getKey());
+        writer.writeObject(entryValue.getValue());
+      }
+    }
+
+    // backup INTP_PROCESS_META
+    // Interpreter meta map struct
+    // IntpGroupId -> {server_tserver_host,server_tserver_port,...}
+    Map<String, Map<String, Object>> mapIntpProcMeta
+        = clusterMeta.get(ClusterMetaType.INTP_PROCESS_META, "");
+    // write interpreter size
+    writer.writeInt(mapIntpProcMeta.size());
+    for (Map.Entry<String, Map<String, Object>> entry : mapIntpProcMeta.entrySet()) {
+      // write IntpGroupId
+      writer.writeString(entry.getKey());
+
+      Map<String, Object> kvPairs = entry.getValue();
+      // write interpreter mate kv pairs size
+      writer.writeInt(kvPairs.size());
+      for (Map.Entry<String, Object> entryValue : kvPairs.entrySet()) {
+        // write interpreter mate kv pairs
+        writer.writeString(entryValue.getKey());
+        writer.writeObject(entryValue.getValue());
+      }
+    }
+  }
+
+  @Override
+  public void restore(BackupInput reader) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ClusterStateMachine.restore()");
+    }
+
+    clusterMeta = new ClusterMeta();
+    // read all SERVER_META size
+    int nServerMeta = reader.readInt();
+    for (int i = 0; i < nServerMeta; i++) {
+      // read cluster_name
+      String clusterName = reader.readString();
+
+      // read cluster mate kv pairs size
+      int nKVpairs = reader.readInt();
+      for (int j = 0; j < nKVpairs; i++) {
+        // read cluster mate kv pairs
+        String key = reader.readString();
+        Object value = reader.readObject();
+
+        clusterMeta.put(ClusterMetaType.SERVER_META,
+            clusterName, Maps.immutableEntry(key, value));
+      }
+    }
+
+    // read all INTP_PROCESS_META size
+    int nIntpMeta = reader.readInt();
+    for (int i = 0; i < nIntpMeta; i++) {
+      // read interpreter name
+      String intpName = reader.readString();
+
+      // read interpreter mate kv pairs size
+      int nKVpairs = reader.readInt();
+      for (int j = 0; j < nKVpairs; i++) {
+        // read interpreter mate kv pairs
+        String key = reader.readString();
+        Object value = reader.readObject();
+
+        clusterMeta.put(ClusterMetaType.INTP_PROCESS_META,
+            intpName, Maps.immutableEntry(key, value));
+      }
+    }
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMeta.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMeta.java
new file mode 100644
index 0000000..9ff7123
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMeta.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster.meta;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Metadata stores metadata information in a KV key-value pair
+ */
+public class ClusterMeta implements Serializable {
+  private static Logger LOG = LoggerFactory.getLogger(ClusterMeta.class);
+
+  // The name of each server node in the cluster
+  public static String NODE_NAME            = "NODE_NAME";
+
+  // submarine-server meta
+  public static String SERVER_HOST          = "SERVER_HOST";
+  public static String SERVER_PORT          = "SERVER_PORT";
+  public static String SERVER_START_TIME    = "SERVER_START_TIME";
+
+  // interperter-process meta
+  public static String INTP_PROCESS_NAME    = "INTP_PROCESS_NAME";
+  public static String INTP_TSERVER_HOST    = "INTP_TSERVER_HOST";
+  public static String INTP_TSERVER_PORT    = "INTP_TSERVER_PORT";
+  public static String INTP_START_TIME      = "INTP_START_TIME";
+
+  // submarine-server resource usage
+  public static String CPU_CAPACITY         = "CPU_CAPACITY";
+  public static String CPU_USED             = "CPU_USED";
+  public static String MEMORY_CAPACITY      = "MEMORY_CAPACITY";
+  public static String MEMORY_USED          = "MEMORY_USED";
+
+  public static String LATEST_HEARTBEAT     = "LATEST_HEARTBEAT";
+
+  // submarine-server or interperter-process status
+  public static String STATUS               = "STATUS";
+  public static String ONLINE_STATUS        = "ONLINE";
+  public static String OFFLINE_STATUS       = "OFFLINE";
+
+  // cluster_name = host:port
+  // Map:cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...}
+  private Map<String, Map<String, Object>> mapServerMeta = new HashMap<>();
+
+  // Map:InterpreterGroupId -> {cluster_name,intp_tserver_host,...}
+  private Map<String, Map<String, Object>> mapInterpreterMeta = new HashMap<>();
+
+  public void put(ClusterMetaType type, String key, Object value) {
+    Map<String, Object> mapValue = (Map<String, Object>) value;
+
+    switch (type) {
+      case SERVER_META:
+        // Because it may be partially updated metadata information
+        if (mapServerMeta.containsKey(key)) {
+          Map<String, Object> values = mapServerMeta.get(key);
+          values.putAll(mapValue);
+        } else {
+          mapServerMeta.put(key, mapValue);
+        }
+        break;
+      case INTP_PROCESS_META:
+        if (mapInterpreterMeta.containsKey(key)) {
+          Map<String, Object> values = mapInterpreterMeta.get(key);
+          values.putAll(mapValue);
+        } else {
+          mapInterpreterMeta.put(key, mapValue);
+        }
+        break;
+    }
+  }
+
+  public Map<String, Map<String, Object>> get(ClusterMetaType type, String key) {
+    Map<String, Object> values = null;
+
+    switch (type) {
+      case SERVER_META:
+        if (null == key || StringUtils.isEmpty(key)) {
+          return mapServerMeta;
+        }
+        if (mapServerMeta.containsKey(key)) {
+          values = mapServerMeta.get(key);
+        } else {
+          LOG.warn("can not find key : {}", key);
+        }
+        break;
+      case INTP_PROCESS_META:
+        if (null == key || StringUtils.isEmpty(key)) {
+          return mapInterpreterMeta;
+        }
+        if (mapInterpreterMeta.containsKey(key)) {
+          values = mapInterpreterMeta.get(key);
+        } else {
+          LOG.warn("can not find key : {}", key);
+        }
+        break;
+    }
+
+    Map<String, Map<String, Object>> result = new HashMap<>();
+    result.put(key, values);
+
+    return result;
+  }
+
+  public Map<String, Object> remove(ClusterMetaType type, String key) {
+    switch (type) {
+      case SERVER_META:
+        if (mapServerMeta.containsKey(key)) {
+          return mapServerMeta.remove(key);
+        } else {
+          LOG.warn("can not find key : {}", key);
+        }
+        break;
+      case INTP_PROCESS_META:
+        if (mapInterpreterMeta.containsKey(key)) {
+          return mapInterpreterMeta.remove(key);
+        } else {
+          LOG.warn("can not find key : {}", key);
+        }
+        break;
+    }
+
+    return null;
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMetaEntity.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMetaEntity.java
new file mode 100644
index 0000000..1a4efa8
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMetaEntity.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster.meta;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+/**
+ * Cluster operations, cluster types, encapsulation objects for keys and values
+ */
+public class ClusterMetaEntity implements Serializable {
+  private ClusterMetaOperation operation;
+  private ClusterMetaType type;
+  private String key;
+  private HashMap<String, Object> values = new HashMap<>();
+
+  public ClusterMetaEntity(ClusterMetaOperation operation, ClusterMetaType type,
+                           String key, HashMap<String, Object> values) {
+    this.operation = operation;
+    this.type = type;
+    this.key = key;
+
+    if (null != values) {
+      this.values.putAll(values);
+    }
+  }
+
+  public ClusterMetaOperation getOperation() {
+    return operation;
+  }
+
+  public ClusterMetaType getMetaType() {
+    return type;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public HashMap<String, Object> getValues() {
+    return values;
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMetaOperation.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMetaOperation.java
new file mode 100644
index 0000000..674c924
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMetaOperation.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster.meta;
+
+/**
+ * Type of cluster metadata operation
+ */
+public enum ClusterMetaOperation {
+  GET_OPERATION,
+  PUT_OPERATION,
+  DELETE_OPERATION
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMetaType.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMetaType.java
new file mode 100644
index 0000000..b464b26
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/meta/ClusterMetaType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster.meta;
+
+/**
+ * Type of cluster metadata
+ */
+public enum ClusterMetaType {
+  SERVER_META,
+  INTP_PROCESS_META
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/LocalRaftClientProtocol.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/LocalRaftClientProtocol.java
new file mode 100644
index 0000000..b26bdf7
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/LocalRaftClientProtocol.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster.protocol;
+
+import com.google.common.collect.Maps;
+import io.atomix.cluster.MemberId;
+import io.atomix.primitive.session.SessionId;
+
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.utils.concurrent.Futures;
+import io.atomix.utils.serializer.Serializer;
+
+import java.net.ConnectException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Protocol for intercommunication between Raft clients for each server in the cluster.
+ * Communication protocol for handling sessions, queries, commands, and services within the cluster.
+ */
+public class LocalRaftClientProtocol extends LocalRaftProtocol implements RaftClientProtocol {
+  private Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> heartbeatHandler;
+  private final Map<Long, Consumer<PublishRequest>> publishListeners = Maps.newConcurrentMap();
+
+  public LocalRaftClientProtocol(MemberId memberId,
+                                 Serializer serializer,
+                                 Map<MemberId, LocalRaftServerProtocol> servers,
+                                 Map<MemberId, LocalRaftClientProtocol> clients) {
+    super(serializer, servers, clients);
+    clients.put(memberId, this);
+  }
+
+  private CompletableFuture<LocalRaftServerProtocol> getServer(MemberId memberId) {
+    LocalRaftServerProtocol server = server(memberId);
+    if (server != null) {
+      return Futures.completedFuture(server);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
+                                                            OpenSessionRequest request) {
+    return getServer(memberId).thenCompose(protocol ->
+        protocol.openSession(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId,
+                                                              CloseSessionRequest request) {
+    return getServer(memberId).thenCompose(protocol ->
+        protocol.closeSession(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
+                                                        KeepAliveRequest request) {
+    return getServer(memberId).thenCompose(protocol ->
+        protocol.keepAlive(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
+    return getServer(memberId).thenCompose(protocol ->
+        protocol.query(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<CommandResponse> command(MemberId memberId,
+                                                    CommandRequest request) {
+    return getServer(memberId).thenCompose(protocol ->
+        protocol.command(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
+                                                      MetadataRequest request) {
+    return getServer(memberId).thenCompose(protocol ->
+        protocol.metadata(encode(request))).thenApply(this::decode);
+  }
+
+  CompletableFuture<byte[]> heartbeat(byte[] request) {
+    if (heartbeatHandler != null) {
+      return heartbeatHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerHeartbeatHandler(Function<HeartbeatRequest,
+      CompletableFuture<HeartbeatResponse>> handler) {
+    this.heartbeatHandler = handler;
+  }
+
+  @Override
+  public void unregisterHeartbeatHandler() {
+    this.heartbeatHandler = null;
+  }
+
+  @Override
+  public void reset(Set<MemberId> members, ResetRequest request) {
+    members.forEach(nodeId -> {
+      LocalRaftServerProtocol server = server(nodeId);
+      if (server != null) {
+        server.reset(request.session(), encode(request));
+      }
+    });
+  }
+
+  void publish(long sessionId, byte[] request) {
+    Consumer<PublishRequest> listener = publishListeners.get(sessionId);
+    if (listener != null) {
+      listener.accept(decode(request));
+    }
+  }
+
+  @Override
+  public void registerPublishListener(SessionId sessionId,
+                                      Consumer<PublishRequest> listener, Executor executor) {
+    publishListeners.put(sessionId.id(), request ->
+        executor.execute(() -> listener.accept(request)));
+  }
+
+  @Override
+  public void unregisterPublishListener(SessionId sessionId) {
+    publishListeners.remove(sessionId.id());
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/LocalRaftProtocol.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/LocalRaftProtocol.java
new file mode 100644
index 0000000..ccadcc4
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/LocalRaftProtocol.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster.protocol;
+
+import io.atomix.cluster.MemberId;
+import io.atomix.utils.serializer.Serializer;
+
+import java.util.Map;
+
+/**
+ * Base class for Raft protocol.
+ */
+public abstract class LocalRaftProtocol {
+  private final Serializer serializer;
+  private final Map<MemberId, LocalRaftServerProtocol> servers;
+  private final Map<MemberId, LocalRaftClientProtocol> clients;
+
+  public LocalRaftProtocol(Serializer serializer,
+                           Map<MemberId, LocalRaftServerProtocol> servers,
+                           Map<MemberId, LocalRaftClientProtocol> clients) {
+    this.serializer = serializer;
+    this.servers = servers;
+    this.clients = clients;
+  }
+
+  <T> T copy(T value) {
+    return serializer.decode(serializer.encode(value));
+  }
+
+  byte[] encode(Object value) {
+    return serializer.encode(value);
+  }
+
+  <T> T decode(byte[] bytes) {
+    return serializer.decode(bytes);
+  }
+
+  LocalRaftServerProtocol server(MemberId memberId) {
+    return servers.get(memberId);
+  }
+
+  LocalRaftClientProtocol client(MemberId memberId) {
+    return clients.get(memberId);
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/LocalRaftProtocolFactory.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/LocalRaftProtocolFactory.java
new file mode 100644
index 0000000..077f2c1
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/LocalRaftProtocolFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster.protocol;
+
+import com.google.common.collect.Maps;
+import io.atomix.cluster.MemberId;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.utils.serializer.Serializer;
+
+import java.util.Map;
+
+/**
+ * Cluster Raft protocol factory.
+ */
+public class LocalRaftProtocolFactory {
+  private final Serializer serializer;
+  private final Map<MemberId, LocalRaftServerProtocol> servers = Maps.newConcurrentMap();
+  private final Map<MemberId, LocalRaftClientProtocol> clients = Maps.newConcurrentMap();
+
+  public LocalRaftProtocolFactory(Serializer serializer) {
+    this.serializer = serializer;
+  }
+
+  /**
+   * Returns a new test client protocol.
+   *
+   * @param memberId the client member identifier
+   * @return a new test client protocol
+   */
+  public RaftClientProtocol newClientProtocol(MemberId memberId) {
+    return new LocalRaftClientProtocol(memberId, serializer, servers, clients);
+  }
+
+  /**
+   * Returns a new test server protocol.
+   *
+   * @param memberId the server member identifier
+   * @return a new test server protocol
+   */
+  public RaftServerProtocol newServerProtocol(MemberId memberId) {
+    return new LocalRaftServerProtocol(memberId, serializer, servers, clients);
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/LocalRaftServerProtocol.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/LocalRaftServerProtocol.java
new file mode 100644
index 0000000..7ae9d33
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/LocalRaftServerProtocol.java
@@ -0,0 +1,527 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster.protocol;
+
+import com.google.common.collect.Maps;
+import io.atomix.cluster.MemberId;
+import io.atomix.primitive.session.SessionId;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.JoinRequest;
+import io.atomix.protocols.raft.protocol.JoinResponse;
+import io.atomix.protocols.raft.protocol.LeaveRequest;
+import io.atomix.protocols.raft.protocol.LeaveResponse;
+import io.atomix.protocols.raft.protocol.ConfigureRequest;
+import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.ReconfigureRequest;
+import io.atomix.protocols.raft.protocol.ReconfigureResponse;
+import io.atomix.protocols.raft.protocol.InstallRequest;
+import io.atomix.protocols.raft.protocol.InstallResponse;
+import io.atomix.protocols.raft.protocol.PollRequest;
+import io.atomix.protocols.raft.protocol.PollResponse;
+import io.atomix.protocols.raft.protocol.VoteRequest;
+import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.protocol.TransferRequest;
+import io.atomix.protocols.raft.protocol.TransferResponse;
+import io.atomix.protocols.raft.protocol.AppendRequest;
+import io.atomix.protocols.raft.protocol.AppendResponse;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+
+import io.atomix.utils.concurrent.Futures;
+import io.atomix.utils.serializer.Serializer;
+
+import java.net.ConnectException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Cluster server protocol.
+ */
+public class LocalRaftServerProtocol extends LocalRaftProtocol implements RaftServerProtocol {
+  private Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> openSessionHandler;
+  private Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>>
+      closeSessionHandler;
+  private Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> keepAliveHandler;
+  private Function<QueryRequest, CompletableFuture<QueryResponse>> queryHandler;
+  private Function<CommandRequest, CompletableFuture<CommandResponse>> commandHandler;
+  private Function<MetadataRequest, CompletableFuture<MetadataResponse>> metadataHandler;
+  private Function<JoinRequest, CompletableFuture<JoinResponse>> joinHandler;
+  private Function<LeaveRequest, CompletableFuture<LeaveResponse>> leaveHandler;
+  private Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> configureHandler;
+  private Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> reconfigureHandler;
+  private Function<InstallRequest, CompletableFuture<InstallResponse>> installHandler;
+  private Function<PollRequest, CompletableFuture<PollResponse>> pollHandler;
+  private Function<VoteRequest, CompletableFuture<VoteResponse>> voteHandler;
+  private Function<TransferRequest, CompletableFuture<TransferResponse>> transferHandler;
+  private Function<AppendRequest, CompletableFuture<AppendResponse>> appendHandler;
+  private final Map<Long, Consumer<ResetRequest>> resetListeners = Maps.newConcurrentMap();
+
+  public LocalRaftServerProtocol(MemberId memberId, Serializer serializer,
+                                 Map<MemberId, LocalRaftServerProtocol> servers,
+                                 Map<MemberId, LocalRaftClientProtocol> clients) {
+    super(serializer, servers, clients);
+    servers.put(memberId, this);
+  }
+
+  private CompletableFuture<LocalRaftServerProtocol> getServer(MemberId memberId) {
+    LocalRaftServerProtocol server = server(memberId);
+    if (server != null) {
+      return Futures.completedFuture(server);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  private CompletableFuture<LocalRaftClientProtocol> getClient(MemberId memberId) {
+    LocalRaftClientProtocol client = client(memberId);
+    if (client != null) {
+      return Futures.completedFuture(client);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
+                                                            OpenSessionRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.openSession(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId,
+                                                              CloseSessionRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.closeSession(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
+                                                        KeepAliveRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.keepAlive(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.query(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<CommandResponse> command(MemberId memberId,
+                                                    CommandRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.command(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
+                                                      MetadataRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.metadata(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.join(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.leave(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<ConfigureResponse> configure(MemberId memberId,
+                                                        ConfigureRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.configure(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId,
+                                                            ReconfigureRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.reconfigure(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.install(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<TransferResponse> transfer(MemberId memberId, TransferRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.install(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.poll(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.vote(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.append(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public void publish(MemberId memberId, PublishRequest request) {
+    getClient(memberId).thenAccept(protocol ->
+        protocol.publish(request.session(), encode(request)));
+  }
+
+  @Override
+  public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId,
+                                                        HeartbeatRequest request) {
+    return getClient(memberId).thenCompose(protocol ->
+        protocol.heartbeat(encode(request))).thenApply(this::decode);
+  }
+
+  CompletableFuture<byte[]> openSession(byte[] request) {
+    if (openSessionHandler != null) {
+      return openSessionHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerOpenSessionHandler(Function<OpenSessionRequest,
+      CompletableFuture<OpenSessionResponse>> handler) {
+    this.openSessionHandler = handler;
+  }
+
+  @Override
+  public void unregisterOpenSessionHandler() {
+    this.openSessionHandler = null;
+  }
+
+  CompletableFuture<byte[]> closeSession(byte[] request) {
+    if (closeSessionHandler != null) {
+      return closeSessionHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerCloseSessionHandler(Function<CloseSessionRequest,
+      CompletableFuture<CloseSessionResponse>> handler) {
+    this.closeSessionHandler = handler;
+  }
+
+  @Override
+  public void unregisterCloseSessionHandler() {
+    this.closeSessionHandler = null;
+  }
+
+  CompletableFuture<byte[]> keepAlive(byte[] request) {
+    if (keepAliveHandler != null) {
+      return keepAliveHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerKeepAliveHandler(Function<KeepAliveRequest,
+      CompletableFuture<KeepAliveResponse>> handler) {
+    this.keepAliveHandler = handler;
+  }
+
+  @Override
+  public void unregisterKeepAliveHandler() {
+    this.keepAliveHandler = null;
+  }
+
+  CompletableFuture<byte[]> query(byte[] request) {
+    if (queryHandler != null) {
+      return queryHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerQueryHandler(Function<QueryRequest,
+      CompletableFuture<QueryResponse>> handler) {
+    this.queryHandler = handler;
+  }
+
+  @Override
+  public void unregisterQueryHandler() {
+    this.queryHandler = null;
+  }
+
+  CompletableFuture<byte[]> command(byte[] request) {
+    if (commandHandler != null) {
+      return commandHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerCommandHandler(Function<CommandRequest,
+      CompletableFuture<CommandResponse>> handler) {
+    this.commandHandler = handler;
+  }
+
+  @Override
+  public void unregisterCommandHandler() {
+    this.commandHandler = null;
+  }
+
+  CompletableFuture<byte[]> metadata(byte[] request) {
+    if (metadataHandler != null) {
+      return metadataHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerMetadataHandler(Function<MetadataRequest,
+      CompletableFuture<MetadataResponse>> handler) {
+    this.metadataHandler = handler;
+  }
+
+  @Override
+  public void unregisterMetadataHandler() {
+    this.metadataHandler = null;
+  }
+
+  CompletableFuture<byte[]> join(byte[] request) {
+    if (joinHandler != null) {
+      return joinHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerJoinHandler(Function<JoinRequest,
+      CompletableFuture<JoinResponse>> handler) {
+    this.joinHandler = handler;
+  }
+
+  @Override
+  public void unregisterJoinHandler() {
+    this.joinHandler = null;
+  }
+
+  CompletableFuture<byte[]> leave(byte[] request) {
+    if (leaveHandler != null) {
+      return leaveHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerLeaveHandler(Function<LeaveRequest,
+      CompletableFuture<LeaveResponse>> handler) {
+    this.leaveHandler = handler;
+  }
+
+  @Override
+  public void unregisterLeaveHandler() {
+    this.leaveHandler = null;
+  }
+
+  CompletableFuture<byte[]> configure(byte[] request) {
+    if (configureHandler != null) {
+      return configureHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerConfigureHandler(Function<ConfigureRequest,
+      CompletableFuture<ConfigureResponse>> handler) {
+    this.configureHandler = handler;
+  }
+
+  @Override
+  public void unregisterConfigureHandler() {
+    this.configureHandler = null;
+  }
+
+  CompletableFuture<byte[]> reconfigure(byte[] request) {
+    if (reconfigureHandler != null) {
+      return reconfigureHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerReconfigureHandler(Function<ReconfigureRequest,
+      CompletableFuture<ReconfigureResponse>> handler) {
+    this.reconfigureHandler = handler;
+  }
+
+  @Override
+  public void unregisterReconfigureHandler() {
+    this.reconfigureHandler = null;
+  }
+
+  CompletableFuture<byte[]> install(byte[] request) {
+    if (installHandler != null) {
+      return installHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerInstallHandler(Function<InstallRequest,
+      CompletableFuture<InstallResponse>> handler) {
+    this.installHandler = handler;
+  }
+
+  @Override
+  public void unregisterInstallHandler() {
+    this.installHandler = null;
+  }
+
+  CompletableFuture<byte[]> poll(byte[] request) {
+    if (pollHandler != null) {
+      return pollHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerPollHandler(Function<PollRequest,
+      CompletableFuture<PollResponse>> handler) {
+    this.pollHandler = handler;
+  }
+
+  @Override
+  public void unregisterPollHandler() {
+    this.pollHandler = null;
+  }
+
+  CompletableFuture<byte[]> vote(byte[] request) {
+    if (voteHandler != null) {
+      return voteHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerVoteHandler(Function<VoteRequest,
+      CompletableFuture<VoteResponse>> handler) {
+    this.voteHandler = handler;
+  }
+
+  @Override
+  public void unregisterVoteHandler() {
+    this.voteHandler = null;
+  }
+
+  @Override
+  public void registerTransferHandler(Function<TransferRequest,
+      CompletableFuture<TransferResponse>> handler) {
+    this.transferHandler = handler;
+  }
+
+  @Override
+  public void unregisterTransferHandler() {
+    this.transferHandler = null;
+  }
+
+  CompletableFuture<byte[]> transfer(byte[] request) {
+    if (transferHandler != null) {
+      return transferHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  CompletableFuture<byte[]> append(byte[] request) {
+    if (appendHandler != null) {
+      return appendHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerAppendHandler(Function<AppendRequest,
+      CompletableFuture<AppendResponse>> handler) {
+    this.appendHandler = handler;
+  }
+
+  @Override
+  public void unregisterAppendHandler() {
+    this.appendHandler = null;
+  }
+
+  void reset(long sessionId, byte[] request) {
+    Consumer<ResetRequest> listener = resetListeners.get(sessionId);
+    if (listener != null) {
+      listener.accept(decode(request));
+    }
+  }
+
+  @Override
+  public void registerResetListener(SessionId sessionId,
+                                    Consumer<ResetRequest> listener, Executor executor) {
+    resetListeners.put(sessionId.id(), request -> executor.execute(()
+        -> listener.accept(request)));
+  }
+
+  @Override
+  public void unregisterResetListener(SessionId sessionId) {
+    resetListeners.remove(sessionId.id());
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/RaftClientMessagingProtocol.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/RaftClientMessagingProtocol.java
new file mode 100644
index 0000000..7ef0f32
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/RaftClientMessagingProtocol.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster.protocol;
+
+import io.atomix.cluster.MemberId;
+import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.primitive.session.SessionId;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.utils.net.Address;
+import io.atomix.utils.serializer.Serializer;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Raft client messaging service protocol.
+ */
+public class RaftClientMessagingProtocol extends RaftMessagingProtocol
+    implements RaftClientProtocol {
+  public RaftClientMessagingProtocol(MessagingService messagingService,
+                                     Serializer serializer,
+                                     Function<MemberId, Address> addressProvider) {
+    super(messagingService, serializer, addressProvider);
+  }
+
+  @Override
+  public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
+                                                            OpenSessionRequest request) {
+    return sendAndReceive(memberId, "open-session", request);
+  }
+
+  @Override
+  public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId,
+                                                              CloseSessionRequest request) {
+    return sendAndReceive(memberId, "close-session", request);
+  }
+
+  @Override
+  public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
+                                                        KeepAliveRequest request) {
+    return sendAndReceive(memberId, "keep-alive", request);
+  }
+
+  @Override
+  public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
+    return sendAndReceive(memberId, "query", request);
+  }
+
+  @Override
+  public CompletableFuture<CommandResponse> command(MemberId memberId,
+                                                    CommandRequest request) {
+    return sendAndReceive(memberId, "command", request);
+  }
+
+  @Override
+  public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
+                                                      MetadataRequest request) {
+    return sendAndReceive(memberId, "metadata", request);
+  }
+
+  @Override
+  public void registerHeartbeatHandler(Function<HeartbeatRequest,
+      CompletableFuture<HeartbeatResponse>> handler) {
+    registerHandler("heartbeat", handler);
+  }
+
+  @Override
+  public void unregisterHeartbeatHandler() {
+    unregisterHandler("heartbeat");
+  }
+
+  @Override
+  public void reset(Set<MemberId> members, ResetRequest request) {
+    for (MemberId memberId : members) {
+      sendAsync(memberId, String.format("reset-%d", request.session()), request);
+    }
+  }
+
+  @Override
+  public void registerPublishListener(SessionId sessionId, Consumer<PublishRequest> listener,
+                                      Executor executor) {
+    messagingService.registerHandler(String.format("publish-%d", sessionId.id()), (e, p) -> {
+      listener.accept(serializer.decode(p));
+    }, executor);
+  }
+
+  @Override
+  public void unregisterPublishListener(SessionId sessionId) {
+    messagingService.unregisterHandler(String.format("publish-%d", sessionId.id()));
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/RaftMessagingProtocol.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/RaftMessagingProtocol.java
new file mode 100644
index 0000000..8cdecc5
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/RaftMessagingProtocol.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster.protocol;
+
+import io.atomix.cluster.MemberId;
+import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.utils.concurrent.Futures;
+import io.atomix.utils.net.Address;
+import io.atomix.utils.serializer.Serializer;
+
+import java.net.ConnectException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Messaging service based Raft protocol.
+ */
+public abstract class RaftMessagingProtocol {
+  protected final MessagingService messagingService;
+  protected final Serializer serializer;
+  private final Function<MemberId, Address> addressProvider;
+
+  public RaftMessagingProtocol(MessagingService messagingService,
+                               Serializer serializer,
+                               Function<MemberId, Address> addressProvider) {
+    this.messagingService = messagingService;
+    this.serializer = serializer;
+    this.addressProvider = addressProvider;
+  }
+
+  protected Address address(MemberId memberId) {
+    return addressProvider.apply(memberId);
+  }
+
+  protected <T, U> CompletableFuture<U> sendAndReceive(MemberId memberId,
+                                                       String type, T request) {
+    Address address = address(memberId);
+    if (address == null) {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+    return messagingService.sendAndReceive(address, type, serializer.encode(request))
+        .thenApply(serializer::decode);
+  }
+
+  protected CompletableFuture<Void> sendAsync(MemberId memberId, String type, Object request) {
+    Address address = address(memberId);
+    if (address != null) {
+      return messagingService.sendAsync(address(memberId), type, serializer.encode(request));
+    }
+    return CompletableFuture.completedFuture(null);
+  }
+
+  protected <T, U> void registerHandler(String type, Function<T, CompletableFuture<U>> handler) {
+    messagingService.registerHandler(type, (e, p) -> {
+      CompletableFuture<byte[]> future = new CompletableFuture<>();
+      handler.apply(serializer.decode(p)).whenComplete((result, error) -> {
+        if (error == null) {
+          future.complete(serializer.encode(result));
+        } else {
+          future.completeExceptionally(error);
+        }
+      });
+      return future;
+    });
+  }
+
+  protected void unregisterHandler(String type) {
+    messagingService.unregisterHandler(type);
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/RaftServerMessagingProtocol.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/RaftServerMessagingProtocol.java
new file mode 100644
index 0000000..e29614b
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/protocol/RaftServerMessagingProtocol.java
@@ -0,0 +1,346 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster.protocol;
+
+import io.atomix.cluster.MemberId;
+import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.primitive.session.SessionId;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.JoinRequest;
+import io.atomix.protocols.raft.protocol.JoinResponse;
+import io.atomix.protocols.raft.protocol.LeaveRequest;
+import io.atomix.protocols.raft.protocol.LeaveResponse;
+import io.atomix.protocols.raft.protocol.ConfigureRequest;
+import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.ReconfigureRequest;
+import io.atomix.protocols.raft.protocol.ReconfigureResponse;
+import io.atomix.protocols.raft.protocol.InstallRequest;
+import io.atomix.protocols.raft.protocol.InstallResponse;
+import io.atomix.protocols.raft.protocol.PollRequest;
+import io.atomix.protocols.raft.protocol.PollResponse;
+import io.atomix.protocols.raft.protocol.VoteRequest;
+import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.protocol.TransferRequest;
+import io.atomix.protocols.raft.protocol.TransferResponse;
+import io.atomix.protocols.raft.protocol.AppendRequest;
+import io.atomix.protocols.raft.protocol.AppendResponse;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.utils.net.Address;
+import io.atomix.utils.serializer.Serializer;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Raft server messaging protocol between Raft Servers for each server in the cluster.
+ */
+public class RaftServerMessagingProtocol extends RaftMessagingProtocol
+    implements RaftServerProtocol {
+  public RaftServerMessagingProtocol(MessagingService messagingService,
+                                     Serializer serializer,
+                                     Function<MemberId, Address> addressProvider) {
+    super(messagingService, serializer, addressProvider);
+  }
+
+  @Override
+  public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
+                                                            OpenSessionRequest request) {
+    return sendAndReceive(memberId, "open-session", request);
+  }
+
+  @Override
+  public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId,
+                                                              CloseSessionRequest request) {
+    return sendAndReceive(memberId, "close-session", request);
+  }
+
+  @Override
+  public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
+                                                        KeepAliveRequest request) {
+    return sendAndReceive(memberId, "keep-alive", request);
+  }
+
+  @Override
+  public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
+    return sendAndReceive(memberId, "query", request);
+  }
+
+  @Override
+  public CompletableFuture<CommandResponse> command(MemberId memberId,
+                                                    CommandRequest request) {
+    return sendAndReceive(memberId, "command", request);
+  }
+
+  @Override
+  public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
+                                                      MetadataRequest request) {
+    return sendAndReceive(memberId, "metadata", request);
+  }
+
+  @Override
+  public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) {
+    return sendAndReceive(memberId, "join", request);
+  }
+
+  @Override
+  public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) {
+    return sendAndReceive(memberId, "leave", request);
+  }
+
+  @Override
+  public CompletableFuture<ConfigureResponse> configure(MemberId memberId,
+                                                        ConfigureRequest request) {
+    return sendAndReceive(memberId, "configure", request);
+  }
+
+  @Override
+  public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId,
+                                                            ReconfigureRequest request) {
+    return sendAndReceive(memberId, "reconfigure", request);
+  }
+
+  @Override
+  public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) {
+    return sendAndReceive(memberId, "install", request);
+  }
+
+  @Override
+  public CompletableFuture<TransferResponse> transfer(MemberId memberId,
+                                                      TransferRequest request) {
+    return sendAndReceive(memberId, "transfer", request);
+  }
+
+  @Override
+  public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) {
+    return sendAndReceive(memberId, "poll", request);
+  }
+
+  @Override
+  public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) {
+    return sendAndReceive(memberId, "vote", request);
+  }
+
+  @Override
+  public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) {
+    return sendAndReceive(memberId, "append", request);
+  }
+
+  @Override
+  public void publish(MemberId memberId, PublishRequest request) {
+    sendAsync(memberId, String.format("publish-%d", request.session()), request);
+  }
+
+  @Override
+  public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId,
+                                                        HeartbeatRequest request) {
+    return sendAndReceive(memberId, "heartbeat", request);
+  }
+
+  @Override
+  public void registerOpenSessionHandler(Function<OpenSessionRequest,
+      CompletableFuture<OpenSessionResponse>> handler) {
+    registerHandler("open-session", handler);
+  }
+
+  @Override
+  public void unregisterOpenSessionHandler() {
+    unregisterHandler("open-session");
+  }
+
+  @Override
+  public void registerCloseSessionHandler(Function<CloseSessionRequest,
+      CompletableFuture<CloseSessionResponse>> handler) {
+    registerHandler("close-session", handler);
+  }
+
+  @Override
+  public void unregisterCloseSessionHandler() {
+    unregisterHandler("close-session");
+  }
+
+  @Override
+  public void registerKeepAliveHandler(Function<KeepAliveRequest,
+      CompletableFuture<KeepAliveResponse>> handler) {
+    registerHandler("keep-alive", handler);
+  }
+
+  @Override
+  public void unregisterKeepAliveHandler() {
+    unregisterHandler("keep-alive");
+  }
+
+  @Override
+  public void registerQueryHandler(Function<QueryRequest,
+      CompletableFuture<QueryResponse>> handler) {
+    registerHandler("query", handler);
+  }
+
+  @Override
+  public void unregisterQueryHandler() {
+    unregisterHandler("query");
+  }
+
+  @Override
+  public void registerCommandHandler(Function<CommandRequest,
+      CompletableFuture<CommandResponse>> handler) {
+    registerHandler("command", handler);
+  }
+
+  @Override
+  public void unregisterCommandHandler() {
+    unregisterHandler("command");
+  }
+
+  @Override
+  public void registerMetadataHandler(Function<MetadataRequest,
+      CompletableFuture<MetadataResponse>> handler) {
+    registerHandler("metadata", handler);
+  }
+
+  @Override
+  public void unregisterMetadataHandler() {
+    unregisterHandler("metadata");
+  }
+
+  @Override
+  public void registerJoinHandler(Function<JoinRequest,
+      CompletableFuture<JoinResponse>> handler) {
+    registerHandler("join", handler);
+  }
+
+  @Override
+  public void unregisterJoinHandler() {
+    unregisterHandler("join");
+  }
+
+  @Override
+  public void registerLeaveHandler(Function<LeaveRequest,
+      CompletableFuture<LeaveResponse>> handler) {
+    registerHandler("leave", handler);
+  }
+
+  @Override
+  public void unregisterLeaveHandler() {
+    unregisterHandler("leave");
+  }
+
+  @Override
+  public void registerConfigureHandler(Function<ConfigureRequest,
+      CompletableFuture<ConfigureResponse>> handler) {
+    registerHandler("configure", handler);
+  }
+
+  @Override
+  public void unregisterConfigureHandler() {
+    unregisterHandler("configure");
+  }
+
+  @Override
+  public void registerReconfigureHandler(Function<ReconfigureRequest,
+      CompletableFuture<ReconfigureResponse>> handler) {
+    registerHandler("reconfigure", handler);
+  }
+
+  @Override
+  public void unregisterReconfigureHandler() {
+    unregisterHandler("reconfigure");
+  }
+
+  @Override
+  public void registerInstallHandler(Function<InstallRequest,
+      CompletableFuture<InstallResponse>> handler) {
+    registerHandler("install", handler);
+  }
+
+  @Override
+  public void unregisterInstallHandler() {
+    unregisterHandler("install");
+  }
+
+  @Override
+  public void registerTransferHandler(Function<TransferRequest,
+      CompletableFuture<TransferResponse>> handler) {
+    registerHandler("transfer", handler);
+  }
+
+  @Override
+  public void unregisterTransferHandler() {
+    unregisterHandler("transfer");
+  }
+
+  @Override
+  public void registerPollHandler(Function<PollRequest,
+      CompletableFuture<PollResponse>> handler) {
+    registerHandler("poll", handler);
+  }
+
+  @Override
+  public void unregisterPollHandler() {
+    unregisterHandler("poll");
+  }
+
+  @Override
+  public void registerVoteHandler(Function<VoteRequest,
+      CompletableFuture<VoteResponse>> handler) {
+    registerHandler("vote", handler);
+  }
+
+  @Override
+  public void unregisterVoteHandler() {
+    unregisterHandler("vote");
+  }
+
+  @Override
+  public void registerAppendHandler(Function<AppendRequest,
+      CompletableFuture<AppendResponse>> handler) {
+    registerHandler("append", handler);
+  }
+
+  @Override
+  public void unregisterAppendHandler() {
+    unregisterHandler("append");
+  }
+
+  @Override
+  public void registerResetListener(SessionId sessionId,
+                                    Consumer<ResetRequest> listener, Executor executor) {
+    messagingService.registerHandler(String.format("reset-%d", sessionId.id()), (e, p) -> {
+      listener.accept(serializer.decode(p));
+    }, executor);
+  }
+
+  @Override
+  public void unregisterResetListener(SessionId sessionId) {
+    messagingService.unregisterHandler(String.format("reset-%d", sessionId.id()));
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java b/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java
new file mode 100644
index 0000000..cf1c107
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.commons.cluster;
+
+import org.apache.submarine.commons.cluster.meta.ClusterMetaType;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.commons.utils.NetUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ClusterMultiNodeTest {
+  private static Logger LOG = LoggerFactory.getLogger(ClusterMultiNodeTest.class);
+
+  private static List<ClusterManagerServer> clusterServers = new ArrayList<>();
+  private static ClusterManagerClient clusterClient = null;
+
+  static final String metaKey = "ClusterMultiNodeTestKey";
+
+  @BeforeClass
+  public static void startCluster() throws IOException, InterruptedException {
+    LOG.info("ClusterMultiNodeTest::startCluster >>>");
+
+    String clusterAddrList = "";
+    String zServerHost = NetUtils.findAvailableHostAddress();
+    for (int i = 0; i < 3; i++) {
+      // Set the cluster IP and port
+      int zServerPort = NetUtils.findRandomAvailablePortOnAllLocalInterfaces();
+      clusterAddrList += zServerHost + ":" + zServerPort;
+      if (i != 2) {
+        clusterAddrList += ",";
+      }
+    }
+    LOG.info("clusterAddrList = {}", clusterAddrList);
+    SubmarineConfiguration sconf = SubmarineConfiguration.create();
+    sconf.setClusterAddress(clusterAddrList);
+
+    // mock cluster manager server
+    String cluster[] = clusterAddrList.split(",");
+    try {
+      for (int i = 0; i < 3; i++) {
+        String[] parts = cluster[i].split(":");
+        String clusterHost = parts[0];
+        int clusterPort = Integer.valueOf(parts[1]);
+
+        Class clazz = ClusterManagerServer.class;
+        Constructor constructor = clazz.getDeclaredConstructor();
+        constructor.setAccessible(true);
+        ClusterManagerServer clusterServer = (ClusterManagerServer) constructor.newInstance();
+        clusterServer.initTestCluster(clusterAddrList, clusterHost, clusterPort);
+
+        clusterServers.add(clusterServer);
+      }
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+
+    for (ClusterManagerServer clusterServer : clusterServers) {
+      clusterServer.start();
+    }
+
+    // mock cluster manager client
+    try {
+      Class clazz = ClusterManagerClient.class;
+      Constructor constructor = null;
+      constructor = clazz.getDeclaredConstructor();
+      constructor.setAccessible(true);
+      clusterClient = (ClusterManagerClient) constructor.newInstance();
+      clusterClient.start(metaKey);
+    } catch (NoSuchMethodException | InstantiationException
+        | IllegalAccessException | InvocationTargetException e) {
+      LOG.error(e.getMessage(), e);
+    }
+
+    // Waiting for cluster startup
+    boolean clusterIsStartup = false;
+    int wait = 0;
+    while (wait++ < 100) {
+      if (clusterIsStartup() && clusterClient.raftInitialized()) {
+        LOG.info("ClusterMultiNodeTest::wait {}(ms) found cluster leader", wait * 3000);
+        clusterIsStartup = true;
+        break;
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+
+    assertEquals(clusterIsStartup, true);
+
+    Thread.sleep(5000);
+    assertEquals(true, clusterIsStartup());
+    LOG.info("ClusterMultiNodeTest::startCluster <<<");
+  }
+
+  @AfterClass
+  public static void stopCluster() {
+    LOG.info("ClusterMultiNodeTest::stopCluster >>>");
+    if (null != clusterClient) {
+      clusterClient.shutdown();
+    }
+    for (ClusterManagerServer clusterServer : clusterServers) {
+      clusterServer.shutdown();
+    }
+    LOG.info("ClusterMultiNodeTest::stopCluster <<<");
+  }
+
+  static boolean clusterIsStartup() {
+    boolean foundLeader = false;
+    for (ClusterManagerServer clusterServer : clusterServers) {
+      if (!clusterServer.raftInitialized()) {
+        LOG.warn("clusterServer not Initialized!");
+        return false;
+      }
+      if (clusterServer.isClusterLeader()) {
+        foundLeader = true;
+      }
+    }
+
+    if (!foundLeader) {
+      LOG.warn("Can not found leader!");
+      return false;
+    }
+
+    LOG.info("cluster startup!");
+    return true;
+  }
+
+  @Test
+  public void testClusterServerMeta() {
+    LOG.info("ClusterMultiNodeTest::testClusterServerMeta >>>");
+    // Get metadata for all services
+    Object srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
+    LOG.info(srvMeta.toString());
+
+    assertNotNull(srvMeta);
+    assertEquals(true, (srvMeta instanceof HashMap));
+    HashMap hashMap = (HashMap) srvMeta;
+
+    assertEquals(hashMap.size(), 3);
+    LOG.info("ClusterMultiNodeTest::testClusterServerMeta <<<");
+  }
+
+  @Test
+  public void testClusterClientMeta() {
+    LOG.info("ClusterMultiNodeTest::testClusterClientMeta >>>");
+    // Get metadata for all services
+    Object srvMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
+    LOG.info(srvMeta.toString());
+
+    assertNotNull(srvMeta);
+    assertEquals(true, (srvMeta instanceof HashMap));
+    HashMap hashMap = (HashMap) srvMeta;
+
+    assertEquals(hashMap.size(), 1);
+    LOG.info("ClusterMultiNodeTest::testClusterClientMeta <<<");
+  }
+}
diff --git a/submarine-commons/commons-cluster/src/test/resources/log4j.properties b/submarine-commons/commons-cluster/src/test/resources/log4j.properties
new file mode 100644
index 0000000..4725615
--- /dev/null
+++ b/submarine-commons/commons-cluster/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Root logger option
+log4j.rootLogger=INFO, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
diff --git a/submarine-commons/commons-runtime/pom.xml b/submarine-commons/commons-runtime/pom.xml
index 3b5f361..2aa13bd 100644
--- a/submarine-commons/commons-runtime/pom.xml
+++ b/submarine-commons/commons-runtime/pom.xml
@@ -57,7 +57,7 @@
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
-      <version>${commons.lang3.version}</version>
+      <version>${commons-lang3.version}</version>
     </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
diff --git a/submarine-commons/commons-utils/pom.xml b/submarine-commons/commons-utils/pom.xml
index a458c7a..2afdf59 100644
--- a/submarine-commons/commons-utils/pom.xml
+++ b/submarine-commons/commons-utils/pom.xml
@@ -39,11 +39,33 @@
       <artifactId>commons-configuration</artifactId>
       <version>${commons-configuration.version}</version>
     </dependency>
+
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
       <version>${guava.version}</version>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <version>${libthrift.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.httpcomponents</groupId>
+          <artifactId>httpcore</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
diff --git a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/NetUtils.java b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/NetUtils.java
new file mode 100644
index 0000000..a26ceb6
--- /dev/null
+++ b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/NetUtils.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.commons.utils;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.InterfaceAddress;
+import java.net.NetworkInterface;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.Collections;
+
+public class NetUtils {
+  static Logger LOG = LoggerFactory.getLogger(NetUtils.class);
+
+  public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException {
+    int port;
+    try (ServerSocket socket = new ServerSocket(0);) {
+      port = socket.getLocalPort();
+      socket.close();
+    }
+    return port;
+  }
+
+  /**
+   * start:end
+   *
+   * @param portRange
+   * @return
+   * @throws IOException
+   */
+  public static TServerSocket createTServerSocket(String portRange)
+      throws IOException {
+
+    TServerSocket tSocket = null;
+    // ':' is the default value which means no constraints on the portRange
+    if (StringUtils.isBlank(portRange) || portRange.equals(":")) {
+      try {
+        tSocket = new TServerSocket(0);
+        return tSocket;
+      } catch (TTransportException e) {
+        throw new IOException("Fail to create TServerSocket", e);
+      }
+    }
+    // valid user registered port https://en.wikipedia.org/wiki/Registered_port
+    int start = 1024;
+    int end = 65535;
+    String[] ports = portRange.split(":", -1);
+    if (!ports[0].isEmpty()) {
+      start = Integer.parseInt(ports[0]);
+    }
+    if (!ports[1].isEmpty()) {
+      end = Integer.parseInt(ports[1]);
+    }
+    for (int i = start; i <= end; ++i) {
+      try {
+        tSocket = new TServerSocket(i);
+        return tSocket;
+      } catch (Exception e) {
+        // ignore this
+      }
+    }
+    throw new IOException("No available port in the portRange: " + portRange);
+  }
+
+  public static String findAvailableHostAddress() throws UnknownHostException, SocketException {
+    String submarineServerIP = System.getenv("SUBMARINE_LOCAL_IP");
+    if (submarineServerIP != null) {
+      return submarineServerIP;
+    }
+
+    InetAddress address = InetAddress.getLocalHost();
+    if (address.isLoopbackAddress()) {
+      for (NetworkInterface networkInterface : Collections
+          .list(NetworkInterface.getNetworkInterfaces())) {
+        if (!networkInterface.isLoopback()) {
+          for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses()) {
+            InetAddress a = interfaceAddress.getAddress();
+            if (a instanceof Inet4Address) {
+              return a.getHostAddress();
+            }
+          }
+        }
+      }
+    }
+    return address.getHostAddress();
+  }
+
+  public static boolean checkIfRemoteEndpointAccessible(String host, int port) {
+    try {
+      Socket discover = new Socket();
+      discover.setSoTimeout(1000);
+      discover.connect(new InetSocketAddress(host, port), 1000);
+      discover.close();
+      return true;
+    } catch (ConnectException cne) {
+      // end point is not accessible
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Remote endpoint '" + host + ":" + port + "' is not accessible " +
+            "(might be initializing): " + cne.getMessage());
+      }
+      return false;
+    } catch (IOException ioe) {
+      // end point is not accessible
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Remote endpoint '" + host + ":" + port + "' is not accessible " +
+            "(might be initializing): " + ioe.getMessage());
+      }
+      return false;
+    }
+  }
+
+  public static String getInterpreterSettingId(String intpGrpId) {
+    String settingId = null;
+    if (intpGrpId != null) {
+      int indexOfColon = intpGrpId.indexOf("-");
+      settingId = intpGrpId.substring(0, indexOfColon);
+    }
+    return settingId;
+  }
+
+  public static boolean isEnvString(String key) {
+    if (key == null || key.length() == 0) {
+      return false;
+    }
+
+    return key.matches("^[A-Z_0-9]*");
+  }
+
+}
diff --git a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
index 8ae958e..cea3492 100644
--- a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
+++ b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
@@ -62,7 +62,7 @@ public class SubmarineConfiguration extends XMLConfiguration {
     }
   }
 
-  public SubmarineConfiguration() {
+  private SubmarineConfiguration() {
     ConfVars[] vars = ConfVars.values();
     for (ConfVars v : vars) {
       if (v.getType() == ConfVars.VarType.BOOLEAN) {
@@ -242,6 +242,31 @@ public class SubmarineConfiguration extends XMLConfiguration {
     properties.put(ConfVars.JDBC_PASSWORD.getVarName(), password);
   }
 
+  public String getClusterAddress() {
+    return getString(ConfVars.WORKBENCH_CLUSTER_ADDR);
+  }
+
+  public void setClusterAddress(String clusterAddr) {
+    properties.put(ConfVars.WORKBENCH_CLUSTER_ADDR.getVarName(), clusterAddr);
+  }
+
+  public boolean isClusterMode() {
+    String clusterAddr = getString(ConfVars.WORKBENCH_CLUSTER_ADDR);
+    if (StringUtils.isEmpty(clusterAddr)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  public int getClusterHeartbeatInterval() {
+    return getInt(ConfVars.CLUSTER_HEARTBEAT_INTERVAL);
+  }
+
+  public int getClusterHeartbeatTimeout() {
+    return getInt(ConfVars.CLUSTER_HEARTBEAT_TIMEOUT);
+  }
+
   private String getStringValue(String name, String d) {
     String value = this.properties.get(name);
     if (value != null) {
@@ -367,6 +392,9 @@ public class SubmarineConfiguration extends XMLConfiguration {
     SERVER_JETTY_REQUEST_HEADER_SIZE("workbench.server.jetty.request.header.size", 8192),
     SSL_CLIENT_AUTH("workbench.ssl.client.auth", false),
     SSL_KEYSTORE_PATH("workbench.ssl.keystore.path", "keystore"),
+    WORKBENCH_CLUSTER_ADDR("workbench.cluster.addr", ""),
+    CLUSTER_HEARTBEAT_INTERVAL("cluster.heartbeat.interval", 3000),
+    CLUSTER_HEARTBEAT_TIMEOUT("cluster.heartbeat.timeout", 9000),
     SERVER_SSL_KEYSTORE_TYPE("workbench.ssl.keystore.type", "JKS"),
     SERVER_SSL_KEYSTORE_PASSWORD("workbench.ssl.keystore.password", ""),
     SERVER_SSL_KEY_MANAGER_PASSWORD("workbench.ssl.key.manager.password", null),
@@ -515,7 +543,7 @@ public class SubmarineConfiguration extends XMLConfiguration {
         try {
           checkType(value);
         } catch (Exception e) {
-          LOG.error("Exception in ZeppelinConfiguration while isType", e);
+          LOG.error("Exception in SubmarineConfiguration while isType", e);
           return false;
         }
         return true;
diff --git a/submarine-commons/pom.xml b/submarine-commons/pom.xml
index 68fc35c..fffcc1e 100644
--- a/submarine-commons/pom.xml
+++ b/submarine-commons/pom.xml
@@ -17,7 +17,6 @@
   specific language governing permissions and limitations
   under the License.
   -->
-
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -36,6 +35,7 @@
   <modules>
     <module>commons-utils</module>
     <module>commons-runtime</module>
+    <module>commons-cluster</module>
   </modules>
 
 </project>
diff --git a/submarine-dist/src/assembly/distribution.xml b/submarine-dist/src/assembly/distribution.xml
index 33572f3..33e3783 100644
--- a/submarine-dist/src/assembly/distribution.xml
+++ b/submarine-dist/src/assembly/distribution.xml
@@ -135,6 +135,13 @@
         <include>python-interpreter-${project.version}-shade.jar</include>
       </includes>
     </fileSet>
+    <fileSet>
+      <directory>../submarine-commons/commons-cluster/target</directory>
+      <outputDirectory>/lib/commons</outputDirectory>
+      <includes>
+        <include>commons-cluster-${project.version}-shade.jar</include>
+      </includes>
+    </fileSet>
   </fileSets>
 
 </assembly>
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/AbstractComponent.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/AbstractComponent.java
index 9777c04..1b42106 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/AbstractComponent.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/AbstractComponent.java
@@ -26,8 +26,8 @@ import org.apache.submarine.common.api.TensorFlowRole;
 import org.apache.submarine.common.fs.RemoteDirectoryManager;
 import org.apache.submarine.runtimes.yarnservice.command.AbstractLaunchCommand;
 import org.apache.submarine.runtimes.yarnservice.command.LaunchCommandFactory;
-import org.apache.submarine.utils.DockerUtilities;
-import org.apache.submarine.utils.SubmarineResourceUtils;
+import org.apache.submarine.commons.utils.DockerUtilities;
+import org.apache.submarine.commons.utils.SubmarineResourceUtils;
 
 import java.io.IOException;
 import java.util.Objects;
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/AbstractServiceSpec.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/AbstractServiceSpec.java
index c7f5bc3..ea6b667 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/AbstractServiceSpec.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/AbstractServiceSpec.java
@@ -30,10 +30,10 @@ import org.apache.submarine.common.api.TensorFlowRole;
 import org.apache.submarine.common.conf.SubmarineLogs;
 import org.apache.submarine.common.fs.RemoteDirectoryManager;
 import org.apache.submarine.runtimes.yarnservice.command.LaunchCommandFactory;
-import org.apache.submarine.utils.KerberosPrincipalFactory;
-import org.apache.submarine.utils.Localizer;
-import org.apache.submarine.utils.DockerUtilities;
-import org.apache.submarine.utils.EnvironmentUtilities;
+import org.apache.submarine.commons.utils.KerberosPrincipalFactory;
+import org.apache.submarine.commons.utils.Localizer;
+import org.apache.submarine.commons.utils.DockerUtilities;
+import org.apache.submarine.commons.utils.EnvironmentUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/FileSystemOperations.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/FileSystemOperations.java
index 79fc7c7..b0af145 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/FileSystemOperations.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/FileSystemOperations.java
@@ -29,7 +29,7 @@ import org.apache.submarine.common.ClientContext;
 import org.apache.submarine.common.conf.SubmarineConfiguration;
 import org.apache.submarine.common.conf.SubmarineLogs;
 import org.apache.submarine.common.fs.RemoteDirectoryManager;
-import org.apache.submarine.utils.ZipUtilities;
+import org.apache.submarine.commons.utils.ZipUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/HadoopEnvironmentSetup.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/HadoopEnvironmentSetup.java
index 2a72301..6c7ab9a 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/HadoopEnvironmentSetup.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/HadoopEnvironmentSetup.java
@@ -23,8 +23,8 @@ import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
 import org.apache.submarine.common.ClientContext;
 import org.apache.submarine.common.conf.SubmarineLogs;
 import org.apache.submarine.common.fs.RemoteDirectoryManager;
-import org.apache.submarine.utils.ClassPathUtilities;
-import org.apache.submarine.utils.EnvironmentUtilities;
+import org.apache.submarine.commons.utils.ClassPathUtilities;
+import org.apache.submarine.commons.utils.EnvironmentUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
index 58ac9c7..1898f8c 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
@@ -31,7 +31,7 @@ import org.apache.submarine.runtimes.yarnservice.command.PyTorchLaunchCommandFac
 import org.apache.submarine.runtimes.yarnservice.command.TensorFlowLaunchCommandFactory;
 import org.apache.submarine.runtimes.yarnservice.pytorch.PyTorchServiceSpec;
 import org.apache.submarine.runtimes.yarnservice.tensorflow.TensorFlowServiceSpec;
-import org.apache.submarine.utils.Localizer;
+import org.apache.submarine.commons.utils.Localizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/pytorch/PyTorchServiceSpec.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/pytorch/PyTorchServiceSpec.java
index 520afbd..e147928 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/pytorch/PyTorchServiceSpec.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/pytorch/PyTorchServiceSpec.java
@@ -25,7 +25,7 @@ import org.apache.submarine.runtimes.yarnservice.AbstractServiceSpec;
 import org.apache.submarine.runtimes.yarnservice.FileSystemOperations;
 import org.apache.submarine.runtimes.yarnservice.ServiceWrapper;
 import org.apache.submarine.runtimes.yarnservice.command.PyTorchLaunchCommandFactory;
-import org.apache.submarine.utils.Localizer;
+import org.apache.submarine.commons.utils.Localizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/tensorflow/TensorFlowServiceSpec.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/tensorflow/TensorFlowServiceSpec.java
index 5e8a2a5..e4c4930 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/tensorflow/TensorFlowServiceSpec.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/tensorflow/TensorFlowServiceSpec.java
@@ -25,7 +25,7 @@ import org.apache.submarine.runtimes.yarnservice.ServiceWrapper;
 import org.apache.submarine.runtimes.yarnservice.command.TensorFlowLaunchCommandFactory;
 import org.apache.submarine.runtimes.yarnservice.tensorflow.component.TensorBoardComponent;
 import org.apache.submarine.runtimes.yarnservice.tensorflow.component.TensorFlowPsComponent;
-import org.apache.submarine.utils.Localizer;
+import org.apache.submarine.commons.utils.Localizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/tensorflow/component/TensorBoardComponent.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/tensorflow/component/TensorBoardComponent.java
index b680c5e..5d3a41d 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/tensorflow/component/TensorBoardComponent.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/tensorflow/component/TensorBoardComponent.java
@@ -28,8 +28,8 @@ import org.apache.submarine.runtimes.yarnservice.FileSystemOperations;
 import org.apache.submarine.runtimes.yarnservice.YarnServiceUtils;
 import org.apache.submarine.runtimes.yarnservice.command.TensorFlowLaunchCommandFactory;
 import org.apache.submarine.runtimes.yarnservice.tensorflow.TensorFlowCommons;
-import org.apache.submarine.utils.DockerUtilities;
-import org.apache.submarine.utils.SubmarineResourceUtils;
+import org.apache.submarine.commons.utils.DockerUtilities;
+import org.apache.submarine.commons.utils.SubmarineResourceUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/tensorflow/component/TensorFlowPsComponent.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/tensorflow/component/TensorFlowPsComponent.java
index cd61d01..4c64804 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/tensorflow/component/TensorFlowPsComponent.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/runtimes/yarnservice/tensorflow/component/TensorFlowPsComponent.java
@@ -26,8 +26,8 @@ import org.apache.submarine.runtimes.yarnservice.AbstractComponent;
 import org.apache.submarine.runtimes.yarnservice.FileSystemOperations;
 import org.apache.submarine.runtimes.yarnservice.command.TensorFlowLaunchCommandFactory;
 import org.apache.submarine.runtimes.yarnservice.tensorflow.TensorFlowCommons;
-import org.apache.submarine.utils.DockerUtilities;
-import org.apache.submarine.utils.SubmarineResourceUtils;
+import org.apache.submarine.commons.utils.DockerUtilities;
+import org.apache.submarine.commons.utils.SubmarineResourceUtils;
 
 import java.io.IOException;
 import java.util.Objects;
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/ClassPathUtilities.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/ClassPathUtilities.java
index 0caa588..24fa34c 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/ClassPathUtilities.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/ClassPathUtilities.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.submarine.utils;
+package org.apache.submarine.commons.utils;
 
 import java.io.File;
 import java.util.StringTokenizer;
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/DockerUtilities.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/DockerUtilities.java
index decdfd1..497d03d 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/DockerUtilities.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/DockerUtilities.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.submarine.utils;
+package org.apache.submarine.commons.utils;
 
 import org.apache.hadoop.yarn.service.api.records.Artifact;
 
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/EnvironmentUtilities.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/EnvironmentUtilities.java
index 6da53d7..6633ec6 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/EnvironmentUtilities.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/EnvironmentUtilities.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.submarine.utils;
+package org.apache.submarine.commons.utils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.service.api.records.Service;
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/KerberosPrincipalFactory.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/KerberosPrincipalFactory.java
index f135b04..4c8e501 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/KerberosPrincipalFactory.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/KerberosPrincipalFactory.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.submarine.utils;
+package org.apache.submarine.commons.utils;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/Localizer.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/Localizer.java
index 136fa00..a0f8a1b 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/Localizer.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/Localizer.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.submarine.utils;
+package org.apache.submarine.commons.utils;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.service.api.records.ConfigFile;
@@ -32,7 +32,7 @@ import java.io.IOException;
 import java.util.List;
 
 import static org.apache.submarine.runtimes.yarnservice.FileSystemOperations.needHdfs;
-import static org.apache.submarine.utils.EnvironmentUtilities.appendToEnv;
+import static org.apache.submarine.commons.utils.EnvironmentUtilities.appendToEnv;
 
 /**
  * This class holds all dependencies in order to localize dependencies
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/SubmarineResourceUtils.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/SubmarineResourceUtils.java
index 5afaedd..c3aca0d 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/SubmarineResourceUtils.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/SubmarineResourceUtils.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.submarine.utils;
+package org.apache.submarine.commons.utils;
 
 import org.apache.hadoop.yarn.service.api.records.Resource;
 import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/ZipUtilities.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/ZipUtilities.java
index 7b0cccc..7cdf326 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/ZipUtilities.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/ZipUtilities.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.submarine.utils;
+package org.apache.submarine.commons.utils;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/package-info.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/package-info.java
index e8459fd..c3205b6 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/package-info.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/utils/package-info.java
@@ -16,4 +16,4 @@
 /**
  * This package contains classes utility classes.
  */
-package org.apache.submarine.utils;
+package org.apache.submarine.commons.utils;
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
index 2a77840..e1ebfb9 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
@@ -37,7 +37,7 @@ import org.apache.submarine.runtimes.yarnservice.ServiceSpecFileGenerator;
 import org.apache.submarine.runtimes.yarnservice.ServiceWrapper;
 import org.apache.submarine.runtimes.yarnservice.YarnServiceJobSubmitter;
 import org.apache.submarine.runtimes.yarnservice.tensorflow.component.TensorBoardComponent;
-import org.apache.submarine.utils.ZipUtilities;
+import org.apache.submarine.commons.utils.ZipUtilities;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/runtimes/yarnservice/pytorch/TestPyTorchServiceSpec.java b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/runtimes/yarnservice/pytorch/TestPyTorchServiceSpec.java
index 7879567..b56359e 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/runtimes/yarnservice/pytorch/TestPyTorchServiceSpec.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/runtimes/yarnservice/pytorch/TestPyTorchServiceSpec.java
@@ -28,7 +28,7 @@ import org.apache.submarine.runtimes.yarnservice.HadoopEnvironmentSetup;
 import org.apache.submarine.runtimes.yarnservice.ServiceWrapper;
 import org.apache.submarine.runtimes.yarnservice.command.PyTorchLaunchCommandFactory;
 import org.apache.submarine.runtimes.yarnservice.tensorflow.component.ComponentTestCommons;
-import org.apache.submarine.utils.Localizer;
+import org.apache.submarine.commons.utils.Localizer;
 import org.junit.Before;
 import org.junit.Test;
 
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestClassPathUtilities.java b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestClassPathUtilities.java
index 92c03d2..56a6776 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestClassPathUtilities.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestClassPathUtilities.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.submarine.utils;
+package org.apache.submarine.commons.utils;
 
 import org.apache.submarine.FileUtilitiesForTests;
 import org.junit.After;
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestEnvironmentUtilities.java b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestEnvironmentUtilities.java
index 8f6706c..4710e00 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestEnvironmentUtilities.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestEnvironmentUtilities.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.submarine.utils;
+package org.apache.submarine.commons.utils;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestKerberosPrincipalFactory.java b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestKerberosPrincipalFactory.java
index 2d9ce62..68d2304 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestKerberosPrincipalFactory.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestKerberosPrincipalFactory.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.submarine.utils;
+package org.apache.submarine.commons.utils;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestSubmarineResourceUtils.java b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestSubmarineResourceUtils.java
index ecafa5e..d911087 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestSubmarineResourceUtils.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/utils/TestSubmarineResourceUtils.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.submarine.utils;
+package org.apache.submarine.commons.utils;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.conf.Configuration;
diff --git a/submarine-workbench/interpreter/python-interpreter/pom.xml b/submarine-workbench/interpreter/python-interpreter/pom.xml
index 39218d0..33c19a0 100644
--- a/submarine-workbench/interpreter/python-interpreter/pom.xml
+++ b/submarine-workbench/interpreter/python-interpreter/pom.xml
@@ -74,6 +74,14 @@
           <groupId>commons-logging</groupId>
           <artifactId>commons-logging</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.google.errorprone</groupId>
+          <artifactId>error_prone_annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>animal-sniffer-annotations</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>