You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/08/30 20:37:32 UTC

[ratis] branch master updated: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex. (#730)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f65a7b65c RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex. (#730)
f65a7b65c is described below

commit f65a7b65cb64c332d9356eb013976db44077b38f
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Wed Aug 31 04:37:26 2022 +0800

    RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex. (#730)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |   1 +
 .../apache/ratis/server/leader/LeaderState.java    |   4 +
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  86 ++++++++++++++++++
 .../org/apache/ratis/server/impl/ReadRequests.java | 100 +++++++++++++++++++++
 .../org/apache/ratis/server/impl/ServerState.java  |   8 ++
 .../ratis/server/leader/LogAppenderDefault.java    |   1 +
 6 files changed, 200 insertions(+)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 930e4184c..f83656cb0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -382,6 +382,7 @@ public class GrpcLogAppender extends LogAppenderBase {
         default:
           throw new IllegalStateException("Unexpected reply result: " + reply.getResult());
       }
+      getLeaderState().onAppendEntriesReply(getFollower(), reply);
       notifyLogAppender();
     }
 
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
index 9adf507f6..32c7d6ce0 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.leader;
 
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -62,4 +63,7 @@ public interface LeaderState {
   /** Check if a follower is bootstrapping. */
   boolean isFollowerBootstrapping(FollowerInfo follower);
 
+  /** Received an {@link AppendEntriesReplyProto} */
+  void onAppendEntriesReply(FollowerInfo follower, AppendEntriesReplyProto reply);
+
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index fbcbce448..3cb83b1a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -39,6 +40,7 @@ import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.NotReplicatedException;
 import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.ReadRequests.AppendEntriesListener;
 import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.leader.LeaderState;
 import org.apache.ratis.server.leader.LogAppender;
@@ -53,6 +55,7 @@ import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.Timestamp;
@@ -803,6 +806,39 @@ class LeaderStateImpl implements LeaderState {
     }
   }
 
+  private boolean hasMajority(Predicate<RaftPeerId> isAcked) {
+    final RaftPeerId selfId = server.getId();
+    final RaftConfigurationImpl conf = server.getRaftConf();
+
+    final List<RaftPeerId> followers = voterLists.get(0);
+    final boolean includeSelf = conf.containsInConf(selfId);
+    final boolean newConf = hasMajority(isAcked, followers, includeSelf);
+
+    if (!conf.isTransitional()) {
+      return newConf;
+    } else {
+      final List<RaftPeerId> oldFollowers = voterLists.get(1);
+      final boolean includeSelfInOldConf = conf.containsInOldConf(selfId);
+      final boolean oldConf = hasMajority(isAcked, oldFollowers, includeSelfInOldConf);
+      return newConf && oldConf;
+    }
+  }
+
+  private boolean hasMajority(Predicate<RaftPeerId> isAcked, List<RaftPeerId> followers, boolean includeSelf) {
+    if (followers.isEmpty() && !includeSelf) {
+      return true;
+    }
+
+    int count = includeSelf ? 1 : 0;
+    for (RaftPeerId follower: followers) {
+      if (isAcked.test(follower)) {
+        count++;
+      }
+    }
+    final int size = includeSelf ? followers.size() + 1 : followers.size();
+    return count > size / 2;
+  }
+
   private void updateCommit(LogEntryHeader[] entriesToCommit) {
     final long newCommitIndex = raftLog.getLastCommittedIndex();
     logMetadata(newCommitIndex);
@@ -1028,6 +1064,56 @@ class LeaderStateImpl implements LeaderState {
     return false;
   }
 
+  /**
+   * Obtain the current readIndex for read only requests. See Raft paper section 6.4.
+   * 1. Leader makes sure at least one log from current term is committed.
+   * 2. Leader record last committed index as readIndex.
+   * 3. Leader broadcast heartbeats to followers and waits for acknowledgements.
+   * 4. If majority respond success, returns readIndex.
+   * @return current readIndex.
+   */
+  CompletableFuture<Long> getReadIndex() {
+    final long readIndex = server.getRaftLog().getLastCommittedIndex();
+
+    // if group contains only one member, fast path
+    if (server.getRaftConf().getCurrentPeers().size() == 1) {
+      return CompletableFuture.completedFuture(readIndex);
+    }
+
+    // leader has not committed any entries in this term, reject
+    if (server.getRaftLog().getTermIndex(readIndex).getTerm() != server.getState().getCurrentTerm()) {
+      return JavaUtils.completeExceptionally(new LeaderNotReadyException(server.getMemberId()));
+    }
+
+    final MemoizedSupplier<AppendEntriesListener> supplier = MemoizedSupplier.valueOf(
+        () -> new AppendEntriesListener(readIndex));
+    final AppendEntriesListener listener = server.getState().getReadRequests().addAppendEntriesListener(
+        readIndex, key -> supplier.get());
+
+    // the readIndex is already acknowledged before
+    if (listener == null) {
+      return CompletableFuture.completedFuture(readIndex);
+    }
+
+    if (supplier.isInitialized()) {
+      senders.forEach(sender -> {
+        listener.init(sender);
+        try {
+          sender.triggerHeartbeat();
+        } catch (IOException e) {
+          LOG.warn("{}: {} cannot trigger heartbeat due to {}", this, sender, e);
+        }
+      });
+    }
+
+    return listener.getFuture();
+  }
+
+  @Override
+  public void onAppendEntriesReply(FollowerInfo follower, RaftProtos.AppendEntriesReplyProto reply) {
+    server.getState().getReadRequests().onAppendEntriesReply(reply, this::hasMajority);
+  }
+
   void replyPendingRequest(long logIndex, RaftClientReply reply) {
     pendingRequests.replyPendingRequest(logIndex, reply);
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
index b8d8998c7..76f974a03 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
@@ -17,11 +17,28 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogIndex;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.function.Predicate;
 
 /** For supporting linearizable read. */
 class ReadRequests {
+  private static final Logger LOG = LoggerFactory.getLogger(ReadRequests.class);
+
   /** The acknowledgement from a {@link LogAppender} of a heartbeat for a particular call id. */
   static class HeartbeatAck {
     private final LogAppender appender;
@@ -63,4 +80,87 @@ class ReadRequests {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class AppendEntriesListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new ConcurrentHashMap<>();
+
+    AppendEntriesListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    boolean receive(AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) {
+      if (isCompletedNormally()) {
+        return true;
+      }
+      final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
+      final HeartbeatAck reply = replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
+      if (!reply.receive(proto)) {
+        if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
+          future.complete(commitIndex);
+          return true;
+        }
+      }
+
+      return isCompletedNormally();
+    }
+
+    boolean isCompletedNormally() {
+      return future.isDone() && !future.isCancelled() && !future.isCompletedExceptionally();
+    }
+  }
+
+  class AppendEntriesListeners {
+    private final NavigableMap<Long, AppendEntriesListener> sorted = new TreeMap<>();
+
+    synchronized AppendEntriesListener add(long commitIndex, Function<Long, AppendEntriesListener> constructor) {
+      return sorted.computeIfAbsent(commitIndex, constructor);
+    }
+
+    synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply,
+                                           Predicate<Predicate<RaftPeerId>> hasMajority) {
+      final long callId = reply.getServerReply().getCallId();
+      for (;;) {
+        final Map.Entry<Long, AppendEntriesListener> first = sorted.firstEntry();
+        if (first == null || first.getKey() > callId) {
+          return;
+        }
+
+        final AppendEntriesListener listener = first.getValue();
+        if (listener == null) {
+          continue;
+        }
+
+        if (listener.receive(reply, hasMajority)) {
+          final AppendEntriesListener removed = sorted.remove(callId);
+          Preconditions.assertSame(listener, removed, "AppendEntriesListener");
+          ackedCommitIndex.updateToMax(listener.commitIndex, s -> LOG.debug("{}: {}", ReadRequests.this, s));
+        }
+      }
+    }
+  }
+
+  private final AppendEntriesListeners appendEntriesListeners = new AppendEntriesListeners();
+  private final RaftLogIndex ackedCommitIndex = new RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
+
+  AppendEntriesListener addAppendEntriesListener(long commitIndex,
+                                                 Function<Long, AppendEntriesListener> constructor) {
+    if (commitIndex <= ackedCommitIndex.get()) {
+      return null;
+    }
+    return appendEntriesListeners.add(commitIndex, constructor);
+  }
+
+  void onAppendEntriesReply(AppendEntriesReplyProto reply, Predicate<Predicate<RaftPeerId>> hasMajority) {
+    appendEntriesListeners.onAppendEntriesReply(reply, hasMajority);
+  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index e92f9b911..d62e8f50e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -72,6 +72,8 @@ class ServerState {
   private volatile Timestamp lastNoLeaderTime;
   private final TimeDuration noLeaderTimeout;
 
+  private final ReadRequests readRequests;
+
   /**
    * Latest term server has seen.
    * Initialized to 0 on first boot, increases monotonically.
@@ -128,6 +130,8 @@ class ServerState {
     this.log = JavaUtils.memoize(() -> initRaftLog(getSnapshotIndexFromStateMachine, prop));
     this.stateMachineUpdater = JavaUtils.memoize(() -> new StateMachineUpdater(
         stateMachine, server, this, getLog().getSnapshotIndex(), prop));
+
+    this.readRequests = new ReadRequests();
   }
 
   void initialize(StateMachine stateMachine) throws IOException {
@@ -484,4 +488,8 @@ class ServerState {
     }
     return getLog().contains(ti);
   }
+
+  ReadRequests getReadRequests() {
+    return readRequests;
+  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 0a4c12ce7..ca654d1de 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -193,6 +193,7 @@ class LogAppenderDefault extends LogAppenderBase {
           break;
         default: throw new IllegalArgumentException("Unable to process result " + reply.getResult());
       }
+      getLeaderState().onAppendEntriesReply(getFollower(), reply);
     }
   }