You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/08/26 16:58:26 UTC

[ratis] branch master updated: RATIS-1671. Add manual trigger snapshot (#712)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new eaf9541af RATIS-1671. Add manual trigger snapshot (#712)
eaf9541af is described below

commit eaf9541af05af06e9e0077572c8544c4485b1503
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Sat Aug 27 00:58:21 2022 +0800

    RATIS-1671. Add manual trigger snapshot (#712)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  9 +++++---
 .../apache/ratis/server/leader/LogAppender.java    |  3 +++
 .../ratis/server/leader/LogAppenderBase.java       | 26 ++++++++++++++++++++++
 .../ratis/server/leader/LogAppenderDefault.java    |  1 +
 4 files changed, 36 insertions(+), 3 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 3e33a1787..930e4184c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -281,9 +281,12 @@ public class GrpcLogAppender extends LogAppenderBase {
     CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
         getServer().getId(), null, proto);
     request.startRequestTimer();
-    boolean sent = Optional.ofNullable(appendLogRequestObserver).map(observer -> {
-        observer.onNext(proto);
-        return true;}).isPresent();
+    resetHeartbeatTrigger();
+    final boolean sent = Optional.ofNullable(appendLogRequestObserver)
+        .map(observer -> {
+          observer.onNext(proto);
+          return true;
+        }).isPresent();
 
     if (sent) {
       scheduler.onTimeout(requestTimeoutDuration,
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index ef5e1a7ed..f0ff28690 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -166,6 +166,9 @@ public interface LogAppender {
     return getFollower().getNextIndex() < getRaftLog().getNextIndex();
   }
 
+  /** send a heartbeat AppendEntries immediately */
+  void triggerHeartbeat() throws IOException;
+
   /** @return the wait time in milliseconds to send the next heartbeat. */
   default long getHeartbeatWaitTimeMs() {
     final int min = getServer().properties().minRpcTimeoutMs();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 50f9887fd..fda78fbcf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -34,9 +34,11 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SizeInBytes;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * An abstract implementation of {@link LogAppender}.
@@ -53,6 +55,8 @@ public abstract class LogAppenderBase implements LogAppender {
   private final LogAppenderDaemon daemon;
   private final AwaitForSignal eventAwaitForSignal;
 
+  private final AtomicBoolean heartbeatTrigger = new AtomicBoolean();
+
   protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
     this.follower = f;
     this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass());
@@ -69,6 +73,28 @@ public abstract class LogAppenderBase implements LogAppender {
     this.eventAwaitForSignal = new AwaitForSignal(name);
   }
 
+  @Override
+  public void triggerHeartbeat() throws IOException {
+    if (heartbeatTrigger.compareAndSet(false, true)) {
+      notifyLogAppender();
+    }
+  }
+
+  protected void resetHeartbeatTrigger() {
+    heartbeatTrigger.set(false);
+  }
+
+  @Override
+  public boolean shouldSendAppendEntries() {
+    return heartbeatTrigger.get() || LogAppender.super.shouldSendAppendEntries();
+  }
+
+  @Override
+  public long getHeartbeatWaitTimeMs() {
+    return heartbeatTrigger.get() ? 0 :
+        LogAppender.super.getHeartbeatWaitTimeMs();
+  }
+
   @Override
   public AwaitForSignal getEventAwaitForSignal() {
     return eventAwaitForSignal;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 0c91427e5..0a4c12ce7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -72,6 +72,7 @@ class LogAppenderDefault extends LogAppenderBase {
           return null;
         }
 
+        resetHeartbeatTrigger();
         getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0);
         final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
         getFollower().updateLastRpcResponseTime();