You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/04/13 01:45:55 UTC

incubator-ratis git commit: RATIS-219. Add configuration for timeout duration. Contributed by Lokesh Jain

Repository: incubator-ratis
Updated Branches:
  refs/heads/master e37ab2ee1 -> 6c97d0603


RATIS-219. Add configuration for timeout duration.  Contributed by Lokesh Jain


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/6c97d060
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/6c97d060
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/6c97d060

Branch: refs/heads/master
Commit: 6c97d0603ae8c12776b87b419ac5e66adafcccc5
Parents: e37ab2e
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Fri Apr 13 09:45:14 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Fri Apr 13 09:45:14 2018 +0800

----------------------------------------------------------------------
 .../ratis/client/RaftClientConfigKeys.java      | 23 ++++++++------
 .../ratis/client/impl/RaftClientImpl.java       |  2 +-
 .../org/apache/ratis/grpc/GrpcConfigKeys.java   |  2 +-
 .../org/apache/ratis/grpc/RaftGRpcService.java  | 10 ++++--
 .../ratis/grpc/client/AppendStreamer.java       |  2 +-
 .../apache/ratis/grpc/client/GrpcClientRpc.java |  3 +-
 .../grpc/client/RaftClientProtocolClient.java   | 33 +++++++++++++-------
 .../grpc/client/RaftClientProtocolProxy.java    |  9 +++---
 .../ratis/grpc/server/GRpcLogAppender.java      |  7 +++--
 .../grpc/server/RaftServerProtocolClient.java   | 17 +++++-----
 .../ratis/server/RaftServerConfigKeys.java      | 10 ++++++
 11 files changed, 73 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index bb76910..cc4e5ec 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -30,12 +30,21 @@ public interface RaftClientConfigKeys {
   interface Rpc {
     String PREFIX = RaftClientConfigKeys.PREFIX + ".rpc";
 
-    String TIMEOUT_KEY = PREFIX + ".timeout";
-    TimeDuration TIMEOUT_DEFAULT = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS);
+    String RETRY_INTERVAL_KEY = PREFIX + ".retryInterval";
+    TimeDuration RETRY_INTERVAL_DEFAULT = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS);
+    static TimeDuration retryInterval(RaftProperties properties) {
+      return getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()),
+          RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT);
+    }
 
-    static TimeDuration timeout(RaftProperties properties) {
-      return getTimeDuration(properties.getTimeDuration(TIMEOUT_DEFAULT.getUnit()),
-          TIMEOUT_KEY, TIMEOUT_DEFAULT);
+    String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout";
+    TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
+    static TimeDuration requestTimeout(RaftProperties properties) {
+      return getTimeDuration(properties.getTimeDuration(REQUEST_TIMEOUT_DEFAULT.getUnit()),
+          REQUEST_TIMEOUT_KEY, REQUEST_TIMEOUT_DEFAULT);
+    }
+    static void setRequestTimeout(RaftProperties properties, TimeDuration timeoutDuration) {
+      setTimeDuration(properties::setTimeDuration, REQUEST_TIMEOUT_KEY, timeoutDuration);
     }
   }
 
