You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/15 08:47:41 UTC

[ratis] branch master updated: RATIS-1809. Use separated timeout for GrpcLogAppender's streaming RPC (#848)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new cf62b4298 RATIS-1809. Use separated timeout for GrpcLogAppender's streaming RPC (#848)
cf62b4298 is described below

commit cf62b4298ad778fbe1dff8478899883aa91a2234
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Wed Mar 15 16:47:34 2023 +0800

    RATIS-1809. Use separated timeout for GrpcLogAppender's streaming RPC (#848)
---
 .../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 38 +++++++++++++++++++++-
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 10 ++++--
 2 files changed, 45 insertions(+), 3 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index febf54172..48b57fa63 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -19,13 +19,27 @@ package org.apache.ratis.grpc;
 
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.function.Consumer;
 
-import static org.apache.ratis.conf.ConfUtils.*;
+import static org.apache.ratis.conf.ConfUtils.get;
+import static org.apache.ratis.conf.ConfUtils.getBoolean;
+import static org.apache.ratis.conf.ConfUtils.getInt;
+import static org.apache.ratis.conf.ConfUtils.getSizeInBytes;
+import static org.apache.ratis.conf.ConfUtils.getTimeDuration;
+import static org.apache.ratis.conf.ConfUtils.printAll;
+import static org.apache.ratis.conf.ConfUtils.requireMax;
+import static org.apache.ratis.conf.ConfUtils.requireMin;
+import static org.apache.ratis.conf.ConfUtils.set;
+import static org.apache.ratis.conf.ConfUtils.setBoolean;
+import static org.apache.ratis.conf.ConfUtils.setInt;
+import static org.apache.ratis.conf.ConfUtils.setSizeInBytes;
+import static org.apache.ratis.conf.ConfUtils.setTimeDuration;
 
 public interface GrpcConfigKeys {
   Logger LOG = LoggerFactory.getLogger(GrpcConfigKeys.class);
@@ -234,6 +248,28 @@ public interface GrpcConfigKeys {
       setInt(properties::setInt, LEADER_OUTSTANDING_APPENDS_MAX_KEY, maxAppend);
     }
 
+    String INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_KEY = PREFIX + ".install_snapshot.request.element-limit";
+    int INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_DEFAULT = 8;
+    static int installSnapshotRequestElementLimit(RaftProperties properties) {
+      return getInt(properties::getInt, INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_KEY,
+          INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_DEFAULT, getDefaultLog(), requireMin(0));
+    }
+    static void setInstallSnapshotRequestElementLimit(RaftProperties properties, int elementLimit) {
+      setInt(properties::setInt, INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_KEY, elementLimit);
+    }
+
+    String INSTALL_SNAPSHOT_REQUEST_TIMEOUT_KEY = PREFIX + ".install_snapshot.request.timeout";
+    TimeDuration INSTALL_SNAPSHOT_REQUEST_TIMEOUT_DEFAULT = RaftServerConfigKeys.Rpc.REQUEST_TIMEOUT_DEFAULT;
+    static TimeDuration installSnapshotRequestTimeout(RaftProperties properties) {
+      return getTimeDuration(properties.getTimeDuration(INSTALL_SNAPSHOT_REQUEST_TIMEOUT_DEFAULT.getUnit()),
+          INSTALL_SNAPSHOT_REQUEST_TIMEOUT_KEY, INSTALL_SNAPSHOT_REQUEST_TIMEOUT_DEFAULT, getDefaultLog());
+    }
+    static void setInstallSnapshotRequestTimeout(RaftProperties properties,
+                                                 TimeDuration installSnapshotRequestTimeout) {
+      setTimeDuration(properties::setTimeDuration,
+          INSTALL_SNAPSHOT_REQUEST_TIMEOUT_KEY, installSnapshotRequestTimeout);
+    }
+
     String HEARTBEAT_CHANNEL_KEY = PREFIX + ".heartbeat.channel";
     boolean HEARTBEAT_CHANNEL_DEFAULT = true;
     static boolean heartbeatChannel(RaftProperties properties) {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 8fe285a2c..1de7a3f22 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -69,6 +69,8 @@ public class GrpcLogAppender extends LogAppenderBase {
   private final boolean installSnapshotEnabled;
 
   private final TimeDuration requestTimeoutDuration;
+  private final TimeDuration installSnapshotStreamTimeout;
+  private final int maxOutstandingInstallSnapshots;
   private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
   private volatile StreamObservers appendLogRequestObserver;
@@ -87,6 +89,9 @@ public class GrpcLogAppender extends LogAppenderBase {
     final RaftProperties properties = server.getRaftServer().getProperties();
     this.maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(properties);
     this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(properties);
+    this.maxOutstandingInstallSnapshots = GrpcConfigKeys.Server.installSnapshotRequestElementLimit(properties);
+    this.installSnapshotStreamTimeout = GrpcConfigKeys.Server.installSnapshotRequestTimeout(properties)
+        .multiply(maxOutstandingInstallSnapshots);
     this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
     this.useSeparateHBChannel = GrpcConfigKeys.Server.heartbeatChannel(properties);
 
@@ -594,8 +599,9 @@ public class GrpcLogAppender extends LogAppenderBase {
     StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
     final String requestId = UUID.randomUUID().toString();
     try {
-      snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-installSnapshot",
-          requestTimeoutDuration, 8, responseHandler); //FIXME: RATIS-1809
+      snapshotRequestObserver = getClient().installSnapshot(
+          getFollower().getName() + "-installSnapshot-" + requestId,
+          installSnapshotStreamTimeout, maxOutstandingInstallSnapshots, responseHandler);
       for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
         if (isRunning()) {
           snapshotRequestObserver.onNext(request);