You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/10 12:19:35 UTC

[ignite-3] 04/04: IGNITE-13885 Raft client wip 2.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit e78980e90edcc6d71e376f55483740371afdf044
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Wed Feb 10 15:19:14 2021 +0300

    IGNITE-13885 Raft client wip 2.
---
 modules/raft-client/pom.xml                        |   6 +
 .../main/java/org/apache/ignite/raft/Closure.java  |  29 ++
 .../java/org/apache/ignite/raft/Configuration.java | 276 +++++++++++++
 .../org/apache/ignite/raft/ElectionPriority.java   |  37 ++
 .../main/java/org/apache/ignite/raft/Endpoint.java |  89 +++++
 .../java/org/apache/ignite/raft/Lifecycle.java     |  39 ++
 .../main/java/org/apache/ignite/raft/PeerId.java   | 247 ++++++++++++
 .../java/org/apache/ignite/raft/RaftError.java     | 284 +++++++++++++
 .../main/java/org/apache/ignite/raft/Status.java   | 225 +++++++++++
 .../java/org/apache/ignite/raft/StringUtils.java   | 407 +++++++++++++++++++
 .../ignite/raft/closure/RpcResponseClosure.java    |  34 ++
 .../raft/closure/RpcResponseClosureAdapter.java    |  37 ++
 .../org/apache/ignite/raft/rpc/CliRequests.java    | 439 +++++++++++++++++++++
 .../org/apache/ignite/raft/rpc/Connection.java     |  53 +++
 .../raft/rpc/ConnectionClosedEventListener.java    |  24 ++
 .../org/apache/ignite/raft/rpc/InvokeCallback.java |  30 ++
 .../org/apache/ignite/raft/rpc/InvokeContext.java  |  51 +++
 .../ignite/raft/rpc/InvokeTimeoutException.java    |  43 ++
 .../java/org/apache/ignite/raft/rpc/Message.java   |  11 +
 .../ignite/raft/rpc/MessageBuilderFactory.java     |  50 +++
 .../apache/ignite/raft/rpc/NamedThreadFactory.java |  65 +++
 .../apache/ignite/raft/rpc/RemotingException.java  |  43 ++
 .../java/org/apache/ignite/raft/rpc/RpcClient.java | 105 +++++
 .../org/apache/ignite/raft/rpc/RpcContext.java     |  44 +++
 .../org/apache/ignite/raft/rpc/RpcOptions.java     |  25 ++
 .../org/apache/ignite/raft/rpc/RpcProcessor.java   |  52 +++
 .../org/apache/ignite/raft/rpc/RpcRequests.java    |  61 +++
 .../java/org/apache/ignite/raft/rpc/RpcServer.java |  46 +++
 .../java/org/apache/ignite/raft/rpc/RpcUtils.java  | 111 ++++++
 .../ignite/raft}/rpc/impl/LocalConnection.java     |  17 +-
 .../ignite/raft/rpc/impl/LocalRpcClient.java       | 125 ++++++
 .../ignite/raft}/rpc/impl/LocalRpcServer.java      |  59 +--
 .../raft/rpc/message/AddLearnersRequestImpl.java   |  53 +++
 .../rpc/message/CreateGetLeaderRequestImpl.java    |  37 ++
 .../rpc/message/CreateGetLeaderResponseImpl.java   |  22 ++
 .../rpc/message/DefaultMessageBuilderFactory.java  |  86 ++++
 .../ignite/raft/rpc/message/ErrorResponseImpl.java |  25 ++
 .../raft/rpc/message/LearnersOpResponseImpl.java   |  50 +++
 .../ignite/raft/rpc/message/PingRequestImpl.java   |  21 +
 .../rpc/message/RemoveLearnersRequestImpl.java     |  53 +++
 .../raft/rpc/message/ResetLearnersRequestImpl.java |  53 +++
 .../raft/rpc/message/SnapshotRequestImpl.java      |  32 ++
 .../rpc/message/TransferLeaderRequestImpl.java     |  47 +++
 .../ignite/raft/service/AbstractClientService.java | 237 +++++++++++
 .../ignite/raft/service/CliClientService.java      | 153 +++++++
 .../ignite/raft/service/CliClientServiceImpl.java  | 111 ++++++
 .../apache/ignite/raft/service/ClientService.java  |  75 ++++
 .../org/apache/ignite/raft/service/RouteTable.java | 316 +++++++++++++++
 .../org/apache/ignite/raft/RaftClientTest.java     |  39 ++
 .../org/apache/ignite/raft/rpc/LocalRpcTest.java   | 321 +++++++++++++++
 .../sofa/jraft/rpc/impl/LocalConnection.java       |   2 +-
 .../alipay/sofa/jraft/rpc/impl/LocalRpcServer.java |   4 +-
 52 files changed, 4847 insertions(+), 54 deletions(-)

diff --git a/modules/raft-client/pom.xml b/modules/raft-client/pom.xml
index 9286853..01e0c6d 100644
--- a/modules/raft-client/pom.xml
+++ b/modules/raft-client/pom.xml
@@ -34,6 +34,12 @@
     <artifactId>ignite-raft-client</artifactId>
 
     <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/Closure.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/Closure.java
