You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by li...@apache.org on 2017/02/28 23:56:14 UTC
incubator-ratis git commit: RATIS-32. Rename RaftClientRequestSender
to RaftClientRpc. Contributed by Tsz Wo Nicholas Sze
Repository: incubator-ratis
Updated Branches:
refs/heads/master 392fa5141 -> fdbbdf98c
RATIS-32. Rename RaftClientRequestSender to RaftClientRpc. Contributed by Tsz Wo Nicholas Sze
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/fdbbdf98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/fdbbdf98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/fdbbdf98
Branch: refs/heads/master
Commit: fdbbdf98c0d5db82ba6fc2e78893bfd006040abe
Parents: 392fa51
Author: Mingliang Liu <li...@apache.org>
Authored: Tue Feb 28 15:46:18 2017 -0800
Committer: Mingliang Liu <li...@apache.org>
Committed: Tue Feb 28 15:46:18 2017 -0800
----------------------------------------------------------------------
.../org/apache/ratis/client/ClientFactory.java | 4 +-
.../org/apache/ratis/client/RaftClient.java | 16 +--
.../ratis/client/RaftClientRequestSender.java | 34 ------
.../org/apache/ratis/client/RaftClientRpc.java | 34 ++++++
.../ratis/client/impl/ClientImplUtils.java | 11 +-
.../ratis/client/impl/RaftClientImpl.java | 20 ++--
.../java/org/apache/ratis/grpc/GrpcFactory.java | 6 +-
.../apache/ratis/grpc/client/GrpcClientRpc.java | 111 +++++++++++++++++++
.../grpc/client/RaftClientSenderWithGrpc.java | 111 -------------------
.../apache/ratis/hadooprpc/HadoopFactory.java | 6 +-
.../client/HadoopClientRequestSender.java | 68 ------------
.../ratis/hadooprpc/client/HadoopClientRpc.java | 68 ++++++++++++
.../org/apache/ratis/netty/NettyFactory.java | 6 +-
.../netty/client/NettyClientRequestSender.java | 64 -----------
.../ratis/netty/client/NettyClientRpc.java | 64 +++++++++++
.../java/org/apache/ratis/MiniRaftCluster.java | 2 +-
.../ratis/RaftNotLeaderExceptionBaseTest.java | 6 +-
.../impl/RaftReconfigurationBaseTest.java | 10 +-
.../MiniRaftClusterWithSimulatedRpc.java | 4 +-
.../simulation/SimulatedClientRequestReply.java | 41 -------
.../server/simulation/SimulatedClientRpc.java | 41 +++++++
.../ratis/server/simulation/SimulatedRpc.java | 6 +-
22 files changed, 366 insertions(+), 367 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java b/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java
index b775319..6407ba8 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java
@@ -30,6 +30,6 @@ public interface ClientFactory extends RpcFactory {
+ "; rpc type is " + rpcFactory.getRpcType());
}
- /** Create a {@link RaftClientRequestSender}. */
- RaftClientRequestSender newRaftClientRequestSender();
+ /** Create a {@link RaftClientRpc}. */
+ RaftClientRpc newRaftClientRpc();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 4f86d40..8fbffd3 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -36,8 +36,8 @@ public interface RaftClient extends Closeable {
/** @return the id of this client. */
ClientId getId();
- /** @return the request sender of this client. */
- RaftClientRequestSender getRequestSender();
+ /** @return the client rpct. */
+ RaftClientRpc getClientRpc();
/**
* Send the given message to the raft service.
@@ -60,7 +60,7 @@ public interface RaftClient extends Closeable {
/** To build {@link RaftClient} objects. */
class Builder {
private ClientId clientId;
- private RaftClientRequestSender requestSender;
+ private RaftClientRpc clientRpc;
private Collection<RaftPeer> servers;
private RaftPeerId leaderId;
private RaftProperties properties;
@@ -79,9 +79,9 @@ public interface RaftClient extends Closeable {
RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT);
}
return ClientImplUtils.newRaftClient(clientId,
- Objects.requireNonNull(servers, "The 'server' field is not initialized."),
+ Objects.requireNonNull(servers, "The 'servers' field is not initialized."),
leaderId,
- Objects.requireNonNull(requestSender, "The 'requestSender' field is not initialized."),
+ Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not initialized."),
retryInterval);
}
@@ -103,9 +103,9 @@ public interface RaftClient extends Closeable {
return this;
}
- /** Set {@link RaftClientRequestSender}. */
- public Builder setRequestSender(RaftClientRequestSender requestSender) {
- this.requestSender = requestSender;
+ /** Set {@link RaftClientRpc}. */
+ public Builder setClientRpc(RaftClientRpc clientRpc) {
+ this.clientRpc = clientRpc;
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRequestSender.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRequestSender.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRequestSender.java
deleted file mode 100644
index b2541e1..0000000
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRequestSender.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.client;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftPeer;
-
-/** Send requests to a raft service. */
-public interface RaftClientRequestSender extends Closeable {
- /** Send a request. */
- RaftClientReply sendRequest(RaftClientRequest request) throws IOException;
-
- /** Add the information of the given raft servers */
- void addServers(Iterable<RaftPeer> servers);
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
new file mode 100644
index 0000000..ca1864b
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+
+/** The client side rpc of a raft service. */
+public interface RaftClientRpc extends Closeable {
+ /** Send a request. */
+ RaftClientReply sendRequest(RaftClientRequest request) throws IOException;
+
+ /** Add the information of the given raft servers */
+ void addServers(Iterable<RaftPeer> servers);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
index 134e610..85901db 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
@@ -18,7 +18,7 @@
package org.apache.ratis.client.impl;
import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
@@ -27,10 +27,9 @@ import java.util.Collection;
/** Client utilities for internal use. */
public class ClientImplUtils {
- public static RaftClient newRaftClient(ClientId clientId,
- Collection<RaftPeer> peers, RaftPeerId leaderId,
- RaftClientRequestSender requestSender, int retryInterval) {
- return new RaftClientImpl(clientId, peers, leaderId, requestSender,
- retryInterval);
+ public static RaftClient newRaftClient(
+ ClientId clientId, Collection<RaftPeer> peers, RaftPeerId leaderId,
+ RaftClientRpc clientRpc, int retryInterval) {
+ return new RaftClientImpl(clientId, peers, leaderId, clientRpc, retryInterval);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index ff49ab6..3a6fd58 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -18,7 +18,7 @@
package org.apache.ratis.client.impl;
import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.protocol.*;
import org.apache.ratis.util.RaftUtils;
@@ -31,22 +31,22 @@ import java.util.function.Supplier;
/** A client who sends requests to a raft service. */
final class RaftClientImpl implements RaftClient {
private final ClientId clientId;
- private final RaftClientRequestSender requestSender;
+ private final RaftClientRpc clientRpc;
private final Collection<RaftPeer> peers;
private final int retryInterval;
private volatile RaftPeerId leaderId;
RaftClientImpl(ClientId clientId, Collection<RaftPeer> peers,
- RaftPeerId leaderId, RaftClientRequestSender requestSender,
+ RaftPeerId leaderId, RaftClientRpc clientRpc,
int retryInterval) {
this.clientId = clientId;
- this.requestSender = requestSender;
+ this.clientRpc = clientRpc;
this.peers = peers;
this.leaderId = leaderId != null? leaderId : peers.iterator().next().getId();
this.retryInterval = retryInterval;
- requestSender.addServers(peers);
+ clientRpc.addServers(peers);
}
@Override
@@ -102,7 +102,7 @@ final class RaftClientImpl implements RaftClient {
private RaftClientReply sendRequest(RaftClientRequest request)
throws StateMachineException {
try {
- RaftClientReply reply = requestSender.sendRequest(request);
+ RaftClientReply reply = clientRpc.sendRequest(request);
if (reply.isNotLeader()) {
handleNotLeaderException(request, reply.getNotLeaderException());
return null;
@@ -131,7 +131,7 @@ final class RaftClientImpl implements RaftClient {
peers.clear();
peers.addAll(newPeers);
// also refresh the rpc proxies for these peers
- requestSender.addServers(newPeers);
+ clientRpc.addServers(newPeers);
}
}
@@ -149,12 +149,12 @@ final class RaftClientImpl implements RaftClient {
}
@Override
- public RaftClientRequestSender getRequestSender() {
- return requestSender;
+ public RaftClientRpc getClientRpc() {
+ return clientRpc;
}
@Override
public void close() throws IOException {
- requestSender.close();
+ clientRpc.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index df69490..3ae2602 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -18,7 +18,7 @@
package org.apache.ratis.grpc;
import org.apache.ratis.client.ClientFactory;
-import org.apache.ratis.grpc.client.RaftClientSenderWithGrpc;
+import org.apache.ratis.grpc.client.GrpcClientRpc;
import org.apache.ratis.grpc.server.GRpcLogAppender;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.impl.*;
@@ -43,7 +43,7 @@ public class GrpcFactory implements ServerFactory, ClientFactory {
}
@Override
- public RaftClientSenderWithGrpc newRaftClientRequestSender() {
- return new RaftClientSenderWithGrpc();
+ public GrpcClientRpc newRaftClientRpc() {
+ return new GrpcClientRpc();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
new file mode 100644
index 0000000..3f7343a
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.ratis.client.impl.ClientProtoUtils.*;
+
+public class GrpcClientRpc implements RaftClientRpc {
+ public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class);
+
+ private final PeerProxyMap<RaftClientProtocolClient> proxies
+ = new PeerProxyMap<>(RaftClientProtocolClient::new);
+
+ @Override
+ public RaftClientReply sendRequest(RaftClientRequest request)
+ throws IOException {
+ final RaftPeerId serverId = request.getServerId();
+ final RaftClientProtocolClient proxy = proxies.getProxy(serverId);
+ if (request instanceof SetConfigurationRequest) {
+ SetConfigurationRequestProto setConf =
+ toSetConfigurationRequestProto((SetConfigurationRequest) request);
+ return toRaftClientReply(proxy.setConfiguration(setConf));
+ } else {
+ RaftClientRequestProto requestProto = toRaftClientRequestProto(request);
+ CompletableFuture<RaftClientReplyProto> replyFuture =
+ new CompletableFuture<>();
+ final StreamObserver<RaftClientRequestProto> requestObserver =
+ proxy.append(new StreamObserver<RaftClientReplyProto>() {
+ @Override
+ public void onNext(RaftClientReplyProto value) {
+ replyFuture.complete(value);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // This implementation is used as RaftClientRpc. Retry
+ // logic on Exception is in RaftClient.
+ final IOException e;
+ if (t instanceof StatusRuntimeException) {
+ e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t);
+ } else {
+ e = RaftUtils.asIOException(t);
+ }
+ replyFuture.completeExceptionally(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ if (!replyFuture.isDone()) {
+ replyFuture.completeExceptionally(
+ new IOException("No reply for request " + request));
+ }
+ }
+ });
+ requestObserver.onNext(requestProto);
+ requestObserver.onCompleted();
+
+ // TODO: timeout support
+ try {
+ return toRaftClientReply(replyFuture.get());
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(
+ "Interrupted while waiting for response of request " + request);
+ } catch (ExecutionException e) {
+ throw RaftUtils.toIOException(e);
+ }
+ }
+ }
+
+ @Override
+ public void addServers(Iterable<RaftPeer> servers) {
+ proxies.addPeers(servers);
+ }
+
+ @Override
+ public void close() throws IOException {
+ proxies.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java
deleted file mode 100644
index 9a0eca3..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.client.RaftClientRequestSender;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
-import org.apache.ratis.util.PeerProxyMap;
-import org.apache.ratis.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-import static org.apache.ratis.client.impl.ClientProtoUtils.*;
-
-public class RaftClientSenderWithGrpc implements RaftClientRequestSender {
- public static final Logger LOG = LoggerFactory.getLogger(RaftClientSenderWithGrpc.class);
-
- private final PeerProxyMap<RaftClientProtocolClient> proxies
- = new PeerProxyMap<>(RaftClientProtocolClient::new);
-
- @Override
- public RaftClientReply sendRequest(RaftClientRequest request)
- throws IOException {
- final RaftPeerId serverId = request.getServerId();
- final RaftClientProtocolClient proxy = proxies.getProxy(serverId);
- if (request instanceof SetConfigurationRequest) {
- SetConfigurationRequestProto setConf =
- toSetConfigurationRequestProto((SetConfigurationRequest) request);
- return toRaftClientReply(proxy.setConfiguration(setConf));
- } else {
- RaftClientRequestProto requestProto = toRaftClientRequestProto(request);
- CompletableFuture<RaftClientReplyProto> replyFuture =
- new CompletableFuture<>();
- final StreamObserver<RaftClientRequestProto> requestObserver =
- proxy.append(new StreamObserver<RaftClientReplyProto>() {
- @Override
- public void onNext(RaftClientReplyProto value) {
- replyFuture.complete(value);
- }
-
- @Override
- public void onError(Throwable t) {
- // This implementation is used as RaftClientRequestSender. Retry
- // logic on Exception is in RaftClient.
- final IOException e;
- if (t instanceof StatusRuntimeException) {
- e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t);
- } else {
- e = RaftUtils.asIOException(t);
- }
- replyFuture.completeExceptionally(e);
- }
-
- @Override
- public void onCompleted() {
- if (!replyFuture.isDone()) {
- replyFuture.completeExceptionally(
- new IOException("No reply for request " + request));
- }
- }
- });
- requestObserver.onNext(requestProto);
- requestObserver.onCompleted();
-
- // TODO: timeout support
- try {
- return toRaftClientReply(replyFuture.get());
- } catch (InterruptedException e) {
- throw new InterruptedIOException(
- "Interrupted while waiting for response of request " + request);
- } catch (ExecutionException e) {
- throw RaftUtils.toIOException(e);
- }
- }
- }
-
- @Override
- public void addServers(Iterable<RaftPeer> servers) {
- proxies.addPeers(servers);
- }
-
- @Override
- public void close() throws IOException {
- proxies.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
index 9ff493f..7b9e20f 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
@@ -19,7 +19,7 @@ package org.apache.ratis.hadooprpc;
import org.apache.hadoop.conf.Configuration;
import org.apache.ratis.client.ClientFactory;
-import org.apache.ratis.hadooprpc.client.HadoopClientRequestSender;
+import org.apache.ratis.hadooprpc.client.HadoopClientRpc;
import org.apache.ratis.hadooprpc.server.HadoopRpcService;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.impl.RaftServerImpl;
@@ -46,7 +46,7 @@ public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFa
}
@Override
- public HadoopClientRequestSender newRaftClientRequestSender() {
- return new HadoopClientRequestSender(conf);
+ public HadoopClientRpc newRaftClientRpc() {
+ return new HadoopClientRpc(conf);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
deleted file mode 100644
index 1a10dab..0000000
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.hadooprpc.client;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.ratis.client.RaftClientRequestSender;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.util.PeerProxyMap;
-
-import java.io.IOException;
-
-public class HadoopClientRequestSender implements RaftClientRequestSender {
-
- private final PeerProxyMap<RaftClientProtocolClientSideTranslatorPB> proxies;
-
- public HadoopClientRequestSender(final Configuration conf) {
- this.proxies = new PeerProxyMap<>(
- p -> new RaftClientProtocolClientSideTranslatorPB(p.getAddress(), conf));
- }
-
- @Override
- public RaftClientReply sendRequest(RaftClientRequest request)
- throws IOException {
- final RaftPeerId serverId = request.getServerId();
- final RaftClientProtocolClientSideTranslatorPB proxy =
- proxies.getProxy(serverId);
- try {
- if (request instanceof SetConfigurationRequest) {
- return proxy.setConfiguration((SetConfigurationRequest) request);
- } else {
- return proxy.submitClientRequest(request);
- }
- } catch (RemoteException e) {
- throw e.unwrapRemoteException(
- StateMachineException.class,
- ReconfigurationTimeoutException.class,
- ReconfigurationInProgressException.class,
- RaftException.class,
- LeaderNotReadyException.class);
- }
- }
-
- @Override
- public void addServers(Iterable<RaftPeer> servers) {
- proxies.addPeers(servers);
- }
-
- @Override
- public void close() {
- proxies.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
new file mode 100644
index 0000000..25c0ecd
--- /dev/null
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.util.PeerProxyMap;
+
+import java.io.IOException;
+
+public class HadoopClientRpc implements RaftClientRpc {
+
+ private final PeerProxyMap<RaftClientProtocolClientSideTranslatorPB> proxies;
+
+ public HadoopClientRpc(final Configuration conf) {
+ this.proxies = new PeerProxyMap<>(
+ p -> new RaftClientProtocolClientSideTranslatorPB(p.getAddress(), conf));
+ }
+
+ @Override
+ public RaftClientReply sendRequest(RaftClientRequest request)
+ throws IOException {
+ final RaftPeerId serverId = request.getServerId();
+ final RaftClientProtocolClientSideTranslatorPB proxy =
+ proxies.getProxy(serverId);
+ try {
+ if (request instanceof SetConfigurationRequest) {
+ return proxy.setConfiguration((SetConfigurationRequest) request);
+ } else {
+ return proxy.submitClientRequest(request);
+ }
+ } catch (RemoteException e) {
+ throw e.unwrapRemoteException(
+ StateMachineException.class,
+ ReconfigurationTimeoutException.class,
+ ReconfigurationInProgressException.class,
+ RaftException.class,
+ LeaderNotReadyException.class);
+ }
+ }
+
+ @Override
+ public void addServers(Iterable<RaftPeer> servers) {
+ proxies.addPeers(servers);
+ }
+
+ @Override
+ public void close() {
+ proxies.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
index fb27eaa..525b991 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
@@ -18,7 +18,7 @@
package org.apache.ratis.netty;
import org.apache.ratis.client.ClientFactory;
-import org.apache.ratis.netty.client.NettyClientRequestSender;
+import org.apache.ratis.netty.client.NettyClientRpc;
import org.apache.ratis.netty.server.NettyRpcService;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.impl.RaftServerImpl;
@@ -36,7 +36,7 @@ public class NettyFactory extends ServerFactory.BaseFactory implements ClientFac
}
@Override
- public NettyClientRequestSender newRaftClientRequestSender() {
- return new NettyClientRequestSender();
+ public NettyClientRpc newRaftClientRpc() {
+ return new NettyClientRpc();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java
deleted file mode 100644
index 5b36fde..0000000
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.netty.client;
-
-import org.apache.ratis.client.RaftClientRequestSender;
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.netty.NettyRpcProxy;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
-import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto;
-
-import java.io.IOException;
-
-public class NettyClientRequestSender implements RaftClientRequestSender {
- private final NettyRpcProxy.PeerMap proxies = new NettyRpcProxy.PeerMap();
-
- @Override
- public RaftClientReply sendRequest(RaftClientRequest request) throws IOException {
- final RaftPeerId serverId = request.getServerId();
- final NettyRpcProxy proxy = proxies.getProxy(serverId);
-
- final RaftNettyServerRequestProto.Builder b = RaftNettyServerRequestProto.newBuilder();
- final RaftRpcRequestProto rpcRequest;
- if (request instanceof SetConfigurationRequest) {
- final SetConfigurationRequestProto proto = ClientProtoUtils.toSetConfigurationRequestProto(
- (SetConfigurationRequest)request);
- b.setSetConfigurationRequest(proto);
- rpcRequest = proto.getRpcRequest();
- } else {
- final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request);
- b.setRaftClientRequest(proto);
- rpcRequest = proto.getRpcRequest();
- }
- return ClientProtoUtils.toRaftClientReply(
- proxy.send(rpcRequest, b.build()).getRaftClientReply());
- }
-
- @Override
- public void addServers(Iterable<RaftPeer> servers) {
- proxies.addPeers(servers);
- }
-
- @Override
- public void close() {
- proxies.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
new file mode 100644
index 0000000..74afddc
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty.client;
+
+import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.netty.NettyRpcProxy;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+
+import java.io.IOException;
+
+public class NettyClientRpc implements RaftClientRpc {
+ private final NettyRpcProxy.PeerMap proxies = new NettyRpcProxy.PeerMap();
+
+ @Override
+ public RaftClientReply sendRequest(RaftClientRequest request) throws IOException {
+ final RaftPeerId serverId = request.getServerId();
+ final NettyRpcProxy proxy = proxies.getProxy(serverId);
+
+ final RaftNettyServerRequestProto.Builder b = RaftNettyServerRequestProto.newBuilder();
+ final RaftRpcRequestProto rpcRequest;
+ if (request instanceof SetConfigurationRequest) {
+ final SetConfigurationRequestProto proto = ClientProtoUtils.toSetConfigurationRequestProto(
+ (SetConfigurationRequest)request);
+ b.setSetConfigurationRequest(proto);
+ rpcRequest = proto.getRpcRequest();
+ } else {
+ final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request);
+ b.setRaftClientRequest(proto);
+ rpcRequest = proto.getRpcRequest();
+ }
+ return ClientProtoUtils.toRaftClientReply(
+ proxy.send(rpcRequest, b.build()).getRaftClientReply());
+ }
+
+ @Override
+ public void addServers(Iterable<RaftPeer> servers) {
+ proxies.addPeers(servers);
+ }
+
+ @Override
+ public void close() {
+ proxies.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 751854c..9c566a9 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -394,7 +394,7 @@ public abstract class MiniRaftCluster {
return RaftClient.newBuilder()
.setServers(conf.getPeers())
.setLeaderId(leaderId)
- .setRequestSender(clientFactory.newRaftClientRequestSender())
+ .setClientRpc(clientFactory.newRaftClientRpc())
.setProperties(properties)
.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
index fcb0bc2..a5d1127 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
@@ -20,7 +20,7 @@ package org.apache.ratis;
import org.apache.log4j.Level;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;
@@ -87,7 +87,7 @@ public abstract class RaftNotLeaderExceptionBaseTest {
RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, leaderId);
Assert.assertNotEquals(leaderId, newLeader);
- RaftClientRequestSender rpc = client.getRequestSender();
+ RaftClientRpc rpc = client.getClientRpc();
reply= null;
for (int i = 0; reply == null && i < 10; i++) {
try {
@@ -133,7 +133,7 @@ public abstract class RaftNotLeaderExceptionBaseTest {
}
LOG.info(cluster.printServers());
- RaftClientRequestSender rpc = client.getRequestSender();
+ RaftClientRpc rpc = client.getClientRpc();
RaftClientReply reply = null;
// it is possible that the remote peer's rpc server is not ready. need retry
for (int i = 0; reply == null && i < 10; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 5b46af8..3017634 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -23,7 +23,7 @@ import org.apache.ratis.MiniRaftCluster.PeerChanges;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -253,7 +253,7 @@ public abstract class RaftReconfigurationBaseTest {
asList(c1.allPeersInNewConf));
Assert.assertFalse(cluster.getLeader().getRaftConf().isTransitional());
- final RaftClientRequestSender sender = client.getRequestSender();
+ final RaftClientRpc sender = client.getClientRpc();
final SetConfigurationRequest request = new SetConfigurationRequest(
client.getId(), leaderId, DEFAULT_SEQNUM, c1.allPeersInNewConf);
try {
@@ -470,7 +470,7 @@ public abstract class RaftReconfigurationBaseTest {
try(final RaftClient client2 = cluster.createClient(leaderId)) {
latch.await();
LOG.info("client2 starts to change conf");
- final RaftClientRequestSender sender2 = client2.getRequestSender();
+ final RaftClientRpc sender2 = client2.getClientRpc();
sender2.sendRequest(new SetConfigurationRequest(
client2.getId(), leaderId, DEFAULT_SEQNUM, peersInRequest2));
} catch (ReconfigurationInProgressException e) {
@@ -534,7 +534,7 @@ public abstract class RaftReconfigurationBaseTest {
new Thread(() -> {
try(final RaftClient client = cluster.createClient(leaderId)) {
LOG.info("client starts to change conf");
- final RaftClientRequestSender sender = client.getRequestSender();
+ final RaftClientRpc sender = client.getClientRpc();
RaftClientReply reply = sender.sendRequest(new SetConfigurationRequest(
client.getId(), leaderId, DEFAULT_SEQNUM, change.allPeersInNewConf));
if (reply.isNotLeader()) {
@@ -595,7 +595,7 @@ public abstract class RaftReconfigurationBaseTest {
AtomicBoolean success = new AtomicBoolean(false);
new Thread(() -> {
final RaftClient client = cluster.createClient(leaderId);
- final RaftClientRequestSender sender = client.getRequestSender();
+ final RaftClientRpc sender = client.getClientRpc();
final RaftClientRequest request = new RaftClientRequest(client.getId(),
leaderId, 0, new SimpleMessage("test"));
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
index 11c4c0a..e33d64f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
@@ -49,7 +49,7 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster {
};
private SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply;
- private SimulatedClientRequestReply client2serverRequestReply;
+ private SimulatedClientRpc client2serverRequestReply;
private MiniRaftClusterWithSimulatedRpc(String[] ids,
RaftProperties properties, boolean formatted) {
@@ -65,7 +65,7 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster {
LOG.info(SimulatedRequestReply.SIMULATE_LATENCY_KEY + " = "
+ simulateLatencyMs);
serverRequestReply = new SimulatedRequestReply<>(simulateLatencyMs);
- client2serverRequestReply = new SimulatedClientRequestReply(simulateLatencyMs);
+ client2serverRequestReply = new SimulatedClientRpc(simulateLatencyMs);
getServers().stream().forEach(s -> initRpc(s));
addPeersToRpc(toRaftPeers(getServers()));
((SimulatedRpc.Factory)clientFactory).initRpc(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java
deleted file mode 100644
index 65fe7ad..0000000
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.server.simulation;
-
-import org.apache.ratis.client.RaftClientRequestSender;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftPeer;
-
-class SimulatedClientRequestReply
- extends SimulatedRequestReply<RaftClientRequest, RaftClientReply>
- implements RaftClientRequestSender {
- SimulatedClientRequestReply(int simulateLatencyMs) {
- super(simulateLatencyMs);
- }
-
- @Override
- public void addServers(Iterable<RaftPeer> servers) {
- // do nothing
- }
-
- @Override
- public void close() {
- // do nothing
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
new file mode 100644
index 0000000..a62ec16
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+
+class SimulatedClientRpc
+ extends SimulatedRequestReply<RaftClientRequest, RaftClientReply>
+ implements RaftClientRpc {
+ SimulatedClientRpc(int simulateLatencyMs) {
+ super(simulateLatencyMs);
+ }
+
+ @Override
+ public void addServers(Iterable<RaftPeer> servers) {
+ // do nothing
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fdbbdf98/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
index 9d855c3..67193b4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
@@ -41,11 +41,11 @@ class SimulatedRpc implements RpcType {
static class Factory extends ServerFactory.BaseFactory implements ClientFactory {
private SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply;
- private SimulatedClientRequestReply client2serverRequestReply;
+ private SimulatedClientRpc client2serverRequestReply;
public void initRpc(
SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply,
- SimulatedClientRequestReply client2serverRequestReply) {
+ SimulatedClientRpc client2serverRequestReply) {
this.serverRequestReply = Objects.requireNonNull(serverRequestReply);
this.client2serverRequestReply = Objects.requireNonNull(client2serverRequestReply);
}
@@ -58,7 +58,7 @@ class SimulatedRpc implements RpcType {
}
@Override
- public SimulatedClientRequestReply newRaftClientRequestSender() {
+ public SimulatedClientRpc newRaftClientRpc() {
return Objects.requireNonNull(client2serverRequestReply);
}