You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/10/03 13:01:07 UTC

[ratis] 02/03: RATIS-1674. In GrpcLogAppender, disable retry and add minWait. (#752)

This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 3e3fd599da110722958b490463ed21efe2dca625
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Thu Sep 29 23:14:50 2022 +0800

    RATIS-1674. In GrpcLogAppender, disable retry and add minWait. (#752)
    
    (cherry picked from commit 16ba152c7e45a0c3b3c96e3d400837408c69e921)
---
 .../java/org/apache/ratis/grpc/server/GrpcLogAppender.java | 14 ++++++++++++--
 .../apache/ratis/grpc/server/GrpcServerProtocolClient.java |  1 +
 .../java/org/apache/ratis/server/RaftServerConfigKeys.java | 10 ++++++++++
 3 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index e87edac5e..125dd7dfa 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -48,6 +48,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.codahale.metrics.Timer;
 
@@ -73,6 +74,9 @@ public class GrpcLogAppender extends LogAppenderBase {
   private final TimeDuration requestTimeoutDuration;
   private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
+  private final long waitTimeMinMs;
+  private final AtomicReference<Timestamp> lastAppendEntries;
+
   private volatile StreamObservers appendLogRequestObserver;
   private final boolean useSeparateHBChannel;
 
@@ -92,6 +96,10 @@ public class GrpcLogAppender extends LogAppenderBase {
     this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
     this.useSeparateHBChannel = GrpcConfigKeys.Server.heartbeatChannel(properties);
 
+    final TimeDuration waitTimeMin = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties);
+    this.waitTimeMinMs = waitTimeMin.toLong(TimeUnit.MILLISECONDS);
+    this.lastAppendEntries = new AtomicReference<>(Timestamp.currentTime().addTime(waitTimeMin.negate()));
+
     grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
     grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(), pendingRequests::logRequestsSize);
 
@@ -173,9 +181,10 @@ public class GrpcLogAppender extends LogAppenderBase {
       // For normal nodes, new entries should be sent ASAP
       // however for slow followers (especially when the follower is down),
       // keep sending without any wait time only ends up in high CPU load
-      return 0L;
+      final long min = waitTimeMinMs - lastAppendEntries.get().elapsedTimeMs();
+      return Math.max(0L, min);
     }
-    return Math.min(10L, getHeartbeatWaitTimeMs());
+    return Math.min(waitTimeMinMs, getHeartbeatWaitTimeMs());
   }
 
   private boolean isSlowFollower() {
@@ -285,6 +294,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     final boolean sent = Optional.ofNullable(appendLogRequestObserver)
         .map(observer -> {
           observer.onNext(proto);
+          lastAppendEntries.set(Timestamp.currentTime());
           return true;
         }).isPresent();
 
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 6f5d06c2e..d9aefbf0d 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -102,6 +102,7 @@ public class GrpcServerProtocolClient implements Closeable {
     } else {
       channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
     }
+    channelBuilder.disableRetry();
     return channelBuilder.flowControlWindow(flowControlWindow).build();
   }
 
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index e24631378..f75ddae46 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -507,6 +507,16 @@ public interface RaftServerConfigKeys {
       static void setInstallSnapshotEnabled(RaftProperties properties, boolean shouldInstallSnapshot) {
         setBoolean(properties::setBoolean, INSTALL_SNAPSHOT_ENABLED_KEY, shouldInstallSnapshot);
       }
+
+      String WAIT_TIME_MIN_KEY = PREFIX + ".wait-time.min";
+      TimeDuration WAIT_TIME_MIN_DEFAULT = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);
+      static TimeDuration waitTimeMin(RaftProperties properties) {
+        return getTimeDuration(properties.getTimeDuration(WAIT_TIME_MIN_DEFAULT.getUnit()),
+            WAIT_TIME_MIN_KEY, WAIT_TIME_MIN_DEFAULT, getDefaultLog());
+      }
+      static void setWaitTimeMin(RaftProperties properties, TimeDuration minDuration) {
+        setTimeDuration(properties::setTimeDuration, WAIT_TIME_MIN_KEY, minDuration);
+      }
     }
   }