You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/08/29 02:51:44 UTC
[ratis] 04/05: RATIS-1671. Add manual trigger snapshot (#712)
This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch release-2.4.0
in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 5a59c045a60315bfa05d8b3e78f4fcbb21b7cbb0
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Sat Aug 27 00:58:21 2022 +0800
RATIS-1671. Add manual trigger snapshot (#712)
(cherry picked from commit eaf9541af05af06e9e0077572c8544c4485b1503)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 9 +++++---
.../apache/ratis/server/leader/LogAppender.java | 3 +++
.../ratis/server/leader/LogAppenderBase.java | 26 ++++++++++++++++++++++
.../ratis/server/leader/LogAppenderDefault.java | 1 +
4 files changed, 36 insertions(+), 3 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index d9bcb33d1..e87edac5e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -281,9 +281,12 @@ public class GrpcLogAppender extends LogAppenderBase {
CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
getServer().getId(), null, proto);
request.startRequestTimer();
- boolean sent = Optional.ofNullable(appendLogRequestObserver).map(observer -> {
- observer.onNext(proto);
- return true;}).isPresent();
+ resetHeartbeatTrigger();
+ final boolean sent = Optional.ofNullable(appendLogRequestObserver)
+ .map(observer -> {
+ observer.onNext(proto);
+ return true;
+ }).isPresent();
if (sent) {
scheduler.onTimeout(requestTimeoutDuration,
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index ef5e1a7ed..f0ff28690 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -166,6 +166,9 @@ public interface LogAppender {
return getFollower().getNextIndex() < getRaftLog().getNextIndex();
}
+ /** send a heartbeat AppendEntries immediately */
+ void triggerHeartbeat() throws IOException;
+
/** @return the wait time in milliseconds to send the next heartbeat. */
default long getHeartbeatWaitTimeMs() {
final int min = getServer().properties().minRpcTimeoutMs();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 50f9887fd..fda78fbcf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -34,9 +34,11 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* An abstract implementation of {@link LogAppender}.
@@ -53,6 +55,8 @@ public abstract class LogAppenderBase implements LogAppender {
private final LogAppenderDaemon daemon;
private final AwaitForSignal eventAwaitForSignal;
+ private final AtomicBoolean heartbeatTrigger = new AtomicBoolean();
+
protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
this.follower = f;
this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass());
@@ -69,6 +73,28 @@ public abstract class LogAppenderBase implements LogAppender {
this.eventAwaitForSignal = new AwaitForSignal(name);
}
+ @Override
+ public void triggerHeartbeat() throws IOException {
+ if (heartbeatTrigger.compareAndSet(false, true)) {
+ notifyLogAppender();
+ }
+ }
+
+ protected void resetHeartbeatTrigger() {
+ heartbeatTrigger.set(false);
+ }
+
+ @Override
+ public boolean shouldSendAppendEntries() {
+ return heartbeatTrigger.get() || LogAppender.super.shouldSendAppendEntries();
+ }
+
+ @Override
+ public long getHeartbeatWaitTimeMs() {
+ return heartbeatTrigger.get() ? 0 :
+ LogAppender.super.getHeartbeatWaitTimeMs();
+ }
+
@Override
public AwaitForSignal getEventAwaitForSignal() {
return eventAwaitForSignal;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 0c91427e5..0a4c12ce7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -72,6 +72,7 @@ class LogAppenderDefault extends LogAppenderBase {
return null;
}
+ resetHeartbeatTrigger();
getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0);
final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
getFollower().updateLastRpcResponseTime();