new file mode 100644
index 0000000..e2126fc
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/Closure.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+/**
+ * Callback closure.
+ */
+public interface Closure {
+    /**
+     * Called when task is done.
+     *
+     * @param status the execution status.
+     */
+    void run(final Status status);
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/Configuration.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/Configuration.java
new file mode 100644
index 0000000..5a54ce5
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/Configuration.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A configuration with a set of peers and/or learners.
+ */
+public class Configuration implements Iterable<PeerId> {
+    private static final String LEARNER_POSTFIX = "/learner";
+
+    private List<PeerId> peers = new ArrayList<>();
+
+    // use LinkedHashSet to keep insertion order.
+    private LinkedHashSet<PeerId> learners = new LinkedHashSet<>();
+
+    public Configuration() {
+        super();
+    }
+
+    /**
+     * Construct a configuration instance with peers.
+     *
+     * @param conf configuration
+     */
+    public Configuration(final Iterable<PeerId> conf) {
+        this(conf, null);
+    }
+
+    /**
+     * Construct a configuration from another conf.
+     *
+     * @param conf configuration
+     */
+    public Configuration(final Configuration conf) {
+        this(conf.getPeers(), conf.getLearners());
+    }
+
+    /**
+     * Construct a Configuration instance with peers and learners.
+     *
+     * @param conf     peers configuration
+     * @param learners learners
+     * @since 1.3.0
+     */
+    public Configuration(final Iterable<PeerId> conf, final Iterable<PeerId> learners) {
+        for (final PeerId peer : conf) {
+            this.peers.add(new PeerId(peer));
+        }
+        addLearners(learners);
+    }
+
+    public void setLearners(final LinkedHashSet<PeerId> learners) {
+        this.learners = learners;
+    }
+
+    /**
+     * Add a learner peer.
+     *
+     * @param learner learner to add
+     * @return true when add successfully.
+     */
+    public boolean addLearner(final PeerId learner) {
+        return this.learners.add(learner);
+    }
+
+    /**
+     * Add learners in batch, returns the added count.
+     *
+     * @param learners learners to add
+     * @return the total added count
+     */
+    public int addLearners(final Iterable<PeerId> learners) {
+        int ret = 0;
+        if (learners != null) {
+            for (final PeerId peer : learners) {
+                if (this.learners.add(new PeerId(peer))) {
+                    ret++;
+                }
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Remove a learner peer.
+     *
+     * @param learner learner to remove
+     * @return true when remove successfully.
+     */
+    public boolean removeLearner(final PeerId learner) {
+        return this.learners.remove(learner);
+    }
+
+    /**
+     * Retrieve the learners set.
+     *
+     * @return learners
+     */
+    public LinkedHashSet<PeerId> getLearners() {
+        return this.learners;
+    }
+
+    /**
+     * Retrieve the learners set copy.
+     *
+     * @return learners
+     */
+    public List<PeerId> listLearners() {
+        return new ArrayList<>(this.learners);
+    }
+
+    /**
+     * Returns true when the configuration is valid.
+     *
+     * @return true if the configuration is valid.
+     */
+    public boolean isValid() {
+        final Set<PeerId> intersection = new HashSet<>(this.peers);
+        intersection.retainAll(this.learners);
+        return !this.peers.isEmpty() && intersection.isEmpty();
+    }
+
+    public void reset() {
+        this.peers.clear();
+        this.learners.clear();
+    }
+
+    public boolean isEmpty() {
+        return this.peers.isEmpty();
+    }
+
+    /**
+     * Returns the peers total number.
+     *
+     * @return total num of peers
+     */
+    public int size() {
+        return this.peers.size();
+    }
+
+    @Override
+    public Iterator<PeerId> iterator() {
+        return this.peers.iterator();
+    }
+
+    public Set<PeerId> getPeerSet() {
+        return new HashSet<>(this.peers);
+    }
+
+    public List<PeerId> listPeers() {
+        return new ArrayList<>(this.peers);
+    }
+
+    public List<PeerId> getPeers() {
+        return this.peers;
+    }
+
+    public void setPeers(final List<PeerId> peers) {
+        this.peers.clear();
+        for (final PeerId peer : peers) {
+            this.peers.add(new PeerId(peer));
+        }
+    }
+
+    public void appendPeers(final Collection<PeerId> set) {
+        this.peers.addAll(set);
+    }
+
+    public boolean addPeer(final PeerId peer) {
+        return this.peers.add(peer);
+    }
+
+    public boolean removePeer(final PeerId peer) {
+        return this.peers.remove(peer);
+    }
+
+    public boolean contains(final PeerId peer) {
+        return this.peers.contains(peer);
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((this.learners == null) ? 0 : this.learners.hashCode());
+        result = prime * result + ((this.peers == null) ? 0 : this.peers.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        Configuration other = (Configuration) obj;
+        if (this.learners == null) {
+            if (other.learners != null) {
+                return false;
+            }
+        } else if (!this.learners.equals(other.learners)) {
+            return false;
+        }
+        if (this.peers == null) {
+            return other.peers == null;
+        } else {
+            return this.peers.equals(other.peers);
+        }
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        final List<PeerId> peers = listPeers();
+        int i = 0;
+        int size = peers.size();
+        for (final PeerId peer : peers) {
+            sb.append(peer);
+            if (i < size - 1 || !this.learners.isEmpty()) {
+                sb.append(",");
+            }
+            i++;
+        }
+
+        size = this.learners.size();
+        i = 0;
+        for (final PeerId peer : this.learners) {
+            sb.append(peer).append(LEARNER_POSTFIX);
+            if (i < size - 1) {
+                sb.append(",");
+            }
+            i++;
+        }
+
+        return sb.toString();
+    }
+
+    /**
+     * Get the difference between |*this| and |rhs|
+     * |included| would be assigned to |*this| - |rhs|
+     * |excluded| would be assigned to |rhs| - |*this|
+     */
+    public void diff(final Configuration rhs, final Configuration included, final Configuration excluded) {
+        included.peers = new ArrayList<>(this.peers);
+        included.peers.removeAll(rhs.peers);
+        excluded.peers = new ArrayList<>(rhs.peers);
+        excluded.peers.removeAll(this.peers);
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java
new file mode 100644
index 0000000..045fb30
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+/**
+ * Election Priority. TODO asch enum ?
+ */
+public class ElectionPriority {
+    /**
+     * Priority -1 represents this node disabled the priority election function.
+     */
+    public static final int DISABLED = -1;
+
+    /**
+     * Priority 0 is a special value so that a node will never participate in election.
+     */
+    public static final int NOT_ELECTABLE = 0;
+
+    /**
+     * Priority 1 is a minimum value for priority election.
+     */
+    public static final int MIN_VALUE = 1;
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/Endpoint.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/Endpoint.java
new file mode 100644
index 0000000..ae7c4b6
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/Endpoint.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+import java.io.Serializable;
+
+/**
+ * A IP address with port.
+ */
+public class Endpoint implements Serializable {
+    public static final String IP_ANY = "0.0.0.0";
+
+    private static final long serialVersionUID = -7329681263115546100L;
+
+    private String ip = IP_ANY;
+    private int port;
+    private String str;
+
+    public Endpoint() {
+        super();
+    }
+
+    public Endpoint(String address, int port) {
+        super();
+        this.ip = address;
+        this.port = port;
+    }
+
+    public String getIp() {
+        return this.ip;
+    }
+
+    public int getPort() {
+        return this.port;
+    }
+
+    @Override
+    public String toString() {
+        if (str == null) {
+            str = this.ip + ":" + this.port;
+        }
+        return str;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (this.ip == null ? 0 : this.ip.hashCode());
+        result = prime * result + this.port;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final Endpoint other = (Endpoint) obj;
+        if (this.ip == null) {
+            if (other.ip != null) {
+                return false;
+            }
+        } else if (!this.ip.equals(other.ip)) {
+            return false;
+        }
+        return this.port == other.port;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/Lifecycle.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/Lifecycle.java
new file mode 100644
index 0000000..b6a385d
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/Lifecycle.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+/**
+ * Service life cycle mark interface.
+ *
+ * @author boyan (boyan@alibaba-inc.com)
+ *
+ * 2018-Mar-12 3:47:04 PM
+ */
+public interface Lifecycle<T> {
+
+    /**
+     * Initialize the service.
+     *
+     * @return true when successes.
+     */
+    boolean init(final T opts);
+
+    /**
+     * Dispose the resources for service.
+     */
+    void shutdown();
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
new file mode 100644
index 0000000..c31f3d1
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+import java.io.Serializable;
+
+/**
+ * Represent a participant in a replicating group.
+ */
+public class PeerId implements Serializable {
+    private static final long serialVersionUID = 8083529734784884641L;
+
+    /**
+     * Peer address.
+     */
+    private Endpoint endpoint = new Endpoint(Endpoint.IP_ANY, 0);
+
+    /**
+     * Index in same addr, default is 0.
+     */
+    private int idx; // TODO FIXME idx can be removed.
+    /**
+     * Cached toString result.
+     */
+    private String str;
+
+    /**
+     * Node's local priority value, if node don't support priority election, this value is -1.
+     */
+    private int priority = ElectionPriority.DISABLED;
+
+    public static final PeerId ANY_PEER = new PeerId();
+
+    private long checksum;
+
+    public PeerId(PeerId peer) {
+        this.endpoint = peer.getEndpoint();
+        this.idx = peer.getIdx();
+        this.priority = peer.getPriority();
+    }
+
+    public PeerId() {
+        // No-op.
+    }
+
+    /**
+     * Create an empty peer.
+     *
+     * @return empty peer
+     */
+    public static PeerId emptyPeer() {
+        return new PeerId();
+    }
+
+    public PeerId(final Endpoint endpoint, final int idx) {
+        super();
+        this.endpoint = endpoint;
+        this.idx = idx;
+    }
+
+    public PeerId(final String ip, final int port) {
+        this(ip, port, 0);
+    }
+
+    public PeerId(final String ip, final int port, final int idx) {
+        super();
+        this.endpoint = new Endpoint(ip, port);
+        this.idx = idx;
+    }
+
+    public PeerId(final Endpoint endpoint, final int idx, final int priority) {
+        super();
+        this.endpoint = endpoint;
+        this.idx = idx;
+        this.priority = priority;
+    }
+
+    public PeerId(final String ip, final int port, final int idx, final int priority) {
+        super();
+        this.endpoint = new Endpoint(ip, port);
+        this.idx = idx;
+        this.priority = priority;
+    }
+
+    public Endpoint getEndpoint() {
+        return this.endpoint;
+    }
+
+    public String getIp() {
+        return this.endpoint.getIp();
+    }
+
+    public int getPort() {
+        return this.endpoint.getPort();
+    }
+
+    public int getIdx() {
+        return this.idx;
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public void setPriority(int priority) {
+        this.priority = priority;
+        this.str = null;
+    }
+
+    /**
+     * Returns true when ip is ANY_IP, port is zero and idx is zero too.
+     */
+    public boolean isEmpty() {
+        return getIp().equals(Endpoint.IP_ANY) && getPort() == 0 && this.idx == 0;
+    }
+
+    @Override
+    public String toString() {
+        if (this.str == null) {
+            final StringBuilder buf = new StringBuilder(this.endpoint.toString());
+
+            if (this.idx != 0) {
+                buf.append(':').append(this.idx);
+            }
+
+            if (this.priority != ElectionPriority.DISABLED) {
+                if (this.idx == 0) {
+                    buf.append(':');
+                }
+                buf.append(':').append(this.priority);
+            }
+
+            this.str = buf.toString();
+        }
+        return this.str;
+    }
+
+    /**
+     * To judge whether this node can participate in election or not.
+     *
+     * @return the restul that whether this node can participate in election or not.
+     */
+    public boolean isPriorityNotElected() {
+        return this.priority == ElectionPriority.NOT_ELECTABLE;
+    }
+
+    /**
+     * To judge whether the priority election function is disabled or not in this node.
+     *
+     * @return the result that whether this node has priority election function or not.
+     */
+    public boolean isPriorityDisabled() {
+        return this.priority <= ElectionPriority.DISABLED;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (this.endpoint == null ? 0 : this.endpoint.hashCode());
+        result = prime * result + this.idx;
+        return result;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final PeerId other = (PeerId) obj;
+        if (this.endpoint == null) {
+            if (other.endpoint != null) {
+                return false;
+            }
+        } else if (!this.endpoint.equals(other.endpoint)) {
+            return false;
+        }
+        return this.idx == other.idx;
+    }
+
+    /**
+     * Parse peerId from string that generated by {@link #toString()}
+     * This method can support parameter string values are below:
+     *
+     * <pre>
+     * PeerId.parse("a:b")          = new PeerId("a", "b", 0 , -1)
+     * PeerId.parse("a:b:c")        = new PeerId("a", "b", "c", -1)
+     * PeerId.parse("a:b::d")       = new PeerId("a", "b", 0, "d")
+     * PeerId.parse("a:b:c:d")      = new PeerId("a", "b", "c", "d")
+     * </pre>
+     */
+    public boolean parse(final String s) {
+        if (StringUtils.isBlank(s)) {
+            return false;
+        }
+
+        final String[] tmps = StringUtils.parsePeerId(s);
+        if (tmps.length < 2 || tmps.length > 4) {
+            return false;
+        }
+        try {
+            final int port = Integer.parseInt(tmps[1]);
+            this.endpoint = new Endpoint(tmps[0], port);
+
+            switch (tmps.length) {
+                case 3:
+                    this.idx = Integer.parseInt(tmps[2]);
+                    break;
+                case 4:
+                    if ("".equals(tmps[2])) {
+                        this.idx = 0;
+                    } else {
+                        this.idx = Integer.parseInt(tmps[2]);
+                    }
+                    this.priority = Integer.parseInt(tmps[3]);
+                    break;
+                default:
+                    break;
+            }
+            this.str = null;
+            return true;
+        } catch (final Exception e) {
+            // LOG.error("Parse peer from string failed: {}.", s, e);
+            return false;
+        }
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftError.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftError.java
new file mode 100644
index 0000000..a9635fc
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftError.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Raft error code.
+ */
+public enum RaftError {
+
+    /**
+     * Unknown error
+     */
+    UNKNOWN(-1),
+
+    /**
+     * Success, no error.
+     */
+    SUCCESS(0),
+
+    /**
+     * <pre>
+     * All Kinds of Timeout(Including Election_timeout, Timeout_now, Stepdown_timeout)
+     * </pre>
+     * <p>
+     * <code>ERAFTTIMEDOUT = 10001;</code>
+     */
+    ERAFTTIMEDOUT(10001),
+
+    /**
+     * <pre>
+     * Bad User State Machine
+     * </pre>
+     * <p>
+     * <code>ESTATEMACHINE = 10002;</code>
+     */
+    ESTATEMACHINE(10002),
+
+    /**
+     * <pre>
+     * Catchup Failed
+     * </pre>
+     * <p>
+     * <code>ECATCHUP = 10003;</code>
+     */
+    ECATCHUP(10003),
+
+    /**
+     * <pre>
+     * Trigger step_down(Not All)
+     * </pre>
+     * <p>
+     * <code>ELEADERREMOVED = 10004;</code>
+     */
+    ELEADERREMOVED(10004),
+
+    /**
+     * <pre>
+     * Leader Is Not In The New Configuration
+     * </pre>
+     * <p>
+     * <code>ESETPEER = 10005;</code>
+     */
+    ESETPEER(10005),
+
+    /**
+     * <pre>
+     * Shut_down
+     * </pre>
+     * <p>
+     * <code>ENODESHUTDOWN = 10006;</code>
+     */
+    ENODESHUTDOWN(10006),
+
+    /**
+     * <pre>
+     * Receive Higher Term Requests
+     * </pre>
+     * <p>
+     * <code>EHIGHERTERMREQUEST = 10007;</code>
+     */
+    EHIGHERTERMREQUEST(10007),
+
+    /**
+     * <pre>
+     * Receive Higher Term Response
+     * </pre>
+     * <p>
+     * <code>EHIGHERTERMRESPONSE = 10008;</code>
+     */
+    EHIGHERTERMRESPONSE(10008),
+
+    /**
+     * <pre>
+     * Node Is In Error
+     * </pre>
+     * <p>
+     * <code>EBADNODE = 10009;</code>
+     */
+    EBADNODE(10009),
+
+    /**
+     * <pre>
+     * Node Votes For Some Candidate
+     * </pre>
+     * <p>
+     * <code>EVOTEFORCANDIDATE = 10010;</code>
+     */
+    EVOTEFORCANDIDATE(10010),
+
+    /**
+     * <pre>
+     * Follower(without leader) or Candidate Receives
+     * </pre>
+     * <p>
+     * <code>ENEWLEADER = 10011;</code>
+     */
+    ENEWLEADER(10011),
+
+    /**
+     * <pre>
+     * Append_entries/Install_snapshot Request from a new leader
+     * </pre>
+     * <p>
+     * <code>ELEADERCONFLICT = 10012;</code>
+     */
+    ELEADERCONFLICT(10012),
+
+    /**
+     * <pre>
+     * Trigger on_leader_stop
+     * </pre>
+     * <p>
+     * <code>ETRANSFERLEADERSHIP = 10013;</code>
+     */
+    ETRANSFERLEADERSHIP(10013),
+
+    /**
+     * <pre>
+     * The log at the given index is deleted
+     * </pre>
+     * <p>
+     * <code>ELOGDELETED = 10014;</code>
+     */
+    ELOGDELETED(10014),
+
+    /**
+     * <pre>
+     * No available user log to read
+     * </pre>
+     * <p>
+     * <code>ENOMOREUSERLOG = 10015;</code>
+     */
+    ENOMOREUSERLOG(10015),
+
+    /* other non-raft error codes 1000~10000 */
+    /**
+     * Invalid rpc request
+     */
+    EREQUEST(1000),
+
+    /**
+     * Task is stopped
+     */
+    ESTOP(1001),
+
+    /**
+     * Retry again
+     */
+    EAGAIN(1002),
+
+    /**
+     * Interrupted
+     */
+    EINTR(1003),
+
+    /**
+     * Internal exception
+     */
+    EINTERNAL(1004),
+
+    /**
+     * Task is canceled
+     */
+    ECANCELED(1005),
+
+    /**
+     * Host is down
+     */
+    EHOSTDOWN(1006),
+
+    /**
+     * Service is shutdown
+     */
+    ESHUTDOWN(1007),
+
+    /**
+     * Permission issue
+     */
+    EPERM(1008),
+
+    /**
+     * Server is in busy state
+     */
+    EBUSY(1009),
+
+    /**
+     * Timed out
+     */
+    ETIMEDOUT(1010),
+
+    /**
+     * Data is stale
+     */
+    ESTALE(1011),
+
+    /**
+     * Something not found
+     */
+    ENOENT(1012),
+
+    /**
+     * File/folder already exists
+     */
+    EEXISTS(1013),
+
+    /**
+     * IO error
+     */
+    EIO(1014),
+
+    /**
+     * Invalid value.
+     */
+    EINVAL(1015),
+
+    /**
+     * Permission denied
+     */
+    EACCES(1016);
+
+    private static final Map<Integer, RaftError> RAFT_ERROR_MAP = new HashMap<>();
+
+    static {
+        for (final RaftError error : RaftError.values()) {
+            RAFT_ERROR_MAP.put(error.getNumber(), error);
+        }
+    }
+
+    public final int getNumber() {
+        return this.value;
+    }
+
+    public static RaftError forNumber(final int value) {
+        return RAFT_ERROR_MAP.getOrDefault(value, UNKNOWN);
+    }
+
+    public static String describeCode(final int code) {
+        RaftError e = forNumber(code);
+        return e != null ? e.name() : "<Unknown:" + code + ">";
+    }
+
+    private final int value;
+
+    RaftError(final int value) {
+        this.value = value;
+    }
+}
\ No newline at end of file
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/Status.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/Status.java
new file mode 100644
index 0000000..82a9aa4
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/Status.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft;
+
+//A Status encapsulates the result of an operation. It may indicate success,
+
+//or it may indicate an error with an associated error message. It's suitable
+//for passing status of functions with richer information than just error_code
+//in exception-forbidden code. This utility is inspired by leveldb::Status.
+//
+//Multiple threads can invoke const methods on a Status without
+//external synchronization, but if any of the threads may call a
+//non-const method, all threads accessing the same Status must use
+//external synchronization.
+//
+//Since failed status needs to allocate memory, you should be careful when
+//failed status is frequent.
+public class Status {
+
+    /**
+     * Status internal state.
+     *
+     * @author boyan (boyan@alibaba-inc.com)
+     *
+     * 2018-Apr-03 11:17:51 AM
+     */
+    private static class State {
+        /** error code */
+        int    code;
+        /** error msg*/
+        String msg;
+
+        State(int code, String msg) {
+            super();
+            this.code = code;
+            this.msg = msg;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + this.code;
+            result = prime * result + (this.msg == null ? 0 : this.msg.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            State other = (State) obj;
+            if (this.code != other.code) {
+                return false;
+            }
+            if (this.msg == null) {
+                return other.msg == null;
+            } else {
+                return this.msg.equals(other.msg);
+            }
+        }
+    }
+
+    private State state;
+
+    public Status() {
+        this.state = null;
+    }
+
+    /**
+     * Creates a OK status instance.
+     */
+    public static Status OK() {
+        return new Status();
+    }
+
+    public Status(Status s) {
+        if (s.state != null) {
+            this.state = new State(s.state.code, s.state.msg);
+        } else {
+            this.state = null;
+        }
+    }
+
+    public Status(RaftError raftError, String fmt, Object... args) {
+        this.state = new State(raftError.getNumber(), String.format(fmt, args));
+    }
+
+    public Status(int code, String fmt, Object... args) {
+        this.state = new State(code, String.format(fmt, args));
+    }
+
+    public Status(int code, String errorMsg) {
+        this.state = new State(code, errorMsg);
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (this.state == null ? 0 : this.state.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        Status other = (Status) obj;
+        if (this.state == null) {
+            return other.state == null;
+        } else {
+            return this.state.equals(other.state);
+        }
+    }
+
+    /**
+     * Reset status to be OK state.
+     */
+    public void reset() {
+        this.state = null;
+    }
+
+    /**
+     * Returns true when status is in OK state.
+     */
+    public boolean isOk() {
+        return this.state == null || this.state.code == 0;
+    }
+
+    /**
+     * Set error code.
+     */
+    public void setCode(int code) {
+        if (this.state == null) {
+            this.state = new State(code, null);
+        } else {
+            this.state.code = code;
+        }
+    }
+
+    /**
+     * Get error code.
+     */
+    public int getCode() {
+        return this.state == null ? 0 : this.state.code;
+    }
+
+    /**
+     * Get raft error from error code.
+     */
+    public RaftError getRaftError() {
+        return this.state == null ? RaftError.SUCCESS : RaftError.forNumber(this.state.code);
+    }
+
+    /**
+     * Set error msg
+     */
+    public void setErrorMsg(String errMsg) {
+        if (this.state == null) {
+            this.state = new State(0, errMsg);
+        } else {
+            this.state.msg = errMsg;
+        }
+    }
+
+    /**
+     * Set error code and error msg.
+     */
+    public void setError(int code, String fmt, Object... args) {
+        this.state = new State(code, String.format(String.valueOf(fmt), args));
+    }
+
+    /**
+     * Set raft error and error msg.
+     */
+    public void setError(RaftError error, String fmt, Object... args) {
+        this.state = new State(error.getNumber(), String.format(String.valueOf(fmt), args));
+    }
+
+    @Override
+    public String toString() {
+        if (isOk()) {
+            return "Status[OK]";
+        } else {
+            return "Status[" + RaftError.describeCode(this.state.code) + "<" + this.state.code + ">: " + this.state.msg
+                   + "]";
+        }
+    }
+
+    /**
+     * Get the error msg.
+     */
+    public String getErrorMsg() {
+        return this.state == null ? null : this.state.msg;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/StringUtils.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/StringUtils.java
new file mode 100644
index 0000000..8609f44
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/StringUtils.java
@@ -0,0 +1,407 @@
+package org.apache.ignite.raft;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TODO asch Currently used only for parsing peer ids.
+ */
+public class StringUtils {
+    public static final String IPV6_START_MARK     = "[";
+
+    public static final String IPV6_END_MARK       = "]";
+
+    private static final int   IPV6_ADDRESS_LENGTH = 16;
+
+    public static final String EMPTY = "";
+    public static final String[] EMPTY_STRING_ARRAY = new String[0];
+
+    public StringUtils() {
+    }
+
+    public static boolean isEmpty(CharSequence cs) {
+        return cs == null || cs.length() == 0;
+    }
+
+    public static boolean isNotEmpty(CharSequence cs) {
+        return !isEmpty(cs);
+    }
+
+    public static boolean isBlank(CharSequence cs) {
+        int strLen;
+        if (cs != null && (strLen = cs.length()) != 0) {
+            for(int i = 0; i < strLen; ++i) {
+                if (!Character.isWhitespace(cs.charAt(i))) {
+                    return false;
+                }
+            }
+
+            return true;
+        } else {
+            return true;
+        }
+    }
+
+    public static boolean isNotBlank(CharSequence cs) {
+        return !isBlank(cs);
+    }
+
+    public static String[] split(String str, char separatorChar) {
+        return splitWorker(str, separatorChar, false);
+    }
+
+    private static String[] splitWorker(String str, char separatorChar, boolean preserveAllTokens) {
+        if (str == null) {
+            return null;
+        } else {
+            int len = str.length();
+            if (len == 0) {
+                return EMPTY_STRING_ARRAY;
+            } else {
+                List<String> list = new ArrayList();
+                int i = 0;
+                int start = 0;
+                boolean match = false;
+                boolean lastMatch = false;
+
+                while(true) {
+                    while(i < len) {
+                        if (str.charAt(i) == separatorChar) {
+                            if (match || preserveAllTokens) {
+                                list.add(str.substring(start, i));
+                                match = false;
+                                lastMatch = true;
+                            }
+
+                            ++i;
+                            start = i;
+                        } else {
+                            lastMatch = false;
+                            match = true;
+                            ++i;
+                        }
+                    }
+
+                    if (match || preserveAllTokens && lastMatch) {
+                        list.add(str.substring(start, i));
+                    }
+
+                    return (String[])list.toArray(new String[list.size()]);
+                }
+            }
+        }
+    }
+
+    public static boolean isNumeric(String str) {
+        if (str == null) {
+            return false;
+        } else {
+            int sz = str.length();
+
+            for(int i = 0; i < sz; ++i) {
+                if (!Character.isDigit(str.charAt(i))) {
+                    return false;
+                }
+            }
+
+            return true;
+        }
+    }
+
+    public static boolean equals(String str1, String str2) {
+        return str1 == null ? str2 == null : str1.equals(str2);
+    }
+
+    /**
+     * <p>Splits the provided text into an array with a maximum length,
+     * separators specified, preserving all tokens, including empty tokens
+     * created by adjacent separators.</p>
+     *
+     * <p>The separator is not included in the returned String array.
+     * Adjacent separators are treated as separators for empty tokens.
+     * Adjacent separators are treated as one separator.</p>
+     *
+     * <p>A <code>null</code> input String returns <code>null</code>.
+     * A <code>null</code> separatorChars splits on whitespace.</p>
+     *
+     * <p>If more than <code>max</code> delimited substrings are found, the last
+     * returned string includes all characters after the first <code>max - 1</code>
+     * returned strings (including separator characters).</p>
+     *
+     * <pre>
+     * StringUtils.splitPreserveAllTokens(null, *, *)            = null
+     * StringUtils.splitPreserveAllTokens("", *, *)              = []
+     * StringUtils.splitPreserveAllTokens("ab de fg", null, 0)   = ["ab", "cd", "ef"]
+     * StringUtils.splitPreserveAllTokens("ab   de fg", null, 0) = ["ab", "cd", "ef"]
+     * StringUtils.splitPreserveAllTokens("ab:cd:ef", ":", 0)    = ["ab", "cd", "ef"]
+     * StringUtils.splitPreserveAllTokens("ab:cd:ef", ":", 2)    = ["ab", "cd:ef"]
+     * StringUtils.splitPreserveAllTokens("ab   de fg", null, 2) = ["ab", "  de fg"]
+     * StringUtils.splitPreserveAllTokens("ab   de fg", null, 3) = ["ab", "", " de fg"]
+     * StringUtils.splitPreserveAllTokens("ab   de fg", null, 4) = ["ab", "", "", "de fg"]
+     * </pre>
+     *
+     * @param str  the String to parse, may be <code>null</code>
+     * @param separatorChars  the characters used as the delimiters,
+     *  <code>null</code> splits on whitespace
+     * @param max  the maximum number of elements to include in the
+     *  array. A zero or negative value implies no limit
+     * @return an array of parsed Strings, <code>null</code> if null String input
+     * @since 2.1
+     */
+    public static String[] splitPreserveAllTokens(String str, String separatorChars, int max) {
+        return splitWorker(str, separatorChars, max, true);
+    }
+
+    /**
+     * Performs the logic for the <code>split</code> and
+     * <code>splitPreserveAllTokens</code> methods that return a maximum array
+     * length.
+     *
+     * @param str  the String to parse, may be <code>null</code>
+     * @param separatorChars the separate character
+     * @param max  the maximum number of elements to include in the
+     *  array. A zero or negative value implies no limit.
+     * @param preserveAllTokens if <code>true</code>, adjacent separators are
+     * treated as empty token separators; if <code>false</code>, adjacent
+     * separators are treated as one separator.
+     * @return an array of parsed Strings, <code>null</code> if null String input
+     */
+    private static String[] splitWorker(String str, String separatorChars, int max, boolean preserveAllTokens) {
+        // Performance tuned for 2.0 (JDK1.4)
+        // Direct code is quicker than StringTokenizer.
+        // Also, StringTokenizer uses isSpace() not isWhitespace()
+
+        if (str == null) {
+            return null;
+        }
+        int len = str.length();
+        if (len == 0) {
+            return EMPTY_STRING_ARRAY;
+        }
+        List list = new ArrayList();
+        int sizePlus1 = 1;
+        int i = 0, start = 0;
+        boolean match = false;
+        boolean lastMatch = false;
+        if (separatorChars == null) {
+            // Null separator means use whitespace
+            while (i < len) {
+                if (Character.isWhitespace(str.charAt(i))) {
+                    if (match || preserveAllTokens) {
+                        lastMatch = true;
+                        if (sizePlus1++ == max) {
+                            i = len;
+                            lastMatch = false;
+                        }
+                        list.add(str.substring(start, i));
+                        match = false;
+                    }
+                    start = ++i;
+                    continue;
+                }
+                lastMatch = false;
+                match = true;
+                i++;
+            }
+        } else if (separatorChars.length() == 1) {
+            // Optimise 1 character case
+            char sep = separatorChars.charAt(0);
+            while (i < len) {
+                if (str.charAt(i) == sep) {
+                    if (match || preserveAllTokens) {
+                        lastMatch = true;
+                        if (sizePlus1++ == max) {
+                            i = len;
+                            lastMatch = false;
+                        }
+                        list.add(str.substring(start, i));
+                        match = false;
+                    }
+                    start = ++i;
+                    continue;
+                }
+                lastMatch = false;
+                match = true;
+                i++;
+            }
+        } else {
+            // standard case
+            while (i < len) {
+                if (separatorChars.indexOf(str.charAt(i)) >= 0) {
+                    if (match || preserveAllTokens) {
+                        lastMatch = true;
+                        if (sizePlus1++ == max) {
+                            i = len;
+                            lastMatch = false;
+                        }
+                        list.add(str.substring(start, i));
+                        match = false;
+                    }
+                    start = ++i;
+                    continue;
+                }
+                lastMatch = false;
+                match = true;
+                i++;
+            }
+        }
+        if (match || (preserveAllTokens && lastMatch)) {
+            list.add(str.substring(start, i));
+        }
+        return (String[]) list.toArray(new String[list.size()]);
+    }
+
+    /**
+     * <p>Checks if String contains a search String irrespective of case,
+     * handling <code>null</code>. Case-insensitivity is defined as by
+     * {@link String#equalsIgnoreCase(String)}.
+     *
+     * <p>A <code>null</code> String will return <code>false</code>.</p>
+     *
+     * <pre>
+     * StringUtils.contains(null, *) = false
+     * StringUtils.contains(*, null) = false
+     * StringUtils.contains("", "") = true
+     * StringUtils.contains("abc", "") = true
+     * StringUtils.contains("abc", "a") = true
+     * StringUtils.contains("abc", "z") = false
+     * StringUtils.contains("abc", "A") = true
+     * StringUtils.contains("abc", "Z") = false
+     * </pre>
+     *
+     * @param str  the String to check, may be null
+     * @param searchStr  the String to find, may be null
+     * @return true if the String contains the search String irrespective of
+     * case or false if not or <code>null</code> string input
+     */
+    public static boolean containsIgnoreCase(String str, String searchStr) {
+        if (str == null || searchStr == null) {
+            return false;
+        }
+        int len = searchStr.length();
+        int max = str.length() - len;
+        for (int i = 0; i <= max; i++) {
+            if (str.regionMatches(true, i, searchStr, 0, len)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    // -----------------------------------------------------------------------
+    /**
+     * <p>Splits the provided text into an array, using whitespace as the
+     * separator, preserving all tokens, including empty tokens created by
+     * adjacent separators. This is an alternative to using StringTokenizer.
+     * Whitespace is defined by {@link Character#isWhitespace(char)}.</p>
+     *
+     * <p>The separator is not included in the returned String array.
+     * Adjacent separators are treated as separators for empty tokens.
+     * For more control over the split use the StrTokenizer class.</p>
+     *
+     * <p>A <code>null</code> input String returns <code>null</code>.</p>
+     *
+     * <pre>
+     * StringUtils.splitPreserveAllTokens(null)       = null
+     * StringUtils.splitPreserveAllTokens("")         = []
+     * StringUtils.splitPreserveAllTokens("abc def")  = ["abc", "def"]
+     * StringUtils.splitPreserveAllTokens("abc  def") = ["abc", "", "def"]
+     * StringUtils.splitPreserveAllTokens(" abc ")    = ["", "abc", ""]
+     * </pre>
+     *
+     * @param str  the String to parse, may be <code>null</code>
+     * @return an array of parsed Strings, <code>null</code> if null String input
+     * @since 2.1
+     */
+    public static String[] splitPreserveAllTokens(String str) {
+        return splitWorker(str, null, -1, true);
+    }
+
+    /**
+     * <p>Splits the provided text into an array, separator specified,
+     * preserving all tokens, including empty tokens created by adjacent
+     * separators. This is an alternative to using StringTokenizer.</p>
+     *
+     * <p>The separator is not included in the returned String array.
+     * Adjacent separators are treated as separators for empty tokens.
+     * For more control over the split use the StrTokenizer class.</p>
+     *
+     * <p>A <code>null</code> input String returns <code>null</code>.</p>
+     *
+     * <pre>
+     * StringUtils.splitPreserveAllTokens(null, *)         = null
+     * StringUtils.splitPreserveAllTokens("", *)           = []
+     * StringUtils.splitPreserveAllTokens("a.b.c", '.')    = ["a", "b", "c"]
+     * StringUtils.splitPreserveAllTokens("a..b.c", '.')   = ["a", "", "b", "c"]
+     * StringUtils.splitPreserveAllTokens("a:b:c", '.')    = ["a:b:c"]
+     * StringUtils.splitPreserveAllTokens("a\tb\nc", null) = ["a", "b", "c"]
+     * StringUtils.splitPreserveAllTokens("a b c", ' ')    = ["a", "b", "c"]
+     * StringUtils.splitPreserveAllTokens("a b c ", ' ')   = ["a", "b", "c", ""]
+     * StringUtils.splitPreserveAllTokens("a b c  ", ' ')   = ["a", "b", "c", "", ""]
+     * StringUtils.splitPreserveAllTokens(" a b c", ' ')   = ["", a", "b", "c"]
+     * StringUtils.splitPreserveAllTokens("  a b c", ' ')  = ["", "", a", "b", "c"]
+     * StringUtils.splitPreserveAllTokens(" a b c ", ' ')  = ["", a", "b", "c", ""]
+     * </pre>
+     *
+     * @param str  the String to parse, may be <code>null</code>
+     * @param separatorChar  the character used as the delimiter,
+     *  <code>null</code> splits on whitespace
+     * @return an array of parsed Strings, <code>null</code> if null String input
+     * @since 2.1
+     */
+    public static String[] splitPreserveAllTokens(String str, char separatorChar) {
+        return splitWorker(str, separatorChar, true);
+    }
+
+    /**
+     * Parse peerId from string that generated by {@link #toString()}
+     * This method can support parameter string values are below:
+     *
+     * <pre>
+     * PeerId.parse("a:b")          = new PeerId("a", "b", 0 , -1)
+     * PeerId.parse("a:b:c")        = new PeerId("a", "b", "c", -1)
+     * PeerId.parse("a:b::d")       = new PeerId("a", "b", 0, "d")
+     * PeerId.parse("a:b:c:d")      = new PeerId("a", "b", "c", "d")
+     * </pre>
+     *
+     */
+    public static String[] parsePeerId(String s) {
+        if (s.startsWith(IPV6_START_MARK) && StringUtils.containsIgnoreCase(s, IPV6_END_MARK)) {
+            String ipv6Addr;
+            if (s.endsWith(IPV6_END_MARK)) {
+                ipv6Addr = s;
+            } else {
+                ipv6Addr = s.substring(0, (s.indexOf(IPV6_END_MARK) + 1));
+            }
+            if (!isIPv6(ipv6Addr)) {
+                throw new IllegalArgumentException("The IPv6 address(\"" + ipv6Addr + "\") is incorrect.");
+            }
+            String tempString = s.substring((s.indexOf(ipv6Addr) + ipv6Addr.length()));
+            if (tempString.startsWith(":")) {
+                tempString = tempString.substring(1);
+            }
+            String[] tempArr = StringUtils.splitPreserveAllTokens(tempString, ':');
+            String[] result = new String[1 + tempArr.length];
+            result[0] = ipv6Addr;
+            System.arraycopy(tempArr, 0, result, 1, tempArr.length);
+            return result;
+        } else {
+            return StringUtils.splitPreserveAllTokens(s, ':');
+        }
+    }
+
+    /**
+     * check whether the ip address is IPv6.
+     *
+     * @param addr ip address
+     * @return boolean
+     */
+    private static boolean isIPv6(String addr) {
+        try {
+            return InetAddress.getByName(addr).getAddress().length == IPV6_ADDRESS_LENGTH;
+        } catch (Exception e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/closure/RpcResponseClosure.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/closure/RpcResponseClosure.java
new file mode 100644
index 0000000..d7f9b20
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/closure/RpcResponseClosure.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.closure;
+
+import org.apache.ignite.raft.Closure;
+import org.apache.ignite.raft.rpc.Message;
+
+/**
+ * RPC response closure.
+
+ * @param <T>
+ */
+public interface RpcResponseClosure<T extends Message> extends Closure {
+    /**
+     * Called by request handler to set response.
+     *
+     * @param resp rpc response
+     */
+    void setResponse(T resp);
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/closure/RpcResponseClosureAdapter.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/closure/RpcResponseClosureAdapter.java
new file mode 100644
index 0000000..e3dfbf0
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/closure/RpcResponseClosureAdapter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.closure;
+
+import org.apache.ignite.raft.rpc.Message;
+
+/**
+ * RpcResponseClosure adapter holds the response.
+
+ * @param <T>
+ */
+public abstract class RpcResponseClosureAdapter<T extends Message> implements RpcResponseClosure<T> {
+    private T resp;
+
+    public T getResponse() {
+        return this.resp;
+    }
+
+    @Override
+    public void setResponse(T resp) {
+        this.resp = resp;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/CliRequests.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/CliRequests.java
new file mode 100644
index 0000000..8a3fa47
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/CliRequests.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: cli.proto
+
+package org.apache.ignite.raft.rpc;
+
+public final class CliRequests {
+    private CliRequests() {
+    }
+
+    public interface AddPeerRequest extends Message {
+        String getGroupId();
+
+        String getLeaderId();
+
+        String getPeerId();
+
+        interface Builder {
+            Builder setGroupId(String groupId);
+
+            Builder setLeaderId(String leaderId);
+
+            Builder setPeerId(String peerId);
+
+            AddPeerRequest build();
+        }
+
+        public static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createAddPeerRequest();
+        }
+    }
+
+    public interface AddPeerResponse extends Message {
+        static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createAddPeerResponse();
+        }
+
+        java.util.List<String> getOldPeersList();
+
+        int getOldPeersCount();
+
+        String getOldPeers(int index);
+
+        java.util.List<String> getNewPeersList();
+
+        int getNewPeersCount();
+
+        String getNewPeers(int index);
+
+        public interface Builder {
+            Builder addOldPeers(String oldPeersId);
+
+            Builder addNewPeers(String newPeersId);
+
+            AddPeerResponse build();
+        }
+    }
+
+    public interface RemovePeerRequest extends Message {
+        String getGroupId();
+
+        String getLeaderId();
+
+        String getPeerId();
+
+        interface Builder {
+            Builder setGroupId(String groupId);
+
+            Builder setLeaderId(String leaderId);
+
+            Builder setPeerId(String peerId);
+
+            RemovePeerRequest build();
+        }
+
+        public static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createRemovePeerRequest();
+        }
+    }
+
+    public interface RemovePeerResponse extends Message {
+        static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createRemovePeerResponse();
+        }
+
+        java.util.List<String> getOldPeersList();
+
+        int getOldPeersCount();
+
+        String getOldPeers(int index);
+
+        java.util.List<String> getNewPeersList();
+
+        int getNewPeersCount();
+
+        String getNewPeers(int index);
+
+        public interface Builder {
+            Builder addOldPeers(String oldPeerId);
+
+            Builder addNewPeers(String newPeerId);
+
+            RemovePeerResponse build();
+        }
+    }
+
+    public interface ChangePeersRequest extends Message {
+        String getGroupId();
+
+        String getLeaderId();
+
+        java.util.List<String> getNewPeersList();
+
+        int getNewPeersCount();
+
+        String getNewPeers(int index);
+
+        public interface Builder {
+            Builder setGroupId(String groupId);
+
+            Builder setLeaderId(String leaderId);
+
+            Builder addNewPeers(String peerId);
+
+            ChangePeersRequest build();
+        }
+
+        public static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createChangePeerRequest();
+        }
+    }
+
+    public interface ChangePeersResponse extends Message {
+        static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createChangePeerResponse();
+        }
+
+        java.util.List<String> getOldPeersList();
+
+        int getOldPeersCount();
+
+        String getOldPeers(int index);
+
+        java.util.List<String> getNewPeersList();
+
+        int getNewPeersCount();
+
+        String getNewPeers(int index);
+
+        public interface Builder {
+            Builder addOldPeers(String oldPeerId);
+
+            Builder addNewPeers(String newPeerId);
+
+            ChangePeersResponse build();
+        }
+    }
+
+    public interface SnapshotRequest extends Message {
+        String getGroupId();
+
+        String getPeerId();
+
+        public interface Builder {
+            Builder setGroupId(String groupId);
+
+            Builder setPeerId(String peerId);
+
+            SnapshotRequest build();
+        }
+
+        static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createSnapshotRequest();
+        }
+    }
+
+    public interface ResetPeerRequest extends Message {
+        String getGroupId();
+
+        String getPeerId();
+
+        java.util.List<String> getOldPeersList();
+
+        int getOldPeersCount();
+
+        String getOldPeers(int index);
+
+        java.util.List<String> getNewPeersList();
+
+        int getNewPeersCount();
+
+        String getNewPeers(int index);
+
+        public interface Builder {
+            Builder setGroupId(String groupId);
+
+            Builder setPeerId(String peerId);
+
+            Builder addNewPeers(String peerId);
+
+            ResetPeerRequest build();
+        }
+
+        public static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createResetPeerRequest();
+        }
+    }
+
+    public interface TransferLeaderRequest extends Message {
+        String getGroupId();
+
+        String getLeaderId();
+
+        String getPeerId();
+
+        boolean hasPeerId();
+
+        public interface Builder {
+            Builder setGroupId(String groupId);
+
+            Builder setLeaderId(String leaderId);
+
+            Builder setPeerId(String peerId);
+
+            TransferLeaderRequest build();
+        }
+
+        public static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createTransferLeaderRequest();
+        }
+    }
+
+    public interface GetLeaderRequest extends Message {
+        String getGroupId();
+
+        String getPeerId();
+
+        boolean hasPeerId();
+
+        public interface Builder {
+            Builder setGroupId(String groupId);
+
+            Builder setPeerId(String peerId);
+
+            GetLeaderRequest build();
+        }
+
+        public static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createGetLeaderRequest();
+        }
+    }
+
+    public interface GetLeaderResponse extends Message {
+        static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createGetLeaderResponse();
+        }
+
+        String getLeaderId();
+
+        public interface Builder {
+            GetLeaderResponse build();
+
+            Builder setLeaderId(String leaderId);
+        }
+    }
+
+    public interface GetPeersRequest extends Message {
+        String getGroupId();
+
+        String getLeaderId();
+
+        boolean getOnlyAlive();
+
+        public interface Builder {
+            Builder setGroupId(String groupId);
+
+            Builder setLeaderId(String leaderId);
+
+            Builder setOnlyAlive(boolean onlyGetAlive);
+
+            GetPeersRequest build();
+        }
+
+        public static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createGetPeersRequest();
+        }
+    }
+
+    public interface GetPeersResponse extends Message {
+        static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createGetPeersResponse();
+        }
+
+        java.util.List<String> getPeersList();
+
+        int getPeersCount();
+
+        String getPeers(int index);
+
+        java.util.List<String> getLearnersList();
+
+        int getLearnersCount();
+
+        String getLearners(int index);
+
+        public interface Builder {
+            Builder addPeers(String peerId);
+
+            Builder addLearners(String learnerId);
+
+            GetPeersResponse build();
+        }
+    }
+
+    public interface AddLearnersRequest extends Message {
+        String getGroupId();
+
+        String getLeaderId();
+
+        java.util.List<String> getLearnersList();
+
+        int getLearnersCount();
+
+        String getLearners(int index);
+
+        public interface Builder {
+            Builder setGroupId(String groupId);
+
+            Builder setLeaderId(String leaderId);
+
+            Builder addLearners(String learnerId);
+
+            AddLearnersRequest build();
+        }
+
+        static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createAddLearnersRequest();
+        }
+    }
+
+    public interface RemoveLearnersRequest extends Message {
+        String getGroupId();
+
+        String getLeaderId();
+
+        java.util.List<String> getLearnersList();
+
+        int getLearnersCount();
+
+        String getLearners(int index);
+
+        public interface Builder {
+            Builder setGroupId(String groupId);
+
+            Builder setLeaderId(String leaderId);
+
+            Builder addLearners(String leaderId);
+
+            RemoveLearnersRequest build();
+        }
+
+        static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createRemoveLearnersRequest();
+        }
+    }
+
+    public interface ResetLearnersRequest extends Message {
+        String getGroupId();
+
+        String getLeaderId();
+
+        java.util.List<String> getLearnersList();
+
+        /**
+         * <code>repeated string learners = 3;</code>
+         */
+        int getLearnersCount();
+
+        /**
+         * <code>repeated string learners = 3;</code>
+         */
+        String getLearners(int index);
+
+        public interface Builder {
+            Builder setGroupId(String groupId);
+
+            Builder setLeaderId(String leaderId);
+
+            Builder addLearners(String learnerId);
+
+            ResetLearnersRequest build();
+        }
+
+        static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createResetLearnersRequest();
+        }
+    }
+
+    public interface LearnersOpResponse extends Message {
+        java.util.List<String> getOldLearnersList();
+
+        int getOldLearnersCount();
+
+        String getOldLearners(int index);
+
+        java.util.List<String> getNewLearnersList();
+
+        int getNewLearnersCount();
+
+        String getNewLearners(int index);
+
+        static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createLearnersOpResponse();
+        }
+
+        public interface Builder {
+            Builder addOldLearners(String oldLearnersId);
+
+            Builder addNewLearners(String newLearnersId);
+
+            LearnersOpResponse build();
+        }
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Connection.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Connection.java
new file mode 100644
index 0000000..fd88be2
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Connection.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+/**
+ * RPC connection
+ */
+public interface Connection {
+    /**
+     * Get the attribute that bound to the connection.
+     *
+     * @param key the attribute key
+     * @return the attribute value
+     */
+    Object getAttribute(final String key);
+
+    /**
+     * Set the attribute to the connection.
+     *
+     * @param key   the attribute key
+     * @param value the attribute value
+     */
+    void setAttribute(final String key, final Object value);
+
+    /**
+     * Set the attribute to the connection if the key's item doesn't exist, otherwise returns the present item.
+     *
+     * @param key   the attribute key
+     * @param value the attribute value
+     * @return the previous value associated with the specified key, or
+     *         <tt>null</tt> if there was no mapping for the key.
+     */
+    Object setAttributeIfAbsent(final String key, final Object value);
+
+    /**
+     * Close the connection.
+     */
+    void close();
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionClosedEventListener.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionClosedEventListener.java
new file mode 100644
index 0000000..176fa84
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionClosedEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+/**
+ *
+ */
+public interface ConnectionClosedEventListener {
+    void onClosed(final String remoteAddress, final Connection conn);
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
new file mode 100644
index 0000000..010ce51
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public interface InvokeCallback {
+    void complete(final Object result, final Throwable err);
+
+    default Executor executor() {
+        return null;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeContext.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeContext.java
new file mode 100644
index 0000000..0633ddd
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeContext.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * RPC invoke context.
+ */
+public class InvokeContext {
+    private final ConcurrentMap<String, Object> ctx = new ConcurrentHashMap<>();
+
+    public Object put(final String key, final Object value) {
+        return this.ctx.put(key, value);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> T get(final String key) {
+        return (T) this.ctx.get(key);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> T getOrDefault(final String key, final T defaultValue) {
+        return (T) this.ctx.getOrDefault(key, defaultValue);
+    }
+
+    public void clear() {
+        this.ctx.clear();
+    }
+
+    public Set<Map.Entry<String, Object>> entrySet() {
+        return this.ctx.entrySet();
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeTimeoutException.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeTimeoutException.java
new file mode 100644
index 0000000..135de0a
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeTimeoutException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+/**
+ *
+ */
+public class InvokeTimeoutException extends RemotingException {
+    private static final long serialVersionUID = -4710810309766380565L;
+
+    public InvokeTimeoutException() {
+    }
+
+    public InvokeTimeoutException(String message) {
+        super(message);
+    }
+
+    public InvokeTimeoutException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public InvokeTimeoutException(Throwable cause) {
+        super(cause);
+    }
+
+    public InvokeTimeoutException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java
new file mode 100644
index 0000000..0e36975
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java
@@ -0,0 +1,11 @@
+package org.apache.ignite.raft.rpc;
+
+import java.io.Serializable;
+
+/**
+ * The base message.
+ * <p>
+ * Extends Serializable for compatibility with JDK serialization.
+ */
+public interface Message extends Serializable {
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/MessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/MessageBuilderFactory.java
new file mode 100644
index 0000000..03b70da
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/MessageBuilderFactory.java
@@ -0,0 +1,50 @@
+package org.apache.ignite.raft.rpc;
+
+import org.apache.ignite.raft.rpc.message.DefaultMessageBuilderFactory;
+
+/** */
+public interface MessageBuilderFactory {
+    // TODO asch must be injected.
+    public static MessageBuilderFactory DEFAULT = new DefaultMessageBuilderFactory();
+
+    // Ping.
+    RpcRequests.PingRequest.Builder createPingRequest();
+
+    // Error.
+    RpcRequests.ErrorResponse.Builder createErrorResponse();
+
+    // CLI
+    CliRequests.AddPeerRequest.Builder createAddPeerRequest();
+
+    CliRequests.AddPeerResponse.Builder createAddPeerResponse();
+
+    CliRequests.RemovePeerRequest.Builder createRemovePeerRequest();
+
+    CliRequests.RemovePeerResponse.Builder createRemovePeerResponse();
+
+    CliRequests.ChangePeersRequest.Builder createChangePeerRequest();
+
+    CliRequests.ChangePeersResponse.Builder createChangePeerResponse();
+
+    CliRequests.SnapshotRequest.Builder createSnapshotRequest();
+
+    CliRequests.ResetPeerRequest.Builder createResetPeerRequest();
+
+    CliRequests.TransferLeaderRequest.Builder createTransferLeaderRequest();
+
+    CliRequests.GetLeaderRequest.Builder createGetLeaderRequest();
+
+    CliRequests.GetLeaderResponse.Builder createGetLeaderResponse();
+
+    CliRequests.GetPeersRequest.Builder createGetPeersRequest();
+
+    CliRequests.GetPeersResponse.Builder createGetPeersResponse();
+
+    CliRequests.AddLearnersRequest.Builder createAddLearnersRequest();
+
+    CliRequests.RemoveLearnersRequest.Builder createRemoveLearnersRequest();
+
+    CliRequests.ResetLearnersRequest.Builder createResetLearnersRequest();
+
+    CliRequests.LearnersOpResponse.Builder createLearnersOpResponse();
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NamedThreadFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NamedThreadFactory.java
new file mode 100644
index 0000000..f87e993
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NamedThreadFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Named thread factory with prefix.
+ *
+ * @author boyan (boyan@alibaba-inc.com)
+ * <p>
+ * 2018-Mar-21 11:32:02 AM
+ */
+public class NamedThreadFactory implements ThreadFactory {
+    private static System.Logger LOG = System.getLogger(NamedThreadFactory.class.getName());
+
+    private static final LogUncaughtExceptionHandler UNCAUGHT_EX_HANDLER = new LogUncaughtExceptionHandler();
+
+    private final String prefix;
+
+    private final AtomicInteger counter = new AtomicInteger(0);
+    private final boolean daemon;
+
+    public NamedThreadFactory(String prefix) {
+        this(prefix, false);
+    }
+
+    public NamedThreadFactory(String prefix, boolean daemon) {
+        super();
+        this.prefix = prefix;
+        this.daemon = daemon;
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+        Thread t = new Thread(r);
+        t.setDaemon(this.daemon);
+        t.setUncaughtExceptionHandler(UNCAUGHT_EX_HANDLER);
+        t.setName(this.prefix + counter.getAndIncrement());
+        return t;
+    }
+
+    private static final class LogUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+            LOG.log(System.Logger.Level.ERROR, "Uncaught exception in thread {}", t, e);
+        }
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RemotingException.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RemotingException.java
new file mode 100644
index 0000000..8bf6db2
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RemotingException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+/**
+ * Exception for default remoting problems, like the endpoint is unreachable.
+ */
+public class RemotingException extends Exception {
+    private static final long serialVersionUID = -6326244159775972292L;
+
+    public RemotingException() {
+    }
+
+    public RemotingException(String message) {
+        super(message);
+    }
+
+    public RemotingException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public RemotingException(Throwable cause) {
+        super(cause);
+    }
+
+    public RemotingException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
new file mode 100644
index 0000000..8caee80
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.Lifecycle;
+
+/**
+ *
+ */
+public interface RpcClient extends Lifecycle<RpcOptions> {
+    /**
+     * Check connection for given address.
+     *
+     * @param endpoint target address
+     * @return true if there is a connection and the connection is active and writable.
+     */
+    boolean checkConnection(final Endpoint endpoint);
+
+    /**
+     * Check connection for given address and async to create a new one if there is no connection.
+     * @param endpoint       target address
+     * @param createIfAbsent create a new one if there is no connection
+     * @return true if there is a connection and the connection is active and writable.
+     */
+    boolean checkConnection(final Endpoint endpoint, final boolean createIfAbsent);
+
+    /**
+     * Close all connections of a address.
+     *
+     * @param endpoint target address
+     */
+    void closeConnection(final Endpoint endpoint);
+
+    /**
+     * Register a connect event listener.
+     *
+     * @param listener listener.
+     */
+    void registerConnectEventListener(final ConnectionOpenedEventListener listener);
+
+    /**
+     * Synchronous invocation.
+     *
+     * @param endpoint  target address
+     * @param request   request object
+     * @param timeoutMs timeout millisecond
+     * @return invoke result
+     */
+    default Object invokeSync(final Endpoint endpoint, final Object request, final long timeoutMs)
+            throws InterruptedException, RemotingException {
+        return invokeSync(endpoint, request, null, timeoutMs);
+    }
+
+    /**
+     * Synchronous invocation using a invoke context.
+     *
+     * @param endpoint  target address
+     * @param request   request object
+     * @param ctx       invoke context
+     * @param timeoutMs timeout millisecond
+     * @return invoke result
+     */
+    Object invokeSync(final Endpoint endpoint, final Object request, final InvokeContext ctx,
+                      final long timeoutMs) throws InterruptedException, RemotingException;
+
+    /**
+     * Asynchronous invocation with a callback.
+     *
+     * @param endpoint  target address
+     * @param request   request object
+     * @param callback  invoke callback
+     * @param timeoutMs timeout millisecond
+     */
+    default void invokeAsync(final Endpoint endpoint, final Object request, final InvokeCallback callback,
+                             final long timeoutMs) throws InterruptedException, RemotingException {
+        invokeAsync(endpoint, request, null, callback, timeoutMs);
+    }
+
+    /**
+     * Asynchronous invocation with a callback.
+     *
+     * @param endpoint  target address
+     * @param request   request object
+     * @param ctx       invoke context
+     * @param callback  invoke callback
+     * @param timeoutMs timeout millisecond
+     */
+    void invokeAsync(final Endpoint endpoint, final Object request, final InvokeContext ctx, final InvokeCallback callback,
+                     final long timeoutMs) throws InterruptedException, RemotingException;
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcContext.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcContext.java
new file mode 100644
index 0000000..1f7fcf8
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+/**
+ * @author jiachun.fjc
+ */
+public interface RpcContext {
+
+    /**
+     * Send a response back.
+     *
+     * @param responseObj the response object
+     */
+    void sendResponse(final Object responseObj);
+
+    /**
+     * Get current connection.
+     *
+     * @return current connection
+     */
+    Connection getConnection();
+
+    /**
+     * Get the remote address.
+     *
+     * @return remote address
+     */
+    String getRemoteAddress();
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcOptions.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcOptions.java
new file mode 100644
index 0000000..e3135c7
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcOptions.java
@@ -0,0 +1,25 @@
+package org.apache.ignite.raft.rpc;
+
+/**
+ * Basic RPC options.
+ */
+public class RpcOptions {
+    private int rpcConnectTimeoutMs = 5_000;
+    private int rpcDefaultTimeout = 5_000;
+
+    public int getRpcConnectTimeoutMs() {
+        return rpcConnectTimeoutMs;
+    }
+
+    public void setRpcConnectTimeoutMs(int rpcConnectTimeoutMs) {
+        this.rpcConnectTimeoutMs = rpcConnectTimeoutMs;
+    }
+
+    public int getRpcDefaultTimeout() {
+        return rpcDefaultTimeout;
+    }
+
+    public void setRpcDefaultTimeout(int rpcDefaultTimeout) {
+        this.rpcDefaultTimeout = rpcDefaultTimeout;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcProcessor.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcProcessor.java
new file mode 100644
index 0000000..c6dedd0
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcProcessor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Defined functions for process user defined request.
+ *
+ * @author jiachun.fjc
+ */
+public interface RpcProcessor<T> {
+
+    /**
+     * Async to handle request with {@link RpcContext}.
+     *
+     * @param rpcCtx  the rpc context
+     * @param request the request
+     */
+    void handleRequest(final RpcContext rpcCtx, final T request);
+
+    /**
+     * The class name of user request.
+     * Use String type to avoid loading class.
+     *
+     * @return interested request's class name
+     */
+    String interest();
+
+    /**
+     * Get user's executor.
+     *
+     * @return executor
+     */
+    default Executor executor() {
+        return null;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcRequests.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcRequests.java
new file mode 100644
index 0000000..048065c
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcRequests.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: rpc.proto
+
+package org.apache.ignite.raft.rpc;
+
+public final class RpcRequests {
+    private RpcRequests() {
+    }
+
+    public interface ErrorResponse extends Message {
+        int getErrorCode();
+
+        String getErrorMsg();
+
+        interface Builder {
+            Builder setErrorCode(int code);
+
+            Builder setErrorMsg(String msg);
+
+            ErrorResponse build();
+        }
+
+        public static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createErrorResponse();
+        }
+    }
+
+    public interface PingRequest extends Message {
+        /**
+         * <code>required int64 send_timestamp = 1;</code>
+         */
+        long getSendTimestamp();
+
+        interface Builder {
+            Builder setSendTimestamp(long timestamp);
+
+            PingRequest build();
+        }
+
+        public static Builder newBuilder() {
+            return MessageBuilderFactory.DEFAULT.createPingRequest();
+        }
+    }
+}
+
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcServer.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcServer.java
new file mode 100644
index 0000000..ce17a56
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcServer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+import org.apache.ignite.raft.Lifecycle;
+
+/**
+ *
+ * @author jiachun.fjc
+ */
+public interface RpcServer extends Lifecycle<RpcOptions> {
+
+    /**
+     * Register a conn closed event listener.
+     *
+     * @param listener the event listener.
+     */
+    void registerConnectionClosedEventListener(final ConnectionClosedEventListener listener);
+
+    /**
+     * Register user processor.
+     *
+     * @param processor the user processor which has a interest
+     */
+    void registerProcessor(final RpcProcessor<?> processor);
+
+    /**
+     *
+     * @return bound port
+     */
+    int boundPort();
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcUtils.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcUtils.java
new file mode 100644
index 0000000..9bbaaa3
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcUtils.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.raft.Closure;
+import org.apache.ignite.raft.Status;
+
+/**
+ * RPC utilities.
+ */
+public final class RpcUtils {
+    private static final System.Logger LOG = System.getLogger(RpcUtils.class.getName());
+
+    /**
+     * Default jraft closure executor pool minimum size.
+     */
+    public static final int MIN_RPC_CLOSURE_EXECUTOR_POOL_SIZE = 1;
+
+    /**
+     * Default jraft closure executor pool maximum size.
+     */
+    public static final int MAX_RPC_CLOSURE_EXECUTOR_POOL_SIZE = Runtime.getRuntime().availableProcessors();
+
+    /**
+     * Global thread pool to run rpc closure.
+     */
+    public static ThreadPoolExecutor RPC_CLOSURE_EXECUTOR = new ThreadPoolExecutor(MIN_RPC_CLOSURE_EXECUTOR_POOL_SIZE,
+        MAX_RPC_CLOSURE_EXECUTOR_POOL_SIZE,
+        60L,
+        TimeUnit.SECONDS,
+        new SynchronousQueue<>(),
+        new NamedThreadFactory("JRaft-Rpc-Closure-Executor-", true));
+
+    /**
+     * Run closure with OK status in thread pool.
+     */
+    public static Future<?> runClosureInThread(final Closure done) {
+        if (done == null) {
+            return null;
+        }
+        return runClosureInThread(done, Status.OK());
+    }
+
+    /**
+     * Run a task in thread pool, returns the future object.
+     */
+    public static Future<?> runInThread(final Runnable runnable) {
+        return RPC_CLOSURE_EXECUTOR.submit(runnable);
+    }
+
+    /**
+     * Run closure with status in thread pool.
+     */
+    public static Future<?> runClosureInThread(final Closure done, final Status status) {
+        if (done == null) {
+            return null;
+        }
+
+        return runInThread(() -> {
+            try {
+                done.run(status);
+            } catch (final Throwable t) {
+                LOG.log(System.Logger.Level.WARNING, "Fail to run done closure.", t);
+            }
+        });
+    }
+
+    /**
+     * Run closure with status in specified executor
+     */
+    public static void runClosureInExecutor(final Executor executor, final Closure done, final Status status) {
+        if (done == null) {
+            return;
+        }
+
+        if (executor == null) {
+            runClosureInThread(done, status);
+            return;
+        }
+
+        executor.execute(() -> {
+            try {
+                done.run(status);
+            } catch (final Throwable t) {
+                LOG.log(System.Logger.Level.WARNING, "Fail to run done closure.", t);
+            }
+        });
+    }
+
+    private RpcUtils() {
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalConnection.java
similarity index 86%
copy from modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
copy to modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalConnection.java
index db63614..4041316 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalConnection.java
@@ -1,16 +1,13 @@
-package com.alipay.sofa.jraft.rpc.impl;
+package org.apache.ignite.raft.rpc.impl;
 
-import com.alipay.sofa.jraft.rpc.Connection;
-import com.alipay.sofa.jraft.rpc.Message;
-import com.alipay.sofa.jraft.rpc.RpcRequests;
-import com.alipay.sofa.jraft.util.Endpoint;
-import java.util.Collection;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Predicate;
+import org.apache.ignite.raft.rpc.Connection;
+import org.apache.ignite.raft.rpc.Message;
 
 public class LocalConnection implements Connection {
     private static boolean RECORD_ALL_MESSAGES = false;
@@ -41,7 +38,8 @@ public class LocalConnection implements Connection {
 
     private void send(Message request, Future fut) {
         Object[] tuple = {client, request, fut};
-        assert srv.incoming.offer(tuple); // Should never fail because server uses unbounded queue.
+
+        srv.incoming.offer(tuple);
     }
 
     public void onBeforeRequestSend(Message request, Future fut) {
@@ -62,10 +60,11 @@ public class LocalConnection implements Connection {
     }
 
     public void onAfterResponseSend(Message msg, Throwable err) {
-        assert err == null : err;
+        if (msg == null) // Ignore timeouts.
+            return;
 
         if (RECORD_ALL_MESSAGES || recordPred != null && recordPred.test(msg))
-            recordedMsgs.add(new Object[] {System.currentTimeMillis(), msg});
+            recordedMsgs.add(new Object[] {System.currentTimeMillis(), msg, err});
     }
 
     @Override public Object getAttribute(String key) {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalRpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalRpcClient.java
new file mode 100644
index 0000000..4c9a98f
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalRpcClient.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc.impl;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.rpc.ConnectionOpenedEventListener;
+import org.apache.ignite.raft.rpc.InvokeCallback;
+import org.apache.ignite.raft.rpc.InvokeContext;
+import org.apache.ignite.raft.rpc.InvokeTimeoutException;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RemotingException;
+import org.apache.ignite.raft.rpc.RpcClient;
+import org.apache.ignite.raft.rpc.RpcOptions;
+import org.apache.ignite.raft.rpc.RpcUtils;
+
+/**
+ * Local rpc client impl.
+ *
+ * @author ascherbakov.
+ */
+public class LocalRpcClient implements RpcClient {
+    private List<ConnectionOpenedEventListener> listeners = new CopyOnWriteArrayList<>();
+
+    @Override public boolean checkConnection(Endpoint endpoint) {
+        return LocalRpcServer.connect(this, endpoint, false, null);
+    }
+
+    @Override public boolean checkConnection(Endpoint endpoint, boolean createIfAbsent) {
+        return LocalRpcServer.connect(this, endpoint, createIfAbsent, this::onCreated);
+    }
+
+    @Override public void closeConnection(Endpoint endpoint) {
+        LocalRpcServer.closeConnection(this, endpoint);
+    }
+
+    @Override public void registerConnectEventListener(ConnectionOpenedEventListener listener) {
+        if (!listeners.contains(listeners))
+            listeners.add(listener);
+    }
+
+    private void onCreated(LocalConnection conn) {
+        for (ConnectionOpenedEventListener listener : listeners) {
+            listener.onOpened(conn.srv.local.toString(), conn);
+        }
+    }
+
+    @Override public Object invokeSync(Endpoint endpoint, Object request, InvokeContext ctx, long timeoutMs) throws InterruptedException, RemotingException {
+        if (!checkConnection(endpoint, true))
+            throw new RemotingException("Server is dead " + endpoint);
+
+        LocalRpcServer srv = LocalRpcServer.servers.get(endpoint);
+        if (srv == null)
+            throw new RemotingException("Server is dead " + endpoint);
+
+        LocalConnection locConn = srv.conns.get(this);
+        if (locConn == null)
+            throw new RemotingException("Server is dead " + endpoint);
+
+        CompletableFuture<Object> fut = new CompletableFuture();
+
+        locConn.onBeforeRequestSend((Message) request, fut);
+
+        try {
+            return fut.whenComplete((res, err) -> locConn.onAfterResponseSend((Message) res, err)).get(timeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw new RemotingException(e);
+        } catch (TimeoutException e) {
+            throw new InvokeTimeoutException(e);
+        }
+    }
+
+    @Override public void invokeAsync(Endpoint endpoint, Object request, InvokeContext ctx, InvokeCallback callback, long timeoutMs) throws InterruptedException, RemotingException {
+        if (!checkConnection(endpoint, true))
+            throw new RemotingException("Server is dead " + endpoint);
+
+        LocalRpcServer srv = LocalRpcServer.servers.get(endpoint);
+        if (srv == null)
+            throw new RemotingException("Server is dead " + endpoint);
+
+        LocalConnection locConn = srv.conns.get(this);
+        if (locConn == null)
+            throw new RemotingException("Server is dead " + endpoint);
+
+        CompletableFuture<Object> fut = new CompletableFuture<>();
+
+        locConn.onBeforeRequestSend((Message) request, fut);
+
+        fut.orTimeout(timeoutMs, TimeUnit.MILLISECONDS).whenComplete((res, err) -> {
+            locConn.onAfterResponseSend((Message) res, err);
+
+            RpcUtils.runInThread(() -> callback.complete(res,
+                err instanceof TimeoutException ? new InvokeTimeoutException(err) : err)); // Avoid deadlocks if a closure has completed in the same thread.
+        });
+    }
+
+    @Override public boolean init(RpcOptions opts) {
+        return true;
+    }
+
+    @Override public void shutdown() {
+        // Close all connection from this peer.
+        for (LocalRpcServer value : LocalRpcServer.servers.values())
+            LocalRpcServer.closeConnection(this, value.local);
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalRpcServer.java
similarity index 77%
copy from modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
copy to modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalRpcServer.java
index db9648a..18b1909 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/impl/LocalRpcServer.java
@@ -14,15 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package com.alipay.sofa.jraft.rpc.impl;
-
-import com.alipay.sofa.jraft.rpc.Connection;
-import com.alipay.sofa.jraft.rpc.Message;
-import com.alipay.sofa.jraft.rpc.RpcContext;
-import com.alipay.sofa.jraft.rpc.RpcProcessor;
-import com.alipay.sofa.jraft.rpc.RpcServer;
-import com.alipay.sofa.jraft.util.Endpoint;
-import com.alipay.sofa.jraft.util.NamedThreadFactory;
+package org.apache.ignite.raft.rpc.impl;
+
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -31,13 +24,17 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.rpc.Connection;
+import org.apache.ignite.raft.rpc.ConnectionClosedEventListener;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RpcContext;
+import org.apache.ignite.raft.rpc.RpcOptions;
+import org.apache.ignite.raft.rpc.RpcProcessor;
+import org.apache.ignite.raft.rpc.RpcServer;
+import org.apache.ignite.raft.rpc.RpcUtils;
 
 /**
  * Local RPC server impl.
@@ -45,8 +42,6 @@ import org.slf4j.LoggerFactory;
  * @author ascherbakov.
  */
 public class LocalRpcServer implements RpcServer {
-    private static final Logger LOG                    = LoggerFactory.getLogger(LocalRpcServer.class);
-
     /** Running servers. */
     public static ConcurrentMap<Endpoint, LocalRpcServer> servers = new ConcurrentHashMap<>();
 
@@ -65,9 +60,6 @@ public class LocalRpcServer implements RpcServer {
 
     BlockingQueue<Object[]> incoming = new LinkedBlockingDeque<>(); // TODO asch OOM is possible, handle that.
 
-    // TODO FIXME asch Or better use com.alipay.sofa.jraft.rpc.RpcUtils.RPC_CLOSURE_EXECUTOR ?
-    private ExecutorService defaultExecutor;
-
     public LocalRpcServer(Endpoint local) {
         this.local = local;
     }
@@ -122,7 +114,7 @@ public class LocalRpcServer implements RpcServer {
         return local.getPort();
     }
 
-    @Override public synchronized boolean init(Void opts) {
+    @Override public synchronized boolean init(RpcOptions opts) {
         if (started)
             return false;
 
@@ -154,16 +146,10 @@ public class LocalRpcServer implements RpcServer {
                             }
                         }
 
-                        RpcProcessor.ExecutorSelector selector = prc.executorSelector();
-
-                        Executor executor = null;
-
-                        if (selector != null) {
-                            executor = selector.select(null, msg);
-                        }
+                        Executor executor = prc.executor();
 
                         if (executor == null)
-                            executor = defaultExecutor;
+                            executor = RpcUtils.RPC_CLOSURE_EXECUTOR;
 
                         RpcProcessor finalPrc = prc;
 
@@ -189,14 +175,14 @@ public class LocalRpcServer implements RpcServer {
             }
         });
 
-        defaultExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("LocalRPCServer-Default-Executor-Thread: " + local.toString()));
+        started = true;
 
-        worker.setName("LocalRPCServer-Dispatch-Thread: "  + local.toString());
+        worker.setName("LocalRPCServer-Dispatch-Thread: "  + local.toString()); // TODO asch use MPSC pattern ?
         worker.start();
 
         servers.put(local, this);
 
-        started = true;
+
 
         return true;
     }
@@ -214,17 +200,6 @@ public class LocalRpcServer implements RpcServer {
             throw new RuntimeException("Interrupted while waiting for RPC server to stop " + local);
         }
 
-        defaultExecutor.shutdownNow();
-
-        try {
-            boolean stopped = defaultExecutor.awaitTermination(60_000, TimeUnit.MILLISECONDS);
-
-            if (!stopped) // TODO asch make thread dump.
-                LOG.error("Failed to wait for graceful executor shutdown, probably some task is hanging.");
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Interrupted while waiting for RPC server to stop " + local);
-        }
-
         // Close all connections to this server.
         for (LocalRpcClient client : conns.keySet())
             closeConnection(client, local);
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/AddLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/AddLearnersRequestImpl.java
new file mode 100644
index 0000000..940c653
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/AddLearnersRequestImpl.java
@@ -0,0 +1,53 @@
+package org.apache.ignite.raft.rpc.message;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.raft.rpc.CliRequests;
+
+public class AddLearnersRequestImpl implements CliRequests.AddLearnersRequest, CliRequests.AddLearnersRequest.Builder {
+    private String groupId;
+    private String leaderId;
+    private List<String> learnersList = new ArrayList<>();
+
+    @Override public String getGroupId() {
+        return groupId;
+    }
+
+    @Override public String getLeaderId() {
+        return leaderId;
+    }
+
+    @Override public List<String> getLearnersList() {
+        return learnersList;
+    }
+
+    @Override public int getLearnersCount() {
+        return learnersList.size();
+    }
+
+    @Override public String getLearners(int index) {
+        return learnersList.get(index);
+    }
+
+    @Override public Builder setGroupId(String groupId) {
+        this.groupId = groupId;
+
+        return this;
+    }
+
+    @Override public Builder setLeaderId(String leaderId) {
+        this.leaderId = leaderId;
+
+        return this;
+    }
+
+    @Override public Builder addLearners(String learnerId) {
+        learnersList.add(learnerId);
+
+        return this;
+    }
+
+    @Override public CliRequests.AddLearnersRequest build() {
+        return this;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/CreateGetLeaderRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/CreateGetLeaderRequestImpl.java
new file mode 100644
index 0000000..cde6e81
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/CreateGetLeaderRequestImpl.java
@@ -0,0 +1,37 @@
+package org.apache.ignite.raft.rpc.message;
+
+
+import org.apache.ignite.raft.rpc.CliRequests;
+
+public class CreateGetLeaderRequestImpl implements CliRequests.GetLeaderRequest, CliRequests.GetLeaderRequest.Builder {
+    private String groupId;
+    private String peerId;
+
+    @Override public String getGroupId() {
+        return groupId;
+    }
+
+    @Override public String getPeerId() {
+        return peerId;
+    }
+
+    @Override public boolean hasPeerId() {
+        return peerId != null;
+    }
+
+    @Override public Builder setGroupId(String groupId) {
+        this.groupId = groupId;
+
+        return this;
+    }
+
+    @Override public Builder setPeerId(String peerId) {
+        this.peerId = peerId;
+
+        return this;
+    }
+
+    @Override public CliRequests.GetLeaderRequest build() {
+        return this;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/CreateGetLeaderResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/CreateGetLeaderResponseImpl.java
new file mode 100644
index 0000000..517eb47
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/CreateGetLeaderResponseImpl.java
@@ -0,0 +1,22 @@
+package org.apache.ignite.raft.rpc.message;
+
+
+import org.apache.ignite.raft.rpc.CliRequests;
+
+public class CreateGetLeaderResponseImpl implements CliRequests.GetLeaderResponse, CliRequests.GetLeaderResponse.Builder {
+    private String leaderId;
+
+    @Override public String getLeaderId() {
+        return leaderId;
+    }
+
+    @Override public CliRequests.GetLeaderResponse build() {
+        return this;
+    }
+
+    @Override public Builder setLeaderId(String leaderId) {
+        this.leaderId = leaderId;
+
+        return this;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/DefaultMessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/DefaultMessageBuilderFactory.java
new file mode 100644
index 0000000..9f3ef49
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/DefaultMessageBuilderFactory.java
@@ -0,0 +1,86 @@
+package org.apache.ignite.raft.rpc.message;
+
+import org.apache.ignite.raft.rpc.CliRequests;
+import org.apache.ignite.raft.rpc.MessageBuilderFactory;
+import org.apache.ignite.raft.rpc.RpcRequests;
+
+/**
+ * Default message builders factory.
+ */
+public class DefaultMessageBuilderFactory implements MessageBuilderFactory {
+    @Override public RpcRequests.PingRequest.Builder createPingRequest() {
+        return new PingRequestImpl();
+    }
+
+    @Override public RpcRequests.ErrorResponse.Builder createErrorResponse() {
+        return new ErrorResponseImpl();
+    }
+
+    @Override public CliRequests.AddPeerRequest.Builder createAddPeerRequest() {
+        return new AddPeerRequestImpl();
+    }
+
+    @Override public CliRequests.AddPeerResponse.Builder createAddPeerResponse() {
+        return new AddPeerResponseImpl();
+    }
+
+    @Override public CliRequests.RemovePeerRequest.Builder createRemovePeerRequest() {
+        return new RemovePeerRequestImpl();
+    }
+
+    @Override public CliRequests.RemovePeerResponse.Builder createRemovePeerResponse() {
+        return new RemovePeerResponseImpl();
+    }
+
+    @Override public CliRequests.ChangePeersRequest.Builder createChangePeerRequest() {
+        return new ChangePeerRequestImpl();
+    }
+
+    @Override public CliRequests.ChangePeersResponse.Builder createChangePeerResponse() {
+        return new ChangePeersResponseImpl();
+    }
+
+    @Override public CliRequests.SnapshotRequest.Builder createSnapshotRequest() {
+        return new SnapshotRequestImpl();
+    }
+
+    @Override public CliRequests.ResetPeerRequest.Builder createResetPeerRequest() {
+        return new ResetPeerRequestImpl();
+    }
+
+    @Override public CliRequests.TransferLeaderRequest.Builder createTransferLeaderRequest() {
+        return new TransferLeaderRequestImpl();
+    }
+
+    @Override public CliRequests.GetLeaderRequest.Builder createGetLeaderRequest() {
+        return new CreateGetLeaderRequestImpl();
+    }
+
+    @Override public CliRequests.GetLeaderResponse.Builder createGetLeaderResponse() {
+        return new CreateGetLeaderResponseImpl();
+    }
+
+    @Override public CliRequests.GetPeersRequest.Builder createGetPeersRequest() {
+        return new GetPeersRequestImpl();
+    }
+
+    @Override public CliRequests.GetPeersResponse.Builder createGetPeersResponse() {
+        return new GetPeersResponseImpl();
+    }
+
+    @Override public CliRequests.AddLearnersRequest.Builder createAddLearnersRequest() {
+        return new AddLearnersRequestImpl();
+    }
+
+    @Override public CliRequests.RemoveLearnersRequest.Builder createRemoveLearnersRequest() {
+        return new RemoveLearnersRequestImpl();
+    }
+
+    @Override public CliRequests.ResetLearnersRequest.Builder createResetLearnersRequest() {
+        return new ResetLearnersRequestImpl();
+    }
+
+    @Override public CliRequests.LearnersOpResponse.Builder createLearnersOpResponse() {
+        return new LearnersOpResponseImpl();
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ErrorResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ErrorResponseImpl.java
new file mode 100644
index 0000000..ad1f2b7
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ErrorResponseImpl.java
@@ -0,0 +1,25 @@
+package org.apache.ignite.raft.rpc.message;
+
+import org.apache.ignite.raft.rpc.RpcRequests;
+
+class ErrorResponseImpl implements RpcRequests.ErrorResponse, RpcRequests.ErrorResponse.Builder {
+    @Override public int getErrorCode() {
+        return 0;
+    }
+
+    @Override public String getErrorMsg() {
+        return null;
+    }
+
+    @Override public Builder setErrorCode(int code) {
+        return null;
+    }
+
+    @Override public Builder setErrorMsg(String msg) {
+        return null;
+    }
+
+    @Override public RpcRequests.ErrorResponse build() {
+        return null;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/LearnersOpResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/LearnersOpResponseImpl.java
new file mode 100644
index 0000000..06ff501
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/LearnersOpResponseImpl.java
@@ -0,0 +1,50 @@
+package org.apache.ignite.raft.rpc.message;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.raft.rpc.CliRequests;
+
+class LearnersOpResponseImpl implements CliRequests.LearnersOpResponse, CliRequests.LearnersOpResponse.Builder {
+    private List<String> oldLearnersList = new ArrayList<>();
+    private List<String> newLearnersList = new ArrayList<>();
+
+    @Override public List<String> getOldLearnersList() {
+        return oldLearnersList;
+    }
+
+    @Override public int getOldLearnersCount() {
+        return oldLearnersList.size();
+    }
+
+    @Override public String getOldLearners(int index) {
+        return oldLearnersList.get(index);
+    }
+
+    @Override public List<String> getNewLearnersList() {
+        return newLearnersList;
+    }
+
+    @Override public int getNewLearnersCount() {
+        return newLearnersList.size();
+    }
+
+    @Override public String getNewLearners(int index) {
+        return newLearnersList.get(index);
+    }
+
+    @Override public Builder addOldLearners(String oldLearnersId) {
+        oldLearnersList.add(oldLearnersId);
+
+        return this;
+    }
+
+    @Override public Builder addNewLearners(String newLearnersId) {
+        newLearnersList.add(newLearnersId);
+
+        return this;
+    }
+
+    @Override public CliRequests.LearnersOpResponse build() {
+        return this;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/PingRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/PingRequestImpl.java
new file mode 100644
index 0000000..b4eba88
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/PingRequestImpl.java
@@ -0,0 +1,21 @@
+package org.apache.ignite.raft.rpc.message;
+
+import org.apache.ignite.raft.rpc.RpcRequests;
+
+class PingRequestImpl implements RpcRequests.PingRequest, RpcRequests.PingRequest.Builder {
+    private long sendTimestamp;
+
+    @Override public long getSendTimestamp() {
+        return sendTimestamp;
+    }
+
+    @Override public Builder setSendTimestamp(long timestamp) {
+        this.sendTimestamp = timestamp;
+
+        return this;
+    }
+
+    @Override public RpcRequests.PingRequest build() {
+        return this;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/RemoveLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/RemoveLearnersRequestImpl.java
new file mode 100644
index 0000000..ef4005c
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/RemoveLearnersRequestImpl.java
@@ -0,0 +1,53 @@
+package org.apache.ignite.raft.rpc.message;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.raft.rpc.CliRequests;
+
+class RemoveLearnersRequestImpl implements CliRequests.RemoveLearnersRequest, CliRequests.RemoveLearnersRequest.Builder {
+    private String groupId;
+    private String leaderId;
+    private List<String> learnersList = new ArrayList<>();
+
+    @Override public String getGroupId() {
+        return groupId;
+    }
+
+    @Override public String getLeaderId() {
+        return leaderId;
+    }
+
+    @Override public List<String> getLearnersList() {
+        return learnersList;
+    }
+
+    @Override public int getLearnersCount() {
+        return learnersList.size();
+    }
+
+    @Override public String getLearners(int index) {
+        return learnersList.get(index);
+    }
+
+    @Override public Builder setGroupId(String groupId) {
+        this.groupId = groupId;
+
+        return this;
+    }
+
+    @Override public Builder setLeaderId(String leaderId) {
+        this.leaderId = leaderId;
+
+        return this;
+    }
+
+    @Override public Builder addLearners(String learnerId) {
+        learnersList.add(learnerId);
+
+        return this;
+    }
+
+    @Override public CliRequests.RemoveLearnersRequest build() {
+        return this;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ResetLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ResetLearnersRequestImpl.java
new file mode 100644
index 0000000..80ea2cb
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ResetLearnersRequestImpl.java
@@ -0,0 +1,53 @@
+package org.apache.ignite.raft.rpc.message;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.raft.rpc.CliRequests;
+
+class ResetLearnersRequestImpl implements CliRequests.ResetLearnersRequest, CliRequests.ResetLearnersRequest.Builder {
+    private String groupId;
+    private String leaderId;
+    private List<String> learnersList = new ArrayList<>();
+
+    @Override public String getGroupId() {
+        return groupId;
+    }
+
+    @Override public String getLeaderId() {
+        return leaderId;
+    }
+
+    @Override public List<String> getLearnersList() {
+        return learnersList;
+    }
+
+    @Override public int getLearnersCount() {
+        return learnersList.size();
+    }
+
+    @Override public String getLearners(int index) {
+        return learnersList.get(index);
+    }
+
+    @Override public Builder setGroupId(String groupId) {
+        this.groupId = groupId;
+
+        return this;
+    }
+
+    @Override public Builder setLeaderId(String leaderId) {
+        this.leaderId = leaderId;
+
+        return this;
+    }
+
+    @Override public Builder addLearners(String learnerId) {
+        learnersList.add(learnerId);
+
+        return this;
+    }
+
+    @Override public CliRequests.ResetLearnersRequest build() {
+        return this;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/SnapshotRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/SnapshotRequestImpl.java
new file mode 100644
index 0000000..00dfb47
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/SnapshotRequestImpl.java
@@ -0,0 +1,32 @@
+package org.apache.ignite.raft.rpc.message;
+
+import org.apache.ignite.raft.rpc.CliRequests;
+
+class SnapshotRequestImpl implements CliRequests.SnapshotRequest, CliRequests.SnapshotRequest.Builder {
+    private String groupId;
+    private String peerId;
+
+    @Override public String getGroupId() {
+        return groupId;
+    }
+
+    @Override public String getPeerId() {
+        return peerId;
+    }
+
+    @Override public Builder setGroupId(String groupId) {
+        this.groupId = groupId;
+
+        return this;
+    }
+
+    @Override public Builder setPeerId(String peerId) {
+        this.peerId = peerId;
+
+        return this;
+    }
+
+    @Override public CliRequests.SnapshotRequest build() {
+        return this;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/TransferLeaderRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/TransferLeaderRequestImpl.java
new file mode 100644
index 0000000..5796ee0
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/TransferLeaderRequestImpl.java
@@ -0,0 +1,47 @@
+package org.apache.ignite.raft.rpc.message;
+
+import org.apache.ignite.raft.rpc.CliRequests;
+
+class TransferLeaderRequestImpl implements CliRequests.TransferLeaderRequest, CliRequests.TransferLeaderRequest.Builder {
+    private String groupId;
+    private String leaderId;
+    private String peerId;
+
+    @Override public String getGroupId() {
+        return groupId;
+    }
+
+    @Override public String getLeaderId() {
+        return leaderId;
+    }
+
+    @Override public String getPeerId() {
+        return peerId;
+    }
+
+    @Override public boolean hasPeerId() {
+        return peerId != null;
+    }
+
+    @Override public Builder setGroupId(String groupId) {
+        this.groupId = groupId;
+
+        return this;
+    }
+
+    @Override public Builder setLeaderId(String leaderId) {
+        this.leaderId = leaderId;
+
+        return this;
+    }
+
+    @Override public Builder setPeerId(String peerId) {
+        this.peerId = peerId;
+
+        return this;
+    }
+
+    @Override public CliRequests.TransferLeaderRequest build() {
+        return this;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java
new file mode 100644
index 0000000..da09246
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.service;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.RaftError;
+import org.apache.ignite.raft.Status;
+import org.apache.ignite.raft.closure.RpcResponseClosure;
+import org.apache.ignite.raft.rpc.InvokeCallback;
+import org.apache.ignite.raft.rpc.InvokeContext;
+import org.apache.ignite.raft.rpc.InvokeTimeoutException;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RemotingException;
+import org.apache.ignite.raft.rpc.RpcClient;
+import org.apache.ignite.raft.rpc.RpcOptions;
+import org.apache.ignite.raft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.rpc.RpcRequests.PingRequest;
+import org.apache.ignite.raft.rpc.RpcUtils;
+import org.apache.ignite.raft.rpc.impl.LocalRpcClient;
+
+/**
+ * Abstract RPC client service based.
+ */
+public abstract class AbstractClientService implements ClientService {
+    protected static final System.Logger LOG = System.getLogger(AbstractClientService.class.getName());
+
+    protected volatile RpcClient rpcClient;
+    protected RpcOptions rpcOptions;
+
+    public RpcClient getRpcClient() {
+        return this.rpcClient;
+    }
+
+    @Override
+    public boolean isConnected(final Endpoint endpoint) {
+        final RpcClient rc = this.rpcClient;
+        return rc != null && isConnected(rc, endpoint);
+    }
+
+    private static boolean isConnected(final RpcClient rpcClient, final Endpoint endpoint) {
+        return rpcClient.checkConnection(endpoint);
+    }
+
+    @Override
+    public boolean checkConnection(final Endpoint endpoint, final boolean createIfAbsent) {
+        final RpcClient rc = this.rpcClient;
+        if (rc == null) {
+            throw new IllegalStateException("Client service is uninitialized.");
+        }
+        return rc.checkConnection(endpoint, createIfAbsent);
+    }
+
+    @Override
+    public synchronized boolean init(final RpcOptions rpcOptions) {
+        if (this.rpcClient != null) {
+            return true;
+        }
+        this.rpcOptions = rpcOptions;
+        return initRpcClient();
+    }
+
+    protected boolean initRpcClient() {
+        this.rpcClient = new LocalRpcClient();
+        this.rpcClient.init(rpcOptions);
+
+        return true;
+    }
+
+    @Override
+    public synchronized void shutdown() {
+        if (this.rpcClient != null) {
+            this.rpcClient.shutdown();
+            this.rpcClient = null;
+        }
+    }
+
+    @Override
+    public boolean connect(final Endpoint endpoint) {
+        final RpcClient rc = this.rpcClient;
+        if (rc == null) {
+            throw new IllegalStateException("Client service is uninitialized.");
+        }
+        if (isConnected(rc, endpoint)) {
+            return true;
+        }
+        try {
+            final PingRequest req = PingRequest.newBuilder() //
+                .setSendTimestamp(System.currentTimeMillis()) //
+                .build();
+            final ErrorResponse resp = (ErrorResponse) rc.invokeSync(endpoint, req,
+                this.rpcOptions.getRpcConnectTimeoutMs());
+            return resp.getErrorCode() == 0;
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return false;
+        } catch (final RemotingException e) {
+            LOG.log(System.Logger.Level.WARNING, "Fail to connect {0}, remoting exception: {1}.", endpoint, e.getMessage());
+            return false;
+        }
+    }
+
+    @Override
+    public boolean disconnect(final Endpoint endpoint) {
+        final RpcClient rc = this.rpcClient;
+        if (rc == null) {
+            return true;
+        }
+        LOG.log(System.Logger.Level.INFO, "Disconnect from {}.", endpoint);
+        rc.closeConnection(endpoint);
+        return true;
+    }
+
+    @Override
+    public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+                                                              final RpcResponseClosure<T> done, final int timeoutMs) {
+        return invokeWithDone(endpoint, request, null, done, timeoutMs, null);
+    }
+
+    public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+                                                              final InvokeContext ctx,
+                                                              final RpcResponseClosure<T> done, final int timeoutMs,
+                                                              final Executor rpcExecutor) {
+        final RpcClient rc = this.rpcClient;
+        final CompletableFuture<Message> future = new CompletableFuture<>();
+        final Executor currExecutor = rpcExecutor != null ? rpcExecutor : RpcUtils.RPC_CLOSURE_EXECUTOR;
+        try {
+            if (rc == null) {
+                future.completeExceptionally(new IllegalStateException("Client service is uninitialized."));
+                // should be in another thread to avoid dead locking.
+                RpcUtils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL,
+                    "Client service is uninitialized."));
+                return future;
+            }
+
+            rc.invokeAsync(endpoint, request, ctx, new InvokeCallback() {
+                @SuppressWarnings({"unchecked", "ConstantConditions"})
+                @Override
+                public void complete(final Object result, final Throwable err) {
+                    if (future.isCancelled()) {
+                        onCanceled(request, done);
+                        return;
+                    }
+
+                    if (err == null) {
+                        Status status = Status.OK();
+                        Message msg;
+                        if (result instanceof ErrorResponse) {
+                            status = handleErrorResponse((ErrorResponse) result);
+                            msg = (Message) result;
+                        } else {
+                            msg = (Message) result;
+                        }
+                        if (done != null) {
+                            try {
+                                if (status.isOk()) {
+                                    done.setResponse((T) msg);
+                                }
+                                done.run(status);
+                            } catch (final Throwable t) {
+                                LOG.log(System.Logger.Level.WARNING, "Fail to run RpcResponseClosure, the request is {}.", request, t);
+                            }
+                        }
+                        if (!future.isDone()) {
+                            future.complete(msg);
+                        }
+                    } else {
+                        if (done != null) {
+                            try {
+                                done.run(new Status(err instanceof InvokeTimeoutException ? RaftError.ETIMEDOUT
+                                    : RaftError.EINTERNAL, "RPC exception:" + err.getMessage()));
+                            } catch (final Throwable t) {
+                                LOG.log(System.Logger.Level.WARNING, "Fail to run RpcResponseClosure, the request is {}.", request, t);
+                            }
+                        }
+                        if (!future.isDone()) {
+                            future.completeExceptionally(err);
+                        }
+                    }
+                }
+
+                @Override
+                public Executor executor() {
+                    return currExecutor;
+                }
+            }, timeoutMs <= 0 ? this.rpcOptions.getRpcDefaultTimeout() : timeoutMs);
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            future.completeExceptionally(e);
+            // should be in another thread to avoid dead locking.
+            RpcUtils.runClosureInExecutor(currExecutor, done,
+                new Status(RaftError.EINTR, "Sending rpc was interrupted"));
+        } catch (final RemotingException e) {
+            future.completeExceptionally(e);
+            // should be in another thread to avoid dead locking.
+            RpcUtils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL,
+                "Fail to send a RPC request:" + e.getMessage()));
+
+        }
+
+        return future;
+    }
+
+    private static Status handleErrorResponse(final ErrorResponse eResp) {
+        final Status status = new Status();
+        status.setCode(eResp.getErrorCode());
+        status.setErrorMsg(eResp.getErrorMsg());
+        return status;
+    }
+
+    private <T extends Message> void onCanceled(final Message request, final RpcResponseClosure<T> done) {
+        if (done != null) {
+            try {
+                done.run(new Status(RaftError.ECANCELED, "RPC request was canceled by future."));
+            } catch (final Throwable t) {
+                LOG.log(System.Logger.Level.WARNING, "Fail to run RpcResponseClosure, the request is {}.", request, t);
+            }
+        }
+    }
+
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java
new file mode 100644
index 0000000..8082191
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.service;
+
+import java.util.concurrent.Future;
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.closure.RpcResponseClosure;
+import org.apache.ignite.raft.rpc.CliRequests;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RpcRequests;
+
+/**
+ * Cli RPC client service.
+ */
+public interface CliClientService extends ClientService {
+    /**
+     * Adds a peer.
+     *
+     * @param endpoint  server address
+     * @param request   request data
+     * @param done      callback
+     * @return a future with result
+     */
+    Future<Message> addPeer(Endpoint endpoint, CliRequests.AddPeerRequest request,
+                            RpcResponseClosure<CliRequests.AddPeerResponse> done);
+
+    /**
+     * Removes a peer.
+     *
+     * @param endpoint  server address
+     * @param request   request data
+     * @param done      callback
+     * @return a future with result
+     */
+    Future<Message> removePeer(Endpoint endpoint, CliRequests.RemovePeerRequest request,
+                               RpcResponseClosure<CliRequests.RemovePeerResponse> done);
+
+    /**
+     * Reset a peer.
+     *
+     * @param endpoint  server address
+     * @param request   request data
+     * @param done      callback
+     * @return a future with result
+     */
+    Future<Message> resetPeer(Endpoint endpoint, CliRequests.ResetPeerRequest request,
+                              RpcResponseClosure<RpcRequests.ErrorResponse> done);
+
+    /**
+     * Do a snapshot.
+     *
+     * @param endpoint  server address
+     * @param request   request data
+     * @param done      callback
+     * @return a future with result
+     */
+    Future<Message> snapshot(Endpoint endpoint, CliRequests.SnapshotRequest request,
+                             RpcResponseClosure<RpcRequests.ErrorResponse> done);
+
+    /**
+     * Change peers.
+     *
+     * @param endpoint  server address
+     * @param request   request data
+     * @param done      callback
+     * @return a future with result
+     */
+    Future<Message> changePeers(Endpoint endpoint, CliRequests.ChangePeersRequest request,
+                                RpcResponseClosure<CliRequests.ChangePeersResponse> done);
+
+    /**
+     * Add learners
+     *
+     * @param endpoint  server address
+     * @param request   request data
+     * @param done      callback
+     * @return a future with result
+     * @since 1.3.0
+     */
+    Future<Message> addLearners(Endpoint endpoint, CliRequests.AddLearnersRequest request,
+                                RpcResponseClosure<CliRequests.LearnersOpResponse> done);
+
+    /**
+     * Remove learners
+     *
+     * @param endpoint  server address
+     * @param request   request data
+     * @param done      callback
+     * @return a future with result
+     * @since 1.3.0
+     */
+    Future<Message> removeLearners(Endpoint endpoint, CliRequests.RemoveLearnersRequest request,
+                                   RpcResponseClosure<CliRequests.LearnersOpResponse> done);
+
+    /**
+     * Reset learners
+     *
+     * @param endpoint  server address
+     * @param request   request data
+     * @param done      callback
+     * @return a future with result
+     * @since 1.3.0
+     */
+    Future<Message> resetLearners(Endpoint endpoint, CliRequests.ResetLearnersRequest request,
+                                  RpcResponseClosure<CliRequests.LearnersOpResponse> done);
+
+    /**
+     * Get the group leader.
+     *
+     * @param endpoint  server address
+     * @param request   request data
+     * @param done      callback
+     * @return a future with result
+     */
+    Future<Message> getLeader(Endpoint endpoint, CliRequests.GetLeaderRequest request,
+                              RpcResponseClosure<CliRequests.GetLeaderResponse> done);
+
+    /**
+     * Transfer leadership to other peer.
+     *
+     * @param endpoint  server address
+     * @param request   request data
+     * @param done      callback
+     * @return a future with result
+     */
+    Future<Message> transferLeader(Endpoint endpoint, CliRequests.TransferLeaderRequest request,
+                                   RpcResponseClosure<RpcRequests.ErrorResponse> done);
+
+    /**
+     * Get all peers of the replication group.
+     *
+     * @param endpoint  server address
+     * @param request   request data
+     * @param done      callback
+     * @return a future with result
+     */
+    Future<Message> getPeers(Endpoint endpoint, CliRequests.GetPeersRequest request,
+                             RpcResponseClosure<CliRequests.GetPeersResponse> done);
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java
new file mode 100644
index 0000000..94ec3a2
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.service;
+
+import java.util.concurrent.Future;
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.closure.RpcResponseClosure;
+import org.apache.ignite.raft.rpc.CliRequests;
+import org.apache.ignite.raft.rpc.CliRequests.AddLearnersRequest;
+import org.apache.ignite.raft.rpc.CliRequests.AddPeerRequest;
+import org.apache.ignite.raft.rpc.CliRequests.AddPeerResponse;
+import org.apache.ignite.raft.rpc.CliRequests.ChangePeersRequest;
+import org.apache.ignite.raft.rpc.CliRequests.ChangePeersResponse;
+import org.apache.ignite.raft.rpc.CliRequests.GetLeaderRequest;
+import org.apache.ignite.raft.rpc.CliRequests.GetLeaderResponse;
+import org.apache.ignite.raft.rpc.CliRequests.GetPeersRequest;
+import org.apache.ignite.raft.rpc.CliRequests.GetPeersResponse;
+import org.apache.ignite.raft.rpc.CliRequests.LearnersOpResponse;
+import org.apache.ignite.raft.rpc.CliRequests.RemoveLearnersRequest;
+import org.apache.ignite.raft.rpc.CliRequests.RemovePeerRequest;
+import org.apache.ignite.raft.rpc.CliRequests.RemovePeerResponse;
+import org.apache.ignite.raft.rpc.CliRequests.ResetLearnersRequest;
+import org.apache.ignite.raft.rpc.CliRequests.ResetPeerRequest;
+import org.apache.ignite.raft.rpc.CliRequests.TransferLeaderRequest;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RpcRequests.ErrorResponse;
+
+/**
+ *
+ */
+public class CliClientServiceImpl extends AbstractClientService implements CliClientService {
+    @Override
+    public Future<Message> addPeer(final Endpoint endpoint, final AddPeerRequest request,
+                                   final RpcResponseClosure<AddPeerResponse> done) {
+        return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+    }
+
+    @Override
+    public Future<Message> removePeer(final Endpoint endpoint, final RemovePeerRequest request,
+                                      final RpcResponseClosure<RemovePeerResponse> done) {
+        return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+    }
+
+    @Override
+    public Future<Message> resetPeer(final Endpoint endpoint, final ResetPeerRequest request,
+                                     final RpcResponseClosure<ErrorResponse> done) {
+        return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+    }
+
+    @Override
+    public Future<Message> snapshot(final Endpoint endpoint, final CliRequests.SnapshotRequest request,
+                                    final RpcResponseClosure<ErrorResponse> done) {
+        return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+    }
+
+    @Override
+    public Future<Message> changePeers(final Endpoint endpoint, final ChangePeersRequest request,
+                                       final RpcResponseClosure<ChangePeersResponse> done) {
+        return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+    }
+
+    @Override
+    public Future<Message> addLearners(final Endpoint endpoint, final AddLearnersRequest request,
+                                       final RpcResponseClosure<LearnersOpResponse> done) {
+        return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+    }
+
+    @Override
+    public Future<Message> removeLearners(final Endpoint endpoint, final RemoveLearnersRequest request,
+                                          final RpcResponseClosure<LearnersOpResponse> done) {
+        return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+    }
+
+    @Override
+    public Future<Message> resetLearners(final Endpoint endpoint, final ResetLearnersRequest request,
+                                         final RpcResponseClosure<LearnersOpResponse> done) {
+        return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+    }
+
+    @Override
+    public Future<Message> getLeader(final Endpoint endpoint, final GetLeaderRequest request,
+                                     final RpcResponseClosure<GetLeaderResponse> done) {
+        return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+    }
+
+    @Override
+    public Future<Message> transferLeader(final Endpoint endpoint, final TransferLeaderRequest request,
+                                          final RpcResponseClosure<ErrorResponse> done) {
+        return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+    }
+
+    @Override
+    public Future<Message> getPeers(final Endpoint endpoint, final GetPeersRequest request,
+                                    final RpcResponseClosure<GetPeersResponse> done) {
+        return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/ClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/ClientService.java
new file mode 100644
index 0000000..ef4976d
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/ClientService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.service;
+
+import java.util.concurrent.Future;
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.Lifecycle;
+import org.apache.ignite.raft.closure.RpcResponseClosure;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RpcOptions;
+
+/**
+ * RPC client service.
+ * TODO not needed
+ */
+public interface ClientService extends Lifecycle<RpcOptions> {
+
+    /**
+     * Connect to endpoint, returns true when success.
+     *
+     * @param endpoint server address
+     * @return true on connect success
+     */
+    boolean connect(final Endpoint endpoint);
+
+    /**
+     * Check connection for given address and async to create a new one if there is no connection.
+     * @param endpoint       target address
+     * @param createIfAbsent create a new one if there is no connection
+     * @return true if there is a connection and the connection is active and writable.
+     */
+    boolean checkConnection(final Endpoint endpoint, final boolean createIfAbsent);
+
+    /**
+     * Disconnect from endpoint.
+     *
+     * @param endpoint server address
+     * @return true on disconnect success
+     */
+    boolean disconnect(final Endpoint endpoint);
+
+    /**
+     * Returns true when the endpoint's connection is active.
+     *
+     * @param endpoint server address
+     * @return true on connection is active
+     */
+    boolean isConnected(final Endpoint endpoint);
+
+    /**
+     * Send a requests and waits for response with callback, returns the request future.
+     *
+     * @param endpoint  server address
+     * @param request   request data
+     * @param done      callback
+     * @param timeoutMs timeout millis
+     * @return a future with operation result
+     */
+    <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+                                                       final RpcResponseClosure<T> done, final int timeoutMs);
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java
new file mode 100644
index 0000000..916a768
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.service;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.StampedLock;
+import org.apache.ignite.raft.Configuration;
+import org.apache.ignite.raft.PeerId;
+import org.apache.ignite.raft.RaftError;
+import org.apache.ignite.raft.Status;
+import org.apache.ignite.raft.rpc.CliRequests;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RpcRequests;
+
+/**
+ * Maintain routes to raft groups.
+ */
+public class RouteTable {
+    private static final System.Logger LOG = System.getLogger(RouteTable.class.getName());
+
+    private static final RouteTable INSTANCE = new RouteTable();
+
+    // Map<groupId, groupConf>
+    private final ConcurrentMap<String, GroupConf> groupConfTable = new ConcurrentHashMap<>();
+
+    public static RouteTable getInstance() {
+        return INSTANCE;
+    }
+
+    /**
+     * Update configuration of group in route table.
+     *
+     * @param groupId raft group id
+     * @param conf    configuration to update
+     * @return true on success
+     */
+    public boolean updateConfiguration(final String groupId, final Configuration conf) {
+        final GroupConf gc = getOrCreateGroupConf(groupId);
+        final StampedLock stampedLock = gc.stampedLock;
+        final long stamp = stampedLock.writeLock();
+        try {
+            gc.conf = conf;
+            if (gc.leader != null && !gc.conf.contains(gc.leader)) {
+                gc.leader = null;
+            }
+        } finally {
+            stampedLock.unlockWrite(stamp);
+        }
+        return true;
+    }
+
+    private GroupConf getOrCreateGroupConf(final String groupId) {
+        GroupConf gc = this.groupConfTable.get(groupId);
+        if (gc == null) {
+            gc = new GroupConf();
+            final GroupConf old = this.groupConfTable.putIfAbsent(groupId, gc);
+            if (old != null) {
+                gc = old;
+            }
+        }
+        return gc;
+    }
+
+    /**
+     * Get the cached leader of the group, return it when found, null otherwise.
+     * Make sure calls {@link #refreshLeader(CliClientService, String, int)} already
+     * before invoke this method.
+     *
+     * @param groupId raft group id
+     * @return peer of leader
+     */
+    public PeerId selectLeader(final String groupId) {
+        final GroupConf gc = this.groupConfTable.get(groupId);
+        if (gc == null) {
+            return null;
+        }
+        final StampedLock stampedLock = gc.stampedLock;
+        long stamp = stampedLock.tryOptimisticRead();
+        PeerId leader = gc.leader;
+        if (!stampedLock.validate(stamp)) {
+            stamp = stampedLock.readLock();
+            try {
+                leader = gc.leader;
+            } finally {
+                stampedLock.unlockRead(stamp);
+            }
+        }
+        return leader;
+    }
+
+    /**
+     * Update leader info.
+     *
+     * @param groupId raft group id
+     * @param leader  peer of leader
+     * @return true on success
+     */
+    public boolean updateLeader(final String groupId, final PeerId leader) {
+        final GroupConf gc = getOrCreateGroupConf(groupId);
+        final StampedLock stampedLock = gc.stampedLock;
+        final long stamp = stampedLock.writeLock();
+        try {
+            gc.leader = leader;
+        } finally {
+            stampedLock.unlockWrite(stamp);
+        }
+        return true;
+    }
+
+    /**
+     * Update leader info.
+     *
+     * @param groupId   raft group id
+     * @param leaderStr peer string of leader
+     * @return true on success
+     */
+    public boolean updateLeader(final String groupId, final String leaderStr) {
+        final PeerId leader = new PeerId();
+        if (leader.parse(leaderStr)) {
+            return updateLeader(groupId, leader);
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * Get the configuration by groupId, returns null when not found.
+     *
+     * @param groupId raft group id
+     * @return configuration of the group id
+     */
+    public Configuration getConfiguration(final String groupId) {
+        final GroupConf gc = this.groupConfTable.get(groupId);
+        if (gc == null) {
+            return null;
+        }
+        final StampedLock stampedLock = gc.stampedLock;
+        long stamp = stampedLock.tryOptimisticRead();
+        Configuration conf = gc.conf;
+        if (!stampedLock.validate(stamp)) {
+            stamp = stampedLock.readLock();
+            try {
+                conf = gc.conf;
+            } finally {
+                stampedLock.unlockRead(stamp);
+            }
+        }
+        return conf;
+    }
+
+    /**
+     * Blocking the thread until query_leader finishes.
+     *
+     * @param groupId   raft group id
+     * @param timeoutMs timeout millis
+     * @return operation status
+     */
+    public Status refreshLeader(final CliClientService cliClientService, final String groupId, final int timeoutMs)
+        throws InterruptedException,
+        TimeoutException {
+        final Configuration conf = getConfiguration(groupId);
+        if (conf == null) {
+            return new Status(RaftError.ENOENT,
+                "Group %s is not registered in RouteTable, forgot to call updateConfiguration?", groupId);
+        }
+        final Status st = Status.OK();
+        final CliRequests.GetLeaderRequest.Builder rb = CliRequests.GetLeaderRequest.newBuilder();
+        rb.setGroupId(groupId);
+        final CliRequests.GetLeaderRequest request = rb.build();
+        TimeoutException timeoutException = null;
+        for (final PeerId peer : conf) {
+            if (!cliClientService.connect(peer.getEndpoint())) {
+                if (st.isOk()) {
+                    st.setError(-1, "Fail to init channel to %s", peer);
+                } else {
+                    final String savedMsg = st.getErrorMsg();
+                    st.setError(-1, "%s, Fail to init channel to %s", savedMsg, peer);
+                }
+                continue;
+            }
+            final Future<Message> result = cliClientService.getLeader(peer.getEndpoint(), request, null);
+            try {
+                final Message msg = result.get(timeoutMs, TimeUnit.MILLISECONDS);
+                if (msg instanceof RpcRequests.ErrorResponse) {
+                    if (st.isOk()) {
+                        st.setError(-1, ((RpcRequests.ErrorResponse) msg).getErrorMsg());
+                    } else {
+                        final String savedMsg = st.getErrorMsg();
+                        st.setError(-1, "%s, %s", savedMsg, ((RpcRequests.ErrorResponse) msg).getErrorMsg());
+                    }
+                } else {
+                    final CliRequests.GetLeaderResponse response = (CliRequests.GetLeaderResponse) msg;
+                    updateLeader(groupId, response.getLeaderId());
+                    return Status.OK();
+                }
+            } catch (final TimeoutException e) {
+                timeoutException = e;
+            } catch (final ExecutionException e) {
+                if (st.isOk()) {
+                    st.setError(-1, e.getMessage());
+                } else {
+                    final String savedMsg = st.getErrorMsg();
+                    st.setError(-1, "%s, %s", savedMsg, e.getMessage());
+                }
+            }
+        }
+        if (timeoutException != null) {
+            throw timeoutException;
+        }
+
+        return st;
+    }
+
+    public Status refreshConfiguration(final CliClientService cliClientService, final String groupId,
+                                       final int timeoutMs) throws InterruptedException, TimeoutException {
+        final Configuration conf = getConfiguration(groupId);
+        if (conf == null) {
+            return new Status(RaftError.ENOENT,
+                "Group %s is not registered in RouteTable, forgot to call updateConfiguration?", groupId);
+        }
+        final Status st = Status.OK();
+        PeerId leaderId = selectLeader(groupId);
+        if (leaderId == null) {
+            refreshLeader(cliClientService, groupId, timeoutMs);
+            leaderId = selectLeader(groupId);
+        }
+        if (leaderId == null) {
+            st.setError(-1, "Fail to get leader of group %s", groupId);
+            return st;
+        }
+        if (!cliClientService.connect(leaderId.getEndpoint())) {
+            st.setError(-1, "Fail to init channel to %s", leaderId);
+            return st;
+        }
+        final CliRequests.GetPeersRequest.Builder rb = CliRequests.GetPeersRequest.newBuilder();
+        rb.setGroupId(groupId);
+        rb.setLeaderId(leaderId.toString());
+        try {
+            final Message result = cliClientService.getPeers(leaderId.getEndpoint(), rb.build(), null).get(timeoutMs,
+                TimeUnit.MILLISECONDS);
+            if (result instanceof CliRequests.GetPeersResponse) {
+                final CliRequests.GetPeersResponse resp = (CliRequests.GetPeersResponse) result;
+                final Configuration newConf = new Configuration();
+                for (final String peerIdStr : resp.getPeersList()) {
+                    final PeerId newPeer = new PeerId();
+                    newPeer.parse(peerIdStr);
+                    newConf.addPeer(newPeer);
+                }
+                updateConfiguration(groupId, newConf);
+            } else {
+                final RpcRequests.ErrorResponse resp = (RpcRequests.ErrorResponse) result;
+                st.setError(resp.getErrorCode(), resp.getErrorMsg());
+            }
+        } catch (final Exception e) {
+            st.setError(-1, e.getMessage());
+        }
+        return st;
+    }
+
+    /**
+     * Reset the states.
+     */
+    public void reset() {
+        this.groupConfTable.clear();
+    }
+
+    /**
+     * Remove the group from route table.
+     *
+     * @param groupId raft group id
+     * @return true on success
+     */
+    public boolean removeGroup(final String groupId) {
+        return this.groupConfTable.remove(groupId) != null;
+    }
+
+    @Override
+    public String toString() {
+        return "RouteTable{" + "groupConfTable=" + groupConfTable + '}';
+    }
+
+    private RouteTable() {
+    }
+
+    private static class GroupConf {
+
+        private final StampedLock stampedLock = new StampedLock();
+
+        private Configuration conf;
+        private PeerId leader;
+
+        @Override
+        public String toString() {
+            return "GroupConf{" + "conf=" + conf + ", leader=" + leader + '}';
+        }
+    }
+}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java
new file mode 100644
index 0000000..0b1e8e4
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java
@@ -0,0 +1,39 @@
+package org.apache.ignite.raft;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.raft.rpc.RpcOptions;
+import org.apache.ignite.raft.service.CliClientServiceImpl;
+import org.apache.ignite.raft.service.RouteTable;
+import org.junit.Test;
+
+public class RaftClientTest {
+    private static final System.Logger LOG = System.getLogger(RaftClientTest.class.getName());
+
+    @Test
+    public void testClient() throws TimeoutException, InterruptedException {
+        List<PeerId> peers = Arrays.asList(
+            new PeerId("127.0.0.1", 8080),
+            new PeerId("127.0.0.1", 8081),
+            new PeerId("127.0.0.1", 8082)
+        );
+
+        Configuration initConf = new Configuration(peers);
+
+        String groupId = "unittest";
+
+        RouteTable.getInstance().updateConfiguration(groupId, initConf);
+
+        final CliClientServiceImpl cliClientService = new CliClientServiceImpl();
+        cliClientService.init(new RpcOptions());
+
+        if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) {
+            throw new IllegalStateException("Refresh leader failed");
+        }
+
+        final PeerId leader = RouteTable.getInstance().selectLeader(groupId);
+
+        LOG.log(System.Logger.Level.INFO, "Leader is " + leader);
+    }
+}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/LocalRpcTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/LocalRpcTest.java
new file mode 100644
index 0000000..93b12a5
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/LocalRpcTest.java
@@ -0,0 +1,321 @@
+package org.apache.ignite.raft.rpc;
+
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.raft.Endpoint;
+import org.apache.ignite.raft.rpc.impl.LocalConnection;
+import org.apache.ignite.raft.rpc.impl.LocalRpcClient;
+import org.apache.ignite.raft.rpc.impl.LocalRpcServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * TODO add test for localconn.close.
+ */
+public class LocalRpcTest {
+    private Endpoint endpoint;
+    private LocalRpcServer server;
+
+    @Before
+    public void setup() {
+        endpoint = new Endpoint();
+        server = new LocalRpcServer(endpoint);
+        server.registerProcessor(new Request1RpcProcessor());
+        server.registerProcessor(new Request2RpcProcessor());
+        server.init(null);
+    }
+
+    @After
+    public void teardown() {
+        server.shutdown();
+
+        assertNull(LocalRpcServer.servers.get(endpoint));
+    }
+
+    @Test
+    public void testStartStopServer() {
+        assertNotNull(LocalRpcServer.servers.get(endpoint));
+    }
+
+    @Test
+    public void testConnection() {
+        LocalRpcClient client = new LocalRpcClient();
+
+        assertFalse(client.checkConnection(endpoint));
+
+        assertTrue(client.checkConnection(endpoint, true));
+    }
+
+    @Test
+    public void testSyncProcessing() throws RemotingException, InterruptedException {
+        RpcClient client = new LocalRpcClient();
+        Response1 resp1 = (Response1) client.invokeSync(endpoint, new Request1(), new InvokeContext(), 5000);
+        assertNotNull(resp1);
+
+        Response2 resp2 = (Response2) client.invokeSync(endpoint, new Request2(), new InvokeContext(), 5000);
+        assertNotNull(resp2);
+    }
+
+    @Test
+    public void testAsyncProcessing() throws RemotingException, InterruptedException {
+        RpcClient client = new LocalRpcClient();
+
+        CountDownLatch l1 = new CountDownLatch(1);
+        AtomicReference<Response1> resp1 = new AtomicReference<>();
+        client.invokeAsync(endpoint, new Request1(), new InvokeContext(), (result, err) -> {
+            resp1.set((Response1) result);
+            l1.countDown();
+        }, 5000);
+        l1.await(5_000, TimeUnit.MILLISECONDS);
+        assertNotNull(resp1);
+
+        CountDownLatch l2 = new CountDownLatch(1);
+        AtomicReference<Response2> resp2 = new AtomicReference<>();
+        client.invokeAsync(endpoint, new Request2(), new InvokeContext(), (result, err) -> {
+            resp2.set((Response2) result);
+            l2.countDown();
+        }, 5000);
+        l2.await(5_000, TimeUnit.MILLISECONDS);
+        assertNotNull(resp2);
+    }
+
+    @Test
+    public void testDisconnect1() {
+        RpcClient client1 = new LocalRpcClient();
+        RpcClient client2 = new LocalRpcClient();
+
+        assertTrue(client1.checkConnection(endpoint, true));
+        assertTrue(client2.checkConnection(endpoint, true));
+
+        client1.shutdown();
+
+        assertFalse(client1.checkConnection(endpoint));
+        assertTrue(client2.checkConnection(endpoint));
+
+        client2.shutdown();
+
+        assertFalse(client1.checkConnection(endpoint));
+        assertFalse(client2.checkConnection(endpoint));
+    }
+
+    @Test
+    public void testDisconnect2() {
+        RpcClient client1 = new LocalRpcClient();
+        RpcClient client2 = new LocalRpcClient();
+
+        assertTrue(client1.checkConnection(endpoint, true));
+        assertTrue(client2.checkConnection(endpoint, true));
+
+        server.shutdown();
+
+        assertFalse(client1.checkConnection(endpoint));
+        assertFalse(client2.checkConnection(endpoint));
+    }
+
+    @Test
+    public void testRecordedSync() throws RemotingException, InterruptedException {
+        AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+        RpcClient client1 = new LocalRpcClient();
+        client1.registerConnectEventListener(new ConnectionOpenedEventListener() {
+            @Override public void onOpened(String remoteAddress, Connection conn) {
+                LocalConnection locConn = (LocalConnection) conn;
+                connRef.set(locConn);
+                locConn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+            }
+        });
+
+        assertTrue(client1.checkConnection(endpoint, true));
+
+        client1.invokeSync(endpoint, new Request1(), 500);
+        client1.invokeSync(endpoint, new Request2(), 500);
+
+        Queue<Object[]> recorded = connRef.get().recordedMessages();
+
+        assertEquals(2, recorded.size());
+        assertTrue(recorded.poll()[1] instanceof Request1);
+        assertTrue(recorded.poll()[1] instanceof Response1);
+    }
+
+    @Test
+    public void testRecordedSyncTimeout() throws RemotingException, InterruptedException {
+        AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+        RpcClient client1 = new LocalRpcClient();
+
+        client1.registerConnectEventListener(new ConnectionOpenedEventListener() {
+            @Override public void onOpened(String remoteAddress, Connection conn) {
+                LocalConnection locConn = (LocalConnection) conn;
+                connRef.set(locConn);
+                locConn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+            }
+        });
+
+        assertTrue(client1.checkConnection(endpoint, true));
+
+        try {
+            Request1 request = new Request1();
+            request.val = 10_000;
+            client1.invokeSync(endpoint, request, 500);
+
+            fail();
+        } catch (InvokeTimeoutException e) {
+            // Expected.
+        }
+
+        Queue<Object[]> recorded = connRef.get().recordedMessages();
+
+        assertEquals(1, recorded.size());
+        assertTrue(recorded.poll()[1] instanceof Request1);
+    }
+
+    @Test
+    public void testRecordedAsync() throws RemotingException, InterruptedException {
+        AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+        RpcClient client1 = new LocalRpcClient();
+
+        client1.registerConnectEventListener(new ConnectionOpenedEventListener() {
+            @Override public void onOpened(String remoteAddress, Connection conn) {
+                LocalConnection locConn = (LocalConnection) conn;
+                connRef.set(locConn);
+                locConn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+            }
+        });
+
+        assertTrue(client1.checkConnection(endpoint, true));
+
+        CountDownLatch l = new CountDownLatch(2);
+
+        client1.invokeAsync(endpoint, new Request1(), null, (result, err) -> l.countDown(), 500);
+        client1.invokeAsync(endpoint, new Request2(), null, (result, err) -> l.countDown(), 500);
+
+        l.await();
+
+        Queue<Object[]> recorded = connRef.get().recordedMessages();
+
+        assertEquals(2, recorded.size());
+        assertTrue(recorded.poll()[1] instanceof Request1);
+        assertTrue(recorded.poll()[1] instanceof Response1);
+    }
+
+    @Test
+    public void testRecordedAsyncTimeout() throws RemotingException, InterruptedException {
+        AtomicReference<LocalConnection> connRef = new AtomicReference<>();
+
+        RpcClient client1 = new LocalRpcClient();
+
+        client1.registerConnectEventListener(new ConnectionOpenedEventListener() {
+            @Override public void onOpened(String remoteAddress, Connection conn) {
+                LocalConnection locConn = (LocalConnection) conn;
+                connRef.set(locConn);
+                locConn.recordMessages(msg -> msg instanceof Request1 || msg instanceof Response1);
+            }
+        });
+
+        assertTrue(client1.checkConnection(endpoint, true));
+
+        Request1 request = new Request1();
+        request.val = 10_000;
+        CountDownLatch l = new CountDownLatch(1);
+
+        AtomicReference<Throwable> holder = new AtomicReference<>();
+
+        client1.invokeAsync(endpoint, request, null, new InvokeCallback() {
+            @Override public void complete(Object result, Throwable err) {
+                holder.set(err);
+
+                l.countDown();
+            }
+        }, 500);
+
+        l.await();
+
+        assertTrue(holder.get() instanceof InvokeTimeoutException);
+
+        Queue<Object[]> recorded = connRef.get().recordedMessages();
+
+        assertEquals(1, recorded.size());
+        assertTrue(recorded.poll()[1] instanceof Request1);
+    }
+
+    @Test
+    public void testBlockedSync() throws RemotingException, InterruptedException {
+//        RpcClient client1 = new LocalRpcClient();
+//
+//        assertTrue(client1.checkConnection(endpoint, true));
+//
+//        LocalConnection conn = LocalRpcServer.servers.get(endpoint).conns.get(client1);
+//
+//        assertNotNull(conn);
+//
+//        conn.recordMessages(msg -> msg instanceof Request1);
+//
+//        Response2 resp2 = (Response2) client1.invokeSync(endpoint, new Request2(), 500);
+//
+//        assertEquals(1, resp2.val);
+//
+//        Future<Response1> resp = Utils.runInThread(() -> (Response1) client1.invokeSync(endpoint, new Request1(), 30_000));
+//
+//        Thread.sleep(3_000);
+//
+//        assertFalse(resp.isDone());
+    }
+
+    private static class Request1RpcProcessor implements RpcProcessor<Request1> {
+        @Override public void handleRequest(RpcContext rpcCtx, Request1 request) {
+            if (request.val == 10_000)
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException ignored) {
+                    // No-op.
+                }
+
+            Response1 resp1 = new Response1();
+            resp1.val = request.val + 1;
+            rpcCtx.sendResponse(resp1);
+        }
+
+        @Override public String interest() {
+            return Request1.class.getName();
+        }
+    }
+
+    private static class Request2RpcProcessor implements RpcProcessor<Request2> {
+        @Override public void handleRequest(RpcContext rpcCtx, Request2 request) {
+            Response2 resp2 = new Response2();
+            resp2.val = request.val + 1;
+            rpcCtx.sendResponse(resp2);
+        }
+
+        @Override public String interest() {
+            return Request2.class.getName();
+        }
+    }
+
+    private static class Request1 implements Message {
+        int val;
+    }
+
+    private static class Request2 implements Message {
+        int val;
+    }
+
+    private static class Response1 implements Message {
+        int val;
+    }
+
+    private static class Response2 implements Message {
+        int val;
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
index db63614..6243a1c 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
@@ -41,7 +41,7 @@ public class LocalConnection implements Connection {
 
     private void send(Message request, Future fut) {
         Object[] tuple = {client, request, fut};
-        assert srv.incoming.offer(tuple); // Should never fail because server uses unbounded queue.
+        srv.incoming.offer(tuple);
     }
 
     public void onBeforeRequestSend(Message request, Future fut) {
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
index db9648a..486971e 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
@@ -191,13 +191,13 @@ public class LocalRpcServer implements RpcServer {
 
         defaultExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("LocalRPCServer-Default-Executor-Thread: " + local.toString()));
 
+        started = true;
+
         worker.setName("LocalRPCServer-Dispatch-Thread: "  + local.toString());
         worker.start();
 
         servers.put(local, this);
 
-        started = true;
-
         return true;
     }