You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/10/03 13:01:07 UTC
[ratis] 02/03: RATIS-1674. In GrpcLogAppender, disable retry and add minWait. (#752)
This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 3e3fd599da110722958b490463ed21efe2dca625
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Thu Sep 29 23:14:50 2022 +0800
RATIS-1674. In GrpcLogAppender, disable retry and add minWait. (#752)
(cherry picked from commit 16ba152c7e45a0c3b3c96e3d400837408c69e921)
---
.../java/org/apache/ratis/grpc/server/GrpcLogAppender.java | 14 ++++++++++++--
.../apache/ratis/grpc/server/GrpcServerProtocolClient.java | 1 +
.../java/org/apache/ratis/server/RaftServerConfigKeys.java | 10 ++++++++++
3 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index e87edac5e..125dd7dfa 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -48,6 +48,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import com.codahale.metrics.Timer;
@@ -73,6 +74,9 @@ public class GrpcLogAppender extends LogAppenderBase {
private final TimeDuration requestTimeoutDuration;
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+ private final long waitTimeMinMs;
+ private final AtomicReference<Timestamp> lastAppendEntries;
+
private volatile StreamObservers appendLogRequestObserver;
private final boolean useSeparateHBChannel;
@@ -92,6 +96,10 @@ public class GrpcLogAppender extends LogAppenderBase {
this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
this.useSeparateHBChannel = GrpcConfigKeys.Server.heartbeatChannel(properties);
+ final TimeDuration waitTimeMin = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties);
+ this.waitTimeMinMs = waitTimeMin.toLong(TimeUnit.MILLISECONDS);
+ this.lastAppendEntries = new AtomicReference<>(Timestamp.currentTime().addTime(waitTimeMin.negate()));
+
grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(), pendingRequests::logRequestsSize);
@@ -173,9 +181,10 @@ public class GrpcLogAppender extends LogAppenderBase {
// For normal nodes, new entries should be sent ASAP
// however for slow followers (especially when the follower is down),
// keep sending without any wait time only ends up in high CPU load
- return 0L;
+ final long min = waitTimeMinMs - lastAppendEntries.get().elapsedTimeMs();
+ return Math.max(0L, min);
}
- return Math.min(10L, getHeartbeatWaitTimeMs());
+ return Math.min(waitTimeMinMs, getHeartbeatWaitTimeMs());
}
private boolean isSlowFollower() {
@@ -285,6 +294,7 @@ public class GrpcLogAppender extends LogAppenderBase {
final boolean sent = Optional.ofNullable(appendLogRequestObserver)
.map(observer -> {
observer.onNext(proto);
+ lastAppendEntries.set(Timestamp.currentTime());
return true;
}).isPresent();
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 6f5d06c2e..d9aefbf0d 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -102,6 +102,7 @@ public class GrpcServerProtocolClient implements Closeable {
} else {
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
+ channelBuilder.disableRetry();
return channelBuilder.flowControlWindow(flowControlWindow).build();
}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index e24631378..f75ddae46 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -507,6 +507,16 @@ public interface RaftServerConfigKeys {
static void setInstallSnapshotEnabled(RaftProperties properties, boolean shouldInstallSnapshot) {
setBoolean(properties::setBoolean, INSTALL_SNAPSHOT_ENABLED_KEY, shouldInstallSnapshot);
}
+
+ String WAIT_TIME_MIN_KEY = PREFIX + ".wait-time.min";
+ TimeDuration WAIT_TIME_MIN_DEFAULT = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);
+ static TimeDuration waitTimeMin(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(WAIT_TIME_MIN_DEFAULT.getUnit()),
+ WAIT_TIME_MIN_KEY, WAIT_TIME_MIN_DEFAULT, getDefaultLog());
+ }
+ static void setWaitTimeMin(RaftProperties properties, TimeDuration minDuration) {
+ setTimeDuration(properties::setTimeDuration, WAIT_TIME_MIN_KEY, minDuration);
+ }
}
}