You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:16:55 UTC

[10/54] [abbrv] incubator-ratis git commit: Move RequestDispatcher code to RaftServerImpl.

Move RequestDispatcher code to RaftServerImpl.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a38e2f71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a38e2f71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a38e2f71

Branch: refs/heads/master
Commit: a38e2f71ef0e84cb0af9a394544274e0dd56bcd9
Parents: 673a282
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Tue Jan 3 02:05:43 2017 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Tue Jan 3 02:05:43 2017 +0800

----------------------------------------------------------------------
 .../RaftClientAsynchronousProtocol.java         |  30 ++++
 .../org/apache/raft/grpc/RaftGRpcService.java   |   6 +-
 .../grpc/client/RaftClientProtocolService.java  |  14 +-
 .../grpc/server/RaftServerProtocolService.java  |  15 +-
 .../raft/hadooprpc/server/HadoopRpcService.java |  18 +--
 .../raft/netty/server/NettyRpcService.java      |  15 +-
 .../java/org/apache/raft/server/RaftServer.java |   5 +-
 .../apache/raft/server/impl/RaftServerImpl.java |  72 +++++++++-
 .../raft/server/impl/RequestDispatcher.java     | 140 -------------------
 .../server/simulation/SimulatedServerRpc.java   |  23 ++-
 10 files changed, 142 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-common/src/main/java/org/apache/raft/protocol/RaftClientAsynchronousProtocol.java
----------------------------------------------------------------------
diff --git a/raft-common/src/main/java/org/apache/raft/protocol/RaftClientAsynchronousProtocol.java b/raft-common/src/main/java/org/apache/raft/protocol/RaftClientAsynchronousProtocol.java
new file mode 100644
index 0000000..3572b7e
--- /dev/null
+++ b/raft-common/src/main/java/org/apache/raft/protocol/RaftClientAsynchronousProtocol.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.raft.protocol;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/** Asynchronous version of {@link RaftClientProtocol}. */
+public interface RaftClientAsynchronousProtocol {
+  CompletableFuture<RaftClientReply> submitClientRequestAsync(
+      RaftClientRequest request) throws IOException;
+
+  CompletableFuture<RaftClientReply> setConfigurationAsync(
+      SetConfigurationRequest request) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
index d0c98c3..1184e2e 100644
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
+++ b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
@@ -25,7 +25,6 @@ import org.apache.raft.grpc.server.RaftServerProtocolService;
 import org.apache.raft.protocol.RaftPeer;
 import org.apache.raft.server.RaftServer;
 import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.server.impl.RequestDispatcher;
 import org.apache.raft.shaded.io.grpc.Server;
 import org.apache.raft.shaded.io.grpc.ServerBuilder;
 import org.apache.raft.shaded.io.grpc.netty.NettyServerBuilder;
@@ -61,11 +60,10 @@ public class RaftGRpcService implements RaftServerRpc {
         RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_KEY,
         RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT);
     ServerBuilder serverBuilder = ServerBuilder.forPort(port);
-    final RequestDispatcher dispatcher = new RequestDispatcher(raftServer);
     selfId = raftServer.getId();
     server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize)
-        .addService(new RaftServerProtocolService(selfId, dispatcher))
-        .addService(new RaftClientProtocolService(selfId, dispatcher))
+        .addService(new RaftServerProtocolService(selfId, raftServer))
+        .addService(new RaftClientProtocolService(selfId, raftServer))
         .build();
 
     // start service to determine the port (in case port is configured as 0)

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
index 32dbac7..8f41bdc 100644
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
+++ b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
@@ -20,8 +20,8 @@ package org.apache.raft.grpc.client;
 import com.google.common.base.Preconditions;
 import org.apache.raft.client.impl.ClientProtoUtils;
 import org.apache.raft.grpc.RaftGrpcUtil;
+import org.apache.raft.protocol.RaftClientAsynchronousProtocol;
 import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.server.impl.RequestDispatcher;
 import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
 import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto;
@@ -65,18 +65,18 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
   private static final PendingAppend COMPLETED = new PendingAppend(Long.MAX_VALUE);
 
   private final String id;
-  private final RequestDispatcher dispatcher;
+  private final RaftClientAsynchronousProtocol client;
 
