You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:16:55 UTC
[10/54] [abbrv] incubator-ratis git commit: Move RequestDispatcher
code to RaftServerImpl.
Move RequestDispatcher code to RaftServerImpl.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a38e2f71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a38e2f71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a38e2f71
Branch: refs/heads/master
Commit: a38e2f71ef0e84cb0af9a394544274e0dd56bcd9
Parents: 673a282
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Tue Jan 3 02:05:43 2017 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Tue Jan 3 02:05:43 2017 +0800
----------------------------------------------------------------------
.../RaftClientAsynchronousProtocol.java | 30 ++++
.../org/apache/raft/grpc/RaftGRpcService.java | 6 +-
.../grpc/client/RaftClientProtocolService.java | 14 +-
.../grpc/server/RaftServerProtocolService.java | 15 +-
.../raft/hadooprpc/server/HadoopRpcService.java | 18 +--
.../raft/netty/server/NettyRpcService.java | 15 +-
.../java/org/apache/raft/server/RaftServer.java | 5 +-
.../apache/raft/server/impl/RaftServerImpl.java | 72 +++++++++-
.../raft/server/impl/RequestDispatcher.java | 140 -------------------
.../server/simulation/SimulatedServerRpc.java | 23 ++-
10 files changed, 142 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-common/src/main/java/org/apache/raft/protocol/RaftClientAsynchronousProtocol.java
----------------------------------------------------------------------
diff --git a/raft-common/src/main/java/org/apache/raft/protocol/RaftClientAsynchronousProtocol.java b/raft-common/src/main/java/org/apache/raft/protocol/RaftClientAsynchronousProtocol.java
new file mode 100644
index 0000000..3572b7e
--- /dev/null
+++ b/raft-common/src/main/java/org/apache/raft/protocol/RaftClientAsynchronousProtocol.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.raft.protocol;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/** Asynchronous version of {@link RaftClientProtocol}. */
+public interface RaftClientAsynchronousProtocol {
+ CompletableFuture<RaftClientReply> submitClientRequestAsync(
+ RaftClientRequest request) throws IOException;
+
+ CompletableFuture<RaftClientReply> setConfigurationAsync(
+ SetConfigurationRequest request) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
index d0c98c3..1184e2e 100644
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
+++ b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
@@ -25,7 +25,6 @@ import org.apache.raft.grpc.server.RaftServerProtocolService;
import org.apache.raft.protocol.RaftPeer;
import org.apache.raft.server.RaftServer;
import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.server.impl.RequestDispatcher;
import org.apache.raft.shaded.io.grpc.Server;
import org.apache.raft.shaded.io.grpc.ServerBuilder;
import org.apache.raft.shaded.io.grpc.netty.NettyServerBuilder;
@@ -61,11 +60,10 @@ public class RaftGRpcService implements RaftServerRpc {
RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_KEY,
RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT);
ServerBuilder serverBuilder = ServerBuilder.forPort(port);
- final RequestDispatcher dispatcher = new RequestDispatcher(raftServer);
selfId = raftServer.getId();
server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize)
- .addService(new RaftServerProtocolService(selfId, dispatcher))
- .addService(new RaftClientProtocolService(selfId, dispatcher))
+ .addService(new RaftServerProtocolService(selfId, raftServer))
+ .addService(new RaftClientProtocolService(selfId, raftServer))
.build();
// start service to determine the port (in case port is configured as 0)
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
index 32dbac7..8f41bdc 100644
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
+++ b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
@@ -20,8 +20,8 @@ package org.apache.raft.grpc.client;
import com.google.common.base.Preconditions;
import org.apache.raft.client.impl.ClientProtoUtils;
import org.apache.raft.grpc.RaftGrpcUtil;
+import org.apache.raft.protocol.RaftClientAsynchronousProtocol;
import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.server.impl.RequestDispatcher;
import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto;
import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto;
@@ -65,18 +65,18 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
private static final PendingAppend COMPLETED = new PendingAppend(Long.MAX_VALUE);
private final String id;
- private final RequestDispatcher dispatcher;
+ private final RaftClientAsynchronousProtocol client;
- public RaftClientProtocolService(String id, RequestDispatcher dispatcher) {
+ public RaftClientProtocolService(String id, RaftClientAsynchronousProtocol client) {
this.id = id;
- this.dispatcher = dispatcher;
+ this.client = client;
}
@Override
public void setConfiguration(SetConfigurationRequestProto request,
StreamObserver<RaftClientReplyProto> responseObserver) {
try {
- CompletableFuture<RaftClientReply> future = dispatcher.setConfigurationAsync(
+ CompletableFuture<RaftClientReply> future = client.setConfigurationAsync(
ClientProtoUtils.toSetConfigurationRequest(request));
future.whenCompleteAsync((reply, exception) -> {
if (exception != null) {
@@ -114,8 +114,8 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
pendingList.add(p);
}
- CompletableFuture<RaftClientReply> future = dispatcher
- .handleClientRequest(ClientProtoUtils.toRaftClientRequest(request));
+ CompletableFuture<RaftClientReply> future = client.submitClientRequestAsync(
+ ClientProtoUtils.toRaftClientRequest(request));
future.whenCompleteAsync((reply, exception) -> {
if (exception != null) {
// TODO: the exception may be from either raft or state machine.
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
index 2f06c59..53dbb6a 100644
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
+++ b/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
@@ -18,7 +18,7 @@
package org.apache.raft.grpc.server;
import org.apache.raft.grpc.RaftGrpcUtil;
-import org.apache.raft.server.impl.RequestDispatcher;
+import org.apache.raft.server.protocol.RaftServerProtocol;
import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
import org.apache.raft.shaded.proto.RaftProtos.*;
import org.apache.raft.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
@@ -29,18 +29,18 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase
public static final Logger LOG = LoggerFactory.getLogger(RaftServerProtocolService.class);
private final String id;
- private final RequestDispatcher dispatcher;
+ private final RaftServerProtocol server;
- public RaftServerProtocolService(String id, RequestDispatcher dispatcher) {
+ public RaftServerProtocolService(String id, RaftServerProtocol server) {
this.id = id;
- this.dispatcher = dispatcher;
+ this.server = server;
}
@Override
public void requestVote(RequestVoteRequestProto request,
StreamObserver<RequestVoteReplyProto> responseObserver) {
try {
- final RequestVoteReplyProto reply = dispatcher.requestVote(request);
+ final RequestVoteReplyProto reply = server.requestVote(request);
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable e) {
@@ -57,7 +57,7 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase
@Override
public void onNext(AppendEntriesRequestProto request) {
try {
- final AppendEntriesReplyProto reply = dispatcher.appendEntries(request);
+ final AppendEntriesReplyProto reply = server.appendEntries(request);
responseObserver.onNext(reply);
} catch (Throwable e) {
LOG.info("{} got exception when handling appendEntries {}: {}",
@@ -87,8 +87,7 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase
@Override
public void onNext(InstallSnapshotRequestProto request) {
try {
- final InstallSnapshotReplyProto reply =
- dispatcher.installSnapshot(request);
+ final InstallSnapshotReplyProto reply = server.installSnapshot(request);
responseObserver.onNext(reply);
} catch (Throwable e) {
LOG.info("{} got exception when handling installSnapshot {}: {}",
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
index b73deca..24e1d2c 100644
--- a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
+++ b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
@@ -24,11 +24,12 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.raft.hadooprpc.Proxy;
import org.apache.raft.hadooprpc.client.RaftClientProtocolPB;
import org.apache.raft.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB;
+import org.apache.raft.protocol.RaftClientProtocol;
import org.apache.raft.protocol.RaftPeer;
import org.apache.raft.server.RaftServer;
import org.apache.raft.server.RaftServerConfigKeys;
import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.server.impl.RequestDispatcher;
+import org.apache.raft.server.protocol.RaftServerProtocol;
import org.apache.raft.shaded.com.google.protobuf.BlockingService;
import org.apache.raft.shaded.com.google.protobuf.ServiceException;
import org.apache.raft.shaded.proto.RaftProtos.*;
@@ -49,7 +50,6 @@ public class HadoopRpcService implements RaftServerRpc {
static final String CLASS_NAME = HadoopRpcService.class.getSimpleName();
public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest";
- private final RequestDispatcher raftService;
private final String id;
private final RPC.Server ipcServer;
private final InetSocketAddress ipcServerAddress;
@@ -60,12 +60,11 @@ public class HadoopRpcService implements RaftServerRpc {
throws IOException {
this.proxies = new PeerProxyMap<>(
p -> new Proxy(RaftServerProtocolPB.class, p.getAddress(), conf));
- this.raftService = new RequestDispatcher(server);
this.id = server.getId();
- this.ipcServer = newRpcServer(conf);
+ this.ipcServer = newRpcServer(server, conf);
this.ipcServerAddress = ipcServer.getListenerAddress();
- addRaftClientProtocol(conf);
+ addRaftClientProtocol(server, conf);
LOG.info(getClass().getSimpleName() + " created RPC.Server at "
+ ipcServerAddress);
@@ -76,7 +75,8 @@ public class HadoopRpcService implements RaftServerRpc {
return ipcServerAddress;
}
- private RPC.Server newRpcServer(final Configuration conf) throws IOException {
+ private RPC.Server newRpcServer(RaftServerProtocol serverProtocol, final Configuration conf)
+ throws IOException {
final RaftServerConfigKeys.Get get = new RaftServerConfigKeys.Get() {
@Override
protected int getInt(String key, int defaultValue) {
@@ -94,7 +94,7 @@ public class HadoopRpcService implements RaftServerRpc {
final BlockingService service
= RaftServerProtocolService.newReflectiveBlockingService(
- new RaftServerProtocolServerSideTranslatorPB(raftService));
+ new RaftServerProtocolServerSideTranslatorPB(serverProtocol));
RPC.setProtocolEngine(conf, RaftServerProtocolPB.class, ProtobufRpcEngineShaded.class);
return new RPC.Builder(conf)
.setProtocol(RaftServerProtocolPB.class)
@@ -106,13 +106,13 @@ public class HadoopRpcService implements RaftServerRpc {
.build();
}
- private void addRaftClientProtocol(Configuration conf) {
+ private void addRaftClientProtocol(RaftClientProtocol clientProtocol, Configuration conf) {
final Class<?> protocol = RaftClientProtocolPB.class;
RPC.setProtocolEngine(conf,protocol, ProtobufRpcEngineShaded.class);
final BlockingService service
= RaftClientProtocolService.newReflectiveBlockingService(
- new RaftClientProtocolServerSideTranslatorPB(raftService));
+ new RaftClientProtocolServerSideTranslatorPB(clientProtocol));
ipcServer.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
index 08e379a..50833fb 100644
--- a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
+++ b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
@@ -31,7 +31,6 @@ import org.apache.raft.protocol.RaftClientReply;
import org.apache.raft.protocol.RaftPeer;
import org.apache.raft.server.RaftServer;
import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.server.impl.RequestDispatcher;
import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
@@ -56,7 +55,7 @@ public final class NettyRpcService implements RaftServerRpc {
public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest";
private final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName());
- private final RequestDispatcher raftService;
+ private final RaftServer server;
private final String id;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
@@ -76,7 +75,7 @@ public final class NettyRpcService implements RaftServerRpc {
/** Constructs a netty server with the given port. */
public NettyRpcService(int port, RaftServer server) {
- this.raftService = new RequestDispatcher(server);
+ this.server = server;
this.id = server.getId();
final ChannelInitializer<SocketChannel> initializer
@@ -134,7 +133,7 @@ public final class NettyRpcService implements RaftServerRpc {
case REQUESTVOTEREQUEST: {
final RequestVoteRequestProto request = proto.getRequestVoteRequest();
rpcRequest = request.getServerRequest();
- final RequestVoteReplyProto reply = raftService.requestVote(request);
+ final RequestVoteReplyProto reply = server.requestVote(request);
return RaftNettyServerReplyProto.newBuilder()
.setRequestVoteReply(reply)
.build();
@@ -142,7 +141,7 @@ public final class NettyRpcService implements RaftServerRpc {
case APPENDENTRIESREQUEST: {
final AppendEntriesRequestProto request = proto.getAppendEntriesRequest();
rpcRequest = request.getServerRequest();
- final AppendEntriesReplyProto reply = raftService.appendEntries(request);
+ final AppendEntriesReplyProto reply = server.appendEntries(request);
return RaftNettyServerReplyProto.newBuilder()
.setAppendEntriesReply(reply)
.build();
@@ -150,7 +149,7 @@ public final class NettyRpcService implements RaftServerRpc {
case INSTALLSNAPSHOTREQUEST: {
final InstallSnapshotRequestProto request = proto.getInstallSnapshotRequest();
rpcRequest = request.getServerRequest();
- final InstallSnapshotReplyProto reply = raftService.installSnapshot(request);
+ final InstallSnapshotReplyProto reply = server.installSnapshot(request);
return RaftNettyServerReplyProto.newBuilder()
.setInstallSnapshotReply(reply)
.build();
@@ -158,7 +157,7 @@ public final class NettyRpcService implements RaftServerRpc {
case RAFTCLIENTREQUEST: {
final RaftClientRequestProto request = proto.getRaftClientRequest();
rpcRequest = request.getRpcRequest();
- final RaftClientReply reply = raftService.submitClientRequest(
+ final RaftClientReply reply = server.submitClientRequest(
ClientProtoUtils.toRaftClientRequest(request));
return RaftNettyServerReplyProto.newBuilder()
.setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply))
@@ -167,7 +166,7 @@ public final class NettyRpcService implements RaftServerRpc {
case SETCONFIGURATIONREQUEST: {
final SetConfigurationRequestProto request = proto.getSetConfigurationRequest();
rpcRequest = request.getRpcRequest();
- final RaftClientReply reply = raftService.setConfiguration(
+ final RaftClientReply reply = server.setConfiguration(
ClientProtoUtils.toSetConfigurationRequest(request));
return RaftNettyServerReplyProto.newBuilder()
.setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply))
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-server/src/main/java/org/apache/raft/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java b/raft-server/src/main/java/org/apache/raft/server/RaftServer.java
index 7141eca..aa4dfbf 100644
--- a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java
+++ b/raft-server/src/main/java/org/apache/raft/server/RaftServer.java
@@ -17,12 +17,15 @@
*/
package org.apache.raft.server;
+import org.apache.raft.protocol.RaftClientAsynchronousProtocol;
+import org.apache.raft.protocol.RaftClientProtocol;
import org.apache.raft.server.protocol.RaftServerProtocol;
import java.io.Closeable;
/** Raft server interface */
-public interface RaftServer extends RaftServerProtocol, Closeable {
+public interface RaftServer extends Closeable, RaftServerProtocol,
+ RaftClientProtocol, RaftClientAsynchronousProtocol {
/** @return the server ID. */
String getId();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
index 3026afa..1ea40f6 100644
--- a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
@@ -44,6 +44,7 @@ import java.util.Collection;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
import static org.apache.raft.util.LifeCycle.State.*;
@@ -146,6 +147,7 @@ public class RaftServerImpl implements RaftServer {
return serverRpc;
}
+ @Override
public void start() {
lifeCycle.transition(STARTING);
state.start();
@@ -186,11 +188,12 @@ public class RaftServerImpl implements RaftServer {
return this.state;
}
+ @Override
public String getId() {
return getState().getSelfId();
}
- public RaftConfiguration getRaftConf() {
+ RaftConfiguration getRaftConf() {
return getState().getRaftConf();
}
@@ -323,7 +326,7 @@ public class RaftServerImpl implements RaftServer {
/**
* @return null if the server is in leader state.
*/
- CompletableFuture<RaftClientReply> checkLeaderState(
+ private CompletableFuture<RaftClientReply> checkLeaderState(
RaftClientRequest request) {
if (!isLeader()) {
NotLeaderException exception = generateNotLeaderException();
@@ -355,7 +358,7 @@ public class RaftServerImpl implements RaftServer {
/**
* Handle a normal update request from client.
*/
- public CompletableFuture<RaftClientReply> appendTransaction(
+ private CompletableFuture<RaftClientReply> appendTransaction(
RaftClientRequest request, TransactionContext entry)
throws RaftException {
LOG.debug("{}: receive client request({})", getId(), request);
@@ -384,10 +387,71 @@ public class RaftServerImpl implements RaftServer {
return pending.getFuture();
}
+ @Override
+ public CompletableFuture<RaftClientReply> submitClientRequestAsync(
+ RaftClientRequest request) throws IOException {
+ // first check the server's leader state
+ CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+ if (reply != null) {
+ return reply;
+ }
+
+ // let the state machine handle read-only request from client
+ if (request.isReadOnly()) {
+ // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper,
+ // section 8 (last part)
+ return stateMachine.query(request);
+ }
+
+ // TODO: this client request will not be added to pending requests
+ // until later which means that any failure in between will leave partial state in the
+ // state machine. We should call cancelTransaction() for failed requests
+ TransactionContext entry = stateMachine.startTransaction(request);
+ if (entry.getException().isPresent()) {
+ throw RaftUtils.asIOException(entry.getException().get());
+ }
+
+ return appendTransaction(request, entry);
+ }
+
+ @Override
+ public RaftClientReply submitClientRequest(RaftClientRequest request)
+ throws IOException {
+ return waitForReply(getId(), request, submitClientRequestAsync(request));
+ }
+
+ private static RaftClientReply waitForReply(String id, RaftClientRequest request,
+ CompletableFuture<RaftClientReply> future) throws IOException {
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ final String s = id + ": Interrupted when waiting for reply, request=" + request;
+ LOG.info(s, e);
+ throw RaftUtils.toInterruptedIOException(s, e);
+ } catch (ExecutionException e) {
+ final Throwable cause = e.getCause();
+ if (cause == null) {
+ throw new IOException(e);
+ }
+ if (cause instanceof NotLeaderException) {
+ return new RaftClientReply(request, (NotLeaderException)cause);
+ } else {
+ throw RaftUtils.asIOException(cause);
+ }
+ }
+ }
+
+ @Override
+ public RaftClientReply setConfiguration(SetConfigurationRequest request)
+ throws IOException {
+ return waitForReply(getId(), request, setConfigurationAsync(request));
+ }
+
/**
* Handle a raft configuration change request from client.
*/
- public CompletableFuture<RaftClientReply> setConfiguration(
+ @Override
+ public CompletableFuture<RaftClientReply> setConfigurationAsync(
SetConfigurationRequest request) throws IOException {
LOG.debug("{}: receive setConfiguration({})", getId(), request);
lifeCycle.assertCurrentState(RUNNING);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java
deleted file mode 100644
index 39a4ac8..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.protocol.*;
-import org.apache.raft.server.RaftServer;
-import org.apache.raft.server.protocol.RaftServerProtocol;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-/**
- * Each RPC request is first handled by the RequestDispatcher:
- * 1. A request from another RaftPeer is to be handled by RaftServer.
- *
- * If the raft peer is the leader, then:
- *
- * 2. A read-only request from client is to be handled by the state machine.
- * 3. A write request from client is first validated by the state machine. The
- * state machine returns the content of the raft log entry, which is then passed
- * to the RaftServer for replication.
- */
-public class RequestDispatcher implements RaftClientProtocol, RaftServerProtocol {
- static final Logger LOG = LoggerFactory.getLogger(RequestDispatcher.class);
-
- private final RaftServerImpl server;
- private final StateMachine stateMachine;
-
- public RequestDispatcher(RaftServer server) {
- Preconditions.checkArgument(server instanceof RaftServerImpl);
- this.server = (RaftServerImpl)server;
- this.stateMachine = this.server.getStateMachine();
- }
-
- public CompletableFuture<RaftClientReply> handleClientRequest(
- RaftClientRequest request) throws IOException {
- // first check the server's leader state
- CompletableFuture<RaftClientReply> reply = server.checkLeaderState(request);
- if (reply != null) {
- return reply;
- }
-
- // let the state machine handle read-only request from client
- if (request.isReadOnly()) {
- // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper,
- // section 8 (last part)
- return stateMachine.query(request);
- }
-
- // TODO: this client request will not be added to pending requests
- // until later which means that any failure in between will leave partial state in the
- // state machine. We should call cancelTransaction() for failed requests
- TransactionContext entry = stateMachine.startTransaction(request);
- if (entry.getException().isPresent()) {
- throw RaftUtils.asIOException(entry.getException().get());
- }
-
- return server.appendTransaction(request, entry);
- }
-
- @Override
- public RaftClientReply submitClientRequest(RaftClientRequest request)
- throws IOException {
- return waitForReply(server.getId(), request, handleClientRequest(request));
- }
-
- public CompletableFuture<RaftClientReply> setConfigurationAsync(
- SetConfigurationRequest request) throws IOException {
- return server.setConfiguration(request);
- }
-
- @Override
- public RaftClientReply setConfiguration(SetConfigurationRequest request)
- throws IOException {
- return waitForReply(server.getId(), request, setConfigurationAsync(request));
- }
-
- private static RaftClientReply waitForReply(String serverId,
- RaftClientRequest request, CompletableFuture<RaftClientReply> future)
- throws IOException {
- try {
- return future.get();
- } catch (InterruptedException e) {
- final String s = serverId + ": Interrupted when waiting for reply, request=" + request;
- LOG.info(s, e);
- throw RaftUtils.toInterruptedIOException(s, e);
- } catch (ExecutionException e) {
- final Throwable cause = e.getCause();
- if (cause == null) {
- throw new IOException(e);
- }
- if (cause instanceof NotLeaderException) {
- return new RaftClientReply(request, (NotLeaderException)cause);
- } else {
- throw RaftUtils.asIOException(cause);
- }
- }
- }
-
- @Override
- public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
- throws IOException {
- return server.requestVote(request);
- }
-
- @Override
- public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request)
- throws IOException {
- return server.appendEntries(request);
- }
-
- @Override
- public InstallSnapshotReplyProto installSnapshot(
- InstallSnapshotRequestProto request) throws IOException {
- return server.installSnapshot(request);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
index 8a7e752..799ee65 100644
--- a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
+++ b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
@@ -18,13 +18,10 @@
package org.apache.raft.server.simulation;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.protocol.SetConfigurationRequest;
-import org.apache.raft.server.impl.RaftServerImpl;
+import org.apache.raft.protocol.*;
import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.server.impl.RequestDispatcher;
+import org.apache.raft.server.impl.RaftServerImpl;
+import org.apache.raft.server.protocol.RaftServerProtocol;
import org.apache.raft.shaded.proto.RaftProtos.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +37,6 @@ class SimulatedServerRpc implements RaftServerRpc {
static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class);
private final RaftServerImpl server;
- private final RequestDispatcher dispatcher;
private final RequestHandler<RaftServerRequest, RaftServerReply> serverHandler;
private final RequestHandler<RaftClientRequest, RaftClientReply> clientHandler;
private final ExecutorService executor = Executors.newFixedThreadPool(3,
@@ -50,7 +46,6 @@ class SimulatedServerRpc implements RaftServerRpc {
SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply,
SimulatedRequestReply<RaftClientRequest, RaftClientReply> clientRequestReply) {
this.server = server;
- this.dispatcher = new RequestDispatcher(server);
this.serverHandler = new RequestHandler<>(server.getId(),
"serverHandler", serverRequestReply, serverHandlerImpl, 3);
this.clientHandler = new RequestHandler<>(server.getId(),
@@ -125,13 +120,11 @@ class SimulatedServerRpc implements RaftServerRpc {
public RaftServerReply handleRequest(RaftServerRequest r)
throws IOException {
if (r.isAppendEntries()) {
- return new RaftServerReply(
- dispatcher.appendEntries(r.getAppendEntries()));
+ return new RaftServerReply(server.appendEntries(r.getAppendEntries()));
} else if (r.isRequestVote()) {
- return new RaftServerReply(dispatcher.requestVote(r.getRequestVote()));
+ return new RaftServerReply(server.requestVote(r.getRequestVote()));
} else if (r.isInstallSnapshot()) {
- return new RaftServerReply(
- dispatcher.installSnapshot(r.getInstallSnapshot()));
+ return new RaftServerReply(server.installSnapshot(r.getInstallSnapshot()));
} else {
throw new IllegalStateException("unexpected state");
}
@@ -150,9 +143,9 @@ class SimulatedServerRpc implements RaftServerRpc {
throws IOException {
final CompletableFuture<RaftClientReply> future;
if (request instanceof SetConfigurationRequest) {
- future = dispatcher.setConfigurationAsync((SetConfigurationRequest) request);
+ future = server.setConfigurationAsync((SetConfigurationRequest) request);
} else {
- future = dispatcher.handleClientRequest(request);
+ future = server.submitClientRequestAsync(request);
}
future.whenCompleteAsync((reply, exception) -> {