You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/04/13 01:45:55 UTC
incubator-ratis git commit: RATIS-219. Add configuration for timeout
duration. Contributed by Lokesh Jain
Repository: incubator-ratis
Updated Branches:
refs/heads/master e37ab2ee1 -> 6c97d0603
RATIS-219. Add configuration for timeout duration. Contributed by Lokesh Jain
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/6c97d060
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/6c97d060
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/6c97d060
Branch: refs/heads/master
Commit: 6c97d0603ae8c12776b87b419ac5e66adafcccc5
Parents: e37ab2e
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Fri Apr 13 09:45:14 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Fri Apr 13 09:45:14 2018 +0800
----------------------------------------------------------------------
.../ratis/client/RaftClientConfigKeys.java | 23 ++++++++------
.../ratis/client/impl/RaftClientImpl.java | 2 +-
.../org/apache/ratis/grpc/GrpcConfigKeys.java | 2 +-
.../org/apache/ratis/grpc/RaftGRpcService.java | 10 ++++--
.../ratis/grpc/client/AppendStreamer.java | 2 +-
.../apache/ratis/grpc/client/GrpcClientRpc.java | 3 +-
.../grpc/client/RaftClientProtocolClient.java | 33 +++++++++++++-------
.../grpc/client/RaftClientProtocolProxy.java | 9 +++---
.../ratis/grpc/server/GRpcLogAppender.java | 7 +++--
.../grpc/server/RaftServerProtocolClient.java | 17 +++++-----
.../ratis/server/RaftServerConfigKeys.java | 10 ++++++
11 files changed, 73 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index bb76910..cc4e5ec 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -30,12 +30,21 @@ public interface RaftClientConfigKeys {
interface Rpc {
String PREFIX = RaftClientConfigKeys.PREFIX + ".rpc";
- String TIMEOUT_KEY = PREFIX + ".timeout";
- TimeDuration TIMEOUT_DEFAULT = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS);
+ String RETRY_INTERVAL_KEY = PREFIX + ".retryInterval";
+ TimeDuration RETRY_INTERVAL_DEFAULT = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS);
+ static TimeDuration retryInterval(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()),
+ RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT);
+ }
- static TimeDuration timeout(RaftProperties properties) {
- return getTimeDuration(properties.getTimeDuration(TIMEOUT_DEFAULT.getUnit()),
- TIMEOUT_KEY, TIMEOUT_DEFAULT);
+ String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout";
+ TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
+ static TimeDuration requestTimeout(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(REQUEST_TIMEOUT_DEFAULT.getUnit()),
+ REQUEST_TIMEOUT_KEY, REQUEST_TIMEOUT_DEFAULT);
+ }
+ static void setRequestTimeout(RaftProperties properties, TimeDuration timeoutDuration) {
+ setTimeDuration(properties::setTimeDuration, REQUEST_TIMEOUT_KEY, timeoutDuration);
}
}
@@ -44,24 +53,20 @@ public interface RaftClientConfigKeys {
String MAX_OUTSTANDING_REQUESTS_KEY = PREFIX + ".outstanding-requests.max";
int MAX_OUTSTANDING_REQUESTS_DEFAULT = 100;
-
static int maxOutstandingRequests(RaftProperties properties) {
return getInt(properties::getInt, MAX_OUTSTANDING_REQUESTS_KEY,
MAX_OUTSTANDING_REQUESTS_DEFAULT, requireMin(2));
}
-
static void setMaxOutstandingRequests(RaftProperties properties, int outstandingRequests) {
setInt(properties::setInt, MAX_OUTSTANDING_REQUESTS_KEY, outstandingRequests);
}
String SCHEDULER_THREADS_KEY = PREFIX + ".scheduler-threads";
int SCHEDULER_THREADS_DEFAULT = 3;
-
static int schedulerThreads(RaftProperties properties) {
return getInt(properties::getInt, SCHEDULER_THREADS_KEY,
SCHEDULER_THREADS_DEFAULT, requireMin(1));
}
-
static void setSchedulerThreads(RaftProperties properties, int schedulerThreads) {
setInt(properties::setInt, SCHEDULER_THREADS_KEY, schedulerThreads);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index e8a897b..44df5c3 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -108,7 +108,7 @@ final class RaftClientImpl implements RaftClient {
this.groupId = group.getGroupId();
this.leaderId = leaderId != null? leaderId
: !peers.isEmpty()? peers.iterator().next().getId(): null;
- this.retryInterval = RaftClientConfigKeys.Rpc.timeout(properties);
+ this.retryInterval = RaftClientConfigKeys.Rpc.retryInterval(properties);
asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
scheduler = Executors.newScheduledThreadPool(RaftClientConfigKeys.Async.schedulerThreads(properties));
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index c1cc33a..9d18f3e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -75,7 +75,7 @@ public interface GrpcConfigKeys {
}
String RETRY_INTERVAL_KEY = PREFIX + ".retry.interval";
- TimeDuration RETRY_INTERVAL_DEFAULT = RaftClientConfigKeys.Rpc.TIMEOUT_DEFAULT;
+ TimeDuration RETRY_INTERVAL_DEFAULT = RaftClientConfigKeys.Rpc.RETRY_INTERVAL_DEFAULT;
static TimeDuration retryInterval(RaftProperties properties) {
return getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()),
RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index ae7a977..d3827ef 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -34,6 +34,7 @@ import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,6 +76,7 @@ public class RaftGRpcService implements RaftServerRpc {
Collections.synchronizedMap(new HashMap<>());
private final Supplier<RaftPeerId> idSupplier;
private final int flowControlWindow;
+ private final TimeDuration requestTimeoutDuration;
private RaftGRpcService(RaftServer server) {
this(server,
@@ -82,17 +84,19 @@ public class RaftGRpcService implements RaftServerRpc {
GrpcConfigKeys.messageSizeMax(server.getProperties()).getSizeInt(),
GrpcConfigKeys.messageSizeMax(server.getProperties()),
RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()),
- GrpcConfigKeys.flowControlWindow(server.getProperties()));
+ GrpcConfigKeys.flowControlWindow(server.getProperties()),
+ RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()));
}
private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize,
SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
- SizeInBytes flowControlWindowSize) {
+ SizeInBytes flowControlWindowSize, TimeDuration requestTimeoutDuration) {
if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
throw new IllegalArgumentException("Illegal configuration: "
+ RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize
+ " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
}
this.flowControlWindow = flowControlWindowSize.getSizeInt();
+ this.requestTimeoutDuration = requestTimeoutDuration;
ServerBuilder serverBuilder = ServerBuilder.forPort(port);
idSupplier = raftServer::getId;
@@ -178,7 +182,7 @@ public class RaftGRpcService implements RaftServerRpc {
public void addPeers(Iterable<RaftPeer> newPeers) {
for (RaftPeer p : newPeers) {
if (!peers.containsKey(p.getId())) {
- peers.put(p.getId(), new RaftServerProtocolClient(p, flowControlWindow));
+ peers.put(p.getId(), new RaftServerProtocolClient(p, flowControlWindow, requestTimeoutDuration));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index ff3ed28..c1228d0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -98,7 +98,7 @@ public class AppendStreamer implements Closeable {
Collectors.toMap(RaftPeer::getId, Function.identity()));
proxyMap = new PeerProxyMap<>(clientId.toString(),
raftPeer -> new RaftClientProtocolProxy(clientId, raftPeer, ResponseHandler::new,
- GrpcConfigKeys.flowControlWindow(prop), maxMessageSize));
+ prop));
proxyMap.addPeers(group.getPeers());
refreshLeaderProxy(leaderId, null);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 6f1142e..8c26c7f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -48,8 +48,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
private final int maxMessageSize;
public GrpcClientRpc(ClientId clientId, RaftProperties properties) {
- super(new PeerProxyMap<>(clientId.toString(), p -> new RaftClientProtocolClient(clientId, p,
- GrpcConfigKeys.flowControlWindow(properties), GrpcConfigKeys.messageSizeMax(properties))));
+ super(new PeerProxyMap<>(clientId.toString(), p -> new RaftClientProtocolClient(clientId, p, properties)));
this.clientId = clientId;
maxMessageSize = GrpcConfigKeys.messageSizeMax(properties).getSizeInt();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index e90dad4..d01bbe8 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -17,7 +17,10 @@
*/
package org.apache.ratis.grpc.client;
+import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.RaftGrpcUtil;
import org.apache.ratis.protocol.*;
import org.apache.ratis.util.TimeoutScheduler;
@@ -56,7 +59,7 @@ public class RaftClientProtocolClient implements Closeable {
private final Supplier<String> name;
private final RaftPeer target;
private final ManagedChannel channel;
- private TimeDuration timeout = TimeDuration.valueOf(3, TimeUnit.SECONDS);
+ private final TimeDuration requestTimeoutDuration;
private final RaftClientProtocolServiceBlockingStub blockingStub;
private final RaftClientProtocolServiceStub asyncStub;
private final AdminProtocolServiceBlockingStub adminBlockingStub;
@@ -64,9 +67,11 @@ public class RaftClientProtocolClient implements Closeable {
private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>();
public RaftClientProtocolClient(ClientId id, RaftPeer target,
- SizeInBytes flowControlWindow, SizeInBytes maxMessageSize) {
+ RaftProperties properties) {
this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
this.target = target;
+ SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties);
+ SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties);
channel = NettyChannelBuilder.forTarget(target.getAddress())
.usePlaintext(true).flowControlWindow(flowControlWindow.getSizeInt())
.maxMessageSize(maxMessageSize.getSizeInt())
@@ -74,6 +79,7 @@ public class RaftClientProtocolClient implements Closeable {
blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);
asyncStub = RaftClientProtocolServiceGrpc.newStub(channel);
adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel);
+ this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
}
String getName() {
@@ -91,20 +97,23 @@ public class RaftClientProtocolClient implements Closeable {
RaftClientReplyProto reinitialize(
ReinitializeRequestProto request) throws IOException {
- TimeUnit unit = timeout.getUnit();
- return blockingCall(() -> adminBlockingStub.withDeadlineAfter(timeout.toInt(unit), unit).reinitialize(request));
+ return blockingCall(() -> adminBlockingStub
+ .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .reinitialize(request));
}
ServerInformationReplyProto serverInformation(
ServerInformationRequestProto request) throws IOException {
- TimeUnit unit = timeout.getUnit();
- return adminBlockingStub.withDeadlineAfter(timeout.toInt(unit), unit).serverInformation(request);
+ return adminBlockingStub
+ .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .serverInformation(request);
}
RaftClientReplyProto setConfiguration(
SetConfigurationRequestProto request) throws IOException {
- TimeUnit unit = timeout.getUnit();
- return blockingCall(() -> blockingStub.withDeadlineAfter(timeout.toInt(unit), unit).setConfiguration(request));
+ return blockingCall(() -> blockingStub
+ .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .setConfiguration(request));
}
private static RaftClientReplyProto blockingCall(
@@ -124,8 +133,8 @@ public class RaftClientProtocolClient implements Closeable {
StreamObserver<RaftClientRequestProto> appendWithTimeout(
StreamObserver<RaftClientReplyProto> responseHandler) {
- TimeUnit unit = timeout.getUnit();
- return asyncStub.withDeadlineAfter(timeout.toInt(unit), unit).append(responseHandler);
+ return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .append(responseHandler);
}
AsyncStreamObservers getAppendStreamObservers() {
@@ -179,7 +188,7 @@ public class RaftClientProtocolClient implements Closeable {
() -> getName() + ":" + getClass().getSimpleName());
try {
requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request));
- TimeoutScheduler.onTimeout(timeout, () -> timeoutCheck(request), LOG,
+ TimeoutScheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request), LOG,
() -> "Timeout check failed for client request: " + request);
} catch(Throwable t) {
handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t));
@@ -189,7 +198,7 @@ public class RaftClientProtocolClient implements Closeable {
private void timeoutCheck(RaftClientRequest request) {
handleReplyFuture(request.getCallId(), f -> f.completeExceptionally(
- new IOException("Request timeout " + timeout + ": " + request)));
+ new IOException("Request timeout " + requestTimeoutDuration + ": " + request)));
}
private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
index 2ed70c2..ee9ce4e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
@@ -17,12 +17,12 @@
*/
package org.apache.ratis.grpc.client;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.util.SizeInBytes;
import java.io.Closeable;
import java.io.IOException;
@@ -33,11 +33,10 @@ public class RaftClientProtocolProxy implements Closeable {
private final Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation;
private RpcSession currentSession;
- public RaftClientProtocolProxy(
- ClientId clientId, RaftPeer target,
+ public RaftClientProtocolProxy(ClientId clientId, RaftPeer target,
Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation,
- SizeInBytes flowControlWindow, SizeInBytes maxMessageSize) {
- proxy = new RaftClientProtocolClient(clientId, target, flowControlWindow, maxMessageSize);
+ RaftProperties properties) {
+ proxy = new RaftClientProtocolClient(clientId, target, properties);
this.responseHandlerCreation = responseHandlerCreation;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index 64d0b23..360d020 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -20,6 +20,7 @@ package org.apache.ratis.grpc.server;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.RaftGRpcService;
import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.TimeoutScheduler;
import org.apache.ratis.server.impl.FollowerInfo;
import org.apache.ratis.server.impl.LeaderState;
@@ -43,7 +44,6 @@ import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -58,7 +58,7 @@ public class GRpcLogAppender extends LogAppender {
private final AppendLogResponseHandler appendResponseHandler;
private final InstallSnapshotResponseHandler snapshotResponseHandler;
- private static TimeDuration rpcTimeout = TimeDuration.valueOf(3, TimeUnit.SECONDS);
+ private static TimeDuration requestTimeoutDuration;
private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver;
private StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver;
@@ -71,6 +71,7 @@ public class GRpcLogAppender extends LogAppender {
client = rpcService.getRpcClient(f.getPeer());
maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(
server.getProxy().getProperties());
+ requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(server.getProxy().getProperties());
pendingRequests = new ConcurrentHashMap<>();
appendResponseHandler = new AppendLogResponseHandler();
@@ -160,7 +161,7 @@ public class GRpcLogAppender extends LogAppender {
server.getId(), null, request);
s.onNext(request);
- TimeoutScheduler.onTimeout(rpcTimeout, () -> timeoutAppendRequest(request), LOG,
+ TimeoutScheduler.onTimeout(requestTimeoutDuration, () -> timeoutAppendRequest(request), LOG,
() -> "Timeout check failed for append entry request: " + request);
follower.updateLastRpcSendTime();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
index 42a2b85..034f06c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
@@ -27,24 +27,24 @@ import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServ
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.util.TimeDuration;
-import java.util.concurrent.TimeUnit;
-
/**
* This is a RaftClient implementation that supports streaming data to the raft
* ring. The stream implementation utilizes gRPC.
*/
public class RaftServerProtocolClient {
private final ManagedChannel channel;
- private TimeDuration timeout = TimeDuration.valueOf(3, TimeUnit.SECONDS);
+ private final TimeDuration requestTimeoutDuration;
private final RaftServerProtocolServiceBlockingStub blockingStub;
private final RaftServerProtocolServiceStub asyncStub;
- public RaftServerProtocolClient(RaftPeer target, int flowControlWindow) {
+ public RaftServerProtocolClient(RaftPeer target, int flowControlWindow,
+ TimeDuration requestTimeoutDuration) {
channel = NettyChannelBuilder.forTarget(target.getAddress())
.usePlaintext(true).flowControlWindow(flowControlWindow)
.build();
blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
+ this.requestTimeoutDuration = requestTimeoutDuration;
}
public void shutdown() {
@@ -53,8 +53,9 @@ public class RaftServerProtocolClient {
public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
// the StatusRuntimeException will be handled by the caller
- TimeUnit unit = timeout.getUnit();
- RequestVoteReplyProto r= blockingStub.withDeadlineAfter(timeout.toInt(unit), unit).requestVote(request);
+ RequestVoteReplyProto r =
+ blockingStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .requestVote(request);
return r;
}
@@ -65,7 +66,7 @@ public class RaftServerProtocolClient {
StreamObserver<InstallSnapshotRequestProto> installSnapshot(
StreamObserver<InstallSnapshotReplyProto> responseHandler) {
- TimeUnit unit = timeout.getUnit();
- return asyncStub.withDeadlineAfter(timeout.toInt(unit), unit).installSnapshot(responseHandler);
+ return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .installSnapshot(responseHandler);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index cf40580..a18f9f1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -204,6 +204,16 @@ public interface RaftServerConfigKeys {
setTimeDuration(properties::setTimeDuration, TIMEOUT_MAX_KEY, maxDuration);
}
+ String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout";
+ TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
+ static TimeDuration requestTimeout(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(REQUEST_TIMEOUT_DEFAULT.getUnit()),
+ REQUEST_TIMEOUT_KEY, REQUEST_TIMEOUT_DEFAULT);
+ }
+ static void setRequestTimeout(RaftProperties properties, TimeDuration timeoutDuration) {
+ setTimeDuration(properties::setTimeDuration, REQUEST_TIMEOUT_KEY, timeoutDuration);
+ }
+
String SLEEP_TIME_KEY = PREFIX + ".sleep.time";
TimeDuration SLEEP_TIME_DEFAULT = TimeDuration.valueOf(25, TimeUnit.MILLISECONDS);
static TimeDuration sleepTime(RaftProperties properties) {