-  public RaftClientProtocolService(String id, RequestDispatcher dispatcher) {
+  public RaftClientProtocolService(String id, RaftClientAsynchronousProtocol client) {
     this.id = id;
-    this.dispatcher = dispatcher;
+    this.client = client;
   }
 
   @Override
   public void setConfiguration(SetConfigurationRequestProto request,
       StreamObserver<RaftClientReplyProto> responseObserver) {
     try {
-      CompletableFuture<RaftClientReply> future = dispatcher.setConfigurationAsync(
+      CompletableFuture<RaftClientReply> future = client.setConfigurationAsync(
           ClientProtoUtils.toSetConfigurationRequest(request));
       future.whenCompleteAsync((reply, exception) -> {
         if (exception != null) {
@@ -114,8 +114,8 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
           pendingList.add(p);
         }
 
-        CompletableFuture<RaftClientReply> future = dispatcher
-            .handleClientRequest(ClientProtoUtils.toRaftClientRequest(request));
+        CompletableFuture<RaftClientReply> future = client.submitClientRequestAsync(
+            ClientProtoUtils.toRaftClientRequest(request));
         future.whenCompleteAsync((reply, exception) -> {
           if (exception != null) {
             // TODO: the exception may be from either raft or state machine.

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
index 2f06c59..53dbb6a 100644
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
+++ b/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
@@ -18,7 +18,7 @@
 package org.apache.raft.grpc.server;
 
 import org.apache.raft.grpc.RaftGrpcUtil;
-import org.apache.raft.server.impl.RequestDispatcher;
+import org.apache.raft.server.protocol.RaftServerProtocol;
 import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
 import org.apache.raft.shaded.proto.RaftProtos.*;
 import org.apache.raft.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
@@ -29,18 +29,18 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase
   public static final Logger LOG = LoggerFactory.getLogger(RaftServerProtocolService.class);
 
   private final String id;
-  private final RequestDispatcher dispatcher;
+  private final RaftServerProtocol server;
 
-  public RaftServerProtocolService(String id, RequestDispatcher dispatcher) {
+  public RaftServerProtocolService(String id, RaftServerProtocol server) {
     this.id = id;
-    this.dispatcher = dispatcher;
+    this.server = server;
   }
 
   @Override
   public void requestVote(RequestVoteRequestProto request,
       StreamObserver<RequestVoteReplyProto> responseObserver) {
     try {
-      final RequestVoteReplyProto reply = dispatcher.requestVote(request);
+      final RequestVoteReplyProto reply = server.requestVote(request);
       responseObserver.onNext(reply);
       responseObserver.onCompleted();
     } catch (Throwable e) {
@@ -57,7 +57,7 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase
       @Override
       public void onNext(AppendEntriesRequestProto request) {
         try {
-          final AppendEntriesReplyProto reply = dispatcher.appendEntries(request);
+          final AppendEntriesReplyProto reply = server.appendEntries(request);
           responseObserver.onNext(reply);
         } catch (Throwable e) {
           LOG.info("{} got exception when handling appendEntries {}: {}",
@@ -87,8 +87,7 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase
       @Override
       public void onNext(InstallSnapshotRequestProto request) {
         try {
-          final InstallSnapshotReplyProto reply =
-              dispatcher.installSnapshot(request);
+          final InstallSnapshotReplyProto reply = server.installSnapshot(request);
           responseObserver.onNext(reply);
         } catch (Throwable e) {
           LOG.info("{} got exception when handling installSnapshot {}: {}",

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
index b73deca..24e1d2c 100644
--- a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
+++ b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
@@ -24,11 +24,12 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.raft.hadooprpc.Proxy;
 import org.apache.raft.hadooprpc.client.RaftClientProtocolPB;
 import org.apache.raft.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB;
+import org.apache.raft.protocol.RaftClientProtocol;
 import org.apache.raft.protocol.RaftPeer;
 import org.apache.raft.server.RaftServer;
 import org.apache.raft.server.RaftServerConfigKeys;
 import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.server.impl.RequestDispatcher;
+import org.apache.raft.server.protocol.RaftServerProtocol;
 import org.apache.raft.shaded.com.google.protobuf.BlockingService;
 import org.apache.raft.shaded.com.google.protobuf.ServiceException;
 import org.apache.raft.shaded.proto.RaftProtos.*;
@@ -49,7 +50,6 @@ public class HadoopRpcService implements RaftServerRpc {
   static final String CLASS_NAME = HadoopRpcService.class.getSimpleName();
   public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest";
 
-  private final RequestDispatcher raftService;
   private final String id;
   private final RPC.Server ipcServer;
   private final InetSocketAddress ipcServerAddress;
@@ -60,12 +60,11 @@ public class HadoopRpcService implements RaftServerRpc {
       throws IOException {
     this.proxies = new PeerProxyMap<>(
         p -> new Proxy(RaftServerProtocolPB.class, p.getAddress(), conf));
-    this.raftService = new RequestDispatcher(server);
     this.id = server.getId();
-    this.ipcServer = newRpcServer(conf);
+    this.ipcServer = newRpcServer(server, conf);
     this.ipcServerAddress = ipcServer.getListenerAddress();
 
-    addRaftClientProtocol(conf);
+    addRaftClientProtocol(server, conf);
 
     LOG.info(getClass().getSimpleName() + " created RPC.Server at "
         + ipcServerAddress);
@@ -76,7 +75,8 @@ public class HadoopRpcService implements RaftServerRpc {
     return ipcServerAddress;
   }
 
-  private RPC.Server newRpcServer(final Configuration conf) throws IOException {
+  private RPC.Server newRpcServer(RaftServerProtocol serverProtocol, final Configuration conf)
+      throws IOException {
     final RaftServerConfigKeys.Get get = new RaftServerConfigKeys.Get() {
       @Override
       protected int getInt(String key, int defaultValue) {
@@ -94,7 +94,7 @@ public class HadoopRpcService implements RaftServerRpc {
 
     final BlockingService service
         = RaftServerProtocolService.newReflectiveBlockingService(
-            new RaftServerProtocolServerSideTranslatorPB(raftService));
+            new RaftServerProtocolServerSideTranslatorPB(serverProtocol));
     RPC.setProtocolEngine(conf, RaftServerProtocolPB.class, ProtobufRpcEngineShaded.class);
     return new RPC.Builder(conf)
         .setProtocol(RaftServerProtocolPB.class)
@@ -106,13 +106,13 @@ public class HadoopRpcService implements RaftServerRpc {
         .build();
   }
 
-  private void addRaftClientProtocol(Configuration conf) {
+  private void addRaftClientProtocol(RaftClientProtocol clientProtocol, Configuration conf) {
     final Class<?> protocol = RaftClientProtocolPB.class;
     RPC.setProtocolEngine(conf,protocol, ProtobufRpcEngineShaded.class);
 
     final BlockingService service
         = RaftClientProtocolService.newReflectiveBlockingService(
-        new RaftClientProtocolServerSideTranslatorPB(raftService));
+        new RaftClientProtocolServerSideTranslatorPB(clientProtocol));
     ipcServer.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
index 08e379a..50833fb 100644
--- a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
+++ b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
@@ -31,7 +31,6 @@ import org.apache.raft.protocol.RaftClientReply;
 import org.apache.raft.protocol.RaftPeer;
 import org.apache.raft.server.RaftServer;
 import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.server.impl.RequestDispatcher;
 import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
 import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
 import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
@@ -56,7 +55,7 @@ public final class NettyRpcService implements RaftServerRpc {
   public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest";
 
   private final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName());
-  private final RequestDispatcher raftService;
+  private final RaftServer server;
   private final String id;
 
   private final EventLoopGroup bossGroup = new NioEventLoopGroup();
@@ -76,7 +75,7 @@ public final class NettyRpcService implements RaftServerRpc {
 
   /** Constructs a netty server with the given port. */
   public NettyRpcService(int port, RaftServer server) {
-    this.raftService = new RequestDispatcher(server);
+    this.server = server;
     this.id = server.getId();
 
     final ChannelInitializer<SocketChannel> initializer
@@ -134,7 +133,7 @@ public final class NettyRpcService implements RaftServerRpc {
         case REQUESTVOTEREQUEST: {
           final RequestVoteRequestProto request = proto.getRequestVoteRequest();
           rpcRequest = request.getServerRequest();
-          final RequestVoteReplyProto reply = raftService.requestVote(request);
+          final RequestVoteReplyProto reply = server.requestVote(request);
           return RaftNettyServerReplyProto.newBuilder()
               .setRequestVoteReply(reply)
               .build();
@@ -142,7 +141,7 @@ public final class NettyRpcService implements RaftServerRpc {
         case APPENDENTRIESREQUEST: {
           final AppendEntriesRequestProto request = proto.getAppendEntriesRequest();
           rpcRequest = request.getServerRequest();
-          final AppendEntriesReplyProto reply = raftService.appendEntries(request);
+          final AppendEntriesReplyProto reply = server.appendEntries(request);
           return RaftNettyServerReplyProto.newBuilder()
               .setAppendEntriesReply(reply)
               .build();
@@ -150,7 +149,7 @@ public final class NettyRpcService implements RaftServerRpc {
         case INSTALLSNAPSHOTREQUEST: {
           final InstallSnapshotRequestProto request = proto.getInstallSnapshotRequest();
           rpcRequest = request.getServerRequest();
-          final InstallSnapshotReplyProto reply = raftService.installSnapshot(request);
+          final InstallSnapshotReplyProto reply = server.installSnapshot(request);
           return RaftNettyServerReplyProto.newBuilder()
               .setInstallSnapshotReply(reply)
               .build();
@@ -158,7 +157,7 @@ public final class NettyRpcService implements RaftServerRpc {
         case RAFTCLIENTREQUEST: {
           final RaftClientRequestProto request = proto.getRaftClientRequest();
           rpcRequest = request.getRpcRequest();
-          final RaftClientReply reply = raftService.submitClientRequest(
+          final RaftClientReply reply = server.submitClientRequest(
               ClientProtoUtils.toRaftClientRequest(request));
           return RaftNettyServerReplyProto.newBuilder()
               .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply))
@@ -167,7 +166,7 @@ public final class NettyRpcService implements RaftServerRpc {
         case SETCONFIGURATIONREQUEST: {
           final SetConfigurationRequestProto request = proto.getSetConfigurationRequest();
           rpcRequest = request.getRpcRequest();
-          final RaftClientReply reply = raftService.setConfiguration(
+          final RaftClientReply reply = server.setConfiguration(
               ClientProtoUtils.toSetConfigurationRequest(request));
           return RaftNettyServerReplyProto.newBuilder()
               .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply))

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-server/src/main/java/org/apache/raft/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java b/raft-server/src/main/java/org/apache/raft/server/RaftServer.java
index 7141eca..aa4dfbf 100644
--- a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java
+++ b/raft-server/src/main/java/org/apache/raft/server/RaftServer.java
@@ -17,12 +17,15 @@
  */
 package org.apache.raft.server;
 
+import org.apache.raft.protocol.RaftClientAsynchronousProtocol;
+import org.apache.raft.protocol.RaftClientProtocol;
 import org.apache.raft.server.protocol.RaftServerProtocol;
 
 import java.io.Closeable;
 
 /** Raft server interface */
-public interface RaftServer extends RaftServerProtocol, Closeable {
+public interface RaftServer extends Closeable, RaftServerProtocol,
+    RaftClientProtocol, RaftClientAsynchronousProtocol {
   /** @return the server ID. */
   String getId();
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
index 3026afa..1ea40f6 100644
--- a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
@@ -44,6 +44,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
 import static org.apache.raft.util.LifeCycle.State.*;
@@ -146,6 +147,7 @@ public class RaftServerImpl implements RaftServer {
     return serverRpc;
   }
 
+  @Override
   public void start() {
     lifeCycle.transition(STARTING);
     state.start();
@@ -186,11 +188,12 @@ public class RaftServerImpl implements RaftServer {
     return this.state;
   }
 
+  @Override
   public String getId() {
     return getState().getSelfId();
   }
 
-  public RaftConfiguration getRaftConf() {
+  RaftConfiguration getRaftConf() {
     return getState().getRaftConf();
   }
 
@@ -323,7 +326,7 @@ public class RaftServerImpl implements RaftServer {
   /**
    * @return null if the server is in leader state.
    */
-  CompletableFuture<RaftClientReply> checkLeaderState(
+  private CompletableFuture<RaftClientReply> checkLeaderState(
       RaftClientRequest request) {
     if (!isLeader()) {
       NotLeaderException exception = generateNotLeaderException();
@@ -355,7 +358,7 @@ public class RaftServerImpl implements RaftServer {
   /**
    * Handle a normal update request from client.
    */
-  public CompletableFuture<RaftClientReply> appendTransaction(
+  private CompletableFuture<RaftClientReply> appendTransaction(
       RaftClientRequest request, TransactionContext entry)
       throws RaftException {
     LOG.debug("{}: receive client request({})", getId(), request);
@@ -384,10 +387,71 @@ public class RaftServerImpl implements RaftServer {
     return pending.getFuture();
   }
 
+  @Override
+  public CompletableFuture<RaftClientReply> submitClientRequestAsync(
+      RaftClientRequest request) throws IOException {
+    // first check the server's leader state
+    CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+    if (reply != null) {
+      return reply;
+    }
+
+    // let the state machine handle read-only request from client
+    if (request.isReadOnly()) {
+      // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper,
+      // section 8 (last part)
+      return stateMachine.query(request);
+    }
+
+    // TODO: this client request will not be added to pending requests
+    // until later which means that any failure in between will leave partial state in the
+    // state machine. We should call cancelTransaction() for failed requests
+    TransactionContext entry = stateMachine.startTransaction(request);
+    if (entry.getException().isPresent()) {
+      throw RaftUtils.asIOException(entry.getException().get());
+    }
+
+    return appendTransaction(request, entry);
+  }
+
+  @Override
+  public RaftClientReply submitClientRequest(RaftClientRequest request)
+      throws IOException {
+    return waitForReply(getId(), request, submitClientRequestAsync(request));
+  }
+
+  private static RaftClientReply waitForReply(String id, RaftClientRequest request,
+      CompletableFuture<RaftClientReply> future) throws IOException {
+    try {
+      return future.get();
+    } catch (InterruptedException e) {
+      final String s = id + ": Interrupted when waiting for reply, request=" + request;
+      LOG.info(s, e);
+      throw RaftUtils.toInterruptedIOException(s, e);
+    } catch (ExecutionException e) {
+      final Throwable cause = e.getCause();
+      if (cause == null) {
+        throw new IOException(e);
+      }
+      if (cause instanceof NotLeaderException) {
+        return new RaftClientReply(request, (NotLeaderException)cause);
+      } else {
+        throw RaftUtils.asIOException(cause);
+      }
+    }
+  }
+
+  @Override
+  public RaftClientReply setConfiguration(SetConfigurationRequest request)
+      throws IOException {
+    return waitForReply(getId(), request, setConfigurationAsync(request));
+  }
+
   /**
    * Handle a raft configuration change request from client.
    */
-  public CompletableFuture<RaftClientReply> setConfiguration(
+  @Override
+  public CompletableFuture<RaftClientReply> setConfigurationAsync(
       SetConfigurationRequest request) throws IOException {
     LOG.debug("{}: receive setConfiguration({})", getId(), request);
     lifeCycle.assertCurrentState(RUNNING);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java
deleted file mode 100644
index 39a4ac8..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.protocol.*;
-import org.apache.raft.server.RaftServer;
-import org.apache.raft.server.protocol.RaftServerProtocol;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-/**
- * Each RPC request is first handled by the RequestDispatcher:
- * 1. A request from another RaftPeer is to be handled by RaftServer.
- *
- * If the raft peer is the leader, then:
- *
- * 2. A read-only request from client is to be handled by the state machine.
- * 3. A write request from client is first validated by the state machine. The
- * state machine returns the content of the raft log entry, which is then passed
- * to the RaftServer for replication.
- */
-public class RequestDispatcher implements RaftClientProtocol, RaftServerProtocol {
-  static final Logger LOG = LoggerFactory.getLogger(RequestDispatcher.class);
-
-  private final RaftServerImpl server;
-  private final StateMachine stateMachine;
-
-  public RequestDispatcher(RaftServer server) {
-    Preconditions.checkArgument(server instanceof RaftServerImpl);
-    this.server = (RaftServerImpl)server;
-    this.stateMachine = this.server.getStateMachine();
-  }
-
-  public CompletableFuture<RaftClientReply> handleClientRequest(
-      RaftClientRequest request) throws IOException {
-    // first check the server's leader state
-    CompletableFuture<RaftClientReply> reply = server.checkLeaderState(request);
-    if (reply != null) {
-      return reply;
-    }
-
-    // let the state machine handle read-only request from client
-    if (request.isReadOnly()) {
-      // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper,
-      // section 8 (last part)
-      return stateMachine.query(request);
-    }
-
-    // TODO: this client request will not be added to pending requests
-    // until later which means that any failure in between will leave partial state in the
-    // state machine. We should call cancelTransaction() for failed requests
-    TransactionContext entry = stateMachine.startTransaction(request);
-    if (entry.getException().isPresent()) {
-      throw RaftUtils.asIOException(entry.getException().get());
-    }
-
-    return server.appendTransaction(request, entry);
-  }
-
-  @Override
-  public RaftClientReply submitClientRequest(RaftClientRequest request)
-      throws IOException {
-    return waitForReply(server.getId(), request, handleClientRequest(request));
-  }
-
-  public CompletableFuture<RaftClientReply> setConfigurationAsync(
-      SetConfigurationRequest request) throws IOException {
-    return server.setConfiguration(request);
-  }
-
-  @Override
-  public RaftClientReply setConfiguration(SetConfigurationRequest request)
-      throws IOException {
-    return waitForReply(server.getId(), request, setConfigurationAsync(request));
-  }
-
-  private static RaftClientReply waitForReply(String serverId,
-      RaftClientRequest request, CompletableFuture<RaftClientReply> future)
-      throws IOException {
-    try {
-      return future.get();
-    } catch (InterruptedException e) {
-      final String s = serverId + ": Interrupted when waiting for reply, request=" + request;
-      LOG.info(s, e);
-      throw RaftUtils.toInterruptedIOException(s, e);
-    } catch (ExecutionException e) {
-      final Throwable cause = e.getCause();
-      if (cause == null) {
-        throw new IOException(e);
-      }
-      if (cause instanceof NotLeaderException) {
-        return new RaftClientReply(request, (NotLeaderException)cause);
-      } else {
-        throw RaftUtils.asIOException(cause);
-      }
-    }
-  }
-
-  @Override
-  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
-      throws IOException {
-    return server.requestVote(request);
-  }
-
-  @Override
-  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request)
-      throws IOException {
-    return server.appendEntries(request);
-  }
-
-  @Override
-  public InstallSnapshotReplyProto installSnapshot(
-      InstallSnapshotRequestProto request) throws IOException {
-    return server.installSnapshot(request);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
index 8a7e752..799ee65 100644
--- a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
+++ b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
@@ -18,13 +18,10 @@
 package org.apache.raft.server.simulation;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.protocol.SetConfigurationRequest;
-import org.apache.raft.server.impl.RaftServerImpl;
+import org.apache.raft.protocol.*;
 import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.server.impl.RequestDispatcher;
+import org.apache.raft.server.impl.RaftServerImpl;
+import org.apache.raft.server.protocol.RaftServerProtocol;
 import org.apache.raft.shaded.proto.RaftProtos.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +37,6 @@ class SimulatedServerRpc implements RaftServerRpc {
   static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class);
 
   private final RaftServerImpl server;
-  private final RequestDispatcher dispatcher;
   private final RequestHandler<RaftServerRequest, RaftServerReply> serverHandler;
   private final RequestHandler<RaftClientRequest, RaftClientReply> clientHandler;
   private final ExecutorService executor = Executors.newFixedThreadPool(3,
@@ -50,7 +46,6 @@ class SimulatedServerRpc implements RaftServerRpc {
       SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply,
       SimulatedRequestReply<RaftClientRequest, RaftClientReply> clientRequestReply) {
     this.server = server;
-    this.dispatcher = new RequestDispatcher(server);
     this.serverHandler = new RequestHandler<>(server.getId(),
         "serverHandler", serverRequestReply, serverHandlerImpl, 3);
     this.clientHandler = new RequestHandler<>(server.getId(),
@@ -125,13 +120,11 @@ class SimulatedServerRpc implements RaftServerRpc {
     public RaftServerReply handleRequest(RaftServerRequest r)
         throws IOException {
       if (r.isAppendEntries()) {
-        return new RaftServerReply(
-            dispatcher.appendEntries(r.getAppendEntries()));
+        return new RaftServerReply(server.appendEntries(r.getAppendEntries()));
       } else if (r.isRequestVote()) {
-        return new RaftServerReply(dispatcher.requestVote(r.getRequestVote()));
+        return new RaftServerReply(server.requestVote(r.getRequestVote()));
       } else if (r.isInstallSnapshot()) {
-        return new RaftServerReply(
-            dispatcher.installSnapshot(r.getInstallSnapshot()));
+        return new RaftServerReply(server.installSnapshot(r.getInstallSnapshot()));
       } else {
         throw new IllegalStateException("unexpected state");
       }
@@ -150,9 +143,9 @@ class SimulatedServerRpc implements RaftServerRpc {
         throws IOException {
       final CompletableFuture<RaftClientReply> future;
       if (request instanceof SetConfigurationRequest) {
-        future = dispatcher.setConfigurationAsync((SetConfigurationRequest) request);
+        future = server.setConfigurationAsync((SetConfigurationRequest) request);
       } else {
-        future = dispatcher.handleClientRequest(request);
+        future = server.submitClientRequestAsync(request);
       }
 
       future.whenCompleteAsync((reply, exception) -> {