You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/01 01:22:32 UTC

[ratis] 16/16: RATIS-1793. Enforce raft.server.log.appender.wait-time.min. (#832)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 8247d3c6b637d6155c76a03dbb8bd9504cacbdb3
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Feb 28 05:00:02 2023 -0800

    RATIS-1793. Enforce raft.server.log.appender.wait-time.min. (#832)
    
    (cherry picked from commit fdd7c5c98beaeed6aa33261752554575282e1a43)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 30 +++++++++++-----------
 .../apache/ratis/server/leader/FollowerInfo.java   |  3 +++
 .../apache/ratis/server/leader/LogAppender.java    |  4 +--
 .../apache/ratis/server/raftlog/RaftLogIndex.java  | 30 ++++++++++++++++------
 .../apache/ratis/server/impl/FollowerInfoImpl.java | 14 +++++++---
 .../ratis/server/leader/LogAppenderBase.java       | 11 ++++++--
 6 files changed, 62 insertions(+), 30 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 7c742d91d..b76da8ecb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -48,7 +48,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 import com.codahale.metrics.Timer;
 
@@ -74,9 +73,6 @@ public class GrpcLogAppender extends LogAppenderBase {
   private final TimeDuration requestTimeoutDuration;
   private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
-  private final long waitTimeMinMs;
-  private final AtomicReference<Timestamp> lastAppendEntries;
-
   private volatile StreamObservers appendLogRequestObserver;
   private final boolean useSeparateHBChannel;
 
@@ -96,10 +92,6 @@ public class GrpcLogAppender extends LogAppenderBase {
     this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
     this.useSeparateHBChannel = GrpcConfigKeys.Server.heartbeatChannel(properties);
 
-    final TimeDuration waitTimeMin = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties);
-    this.waitTimeMinMs = waitTimeMin.toLong(TimeUnit.MILLISECONDS);
-    this.lastAppendEntries = new AtomicReference<>(Timestamp.currentTime().addTime(waitTimeMin.negate()));
-
     grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
     grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(), pendingRequests::logRequestsSize);
 
@@ -182,10 +174,9 @@ public class GrpcLogAppender extends LogAppenderBase {
       // For normal nodes, new entries should be sent ASAP
       // however for slow followers (especially when the follower is down),
       // keep sending without any wait time only ends up in high CPU load
-      final long min = waitTimeMinMs - lastAppendEntries.get().elapsedTimeMs();
-      return Math.max(0L, min);
+      return Math.max(getMinWaitTimeMs(), 0L);
     }
-    return Math.min(waitTimeMinMs, getHeartbeatWaitTimeMs());
+    return Math.min(getMinWaitTimeMs(), getHeartbeatWaitTimeMs());
   }
 
   private boolean isSlowFollower() {
@@ -263,13 +254,13 @@ public class GrpcLogAppender extends LogAppenderBase {
     return CALL_ID_COMPARATOR;
   }
 
-  private void appendLog(boolean excludeLogEntries) throws IOException {
+  private void appendLog(boolean heartbeat) throws IOException {
     final AppendEntriesRequestProto pending;
     final AppendEntriesRequest request;
     try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
       // Prepare and send the append request.
       // Note changes on follower's nextIndex and ops on pendingRequests should always be done under the write-lock
-      pending = newAppendEntriesRequest(callId.getAndIncrement(), excludeLogEntries);
+      pending = newAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
       if (pending == null) {
         return;
       }
@@ -282,6 +273,16 @@ public class GrpcLogAppender extends LogAppenderBase {
       }
     }
 
+    final long waitMs = getMinWaitTimeMs();
+    if (waitMs > 0) {
+      try {
+        Thread.sleep(waitMs);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw IOUtils.toInterruptedIOException(
+            "Interrupted appendLog, heartbeat? " + heartbeat, e);
+      }
+    }
     if (isRunning()) {
       sendRequest(request, pending);
     }
@@ -295,15 +296,14 @@ public class GrpcLogAppender extends LogAppenderBase {
     final boolean sent = Optional.ofNullable(appendLogRequestObserver)
         .map(observer -> {
           observer.onNext(proto);
-          lastAppendEntries.set(Timestamp.currentTime());
           return true;
         }).isPresent();
 
     if (sent) {
+      getFollower().updateLastRpcSendTime(request.isHeartbeat());
       scheduler.onTimeout(requestTimeoutDuration,
           () -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()),
           LOG, () -> "Timeout check failed for append entry request: " + request);
-      getFollower().updateLastRpcSendTime(request.isHeartbeat());
     } else {
       request.stopRequestTimer();
     }
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
index b4ae8458c..fb63068a5 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
@@ -78,6 +78,9 @@ public interface FollowerInfo {
   /** @return the lastRpcResponseTime . */
   Timestamp getLastRpcResponseTime();
 
+  /** @return the lastRpcSendTime . */
+  Timestamp getLastRpcSendTime();
+
   /** Update lastRpcResponseTime to the current time. */
   void updateLastRpcResponseTime();
 
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index f0ff28690..49a1a12fa 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -166,8 +166,8 @@ public interface LogAppender {
     return getFollower().getNextIndex() < getRaftLog().getNextIndex();
   }
 
-  /** send a heartbeat AppendEntries immediately */
-  void triggerHeartbeat() throws IOException;
+  /** Trigger to send a heartbeat AppendEntries. */
+  void triggerHeartbeat();
 
   /** @return the wait time in milliseconds to send the next heartbeat. */
   default long getHeartbeatWaitTimeMs() {
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
index ed545e291..290a58835 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
@@ -44,30 +44,44 @@ public class RaftLogIndex {
 
   public boolean setUnconditionally(long newIndex, Consumer<Object> log) {
     final long old = index.getAndSet(newIndex);
-    log.accept(StringUtils.stringSupplierAsObject(() -> name + ": setUnconditionally " + old + " -> " + newIndex));
-    return old != newIndex;
+    final boolean updated = old != newIndex;
+    if (updated) {
+      log.accept(StringUtils.stringSupplierAsObject(
+          () -> name + ": setUnconditionally " + old + " -> " + newIndex));
+    }
+    return updated;
   }
 
   public boolean updateUnconditionally(LongUnaryOperator update, Consumer<Object> log) {
     final long old = index.getAndUpdate(update);
     final long newIndex = update.applyAsLong(old);
-    log.accept(StringUtils.stringSupplierAsObject(() -> name + ": updateUnconditionally " + old + " -> " + newIndex));
-    return old != newIndex;
+    final boolean updated = old != newIndex;
+    if (updated) {
+      log.accept(StringUtils.stringSupplierAsObject(
+          () -> name + ": updateUnconditionally " + old + " -> " + newIndex));
+    }
+    return updated;
   }
 
   public boolean updateIncreasingly(long newIndex, Consumer<Object> log) {
     final long old = index.getAndSet(newIndex);
     Preconditions.assertTrue(old <= newIndex,
         () -> "Failed to updateIncreasingly for " + name + ": " + old + " -> " + newIndex);
-    log.accept(StringUtils.stringSupplierAsObject(() -> name + ": updateIncreasingly " + old + " -> " + newIndex));
-    return old != newIndex;
+    final boolean updated = old != newIndex;
+    if (updated) {
+      log.accept(StringUtils.stringSupplierAsObject(
+          () -> name + ": updateIncreasingly " + old + " -> " + newIndex));
+    }
+    return updated;
   }
 
   public boolean updateToMax(long newIndex, Consumer<Object> log) {
     final long old = index.getAndUpdate(oldIndex -> Math.max(oldIndex, newIndex));
     final boolean updated = old < newIndex;
-    log.accept(StringUtils.stringSupplierAsObject(
-        () -> name + ": updateToMax old=" + old + ", new=" + newIndex + ", updated? " + updated));
+    if (updated) {
+      log.accept(StringUtils.stringSupplierAsObject(
+          () -> name + ": updateToMax old=" + old + ", new=" + newIndex + ", updated? " + updated));
+    }
     return updated;
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 0d7fe2075..67af642fd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -93,17 +93,20 @@ class FollowerInfoImpl implements FollowerInfo {
 
   @Override
   public void decreaseNextIndex(long newNextIndex) {
-    nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1, newNextIndex), infoIndexChange);
+    nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1, newNextIndex),
+        message -> infoIndexChange.accept("decreaseNextIndex " + message));
   }
 
   @Override
   public void setNextIndex(long newNextIndex) {
-    nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex : old, infoIndexChange);
+    nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex : old,
+        message -> infoIndexChange.accept("setNextIndex " + message));
   }
 
   @Override
   public void updateNextIndex(long newNextIndex) {
-    nextIndex.updateToMax(newNextIndex, infoIndexChange);
+    nextIndex.updateToMax(newNextIndex,
+        message -> infoIndexChange.accept("decreaseNextIndex " + message));
   }
 
   @Override
@@ -160,6 +163,11 @@ class FollowerInfoImpl implements FollowerInfo {
     return lastRpcResponseTime.get();
   }
 
+  @Override
+  public Timestamp getLastRpcSendTime() {
+    return lastRpcSendTime.get();
+  }
+
   @Override
   public void updateLastRpcSendTime(boolean isHeartbeat) {
     final Timestamp currentTime = Timestamp.currentTime();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index bc8a31181..1c0f61836 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -34,10 +34,10 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SizeInBytes;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -56,6 +56,7 @@ public abstract class LogAppenderBase implements LogAppender {
   private final AwaitForSignal eventAwaitForSignal;
 
   private final AtomicBoolean heartbeatTrigger = new AtomicBoolean();
+  private final long waitTimeMinMs;
 
   protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
     this.follower = f;
@@ -71,10 +72,12 @@ public abstract class LogAppenderBase implements LogAppender {
     this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit, EntryWithData::getSerializedSize);
     this.daemon = new LogAppenderDaemon(this);
     this.eventAwaitForSignal = new AwaitForSignal(name);
+
+    this.waitTimeMinMs = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties).toLong(TimeUnit.MILLISECONDS);
   }
 
   @Override
-  public void triggerHeartbeat() throws IOException {
+  public void triggerHeartbeat() {
     if (heartbeatTrigger.compareAndSet(false, true)) {
       notifyLogAppender();
     }
@@ -133,6 +136,10 @@ public abstract class LogAppenderBase implements LogAppender {
     getLeaderState().restart(this);
   }
 
+  public long getMinWaitTimeMs() {
+    return waitTimeMinMs - getFollower().getLastRpcSendTime().elapsedTimeMs();
+  }
+
   @Override
   public final FollowerInfo getFollower() {
     return follower;