You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/15 08:47:41 UTC
[ratis] branch master updated: RATIS-1809. Use separated timeout for GrpcLogAppender's streaming RPC (#848)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new cf62b4298 RATIS-1809. Use separated timeout for GrpcLogAppender's streaming RPC (#848)
cf62b4298 is described below
commit cf62b4298ad778fbe1dff8478899883aa91a2234
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Wed Mar 15 16:47:34 2023 +0800
RATIS-1809. Use separated timeout for GrpcLogAppender's streaming RPC (#848)
---
.../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 38 +++++++++++++++++++++-
.../apache/ratis/grpc/server/GrpcLogAppender.java | 10 ++++--
2 files changed, 45 insertions(+), 3 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index febf54172..48b57fa63 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -19,13 +19,27 @@ package org.apache.ratis.grpc;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Consumer;
-import static org.apache.ratis.conf.ConfUtils.*;
+import static org.apache.ratis.conf.ConfUtils.get;
+import static org.apache.ratis.conf.ConfUtils.getBoolean;
+import static org.apache.ratis.conf.ConfUtils.getInt;
+import static org.apache.ratis.conf.ConfUtils.getSizeInBytes;
+import static org.apache.ratis.conf.ConfUtils.getTimeDuration;
+import static org.apache.ratis.conf.ConfUtils.printAll;
+import static org.apache.ratis.conf.ConfUtils.requireMax;
+import static org.apache.ratis.conf.ConfUtils.requireMin;
+import static org.apache.ratis.conf.ConfUtils.set;
+import static org.apache.ratis.conf.ConfUtils.setBoolean;
+import static org.apache.ratis.conf.ConfUtils.setInt;
+import static org.apache.ratis.conf.ConfUtils.setSizeInBytes;
+import static org.apache.ratis.conf.ConfUtils.setTimeDuration;
public interface GrpcConfigKeys {
Logger LOG = LoggerFactory.getLogger(GrpcConfigKeys.class);
@@ -234,6 +248,28 @@ public interface GrpcConfigKeys {
setInt(properties::setInt, LEADER_OUTSTANDING_APPENDS_MAX_KEY, maxAppend);
}
+ String INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_KEY = PREFIX + ".install_snapshot.request.element-limit";
+ int INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_DEFAULT = 8;
+ static int installSnapshotRequestElementLimit(RaftProperties properties) {
+ return getInt(properties::getInt, INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_KEY,
+ INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_DEFAULT, getDefaultLog(), requireMin(0));
+ }
+ static void setInstallSnapshotRequestElementLimit(RaftProperties properties, int elementLimit) {
+ setInt(properties::setInt, INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_KEY, elementLimit);
+ }
+
+ String INSTALL_SNAPSHOT_REQUEST_TIMEOUT_KEY = PREFIX + ".install_snapshot.request.timeout";
+ TimeDuration INSTALL_SNAPSHOT_REQUEST_TIMEOUT_DEFAULT = RaftServerConfigKeys.Rpc.REQUEST_TIMEOUT_DEFAULT;
+ static TimeDuration installSnapshotRequestTimeout(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(INSTALL_SNAPSHOT_REQUEST_TIMEOUT_DEFAULT.getUnit()),
+ INSTALL_SNAPSHOT_REQUEST_TIMEOUT_KEY, INSTALL_SNAPSHOT_REQUEST_TIMEOUT_DEFAULT, getDefaultLog());
+ }
+ static void setInstallSnapshotRequestTimeout(RaftProperties properties,
+ TimeDuration installSnapshotRequestTimeout) {
+ setTimeDuration(properties::setTimeDuration,
+ INSTALL_SNAPSHOT_REQUEST_TIMEOUT_KEY, installSnapshotRequestTimeout);
+ }
+
String HEARTBEAT_CHANNEL_KEY = PREFIX + ".heartbeat.channel";
boolean HEARTBEAT_CHANNEL_DEFAULT = true;
static boolean heartbeatChannel(RaftProperties properties) {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 8fe285a2c..1de7a3f22 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -69,6 +69,8 @@ public class GrpcLogAppender extends LogAppenderBase {
private final boolean installSnapshotEnabled;
private final TimeDuration requestTimeoutDuration;
+ private final TimeDuration installSnapshotStreamTimeout;
+ private final int maxOutstandingInstallSnapshots;
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
private volatile StreamObservers appendLogRequestObserver;
@@ -87,6 +89,9 @@ public class GrpcLogAppender extends LogAppenderBase {
final RaftProperties properties = server.getRaftServer().getProperties();
this.maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(properties);
this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(properties);
+ this.maxOutstandingInstallSnapshots = GrpcConfigKeys.Server.installSnapshotRequestElementLimit(properties);
+ this.installSnapshotStreamTimeout = GrpcConfigKeys.Server.installSnapshotRequestTimeout(properties)
+ .multiply(maxOutstandingInstallSnapshots);
this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
this.useSeparateHBChannel = GrpcConfigKeys.Server.heartbeatChannel(properties);
@@ -594,8 +599,9 @@ public class GrpcLogAppender extends LogAppenderBase {
StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
final String requestId = UUID.randomUUID().toString();
try {
- snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-installSnapshot",
- requestTimeoutDuration, 8, responseHandler); //FIXME: RATIS-1809
+ snapshotRequestObserver = getClient().installSnapshot(
+ getFollower().getName() + "-installSnapshot-" + requestId,
+ installSnapshotStreamTimeout, maxOutstandingInstallSnapshots, responseHandler);
for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
if (isRunning()) {
snapshotRequestObserver.onNext(request);