@@ -44,24 +53,20 @@ public interface RaftClientConfigKeys {
 
     String MAX_OUTSTANDING_REQUESTS_KEY = PREFIX + ".outstanding-requests.max";
     int MAX_OUTSTANDING_REQUESTS_DEFAULT = 100;
-
     static int maxOutstandingRequests(RaftProperties properties) {
       return getInt(properties::getInt, MAX_OUTSTANDING_REQUESTS_KEY,
           MAX_OUTSTANDING_REQUESTS_DEFAULT, requireMin(2));
     }
-
     static void setMaxOutstandingRequests(RaftProperties properties, int outstandingRequests) {
       setInt(properties::setInt, MAX_OUTSTANDING_REQUESTS_KEY, outstandingRequests);
     }
 
     String SCHEDULER_THREADS_KEY = PREFIX + ".scheduler-threads";
     int SCHEDULER_THREADS_DEFAULT = 3;
-
     static int schedulerThreads(RaftProperties properties) {
       return getInt(properties::getInt, SCHEDULER_THREADS_KEY,
           SCHEDULER_THREADS_DEFAULT, requireMin(1));
     }
-
     static void setSchedulerThreads(RaftProperties properties, int schedulerThreads) {
       setInt(properties::setInt, SCHEDULER_THREADS_KEY, schedulerThreads);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index e8a897b..44df5c3 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -108,7 +108,7 @@ final class RaftClientImpl implements RaftClient {
     this.groupId = group.getGroupId();
     this.leaderId = leaderId != null? leaderId
         : !peers.isEmpty()? peers.iterator().next().getId(): null;
-    this.retryInterval = RaftClientConfigKeys.Rpc.timeout(properties);
+    this.retryInterval = RaftClientConfigKeys.Rpc.retryInterval(properties);
 
     asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
     scheduler = Executors.newScheduledThreadPool(RaftClientConfigKeys.Async.schedulerThreads(properties));

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index c1cc33a..9d18f3e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -75,7 +75,7 @@ public interface GrpcConfigKeys {
     }
 
     String RETRY_INTERVAL_KEY = PREFIX + ".retry.interval";
-    TimeDuration RETRY_INTERVAL_DEFAULT = RaftClientConfigKeys.Rpc.TIMEOUT_DEFAULT;
+    TimeDuration RETRY_INTERVAL_DEFAULT = RaftClientConfigKeys.Rpc.RETRY_INTERVAL_DEFAULT;
     static TimeDuration retryInterval(RaftProperties properties) {
       return getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()),
           RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index ae7a977..d3827ef 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -34,6 +34,7 @@ import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,6 +76,7 @@ public class RaftGRpcService implements RaftServerRpc {
       Collections.synchronizedMap(new HashMap<>());
   private final Supplier<RaftPeerId> idSupplier;
   private final int flowControlWindow;
+  private final TimeDuration requestTimeoutDuration;
 
   private RaftGRpcService(RaftServer server) {
     this(server,
@@ -82,17 +84,19 @@ public class RaftGRpcService implements RaftServerRpc {
         GrpcConfigKeys.messageSizeMax(server.getProperties()).getSizeInt(),
         GrpcConfigKeys.messageSizeMax(server.getProperties()),
         RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()),
-        GrpcConfigKeys.flowControlWindow(server.getProperties()));
+        GrpcConfigKeys.flowControlWindow(server.getProperties()),
+        RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()));
   }
   private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize,
       SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
-      SizeInBytes flowControlWindowSize) {
+      SizeInBytes flowControlWindowSize, TimeDuration requestTimeoutDuration) {
     if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
       throw new IllegalArgumentException("Illegal configuration: "
           + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize
           + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
     }
     this.flowControlWindow = flowControlWindowSize.getSizeInt();
+    this.requestTimeoutDuration = requestTimeoutDuration;
 
     ServerBuilder serverBuilder = ServerBuilder.forPort(port);
     idSupplier = raftServer::getId;
@@ -178,7 +182,7 @@ public class RaftGRpcService implements RaftServerRpc {
   public void addPeers(Iterable<RaftPeer> newPeers) {
     for (RaftPeer p : newPeers) {
       if (!peers.containsKey(p.getId())) {
-        peers.put(p.getId(), new RaftServerProtocolClient(p, flowControlWindow));
+        peers.put(p.getId(), new RaftServerProtocolClient(p, flowControlWindow, requestTimeoutDuration));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index ff3ed28..c1228d0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -98,7 +98,7 @@ public class AppendStreamer implements Closeable {
         Collectors.toMap(RaftPeer::getId, Function.identity()));
     proxyMap = new PeerProxyMap<>(clientId.toString(),
         raftPeer -> new RaftClientProtocolProxy(clientId, raftPeer, ResponseHandler::new,
-            GrpcConfigKeys.flowControlWindow(prop), maxMessageSize));
+            prop));
     proxyMap.addPeers(group.getPeers());
     refreshLeaderProxy(leaderId, null);
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 6f1142e..8c26c7f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -48,8 +48,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
   private final int maxMessageSize;
 
   public GrpcClientRpc(ClientId clientId, RaftProperties properties) {
-    super(new PeerProxyMap<>(clientId.toString(), p -> new RaftClientProtocolClient(clientId, p,
-        GrpcConfigKeys.flowControlWindow(properties), GrpcConfigKeys.messageSizeMax(properties))));
+    super(new PeerProxyMap<>(clientId.toString(), p -> new RaftClientProtocolClient(clientId, p, properties)));
     this.clientId = clientId;
     maxMessageSize = GrpcConfigKeys.messageSizeMax(properties).getSizeInt();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index e90dad4..d01bbe8 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -17,7 +17,10 @@
  */
 package org.apache.ratis.grpc.client;
 
+import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.RaftGrpcUtil;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.util.TimeoutScheduler;
@@ -56,7 +59,7 @@ public class RaftClientProtocolClient implements Closeable {
   private final Supplier<String> name;
   private final RaftPeer target;
   private final ManagedChannel channel;
-  private TimeDuration timeout = TimeDuration.valueOf(3, TimeUnit.SECONDS);
+  private final TimeDuration requestTimeoutDuration;
   private final RaftClientProtocolServiceBlockingStub blockingStub;
   private final RaftClientProtocolServiceStub asyncStub;
   private final AdminProtocolServiceBlockingStub adminBlockingStub;
@@ -64,9 +67,11 @@ public class RaftClientProtocolClient implements Closeable {
   private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>();
 
   public RaftClientProtocolClient(ClientId id, RaftPeer target,
-      SizeInBytes flowControlWindow, SizeInBytes maxMessageSize) {
+      RaftProperties properties) {
     this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
     this.target = target;
+    SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties);
+    SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties);
     channel = NettyChannelBuilder.forTarget(target.getAddress())
         .usePlaintext(true).flowControlWindow(flowControlWindow.getSizeInt())
         .maxMessageSize(maxMessageSize.getSizeInt())
@@ -74,6 +79,7 @@ public class RaftClientProtocolClient implements Closeable {
     blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);
     asyncStub = RaftClientProtocolServiceGrpc.newStub(channel);
     adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel);
+    this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
   }
 
   String getName() {
@@ -91,20 +97,23 @@ public class RaftClientProtocolClient implements Closeable {
 
   RaftClientReplyProto reinitialize(
       ReinitializeRequestProto request) throws IOException {
-    TimeUnit unit = timeout.getUnit();
-    return blockingCall(() -> adminBlockingStub.withDeadlineAfter(timeout.toInt(unit), unit).reinitialize(request));
+    return blockingCall(() -> adminBlockingStub
+        .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+        .reinitialize(request));
   }
 
   ServerInformationReplyProto serverInformation(
       ServerInformationRequestProto request) throws IOException {
-    TimeUnit unit = timeout.getUnit();
-    return adminBlockingStub.withDeadlineAfter(timeout.toInt(unit), unit).serverInformation(request);
+    return adminBlockingStub
+        .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+        .serverInformation(request);
   }
 
   RaftClientReplyProto setConfiguration(
       SetConfigurationRequestProto request) throws IOException {
-    TimeUnit unit = timeout.getUnit();
-    return blockingCall(() -> blockingStub.withDeadlineAfter(timeout.toInt(unit), unit).setConfiguration(request));
+    return blockingCall(() -> blockingStub
+        .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+        .setConfiguration(request));
   }
 
   private static RaftClientReplyProto blockingCall(
@@ -124,8 +133,8 @@ public class RaftClientProtocolClient implements Closeable {
 
   StreamObserver<RaftClientRequestProto> appendWithTimeout(
       StreamObserver<RaftClientReplyProto> responseHandler) {
-    TimeUnit unit = timeout.getUnit();
-    return asyncStub.withDeadlineAfter(timeout.toInt(unit), unit).append(responseHandler);
+    return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+        .append(responseHandler);
   }
 
   AsyncStreamObservers getAppendStreamObservers() {
@@ -179,7 +188,7 @@ public class RaftClientProtocolClient implements Closeable {
           () -> getName() + ":" + getClass().getSimpleName());
       try {
         requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request));
-        TimeoutScheduler.onTimeout(timeout, () -> timeoutCheck(request), LOG,
+        TimeoutScheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request), LOG,
             () -> "Timeout check failed for client request: " + request);
       } catch(Throwable t) {
         handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t));
@@ -189,7 +198,7 @@ public class RaftClientProtocolClient implements Closeable {
 
     private void timeoutCheck(RaftClientRequest request) {
       handleReplyFuture(request.getCallId(), f -> f.completeExceptionally(
-          new IOException("Request timeout " + timeout + ": " + request)));
+          new IOException("Request timeout " + requestTimeoutDuration + ": " + request)));
     }
 
     private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
index 2ed70c2..ee9ce4e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
@@ -17,12 +17,12 @@
  */
 package org.apache.ratis.grpc.client;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.util.SizeInBytes;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -33,11 +33,10 @@ public class RaftClientProtocolProxy implements Closeable {
   private final Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation;
   private RpcSession currentSession;
 
-  public RaftClientProtocolProxy(
-      ClientId clientId, RaftPeer target,
+  public RaftClientProtocolProxy(ClientId clientId, RaftPeer target,
       Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation,
-      SizeInBytes flowControlWindow, SizeInBytes maxMessageSize) {
-    proxy = new RaftClientProtocolClient(clientId, target, flowControlWindow, maxMessageSize);
+      RaftProperties properties) {
+    proxy = new RaftClientProtocolClient(clientId, target, properties);
     this.responseHandlerCreation = responseHandlerCreation;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index 64d0b23..360d020 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -20,6 +20,7 @@ package org.apache.ratis.grpc.server;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.RaftGRpcService;
 import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.util.TimeoutScheduler;
 import org.apache.ratis.server.impl.FollowerInfo;
 import org.apache.ratis.server.impl.LeaderState;
@@ -43,7 +44,6 @@ import java.util.Objects;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -58,7 +58,7 @@ public class GRpcLogAppender extends LogAppender {
 
   private final AppendLogResponseHandler appendResponseHandler;
   private final InstallSnapshotResponseHandler snapshotResponseHandler;
-  private static TimeDuration rpcTimeout = TimeDuration.valueOf(3, TimeUnit.SECONDS);
+  private static TimeDuration requestTimeoutDuration;
 
   private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver;
   private StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver;
@@ -71,6 +71,7 @@ public class GRpcLogAppender extends LogAppender {
     client = rpcService.getRpcClient(f.getPeer());
     maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(
         server.getProxy().getProperties());
+    requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(server.getProxy().getProperties());
     pendingRequests = new ConcurrentHashMap<>();
 
     appendResponseHandler = new AppendLogResponseHandler();
@@ -160,7 +161,7 @@ public class GRpcLogAppender extends LogAppender {
         server.getId(), null, request);
 
     s.onNext(request);
-    TimeoutScheduler.onTimeout(rpcTimeout, () -> timeoutAppendRequest(request), LOG,
+    TimeoutScheduler.onTimeout(requestTimeoutDuration, () -> timeoutAppendRequest(request), LOG,
         () -> "Timeout check failed for append entry request: " + request);
     follower.updateLastRpcSendTime();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
index 42a2b85..034f06c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
@@ -27,24 +27,24 @@ import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServ
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.util.TimeDuration;
 
-import java.util.concurrent.TimeUnit;
-
 /**
  * This is a RaftClient implementation that supports streaming data to the raft
  * ring. The stream implementation utilizes gRPC.
  */
 public class RaftServerProtocolClient {
   private final ManagedChannel channel;
-  private TimeDuration timeout = TimeDuration.valueOf(3, TimeUnit.SECONDS);
+  private final TimeDuration requestTimeoutDuration;
   private final RaftServerProtocolServiceBlockingStub blockingStub;
   private final RaftServerProtocolServiceStub asyncStub;
 
-  public RaftServerProtocolClient(RaftPeer target, int flowControlWindow) {
+  public RaftServerProtocolClient(RaftPeer target, int flowControlWindow,
+      TimeDuration requestTimeoutDuration) {
     channel = NettyChannelBuilder.forTarget(target.getAddress())
         .usePlaintext(true).flowControlWindow(flowControlWindow)
         .build();
     blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
     asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
+    this.requestTimeoutDuration = requestTimeoutDuration;
   }
 
   public void shutdown() {
@@ -53,8 +53,9 @@ public class RaftServerProtocolClient {
 
   public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
     // the StatusRuntimeException will be handled by the caller
-    TimeUnit unit = timeout.getUnit();
-    RequestVoteReplyProto r= blockingStub.withDeadlineAfter(timeout.toInt(unit), unit).requestVote(request);
+    RequestVoteReplyProto r =
+        blockingStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+            .requestVote(request);
     return r;
   }
 
@@ -65,7 +66,7 @@ public class RaftServerProtocolClient {
 
   StreamObserver<InstallSnapshotRequestProto> installSnapshot(
       StreamObserver<InstallSnapshotReplyProto> responseHandler) {
-    TimeUnit unit = timeout.getUnit();
-    return asyncStub.withDeadlineAfter(timeout.toInt(unit), unit).installSnapshot(responseHandler);
+    return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+        .installSnapshot(responseHandler);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6c97d060/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index cf40580..a18f9f1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -204,6 +204,16 @@ public interface RaftServerConfigKeys {
       setTimeDuration(properties::setTimeDuration, TIMEOUT_MAX_KEY, maxDuration);
     }
 
+    String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout";
+    TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
+    static TimeDuration requestTimeout(RaftProperties properties) {
+      return getTimeDuration(properties.getTimeDuration(REQUEST_TIMEOUT_DEFAULT.getUnit()),
+          REQUEST_TIMEOUT_KEY, REQUEST_TIMEOUT_DEFAULT);
+    }
+    static void setRequestTimeout(RaftProperties properties, TimeDuration timeoutDuration) {
+      setTimeDuration(properties::setTimeDuration, REQUEST_TIMEOUT_KEY, timeoutDuration);
+    }
+
     String SLEEP_TIME_KEY = PREFIX + ".sleep.time";
     TimeDuration SLEEP_TIME_DEFAULT = TimeDuration.valueOf(25, TimeUnit.MILLISECONDS);
     static TimeDuration sleepTime(RaftProperties properties) {