You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/12/31 08:40:42 UTC

[ignite-3] 03/03: IGNITE-13885 partially working.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit a529ad1eaebdbd0d999b3102d20f10d931afc8af
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Thu Dec 31 11:40:20 2020 +0300

    IGNITE-13885 partially working.
---
 .../sofa/jraft/entity/LocalStorageOutter.java      |   3 +-
 .../com/alipay/sofa/jraft/entity/RaftOutter.java   |   3 +-
 .../com/alipay/sofa/jraft/rpc/CliRequests.java     |   2 +-
 .../java/com/alipay/sofa/jraft/rpc/Connection.java |   1 -
 .../alipay/sofa/jraft/rpc/HasErrorResponse.java    |   2 +-
 .../sofa/jraft/rpc/MessageBuilderFactory.java      |  28 +++-
 .../com/alipay/sofa/jraft/rpc/RaftRpcFactory.java  |   9 --
 .../com/alipay/sofa/jraft/rpc/RpcRequests.java     |  26 ++--
 .../sofa/jraft/rpc/impl/AbstractClientService.java |  11 +-
 .../sofa/jraft/rpc/impl/LocalConnection.java       |  35 +++++
 .../sofa/jraft/rpc/impl/LocalRaftRpcFactory.java   |  35 ++++-
 .../alipay/sofa/jraft/rpc/impl/LocalRpcClient.java |  72 ++++++++-
 .../alipay/sofa/jraft/rpc/impl/LocalRpcServer.java | 165 ++++++++++++++++++++-
 .../rpc/message/AppendEntriesRequestImpl.java      | 130 ++++++++++++++++
 .../rpc/message/AppendEntriesResponseImpl.java     |  47 ++++++
 .../rpc/message/DefaultMessageBuilderFactory.java  |  53 ++++++-
 .../sofa/jraft/rpc/message/EntryMetaImpl.java      | 133 +++++++++++++++++
 .../sofa/jraft/rpc/message/ErrorResponseImpl.java  |  32 ++++
 .../sofa/jraft/rpc/message/PingRequestImpl.java    |  21 +++
 .../sofa/jraft/rpc/message/PreVoteRequestImpl.java |  87 +++++++++++
 .../jraft/rpc/message/ReadIndexRequestImpl.java    |  76 ++++++++++
 .../jraft/rpc/message/ReadIndexResponseImpl.java   |  36 +++++
 .../jraft/rpc/message/RequestVoteResponseImpl.java |  36 +++++
 .../alipay/sofa/jraft/rpc/message/StableMeta.java  |  32 ++++
 .../jraft/rpc/message/TimeoutNowRequestImpl.java   |  54 +++++++
 .../jraft/rpc/message/TimeoutNowResponseImpl.java  |  36 +++++
 .../com/alipay/sofa/jraft/util/JDKMarshaller.java  |  40 +++--
 .../com/alipay/sofa/jraft/util/Marshaller.java     |   4 +-
 .../java/com/alipay/sofa/jraft/util/Utils.java     |   2 +-
 .../com.alipay.sofa.jraft.rpc.RaftRpcFactory       |   2 +-
 .../com/alipay/sofa/jraft/rpc/LocalRpcTest.java    | 154 +++++++++++++++++++
 .../jraft/storage/impl/LocalLogStorageTest.java    |   5 -
 .../sofa/jraft/storage/io/LocalFileReaderTest.java |   1 -
 modules/raft/src/test/resources/log4j2.xml         |  25 ++++
 34 files changed, 1317 insertions(+), 81 deletions(-)

diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java
index 81e54b4..d483818 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java
@@ -20,6 +20,7 @@
 package com.alipay.sofa.jraft.entity;
 
 import com.alipay.sofa.jraft.rpc.Message;
+import com.alipay.sofa.jraft.rpc.MessageBuilderFactory;
 import com.alipay.sofa.jraft.storage.RaftMetaStorage;
 import com.alipay.sofa.jraft.util.DisruptorBuilder;
 import java.nio.ByteBuffer;
