You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/08/30 20:37:32 UTC
[ratis] branch master updated: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex. (#730)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f65a7b65c RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex. (#730)
f65a7b65c is described below
commit f65a7b65cb64c332d9356eb013976db44077b38f
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Wed Aug 31 04:37:26 2022 +0800
RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex. (#730)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 1 +
.../apache/ratis/server/leader/LeaderState.java | 4 +
.../apache/ratis/server/impl/LeaderStateImpl.java | 86 ++++++++++++++++++
.../org/apache/ratis/server/impl/ReadRequests.java | 100 +++++++++++++++++++++
.../org/apache/ratis/server/impl/ServerState.java | 8 ++
.../ratis/server/leader/LogAppenderDefault.java | 1 +
6 files changed, 200 insertions(+)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 930e4184c..f83656cb0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -382,6 +382,7 @@ public class GrpcLogAppender extends LogAppenderBase {
default:
throw new IllegalStateException("Unexpected reply result: " + reply.getResult());
}
+ getLeaderState().onAppendEntriesReply(getFollower(), reply);
notifyLogAppender();
}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
index 9adf507f6..32c7d6ce0 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.leader;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.protocol.TermIndex;
@@ -62,4 +63,7 @@ public interface LeaderState {
/** Check if a follower is bootstrapping. */
boolean isFollowerBootstrapping(FollowerInfo follower);
+ /** Received an {@link AppendEntriesReplyProto} */
+ void onAppendEntriesReply(FollowerInfo follower, AppendEntriesReplyProto reply);
+
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index fbcbce448..3cb83b1a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -39,6 +40,7 @@ import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.ReadRequests.AppendEntriesListener;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.leader.LogAppender;
@@ -53,6 +55,7 @@ import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
@@ -803,6 +806,39 @@ class LeaderStateImpl implements LeaderState {
}
}
+ private boolean hasMajority(Predicate<RaftPeerId> isAcked) {
+ final RaftPeerId selfId = server.getId();
+ final RaftConfigurationImpl conf = server.getRaftConf();
+
+ final List<RaftPeerId> followers = voterLists.get(0);
+ final boolean includeSelf = conf.containsInConf(selfId);
+ final boolean newConf = hasMajority(isAcked, followers, includeSelf);
+
+ if (!conf.isTransitional()) {
+ return newConf;
+ } else {
+ final List<RaftPeerId> oldFollowers = voterLists.get(1);
+ final boolean includeSelfInOldConf = conf.containsInOldConf(selfId);
+ final boolean oldConf = hasMajority(isAcked, oldFollowers, includeSelfInOldConf);
+ return newConf && oldConf;
+ }
+ }
+
+ private boolean hasMajority(Predicate<RaftPeerId> isAcked, List<RaftPeerId> followers, boolean includeSelf) {
+ if (followers.isEmpty() && !includeSelf) {
+ return true;
+ }
+
+ int count = includeSelf ? 1 : 0;
+ for (RaftPeerId follower: followers) {
+ if (isAcked.test(follower)) {
+ count++;
+ }
+ }
+ final int size = includeSelf ? followers.size() + 1 : followers.size();
+ return count > size / 2;
+ }
+
private void updateCommit(LogEntryHeader[] entriesToCommit) {
final long newCommitIndex = raftLog.getLastCommittedIndex();
logMetadata(newCommitIndex);
@@ -1028,6 +1064,56 @@ class LeaderStateImpl implements LeaderState {
return false;
}
+ /**
+ * Obtain the current readIndex for read only requests. See Raft paper section 6.4.
+ * 1. Leader makes sure at least one log from current term is committed.
+ * 2. Leader record last committed index as readIndex.
+ * 3. Leader broadcast heartbeats to followers and waits for acknowledgements.
+ * 4. If majority respond success, returns readIndex.
+ * @return current readIndex.
+ */
+ CompletableFuture<Long> getReadIndex() {
+ final long readIndex = server.getRaftLog().getLastCommittedIndex();
+
+ // if group contains only one member, fast path
+ if (server.getRaftConf().getCurrentPeers().size() == 1) {
+ return CompletableFuture.completedFuture(readIndex);
+ }
+
+ // leader has not committed any entries in this term, reject
+ if (server.getRaftLog().getTermIndex(readIndex).getTerm() != server.getState().getCurrentTerm()) {
+ return JavaUtils.completeExceptionally(new LeaderNotReadyException(server.getMemberId()));
+ }
+
+ final MemoizedSupplier<AppendEntriesListener> supplier = MemoizedSupplier.valueOf(
+ () -> new AppendEntriesListener(readIndex));
+ final AppendEntriesListener listener = server.getState().getReadRequests().addAppendEntriesListener(
+ readIndex, key -> supplier.get());
+
+ // the readIndex is already acknowledged before
+ if (listener == null) {
+ return CompletableFuture.completedFuture(readIndex);
+ }
+
+ if (supplier.isInitialized()) {
+ senders.forEach(sender -> {
+ listener.init(sender);
+ try {
+ sender.triggerHeartbeat();
+ } catch (IOException e) {
+ LOG.warn("{}: {} cannot trigger heartbeat due to {}", this, sender, e);
+ }
+ });
+ }
+
+ return listener.getFuture();
+ }
+
+ @Override
+ public void onAppendEntriesReply(FollowerInfo follower, RaftProtos.AppendEntriesReplyProto reply) {
+ server.getState().getReadRequests().onAppendEntriesReply(reply, this::hasMajority);
+ }
+
void replyPendingRequest(long logIndex, RaftClientReply reply) {
pendingRequests.replyPendingRequest(logIndex, reply);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
index b8d8998c7..76f974a03 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
@@ -17,11 +17,28 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogIndex;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.function.Predicate;
/** For supporting linearizable read. */
class ReadRequests {
+ private static final Logger LOG = LoggerFactory.getLogger(ReadRequests.class);
+
/** The acknowledgement from a {@link LogAppender} of a heartbeat for a particular call id. */
static class HeartbeatAck {
private final LogAppender appender;
@@ -63,4 +80,87 @@ class ReadRequests {
return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
}
}
+
+ static class AppendEntriesListener {
+ private final long commitIndex;
+ private final CompletableFuture<Long> future = new CompletableFuture<>();
+ private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new ConcurrentHashMap<>();
+
+ AppendEntriesListener(long commitIndex) {
+ this.commitIndex = commitIndex;
+ }
+
+ void init(LogAppender appender) {
+ replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+ }
+
+ CompletableFuture<Long> getFuture() {
+ return future;
+ }
+
+ boolean receive(AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) {
+ if (isCompletedNormally()) {
+ return true;
+ }
+ final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
+ final HeartbeatAck reply = replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
+ if (!reply.receive(proto)) {
+ if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
+ future.complete(commitIndex);
+ return true;
+ }
+ }
+
+ return isCompletedNormally();
+ }
+
+ boolean isCompletedNormally() {
+ return future.isDone() && !future.isCancelled() && !future.isCompletedExceptionally();
+ }
+ }
+
+ class AppendEntriesListeners {
+ private final NavigableMap<Long, AppendEntriesListener> sorted = new TreeMap<>();
+
+ synchronized AppendEntriesListener add(long commitIndex, Function<Long, AppendEntriesListener> constructor) {
+ return sorted.computeIfAbsent(commitIndex, constructor);
+ }
+
+ synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply,
+ Predicate<Predicate<RaftPeerId>> hasMajority) {
+ final long callId = reply.getServerReply().getCallId();
+ for (;;) {
+ final Map.Entry<Long, AppendEntriesListener> first = sorted.firstEntry();
+ if (first == null || first.getKey() > callId) {
+ return;
+ }
+
+ final AppendEntriesListener listener = first.getValue();
+ if (listener == null) {
+ continue;
+ }
+
+ if (listener.receive(reply, hasMajority)) {
+ final AppendEntriesListener removed = sorted.remove(callId);
+ Preconditions.assertSame(listener, removed, "AppendEntriesListener");
+ ackedCommitIndex.updateToMax(listener.commitIndex, s -> LOG.debug("{}: {}", ReadRequests.this, s));
+ }
+ }
+ }
+ }
+
+ private final AppendEntriesListeners appendEntriesListeners = new AppendEntriesListeners();
+ private final RaftLogIndex ackedCommitIndex = new RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
+
+ AppendEntriesListener addAppendEntriesListener(long commitIndex,
+ Function<Long, AppendEntriesListener> constructor) {
+ if (commitIndex <= ackedCommitIndex.get()) {
+ return null;
+ }
+ return appendEntriesListeners.add(commitIndex, constructor);
+ }
+
+ void onAppendEntriesReply(AppendEntriesReplyProto reply, Predicate<Predicate<RaftPeerId>> hasMajority) {
+ appendEntriesListeners.onAppendEntriesReply(reply, hasMajority);
+ }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index e92f9b911..d62e8f50e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -72,6 +72,8 @@ class ServerState {
private volatile Timestamp lastNoLeaderTime;
private final TimeDuration noLeaderTimeout;
+ private final ReadRequests readRequests;
+
/**
* Latest term server has seen.
* Initialized to 0 on first boot, increases monotonically.
@@ -128,6 +130,8 @@ class ServerState {
this.log = JavaUtils.memoize(() -> initRaftLog(getSnapshotIndexFromStateMachine, prop));
this.stateMachineUpdater = JavaUtils.memoize(() -> new StateMachineUpdater(
stateMachine, server, this, getLog().getSnapshotIndex(), prop));
+
+ this.readRequests = new ReadRequests();
}
void initialize(StateMachine stateMachine) throws IOException {
@@ -484,4 +488,8 @@ class ServerState {
}
return getLog().contains(ti);
}
+
+ ReadRequests getReadRequests() {
+ return readRequests;
+ }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 0a4c12ce7..ca654d1de 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -193,6 +193,7 @@ class LogAppenderDefault extends LogAppenderBase {
break;
default: throw new IllegalArgumentException("Unable to process result " + reply.getResult());
}
+ getLeaderState().onAppendEntriesReply(getFollower(), reply);
}
}