@@ -45,7 +46,7 @@ public final class LocalStorageOutter {
 
     public interface StablePBMeta extends Message {
         static Builder newBuilder() {
-            return null;
+            return MessageBuilderFactory.DEFAULT.createStableMeta();
         }
 
         long getTerm();
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java
index 83d40ea..b8b78a3 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java
@@ -19,12 +19,13 @@
 
 package com.alipay.sofa.jraft.entity;
 
+import com.alipay.sofa.jraft.rpc.MessageBuilderFactory;
 import com.alipay.sofa.jraft.rpc.RpcRequests;
 
 public final class RaftOutter {
     public interface EntryMeta {
         static Builder newBuilder() {
-            return null;
+            return MessageBuilderFactory.DEFAULT.createEntryMeta();
         }
 
         long getTerm();
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java
index 4c8776d..38732c0 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java
@@ -38,7 +38,7 @@ public final class CliRequests {
         }
 
         public static Builder newBuilder() {
-            return MessageBuilderFactory.DEFAULT.createAddPeer();
+            return MessageBuilderFactory.DEFAULT.createAddPeerRequest();
         }
     }
 
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/Connection.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/Connection.java
index 9bae02a..256bf6f 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/Connection.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/Connection.java
@@ -22,7 +22,6 @@ package com.alipay.sofa.jraft.rpc;
  * @author jiachun.fjc
  */
 public interface Connection {
-
     /**
      * Get the attribute that bound to the connection.
      *
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/HasErrorResponse.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/HasErrorResponse.java
index 917b05f..59c8d5c 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/HasErrorResponse.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/HasErrorResponse.java
@@ -1,5 +1,5 @@
 package com.alipay.sofa.jraft.rpc;
 
 public interface HasErrorResponse extends Message {
-    RpcRequests.ErrorResponse getErrorResponse();
+    RpcRequests.ErrorResponse getErrorResponse(); // TODO asch can be removed.
 }
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java
index 49aed96..a286c6c 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java
@@ -1,13 +1,39 @@
 package com.alipay.sofa.jraft.rpc;
 
 import com.alipay.sofa.jraft.entity.LocalFileMetaOutter;
+import com.alipay.sofa.jraft.entity.LocalStorageOutter;
+import com.alipay.sofa.jraft.entity.RaftOutter;
 import com.alipay.sofa.jraft.rpc.message.DefaultMessageBuilderFactory;
 
 // TODO asch use JRaftServiceLoader ?
 public interface MessageBuilderFactory {
     public static MessageBuilderFactory DEFAULT = new DefaultMessageBuilderFactory();
 
-    CliRequests.AddPeerRequest.Builder createAddPeer();
+    CliRequests.AddPeerRequest.Builder createAddPeerRequest();
 
     LocalFileMetaOutter.LocalFileMeta.Builder createLocalFileMeta();
+
+    RpcRequests.PingRequest.Builder createPingRequest();
+
+    RpcRequests.RequestVoteRequest.Builder createVoteRequest();
+
+    RpcRequests.RequestVoteResponse.Builder createVoteResponse();
+
+    RpcRequests.ErrorResponse.Builder createErrorResponse();
+
+    LocalStorageOutter.StablePBMeta.Builder createStableMeta();
+
+    RpcRequests.AppendEntriesRequest.Builder createAppendEntriesRequest();
+
+    RpcRequests.AppendEntriesResponse.Builder createAppendEntriesResponse();
+
+    RaftOutter.EntryMeta.Builder createEntryMeta();
+
+    RpcRequests.TimeoutNowRequest.Builder createTimeoutNowRequest();
+
+    RpcRequests.TimeoutNowResponse.Builder createTimeoutNowResponse();
+
+    RpcRequests.ReadIndexRequest.Builder createReadIndexRequest();
+
+    RpcRequests.ReadIndexResponse.Builder createReadIndexResponse();
 }
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcFactory.java
index acc19c5..2989f25 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcFactory.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcFactory.java
@@ -29,15 +29,6 @@ public interface RaftRpcFactory {
     RpcResponseFactory DEFAULT = new RpcResponseFactory() {};
 
     /**
-     * Register serializer with class name.
-     *
-     * @param className class name
-     * @param args      extended parameters, different implementers may need different parameters,
-     *                  the order of parameters need a convention
-     */
-    void registerProtobufSerializer(final String className, final Object... args);
-
-    /**
      * Creates a raft RPC client.
      *
      * @return a new rpc client instance
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java
index 74568b7..eb6ac4f 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java
@@ -19,14 +19,8 @@
 
 package com.alipay.sofa.jraft.rpc;
 
-import com.alipay.sofa.jraft.RaftGroupService;
-import com.alipay.sofa.jraft.entity.LeaderChangeContext;
 import com.alipay.sofa.jraft.entity.RaftOutter;
-import com.alipay.sofa.jraft.option.BootstrapOptions;
-import com.alipay.sofa.jraft.option.ReplicatorOptions;
 import com.alipay.sofa.jraft.util.ByteString;
-import com.alipay.sofa.jraft.util.DisruptorBuilder;
-import java.io.ByteArrayOutputStream;
 
 public final class RpcRequests {
     private RpcRequests() {
@@ -45,7 +39,7 @@ public final class RpcRequests {
         }
 
         public static Builder newBuilder() {
-            return null;
+            return MessageBuilderFactory.DEFAULT.createPingRequest();
         }
     }
 
@@ -73,7 +67,7 @@ public final class RpcRequests {
         }
 
         public static Builder newBuilder() {
-            return null;
+            return MessageBuilderFactory.DEFAULT.createErrorResponse();
         }
     }
 
@@ -140,7 +134,7 @@ public final class RpcRequests {
 
     public interface TimeoutNowRequest extends Message {
         static Builder newBuilder() {
-            return null;
+            return MessageBuilderFactory.DEFAULT.createTimeoutNowRequest();
         }
 
         java.lang.String getGroupId();
@@ -166,7 +160,7 @@ public final class RpcRequests {
 
     public interface TimeoutNowResponse extends HasErrorResponse {
         static Builder newBuilder() {
-            return null;
+            return MessageBuilderFactory.DEFAULT.createTimeoutNowResponse();
         }
 
         static Message getDefaultInstance() {
@@ -231,7 +225,7 @@ public final class RpcRequests {
         }
 
         static Builder newBuilder() {
-            return null;
+            return MessageBuilderFactory.DEFAULT.createVoteRequest();
         }
     }
 
@@ -241,7 +235,7 @@ public final class RpcRequests {
         }
 
         static Builder newBuilder() {
-            return null;
+            return MessageBuilderFactory.DEFAULT.createVoteResponse();
         }
 
         /**
@@ -292,7 +286,7 @@ public final class RpcRequests {
 
     public interface AppendEntriesRequest extends Message {
         static Builder newBuilder() {
-            return null;
+            return MessageBuilderFactory.DEFAULT.createAppendEntriesRequest();
         }
 
         /**
@@ -355,7 +349,7 @@ public final class RpcRequests {
         }
 
         static Builder newBuilder() {
-            return null;
+            return MessageBuilderFactory.DEFAULT.createAppendEntriesResponse();
         }
 
         long getTerm();
@@ -445,7 +439,7 @@ public final class RpcRequests {
 
     public interface ReadIndexRequest extends Message {
         static Builder newBuilder() {
-            return null;
+            return MessageBuilderFactory.DEFAULT.createReadIndexRequest();
         }
 
         java.lang.String getGroupId();
@@ -477,7 +471,7 @@ public final class RpcRequests {
 
     public interface ReadIndexResponse extends HasErrorResponse {
         static Builder newBuilder() {
-            return null;
+            return MessageBuilderFactory.DEFAULT.createReadIndexResponse();
         }
 
         static Message getDefaultInstance() {
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java
index 80e7d1e..5701370 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java
@@ -219,10 +219,15 @@ public abstract class AbstractClientService implements ClientService {
                         if (result instanceof ErrorResponse) {
                             status = handleErrorResponse((ErrorResponse) result);
                             msg = (Message) result;
-                        } else if (result instanceof HasErrorResponse) {
+                        } else if (result instanceof HasErrorResponse) { // TODO asch we don't need this.
                             final ErrorResponse eResp = ((HasErrorResponse) result).getErrorResponse();
-                            status = handleErrorResponse(eResp);
-                            msg = (Message) eResp;
+                            if (eResp != null) {
+                                status = handleErrorResponse(eResp);
+                                msg = eResp;
+                            }
+                            else {
+                                msg = (T) result;
+                            }
                         } else {
                             msg = (T) result;
                         }
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
new file mode 100644
index 0000000..0607196
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalConnection.java
@@ -0,0 +1,35 @@
+package com.alipay.sofa.jraft.rpc.impl;
+
+import com.alipay.sofa.jraft.rpc.Connection;
+import com.alipay.sofa.jraft.util.Endpoint;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class LocalConnection implements Connection {
+    private Map<String, Object> attrs = new ConcurrentHashMap<>();
+
+    final LocalRpcClient client;
+    final Endpoint srv;
+
+    public LocalConnection(LocalRpcClient client, Endpoint srv) {
+        this.client = client;
+        this.srv = srv;
+    }
+
+    @Override public Object getAttribute(String key) {
+        return attrs.get(key);
+    }
+
+    @Override public void setAttribute(String key, Object value) {
+        attrs.put(key, value);
+    }
+
+    @Override public Object setAttributeIfAbsent(String key, Object value) {
+        return attrs.putIfAbsent(key, value);
+    }
+
+    @Override public void close() {
+        LocalRpcServer.closeConnection(client, srv);
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java
index acaa136..ac53089 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java
@@ -16,6 +16,7 @@
  */
 package com.alipay.sofa.jraft.rpc.impl;
 
+import com.alipay.sofa.jraft.option.RpcOptions;
 import com.alipay.sofa.jraft.rpc.RaftRpcFactory;
 import com.alipay.sofa.jraft.rpc.RpcClient;
 import com.alipay.sofa.jraft.rpc.RpcServer;
@@ -31,16 +32,40 @@ import org.slf4j.LoggerFactory;
 @SPI
 public class LocalRaftRpcFactory implements RaftRpcFactory {
     private static final Logger LOG                               = LoggerFactory.getLogger(LocalRaftRpcFactory.class);
+    @Override public RpcClient createRpcClient(ConfigHelper<RpcClient> helper) {
+        LocalRpcClient rpcClient = new LocalRpcClient();
 
-    @Override public void registerProtobufSerializer(String className, Object... args) {
+        if (helper != null)
+            helper.config(rpcClient);
 
+        return rpcClient;
     }
 
-    @Override public RpcClient createRpcClient(ConfigHelper<RpcClient> helper) {
-        return null;
+    @Override public RpcServer createRpcServer(Endpoint endpoint, ConfigHelper<RpcServer> helper) {
+        LocalRpcServer srv = new LocalRpcServer(endpoint);
+
+        if (helper != null)
+            helper.config(srv);
+
+        return srv;
     }
 
-    @Override public RpcServer createRpcServer(Endpoint endpoint, ConfigHelper<RpcServer> helper) {
-        return null;
+    @Override public ConfigHelper<RpcServer> defaultJRaftServerConfigHelper(RpcOptions opts) {
+        return new ConfigHelper<RpcServer>() {
+            @Override public void config(RpcServer instance) {
+                LocalRpcServer srv = (LocalRpcServer) instance;
+                // TODO asch.
+            }
+        };
+    }
+
+    @Override
+    public ConfigHelper<RpcClient> defaultJRaftClientConfigHelper(final RpcOptions opts) {
+        return new ConfigHelper<RpcClient>() {
+            @Override public void config(RpcClient instance) {
+                LocalRpcClient rpcClient = (LocalRpcClient) instance;
+                // TODO asch.
+            }
+        };
     }
 }
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java
index a6c063b..cbff3d6 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java
@@ -17,41 +17,95 @@
 package com.alipay.sofa.jraft.rpc.impl;
 
 import com.alipay.sofa.jraft.ReplicatorGroup;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.error.InvokeTimeoutException;
 import com.alipay.sofa.jraft.error.RemotingException;
 import com.alipay.sofa.jraft.option.RpcOptions;
+import com.alipay.sofa.jraft.rpc.Connection;
 import com.alipay.sofa.jraft.rpc.InvokeCallback;
 import com.alipay.sofa.jraft.rpc.InvokeContext;
 import com.alipay.sofa.jraft.rpc.RpcClient;
 import com.alipay.sofa.jraft.util.Endpoint;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
 
 /**
- * Bolt rpc client impl.
+ * Local rpc client impl.
  *
- * @author jiachun.fjc
+ * @author ascherbakov.
  */
 public class LocalRpcClient implements RpcClient {
+    private volatile ReplicatorGroup replicatorGroup = null;
+
     @Override public boolean checkConnection(Endpoint endpoint) {
-        return false;
+        return LocalRpcServer.connect(this, endpoint, false, null);
     }
 
     @Override public boolean checkConnection(Endpoint endpoint, boolean createIfAbsent) {
-        return false;
+        return LocalRpcServer.connect(this, endpoint, createIfAbsent, this::onCreated);
     }
 
     @Override public void closeConnection(Endpoint endpoint) {
-
+        LocalRpcServer.closeConnection(this, endpoint);
     }
 
     @Override public void registerConnectEventListener(ReplicatorGroup replicatorGroup) {
+        this.replicatorGroup = replicatorGroup;
+    }
 
+    private void onCreated(LocalConnection conn) {
+        if (replicatorGroup != null) {
+            final PeerId peer = new PeerId();
+            if (peer.parse(conn.srv.toString())) {
+                replicatorGroup.checkReplicator(peer, true);
+            }
+            else
+                System.out.println("Fail to parse peer: {}" + peer); // TODO asch
+        }
     }
 
     @Override public Object invokeSync(Endpoint endpoint, Object request, InvokeContext ctx, long timeoutMs) throws InterruptedException, RemotingException {
-        return null;
+        if (!checkConnection(endpoint, true))
+            throw new RemotingException("Server is dead " + endpoint);
+
+        LocalRpcServer srv = LocalRpcServer.servers.get(endpoint);
+        if (srv == null)
+            throw new RemotingException("Server is dead " + endpoint);
+
+        CompletableFuture fut = new CompletableFuture();
+
+        Object[] tuple = {this, request, fut};
+        assert srv.incoming.offer(tuple); // Should never fail because server uses unbounded queue.
+
+        try {
+            return fut.get(timeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw new RemotingException(e);
+        } catch (TimeoutException e) {
+            throw new InvokeTimeoutException(e);
+        }
     }
 
     @Override public void invokeAsync(Endpoint endpoint, Object request, InvokeContext ctx, InvokeCallback callback, long timeoutMs) throws InterruptedException, RemotingException {
+        if (!checkConnection(endpoint, true))
+            throw new RemotingException("Server is dead " + endpoint);
+
+        LocalRpcServer srv = LocalRpcServer.servers.get(endpoint);
+        if (srv == null)
+            throw new RemotingException("Server is dead " + endpoint);
+
+        CompletableFuture fut = new CompletableFuture();
+
+        Object[] tuple = {this, request, fut};
+        assert srv.incoming.offer(tuple);
 
+        fut.whenComplete((BiConsumer<Object, Throwable>) (res, err) -> {
+            callback.complete(res, err);
+        }).orTimeout(timeoutMs, TimeUnit.MILLISECONDS);
     }
 
     @Override public boolean init(RpcOptions opts) {
@@ -59,6 +113,10 @@ public class LocalRpcClient implements RpcClient {
     }
 
     @Override public void shutdown() {
-
+        // Close all connection from this peer.
+        for (LocalRpcServer value : LocalRpcServer.servers.values())
+            LocalRpcServer.closeConnection(this, value.local);
     }
+
+
 }
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
index 4423a42..71ae19b 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
@@ -16,32 +16,181 @@
  */
 package com.alipay.sofa.jraft.rpc.impl;
 
+import com.alipay.sofa.jraft.rpc.Connection;
+import com.alipay.sofa.jraft.rpc.Message;
+import com.alipay.sofa.jraft.rpc.RpcContext;
 import com.alipay.sofa.jraft.rpc.RpcProcessor;
 import com.alipay.sofa.jraft.rpc.RpcServer;
+import com.alipay.sofa.jraft.util.Endpoint;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.function.Consumer;
 
 /**
- * Bolt RPC server impl.
+ * Local RPC server impl.
  *
- * @author jiachun.fjc
+ * @author ascherbakov.
  */
 public class LocalRpcServer implements RpcServer {
-    @Override public void registerConnectionClosedEventListener(ConnectionClosedEventListener listener) {
+    /** Running servers. */
+    public static ConcurrentMap<Endpoint, LocalRpcServer> servers = new ConcurrentHashMap<>();
+
+    Endpoint local;
+
+    /** Remote connections to this server. */
+    public ConcurrentMap<LocalRpcClient, LocalConnection> conns = new ConcurrentHashMap<>();
+
+    private Map<String, RpcProcessor> processors = new ConcurrentHashMap<>();
+
+    private volatile boolean started = false;
+
+    private Thread worker;
 
+    private List<ConnectionClosedEventListener> listeners = new CopyOnWriteArrayList<>();
+
+    BlockingQueue<Object[]> incoming = new LinkedBlockingDeque<>(); // TODO asch use some kind of MPSC queue.
+
+    public LocalRpcServer(Endpoint local) {
+        this.local = local;
     }
 
-    @Override public void registerProcessor(RpcProcessor<?> processor) {
+    static synchronized boolean connect(LocalRpcClient client, Endpoint srv, boolean createIfAbsent, Consumer<LocalConnection> onCreated) {
+        LocalRpcServer locSrv = servers.get(srv);
+
+        if (locSrv == null)
+            return false; // Server is dead.
+
+        LocalConnection conn = locSrv.conns.get(client);
+
+        if (conn == null) {
+            if (!createIfAbsent)
+                return false;
+
+            conn = new LocalConnection(client, srv);
+
+            locSrv.conns.put(client, conn);
+
+            onCreated.accept(conn);
+        }
+
+        return true;
+    }
+
+    static synchronized void closeConnection(LocalRpcClient client, Endpoint srv) {
+        LocalRpcServer locSrv = servers.get(srv);
+
+        if (locSrv == null)
+            return;
+
+        LocalConnection conn = locSrv.conns.remove(client);
+
+        if (conn == null)
+            return;
+
+        locSrv.listeners.forEach(l -> l.onClosed(client.toString(), conn));
+    }
+
+    @Override public void registerConnectionClosedEventListener(ConnectionClosedEventListener listener) {
+        if (!listeners.contains(listener))
+            listeners.add(listener);
+    }
 
+    @Override public void registerProcessor(RpcProcessor<?> processor) {
+        processors.put(processor.interest(), processor);
     }
 
     @Override public int boundPort() {
-        return 0;
+        return local.getPort();
     }
 
-    @Override public boolean init(Void opts) {
-        return false;
+    @Override public synchronized boolean init(Void opts) {
+        if (started)
+            return false;
+
+        worker = new Thread(new Runnable() {
+            @Override public void run() {
+                while(started) {
+                    try {
+                        Object[] tuple = incoming.take();
+                        LocalRpcClient sender = (LocalRpcClient) tuple[0];
+
+                        // Connection is not established, ignore message.
+                        LocalConnection conn = conns.get(sender);
+                        if (conn == null)
+                            continue;
+
+                        Message msg = (Message) tuple[1];
+                        CompletableFuture<Object> fut = (CompletableFuture) tuple[2];
+
+                        Class<? extends Message> cls = msg.getClass();
+                        RpcProcessor prc = processors.get(cls.getName());
+
+                        // TODO asch cache it.
+                        if (prc == null) {
+                            for (Class<?> iface : cls.getInterfaces()) {
+                                prc = processors.get(iface.getName());
+
+                                if (prc != null)
+                                    break;
+                            }
+                        }
+
+                        if (prc == null)
+                            System.out.println();
+
+                        prc.handleRequest(new RpcContext() {
+                            @Override public void sendResponse(Object responseObj) {
+                                fut.complete(responseObj);
+                            }
+
+                            @Override public Connection getConnection() {
+                                return conn;
+                            }
+
+                            @Override public String getRemoteAddress() {
+                                return sender.toString();
+                            }
+                        }, msg);
+
+                    } catch (InterruptedException e) {
+                        return;
+                    }
+                }
+            }
+        });
+
+        worker.setName("LocalRPCServer-Thread: "  + local.toString());
+        worker.start();
+
+        servers.put(local, this);
+
+        started = true;
+
+        return true;
     }
 
-    @Override public void shutdown() {
+    @Override public synchronized void shutdown() {
+        if (!started)
+            return;
+
+        started = false;
+        worker.interrupt();
+        try {
+            worker.join();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted while waiting for RPC server to stop " + local);
+        }
+
+        // Close all connections to this server.
+        for (LocalRpcClient client : conns.keySet())
+            closeConnection(client, local);
 
+        servers.remove(local);
     }
 }
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesRequestImpl.java
new file mode 100644
index 0000000..416cc30
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesRequestImpl.java
@@ -0,0 +1,130 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.entity.RaftOutter;
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+import com.alipay.sofa.jraft.util.ByteString;
+import com.alipay.sofa.jraft.util.Marshaller;
+import java.util.ArrayList;
+import java.util.List;
+
+class AppendEntriesRequestImpl implements RpcRequests.AppendEntriesRequest, RpcRequests.AppendEntriesRequest.Builder {
+    private String groupId;
+    private String serverId;
+    private String peerId;
+    private long term;
+    private long prevLogTerm;
+    private long prevLogIndex;
+    private List<RaftOutter.EntryMeta> entiesList = new ArrayList<>();
+    private long committedIndex;
+    private ByteString data = ByteString.EMPTY;
+
+    @Override public String getGroupId() {
+        return groupId;
+    }
+
+    @Override public String getServerId() {
+        return serverId;
+    }
+
+    @Override public String getPeerId() {
+        return peerId;
+    }
+
+    @Override public long getTerm() {
+        return term;
+    }
+
+    @Override public long getPrevLogTerm() {
+        return prevLogTerm;
+    }
+
+    @Override public long getPrevLogIndex() {
+        return prevLogIndex;
+    }
+
+    @Override public List<RaftOutter.EntryMeta> getEntriesList() {
+        return entiesList;
+    }
+
+    @Override public RaftOutter.EntryMeta getEntries(int index) {
+        return entiesList.get(index);
+    }
+
+    @Override public int getEntriesCount() {
+        return entiesList.size();
+    }
+
+    @Override public long getCommittedIndex() {
+        return committedIndex;
+    }
+
+    @Override public ByteString getData() {
+        return data;
+    }
+
+    @Override public boolean hasData() {
+        return data != ByteString.EMPTY;
+    }
+
+    @Override public byte[] toByteArray() {
+        return Marshaller.DEFAULT.marshall(this);
+    }
+
+    @Override public RpcRequests.AppendEntriesRequest build() {
+        return this;
+    }
+
+    @Override public Builder setData(ByteString data) {
+        this.data = data;
+
+        return this;
+    }
+
+    @Override public Builder setTerm(long term) {
+        this.term = term;
+
+        return this;
+    }
+
+    @Override public Builder setGroupId(String groupId) {
+        this.groupId = groupId;
+
+        return this;
+    }
+
+    @Override public Builder setServerId(String serverId) {
+        this.serverId = serverId;
+
+        return this;
+    }
+
+    @Override public Builder setPeerId(String peerId) {
+        this.peerId = peerId;
+
+        return this;
+    }
+
+    @Override public Builder setPrevLogIndex(long prevLogIndex) {
+        this.prevLogIndex = prevLogIndex;
+
+        return this;
+    }
+
+    @Override public Builder setPrevLogTerm(long prevLogTerm) {
+        this.prevLogTerm = prevLogTerm;
+
+        return this;
+    }
+
+    @Override public Builder setCommittedIndex(long lastCommittedIndex) {
+        this.committedIndex = lastCommittedIndex;
+
+        return this;
+    }
+
+    @Override public Builder addEntries(RaftOutter.EntryMeta entryMeta) {
+        entiesList.add(entryMeta);
+
+        return this;
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesResponseImpl.java
new file mode 100644
index 0000000..9187ab6
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/AppendEntriesResponseImpl.java
@@ -0,0 +1,47 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+public class AppendEntriesResponseImpl implements RpcRequests.AppendEntriesResponse, RpcRequests.AppendEntriesResponse.Builder {
+    private long term;
+    private boolean success;
+    private long lastLogIndex;
+
+    @Override public long getTerm() {
+        return term;
+    }
+
+    @Override public boolean getSuccess() {
+        return success;
+    }
+
+    @Override public long getLastLogIndex() {
+        return lastLogIndex;
+    }
+
+    @Override public RpcRequests.ErrorResponse getErrorResponse() {
+        return null;
+    }
+
+    @Override public RpcRequests.AppendEntriesResponse build() {
+        return this;
+    }
+
+    @Override public Builder setSuccess(boolean success) {
+        this.success = success;
+
+        return this;
+    }
+
+    @Override public Builder setTerm(long currTerm) {
+        this.term = currTerm;
+
+        return this;
+    }
+
+    @Override public Builder setLastLogIndex(long lastLogIndex) {
+        this.lastLogIndex = lastLogIndex;
+
+        return this;
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java
index 14a3fb4..9b8b54d 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java
@@ -1,15 +1,66 @@
 package com.alipay.sofa.jraft.rpc.message;
 
 import com.alipay.sofa.jraft.entity.LocalFileMetaOutter;
+import com.alipay.sofa.jraft.entity.LocalStorageOutter;
+import com.alipay.sofa.jraft.entity.RaftOutter;
 import com.alipay.sofa.jraft.rpc.CliRequests;
 import com.alipay.sofa.jraft.rpc.MessageBuilderFactory;
+import com.alipay.sofa.jraft.rpc.RpcRequests;
 
 public class DefaultMessageBuilderFactory implements MessageBuilderFactory {
-    @Override public CliRequests.AddPeerRequest.Builder createAddPeer() {
+    @Override public CliRequests.AddPeerRequest.Builder createAddPeerRequest() {
         return new AddPeerRequestImpl();
     }
 
     @Override public LocalFileMetaOutter.LocalFileMeta.Builder createLocalFileMeta() {
         return new LocalFileMetaImpl();
     }
+
+    @Override public RpcRequests.PingRequest.Builder createPingRequest() {
+        return new PingRequestImpl();
+    }
+
+    @Override public RpcRequests.RequestVoteRequest.Builder createVoteRequest() {
+        return new PreVoteRequestImpl();
+    }
+
+    @Override public RpcRequests.RequestVoteResponse.Builder createVoteResponse() {
+        return new RequestVoteResponseImpl();
+    }
+
+    @Override public RpcRequests.ErrorResponse.Builder createErrorResponse() {
+        return new ErrorResponseImpl();
+    }
+
+    @Override public LocalStorageOutter.StablePBMeta.Builder createStableMeta() {
+        return new StableMeta();
+    }
+
+    @Override public RpcRequests.AppendEntriesRequest.Builder createAppendEntriesRequest() {
+        return new AppendEntriesRequestImpl();
+    }
+
+    @Override public RpcRequests.AppendEntriesResponse.Builder createAppendEntriesResponse() {
+        return new AppendEntriesResponseImpl();
+    }
+
+    @Override public RaftOutter.EntryMeta.Builder createEntryMeta() {
+        return new EntryMetaImpl();
+    }
+
+    @Override public RpcRequests.TimeoutNowRequest.Builder createTimeoutNowRequest() {
+        return new TimeoutNowRequestImpl();
+    }
+
+    @Override public RpcRequests.TimeoutNowResponse.Builder createTimeoutNowResponse() {
+        return new TimeoutNowResponseImpl();
+    }
+
+    @Override public RpcRequests.ReadIndexRequest.Builder createReadIndexRequest() {
+        return new ReadIndexRequestImpl();
+    }
+
+    @Override public RpcRequests.ReadIndexResponse.Builder createReadIndexResponse() {
+        return new ReadIndexResponseImpl();
+    }
 }
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/EntryMetaImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/EntryMetaImpl.java
new file mode 100644
index 0000000..5356d76
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/EntryMetaImpl.java
@@ -0,0 +1,133 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.entity.EnumOutter;
+import com.alipay.sofa.jraft.entity.RaftOutter;
+import java.util.ArrayList;
+import java.util.List;
+
+class EntryMetaImpl implements RaftOutter.EntryMeta, RaftOutter.EntryMeta.Builder {
+    private long term;
+    private EnumOutter.EntryType type;
+    private List<String> peersList = new ArrayList<>();
+    private long dataLen;
+    private List<String> oldPeersList = new ArrayList<>();
+    private long checksum;
+    private List<String> learnersList = new ArrayList<>();
+    private List<String> oldLearnersList = new ArrayList<>();
+
+    @Override public long getTerm() {
+        return term;
+    }
+
+    @Override public EnumOutter.EntryType getType() {
+        return type;
+    }
+
+    @Override public List<String> getPeersList() {
+        return peersList;
+    }
+
+    @Override public int getPeersCount() {
+        return peersList.size();
+    }
+
+    @Override public String getPeers(int index) {
+        return peersList.get(index);
+    }
+
+    @Override public long getDataLen() {
+        return dataLen;
+    }
+
+    @Override public List<String> getOldPeersList() {
+        return oldPeersList;
+    }
+
+    @Override public int getOldPeersCount() {
+        return oldPeersList.size();
+    }
+
+    @Override public String getOldPeers(int index) {
+        return oldPeersList.get(index);
+    }
+
+    @Override public long getChecksum() {
+        return checksum;
+    }
+
+    @Override public List<String> getLearnersList() {
+        return learnersList;
+    }
+
+    @Override public int getLearnersCount() {
+        return learnersList.size();
+    }
+
+    @Override public String getLearners(int index) {
+        return learnersList.get(index);
+    }
+
+    @Override public List<String> getOldLearnersList() {
+        return oldLearnersList;
+    }
+
+    @Override public int getOldLearnersCount() {
+        return oldLearnersList.size();
+    }
+
+    @Override public String getOldLearners(int index) {
+        return oldLearnersList.get(index);
+    }
+
+    @Override public RaftOutter.EntryMeta build() {
+        return this;
+    }
+
+    @Override public Builder setTerm(long term) {
+        this.term = term;
+
+        return this;
+    }
+
+    @Override public Builder setChecksum(long checksum) {
+        this.checksum = checksum;
+
+        return this;
+    }
+
+    @Override public Builder setType(EnumOutter.EntryType type) {
+        this.type = type;
+
+        return this;
+    }
+
+    @Override public Builder setDataLen(int remaining) {
+        this.dataLen = remaining;
+
+        return this;
+    }
+
+    @Override public Builder addPeers(String peerId) {
+        peersList.add(peerId);
+
+        return this;
+    }
+
+    @Override public Builder addOldPeers(String oldPeerId) {
+        oldPeersList.add(oldPeerId);
+
+        return this;
+    }
+
+    @Override public Builder addLearners(String learnerId) {
+        learnersList.add(learnerId);
+
+        return this;
+    }
+
+    @Override public Builder addOldLearners(String oldLearnerId) {
+        oldLearnersList.add(oldLearnerId);
+
+        return this;
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ErrorResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ErrorResponseImpl.java
new file mode 100644
index 0000000..58447d0
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ErrorResponseImpl.java
@@ -0,0 +1,32 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+public class ErrorResponseImpl implements RpcRequests.ErrorResponse, RpcRequests.ErrorResponse.Builder {
+    private int errorCode;
+    private String errorMsg;
+
+    @Override public int getErrorCode() {
+        return errorCode;
+    }
+
+    @Override public Builder setErrorCode(int errorCode) {
+        this.errorCode = errorCode;
+
+        return this;
+    }
+
+    @Override public String getErrorMsg() {
+        return errorMsg;
+    }
+
+    @Override public Builder setErrorMsg(String errorMsg) {
+        this.errorMsg = errorMsg;
+
+        return this;
+    }
+
+    @Override public RpcRequests.ErrorResponse build() {
+        return this;
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PingRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PingRequestImpl.java
new file mode 100644
index 0000000..480f9a4
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PingRequestImpl.java
@@ -0,0 +1,21 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+class PingRequestImpl implements RpcRequests.PingRequest , RpcRequests.PingRequest .Builder {
+    private long sendTimestamp;
+
+    @Override public long getSendTimestamp() {
+        return sendTimestamp;
+    }
+
+    @Override public Builder setSendTimestamp(long timestamp) {
+        this.sendTimestamp = timestamp;
+
+        return this;
+    }
+
+    @Override public RpcRequests.PingRequest build() {
+        return this;
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PreVoteRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PreVoteRequestImpl.java
new file mode 100644
index 0000000..d5303c3
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/PreVoteRequestImpl.java
@@ -0,0 +1,87 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+class PreVoteRequestImpl implements RpcRequests.RequestVoteRequest, RpcRequests.RequestVoteRequest.Builder {
+    private String groupId;
+    private String serverId;
+    private String peerId;
+    private long term;
+    private long lastLogTerm;
+    private long lastLogIndex;
+    private boolean preVote;
+
+    @Override public String getGroupId() {
+        return groupId;
+    }
+
+    @Override public Builder setGroupId(String groupId) {
+        this.groupId = groupId;
+
+        return this;
+    }
+
+    @Override public String getServerId() {
+        return serverId;
+    }
+
+    @Override public Builder setServerId(String serverId) {
+        this.serverId = serverId;
+
+        return this;
+    }
+
+    @Override public String getPeerId() {
+        return peerId;
+    }
+
+    @Override public Builder setPeerId(String peerId) {
+        this.peerId = peerId;
+
+        return this;
+    }
+
+    @Override public long getTerm() {
+        return term;
+    }
+
+    @Override public Builder setTerm(long term) {
+        this.term = term;
+
+        return this;
+    }
+
+    @Override public long getLastLogTerm() {
+        return lastLogTerm;
+    }
+
+    @Override public Builder setLastLogTerm(long lastLogTerm) {
+        this.lastLogTerm = lastLogTerm;
+
+        return this;
+    }
+
+    @Override public long getLastLogIndex() {
+        return lastLogIndex;
+    }
+
+    @Override public Builder setLastLogIndex(long lastLogIndex) {
+        this.lastLogIndex = lastLogIndex;
+
+        return this;
+    }
+
+    public boolean getPreVote() {
+        return preVote;
+    }
+
+    @Override public Builder setPreVote(boolean preVote) {
+        this.preVote = preVote;
+
+        return this;
+    }
+
+    @Override public RpcRequests.RequestVoteRequest build() {
+        return this;
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexRequestImpl.java
new file mode 100644
index 0000000..dd1868e
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexRequestImpl.java
@@ -0,0 +1,76 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+import com.alipay.sofa.jraft.util.ByteString;
+import java.util.ArrayList;
+import java.util.List;
+
+class ReadIndexRequestImpl implements RpcRequests.ReadIndexRequest, RpcRequests.ReadIndexRequest.Builder {
+    private String groupId;
+    private String serverId;
+    private List<ByteString> entriesList = new ArrayList<>();
+    private String peerId;
+
+    @Override public String getGroupId() {
+        return groupId;
+    }
+
+    @Override public String getServerId() {
+        return serverId;
+    }
+
+    @Override public List<ByteString> getEntriesList() {
+        return entriesList;
+    }
+
+    @Override public int getEntriesCount() {
+        return entriesList.size();
+    }
+
+    @Override public ByteString getEntries(int index) {
+        return entriesList.get(index);
+    }
+
+    @Override public String getPeerId() {
+        return peerId;
+    }
+
+    @Override public RpcRequests.ReadIndexRequest build() {
+        return this;
+    }
+
+    @Override public Builder mergeFrom(RpcRequests.ReadIndexRequest request) {
+        setGroupId(request.getGroupId());
+        setServerId(request.getServerId());
+        setPeerId(request.getPeerId());
+        for (ByteString data : request.getEntriesList()) {
+            addEntries(data);
+        }
+
+        return this;
+    }
+
+    @Override public Builder setPeerId(String peerId) {
+        this.peerId = peerId;
+
+        return this;
+    }
+
+    @Override public Builder setGroupId(String groupId) {
+        this.groupId = groupId;
+
+        return this;
+    }
+
+    @Override public Builder setServerId(String serverId) {
+        this.serverId = serverId;
+
+        return this;
+    }
+
+    @Override public Builder addEntries(ByteString data) {
+        entriesList.add(data);
+
+        return this;
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexResponseImpl.java
new file mode 100644
index 0000000..6cc658f
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/ReadIndexResponseImpl.java
@@ -0,0 +1,36 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+class ReadIndexResponseImpl implements RpcRequests.ReadIndexResponse, RpcRequests.ReadIndexResponse.Builder {
+    private long index;
+    private boolean success;
+
+    @Override public long getIndex() {
+        return index;
+    }
+
+    @Override public boolean getSuccess() {
+        return success;
+    }
+
+    @Override public RpcRequests.ErrorResponse getErrorResponse() {
+        return null;
+    }
+
+    @Override public RpcRequests.ReadIndexResponse build() {
+        return this;
+    }
+
+    @Override public Builder setSuccess(boolean success) {
+        this.success = success;
+
+        return this;
+    }
+
+    @Override public Builder setIndex(long lastCommittedIndex) {
+        this.index = index;
+
+        return this;
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/RequestVoteResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/RequestVoteResponseImpl.java
new file mode 100644
index 0000000..e628940
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/RequestVoteResponseImpl.java
@@ -0,0 +1,36 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+public class RequestVoteResponseImpl implements RpcRequests.RequestVoteResponse, RpcRequests.RequestVoteResponse.Builder {
+    private long term;
+    private boolean granted;
+
+    @Override public long getTerm() {
+        return term;
+    }
+
+    @Override public boolean getGranted() {
+        return granted;
+    }
+
+    @Override public RpcRequests.ErrorResponse getErrorResponse() {
+        return null;
+    }
+
+    @Override public RpcRequests.RequestVoteResponse build() {
+        return this;
+    }
+
+    @Override public Builder setTerm(long currTerm) {
+        this.term = currTerm;
+
+        return this;
+    }
+
+    @Override public Builder setGranted(boolean granted) {
+        this.granted = granted;
+
+        return this;
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/StableMeta.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/StableMeta.java
new file mode 100644
index 0000000..e7790bb
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/StableMeta.java
@@ -0,0 +1,32 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.entity.LocalStorageOutter;
+
+class StableMeta implements LocalStorageOutter.StablePBMeta, LocalStorageOutter.StablePBMeta.Builder {
+    private long term;
+    private String votedFor;
+
+    @Override public long getTerm() {
+        return term;
+    }
+
+    @Override public String getVotedfor() {
+        return votedFor;
+    }
+
+    @Override public Builder setTerm(long term) {
+        this.term = term;
+
+        return this;
+    }
+
+    @Override public Builder setVotedfor(String votedFor) {
+        this.votedFor = votedFor;
+
+        return this;
+    }
+
+    @Override public LocalStorageOutter.StablePBMeta build() {
+        return this;
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowRequestImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowRequestImpl.java
new file mode 100644
index 0000000..af0be57
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowRequestImpl.java
@@ -0,0 +1,54 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+class TimeoutNowRequestImpl implements RpcRequests.TimeoutNowRequest, RpcRequests.TimeoutNowRequest.Builder {
+    private String groupId;
+    private String serverId;
+    private String peerId;
+    private long term;
+
+    @Override public String getGroupId() {
+        return groupId;
+    }
+
+    @Override public String getServerId() {
+        return serverId;
+    }
+
+    @Override public String getPeerId() {
+        return peerId;
+    }
+
+    @Override public long getTerm() {
+        return term;
+    }
+
+    @Override public RpcRequests.TimeoutNowRequest build() {
+        return this;
+    }
+
+    @Override public Builder setTerm(long term) {
+        this.term = term;
+
+        return this;
+    }
+
+    @Override public Builder setGroupId(String groupId) {
+        this.groupId = groupId;
+
+        return this;
+    }
+
+    @Override public Builder setServerId(String serverId) {
+        this.serverId = serverId;
+
+        return this;
+    }
+
+    @Override public Builder setPeerId(String peerId) {
+        this.peerId = peerId;
+
+        return this;
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java
new file mode 100644
index 0000000..da441ff
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/TimeoutNowResponseImpl.java
@@ -0,0 +1,36 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.RpcRequests;
+
+class TimeoutNowResponseImpl implements RpcRequests.TimeoutNowResponse, RpcRequests.TimeoutNowResponse.Builder {
+    private long term;
+    private boolean success;
+
+    @Override public long getTerm() {
+        return term;
+    }
+
+    @Override public boolean getSuccess() {
+        return success;
+    }
+
+    @Override public RpcRequests.ErrorResponse getErrorResponse() {
+        return null;
+    }
+
+    @Override public RpcRequests.TimeoutNowResponse build() {
+        return this;
+    }
+
+    @Override public Builder setTerm(long currTerm) {
+        this.term = term;
+
+        return this;
+    }
+
+    @Override public Builder setSuccess(boolean success) {
+        this.success = success;
+
+        return this;
+    }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java
index cf9bdce..25c06d7 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java
@@ -6,26 +6,34 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
-/** */
+/**
+ *
+ */
 public class JDKMarshaller implements Marshaller {
-    /** {@inheritDoc} */
-    @Override public byte[] marshall(Object o) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream(baos);
-        oos.writeObject(o);
-        oos.close();
-
-        return baos.toByteArray();
+    /**
+     * {@inheritDoc}
+     */
+    @Override public byte[] marshall(Object o) {
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(o);
+            oos.close();
+            return baos.toByteArray();
+        } catch (Exception e) {
+            throw new Error(e);
+        }
     }
 
-    /** {@inheritDoc} */
-    @Override public Object unmarshall(byte[] raw) throws IOException{
-        ByteArrayInputStream bais = new ByteArrayInputStream(raw);
-        ObjectInputStream oos = new ObjectInputStream(bais);
-
+    /**
+     * {@inheritDoc}
+     */
+    @Override public <T> T unmarshall(byte[] raw) {
         try {
-            return oos.readObject();
-        } catch (ClassNotFoundException e) {
+            ByteArrayInputStream bais = new ByteArrayInputStream(raw);
+            ObjectInputStream oos = new ObjectInputStream(bais);
+            return (T) oos.readObject();
+        } catch (Exception e) {
             throw new Error(e);
         }
     }
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java
index 6f28493..54b07ed 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java
@@ -5,7 +5,7 @@ import java.io.IOException;
 public interface Marshaller {
     public static Marshaller DEFAULT = new JDKMarshaller();
 
-    byte[] marshall(Object o) throws IOException;
+    byte[] marshall(Object o);
 
-    <T> T unmarshall(byte[] raw) throws IOException;
+    <T> T unmarshall(byte[] raw);
 }
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java
index 54c18e9..d650a10 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Utils.java
@@ -381,7 +381,7 @@ public final class Utils {
         final boolean isDir = file.isDirectory();
         // can't fsync on windowns.
         if (isDir && Platform.isWindows()) {
-            LOG.warn("Unable to fsync directory {} on windows.", file);
+            // LOG.warn("Unable to fsync directory {} on windows.", file);
             return;
         }
         try (final FileChannel fc = FileChannel.open(file.toPath(), isDir ? StandardOpenOption.READ
diff --git a/modules/raft/src/main/resources/META-INF/services/com.alipay.sofa.jraft.rpc.RaftRpcFactory b/modules/raft/src/main/resources/META-INF/services/com.alipay.sofa.jraft.rpc.RaftRpcFactory
index 8416bc1..54429b7 100644
--- a/modules/raft/src/main/resources/META-INF/services/com.alipay.sofa.jraft.rpc.RaftRpcFactory
+++ b/modules/raft/src/main/resources/META-INF/services/com.alipay.sofa.jraft.rpc.RaftRpcFactory
@@ -1 +1 @@
-com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory
\ No newline at end of file
+com.alipay.sofa.jraft.rpc.impl.LocalRaftRpcFactory
\ No newline at end of file
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
new file mode 100644
index 0000000..f34ebed
--- /dev/null
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/rpc/LocalRpcTest.java
@@ -0,0 +1,154 @@
+package com.alipay.sofa.jraft.rpc;
+
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.error.RemotingException;
+import com.alipay.sofa.jraft.rpc.impl.LocalRpcClient;
+import com.alipay.sofa.jraft.rpc.impl.LocalRpcServer;
+import com.alipay.sofa.jraft.util.Endpoint;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * TODO add test for localconn.close, timeouts.
+ */
+public class LocalRpcTest {
+    private Endpoint endpoint;
+    private LocalRpcServer server;
+
+    @Before
+    public void setup() {
+        endpoint = PeerId.parsePeer("localhost:1000").getEndpoint();
+        server = new LocalRpcServer(endpoint);
+        server.registerProcessor(new Request1RpcProcessor());
+        server.registerProcessor(new Request2RpcProcessor());
+        server.init(null);
+    }
+
+    @After
+    public void teardown() {
+        server.shutdown();
+
+        assertNull(LocalRpcServer.servers.get(endpoint));
+    }
+
+    @Test
+    public void testStartStopServer() {
+        assertNotNull(LocalRpcServer.servers.get(endpoint));
+    }
+
+    @Test
+    public void testConnection() {
+        LocalRpcClient client = new LocalRpcClient();
+
+        assertFalse(client.checkConnection(endpoint));
+
+        assertTrue(client.checkConnection(endpoint, true));
+    }
+
+    @Test
+    public void testSyncProcessing() throws RemotingException, InterruptedException {
+        RpcClient client = new LocalRpcClient();
+        Response1 resp1 = (Response1) client.invokeSync(endpoint, new Request1(), new InvokeContext(), 5000);
+        assertNotNull(resp1);
+
+        Response2 resp2 = (Response2) client.invokeSync(endpoint, new Request2(), new InvokeContext(), 5000);
+        assertNotNull(resp2);
+    }
+
+    @Test
+    public void testAsyncProcessing() throws RemotingException, InterruptedException {
+        RpcClient client = new LocalRpcClient();
+
+        CountDownLatch l1 = new CountDownLatch(1);
+        AtomicReference<Response1> resp1 = new AtomicReference<>();
+        client.invokeAsync(endpoint, new Request1(), new InvokeContext(), (result, err) -> {
+            resp1.set((Response1) result);
+            l1.countDown();
+        }, 5000);
+        l1.await(5_000, TimeUnit.MILLISECONDS);
+        assertNotNull(resp1);
+
+        CountDownLatch l2 = new CountDownLatch(1);
+        AtomicReference<Response2> resp2 = new AtomicReference<>();
+        client.invokeAsync(endpoint, new Request2(), new InvokeContext(), (result, err) -> {
+            resp2.set((Response2) result);
+            l2.countDown();
+        }, 5000);
+        l2.await(5_000, TimeUnit.MILLISECONDS);
+        assertNotNull(resp2);
+    }
+
+    @Test
+    public void testDisconnect1() {
+        RpcClient client1 = new LocalRpcClient();
+        RpcClient client2 = new LocalRpcClient();
+
+        assertTrue(client1.checkConnection(endpoint, true));
+        assertTrue(client2.checkConnection(endpoint, true));
+
+        client1.shutdown();
+
+        assertFalse(client1.checkConnection(endpoint));
+        assertTrue(client2.checkConnection(endpoint));
+
+        client2.shutdown();
+
+        assertFalse(client1.checkConnection(endpoint));
+        assertFalse(client2.checkConnection(endpoint));
+    }
+
+    @Test
+    public void testDisconnect2() {
+        RpcClient client1 = new LocalRpcClient();
+        RpcClient client2 = new LocalRpcClient();
+
+        assertTrue(client1.checkConnection(endpoint, true));
+        assertTrue(client2.checkConnection(endpoint, true));
+
+        server.shutdown();
+
+        assertFalse(client1.checkConnection(endpoint));
+        assertFalse(client2.checkConnection(endpoint));
+    }
+
+    private static class Request1RpcProcessor implements RpcProcessor<Request1> {
+        @Override public void handleRequest(RpcContext rpcCtx, Request1 request) {
+            rpcCtx.sendResponse(new Response1());
+        }
+
+        @Override public String interest() {
+            return Request1.class.getName();
+        }
+    }
+
+    private static class Request2RpcProcessor implements RpcProcessor<Request2> {
+        @Override public void handleRequest(RpcContext rpcCtx, Request2 request) {
+            rpcCtx.sendResponse(new Response2());
+        }
+
+        @Override public String interest() {
+            return Request2.class.getName();
+        }
+    }
+
+    private static class Request1 implements Message {
+    }
+
+    private static class Request2 implements Message {
+    }
+
+    private static class Response1 implements Message {
+    }
+
+    private static class Response2 implements Message {
+    }
+}
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorageTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorageTest.java
index 72b2638..e890721 100644
--- a/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorageTest.java
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorageTest.java
@@ -26,9 +26,4 @@ public class LocalLogStorageTest extends BaseLogStorageTest {
     protected LogStorage newLogStorage() {
         return new LocalLogStorage(this.path, new RaftOptions());
     }
-
-    @Test
-    @Override public void testEmptyState() {
-        super.testEmptyState();
-    }
 }
diff --git a/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java b/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java
index d4bcc66..2639ea8 100644
--- a/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java
+++ b/modules/raft/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java
@@ -19,7 +19,6 @@ package com.alipay.sofa.jraft.storage.io;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import com.alipay.sofa.jraft.util.Utils;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.nio.ByteBuffer;
diff --git a/modules/raft/src/test/resources/log4j2.xml b/modules/raft/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..90d27d7
--- /dev/null
+++ b/modules/raft/src/test/resources/log4j2.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<Configuration status="WARN">
+
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n"/>
+        </Console>
+
+        <!--<RollingFile name="RollingFile" filename="log/jraft-example.log"-->
+                     <!--filepattern="log/%d{YYYYMMddHHmmss}-jraft-example.log">-->
+            <!--<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n"/>-->
+            <!--<Policies>-->
+                <!--<SizeBasedTriggeringPolicy size="100 MB"/>-->
+            <!--</Policies>-->
+            <!--<DefaultRolloverStrategy max="20"/>-->
+        <!--</RollingFile>-->
+
+    </Appenders>
+    <Loggers>
+        <Root level="info">
+            <AppenderRef ref="Console"/>
+            <AppenderRef ref="RollingFile"/>
+        </Root>
+    </Loggers>
+</Configuration>
\ No newline at end of file