You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ratis.apache.org by GitBox <gi...@apache.org> on 2022/08/27 16:08:20 UTC

[GitHub] [ratis] SzyWilliam opened a new pull request, #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

SzyWilliam opened a new pull request, #730:
URL: https://github.com/apache/ratis/pull/730

   ## What changes were proposed in this pull request?
   Add the heartbeat broadcast mechanism in LeaderState. 
   When a leader is requested for current readIndex, it should first confirm its authority by broadcasting heartbeats and receiving majority acknowledgements. If the leadership is confirmed, the readIndex can be safely returned.
   In this PR I propose to add the broadcast mechanism and use it to implement readIndex request.
   
   ## What is the link to the Apache JIRA
   https://issues.apache.org/jira/browse/RATIS-1643
   
   Please replace this section with the link to the Apache JIRA)
   
   ## How was this patch tested?
   this patch should be tested when Read request using ReadIndex Option is implemented.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] szetszwo commented on pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
szetszwo commented on PR #730:
URL: https://github.com/apache/ratis/pull/730#issuecomment-1232244297

   @SzyWilliam , sorry that I just refreshed the code change previously and missed your question.
   
   The entire ReadRequests should belong to LeaderStateImpl since it mainly takes care about broadcasting heartbeats and processing the replies.  We may consider moving it to LeaderStateImpl.  However, I seem to recall that it may be inconvenient to do so.  Please try it in the next JIRA.  Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] szetszwo commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
szetszwo commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r957889294


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -189,6 +189,8 @@ public long[] getFollowerNextIndices() {
 
   private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
 
+  private final ReadRequests readRequests;

Review Comment:
   Let's put move `readRequests` to `ServerState since
   - `stateMachineUpdater` needs to update`readRequests` later;
   - `ServerState` is a smaller class so that `RaftServerImpl` won't get to big.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {

Review Comment:
   Let's call it `HeartbeatListener` or `AppendEntriesListener` since it listens to all heartbeats (indeed, all appendEntries), not only the broadcasted heartbeat.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) {
+      final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
+      final HeartbeatAck reply = replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
+      if (!reply.receive(proto)) {
+        return;
+      }
+      if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
+        future.complete(commitIndex);
+      }
+    }
+
+    boolean isCompleted() {
+      return future.isDone();
+    }
+  }
+
+  private final ConcurrentNavigableMap<Long, HeartbeatBroadcastListener>
+      heartbeatListeners = new ConcurrentSkipListMap<>();
+  private final RaftLogIndex ackedCommitIndex = new RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
+
+  HeartbeatBroadcastListener addHeartbeatListener(long commitIndex,
+                                                  Function<Long, HeartbeatBroadcastListener> constructor) {
+    if (commitIndex <= ackedCommitIndex.get()) {
+      return null;
+    }
+    return heartbeatListeners.computeIfAbsent(commitIndex, constructor);
+  }
+
+  synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply, Predicate<Predicate<RaftPeerId>> hasMajority) {

Review Comment:
   I was thinking about an un-synchronized implementation but it may be too hard.  If we are going to use `synchronized`, we should also `synchronized` the add method and change the map to just `NavigableMap`.  Also, let's create a class so that `synchronized` applies only to that class but not the entire `ReadRequests`.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) {
+      final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
+      final HeartbeatAck reply = replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
+      if (!reply.receive(proto)) {
+        return;
+      }
+      if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
+        future.complete(commitIndex);
+      }
+    }
+
+    boolean isCompleted() {
+      return future.isDone();
+    }
+  }
+
+  private final ConcurrentNavigableMap<Long, HeartbeatBroadcastListener>
+      heartbeatListeners = new ConcurrentSkipListMap<>();
+  private final RaftLogIndex ackedCommitIndex = new RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
+
+  HeartbeatBroadcastListener addHeartbeatListener(long commitIndex,
+                                                  Function<Long, HeartbeatBroadcastListener> constructor) {
+    if (commitIndex <= ackedCommitIndex.get()) {
+      return null;
+    }
+    return heartbeatListeners.computeIfAbsent(commitIndex, constructor);
+  }
+
+  synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply, Predicate<Predicate<RaftPeerId>> hasMajority) {

Review Comment:
   The code will look like
   ```java
     class HeartbeatListeners {
       private final NavigableMap<Long, HeartbeatListener> sorted = new TreeMap<>();
   
       synchronized HeartbeatListener add(long commitIndex, Function<Long, HeartbeatListener> constructor) {
         return sorted.computeIfAbsent(commitIndex, constructor);
       }
   
       synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply, Predicate<Predicate<RaftPeerId>> hasMajority) {
         final long callId = reply.getServerReply().getCallId();
         for (; ; ) {
           final Map.Entry<Long, HeartbeatListener> first = sorted.firstEntry();
           if (first == null || first.getKey() > callId) {
             return;
           }
   
           final HeartbeatListener listener = first.getValue();
           if (listener == null) {
             continue;
           }
   
           if (listener.receive(reply, hasMajority)) {
             final HeartbeatListener removed = sorted.remove(callId);
             Preconditions.assertSame(listener, removed, "HeartbeatListener");
             ackedCommitIndex.updateToMax(listener.commitIndex, s -> LOG.debug("{}: {}", ReadRequests.this, s));
           }
         }
       }
     }
   
     private final RaftLogIndex ackedCommitIndex = new RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
     private final HeartbeatListeners heartbeatListeners = new HeartbeatListeners();
   
     HeartbeatListener addHeartbeatListener(long commitIndex, Function<Long, HeartbeatListener> constructor) {
       if (commitIndex <= ackedCommitIndex.get()) {
         return null;
       }
       return heartbeatListeners.add(commitIndex, constructor);
     }
   
     void onAppendEntriesReply(AppendEntriesReplyProto reply, Predicate<Predicate<RaftPeerId>> hasMajority) {
       heartbeatListeners.onAppendEntriesReply(reply, hasMajority);
     }
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) {
+      final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
+      final HeartbeatAck reply = replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
+      if (!reply.receive(proto)) {
+        return;
+      }
+      if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
+        future.complete(commitIndex);
+      }
+    }
+
+    boolean isCompleted() {
+      return future.isDone();
+    }

Review Comment:
   It should be
   ```java
       boolean isCompletedNormally() {
         return future.isDone() && !future.isCompletedExceptionally() && !future.isCancelled();
       }
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) {
+      final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
+      final HeartbeatAck reply = replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
+      if (!reply.receive(proto)) {
+        return;
+      }
+      if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
+        future.complete(commitIndex);
+      }
+    }
+
+    boolean isCompleted() {
+      return future.isDone();
+    }
+  }
+
+  private final ConcurrentNavigableMap<Long, HeartbeatBroadcastListener>
+      heartbeatListeners = new ConcurrentSkipListMap<>();
+  private final RaftLogIndex ackedCommitIndex = new RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
+
+  HeartbeatBroadcastListener addHeartbeatListener(long commitIndex,
+                                                  Function<Long, HeartbeatBroadcastListener> constructor) {
+    if (commitIndex <= ackedCommitIndex.get()) {
+      return null;
+    }
+    return heartbeatListeners.computeIfAbsent(commitIndex, constructor);
+  }
+
+  synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply, Predicate<Predicate<RaftPeerId>> hasMajority) {
+    final long callId = reply.getServerReply().getCallId();
+    for (;;) {
+      final Long first = heartbeatListeners.firstKey();
+      if (first == null || first > callId) {
+        return;
+      }
+
+      final HeartbeatBroadcastListener listener = heartbeatListeners.get(callId);
+      if (listener == null) {
+        continue;
+      }
+
+      listener.receive(reply, hasMajority);
+      if (listener.isCompleted()) {
+        heartbeatListeners.remove(callId);
+        ackedCommitIndex.updateToMax(listener.commitIndex, s -> LOG.debug("ack commit index change: {}", s));

Review Comment:
   The name `ackedCommitIndex` is already printed.  Let's use 
   ```java
   LOG.debug("{}: {}", ReadRequests.this, s))
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) {

Review Comment:
   Let's check `isCompletedNormally()`:
   ```java
       boolean receive(AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) {
         if (isCompletedNormally()) {
           return true;
         }
         final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
         final HeartbeatAck reply = replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
         if (reply.receive(proto)) {
           if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
             future.complete(commitIndex);
             return true;
           }
         }
         return isCompletedNormally();
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] szetszwo commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
szetszwo commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956787339


##########
ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java:
##########
@@ -225,4 +230,14 @@ public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermInd
   public Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot) {
     return new InstallSnapshotRequests(server, getFollowerId(), requestId, snapshot, snapshotChunkMaxSize);
   }
+
+  @Override
+  public void registerAppendEntriesListener(BiConsumer<RaftPeerId, AppendEntriesReplyProto> listener) {
+    appendEntriesListener = listener;
+  }
+
+  protected void notifyAppendEntriesListener(AppendEntriesReplyProto reply) {
+    Optional.ofNullable(appendEntriesListener).ifPresent(
+        listener -> listener.accept(this.getServer().getId(), reply));

Review Comment:
   Let's add a new method to `LeaderState`
   ```java
   //LeaderState.java
     /** Received an {@link AppendEntriesReplyProto} */
     void onAppendEntriesReply(FollowerInfo follower, AppendEntriesReplyProto reply);
   ```
   Then, this method becomes:
   ```java
       getLeaderState().onAppendEntriesReply(getFollower(), reply);
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +111,69 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final Map<RaftPeerId, HeartbeatAck> pendingAcknowledgements;
+    private final int groupPeerCount;
+    private int ack;
+    private volatile boolean done;
+    private final CompletableFuture<Boolean> result;
+
+    HeartbeatBroadcastListener(List<LogAppender> senders) {
+      this.groupPeerCount = senders.size() + 1;
+      this.ack = 0;
+      this.done = false;
+      result = new CompletableFuture<>();
+
+      pendingAcknowledgements = new ConcurrentHashMap<>();
+      senders.forEach(sender ->
+          pendingAcknowledgements.put(sender.getFollowerId(), new HeartbeatAck(sender)));
+    }
+
+    private void handleReply(RaftPeerId peerId, AppendEntriesReplyProto reply) {
+      if (done) {
+        return;
+      }
+
+      Optional.ofNullable(pendingAcknowledgements.get(peerId)).ifPresent(heartbeatAck -> {
+        boolean firstAcknowledged = heartbeatAck.receive(reply);
+        if (firstAcknowledged) {
+          onHeartbeatAcknowledged();
+        }
+      });
+    }
+
+    private synchronized void handleTimeout() {
+      if (!done) {
+        done = true;
+        result.complete(false);
+      }
+    }
+
+    private boolean isDone() {
+      return this.done;
+    }
+
+    synchronized void onHeartbeatAcknowledged() {
+      if (done) {
+        return;
+      }
+
+      ack++;
+
+      if (isMajorityAck()) {
+        done = true;
+        result.complete(true);
+      }
+    }
+
+    private synchronized boolean isMajorityAck() {
+      // include leader itself
+      return ack + 1 > groupPeerCount / 2;
+    }

Review Comment:
   The configuration may change so that we should use the same logic in `LeaderStateImpl.getMajorityMin` to calculate majority.  We may add the following method:
   ```java
   //LeaderStateImpl.java
     private boolean hasMajority(Predicate<RaftPeerId> isAcked) {
       final RaftPeerId selfId = server.getId();
       final RaftConfigurationImpl conf = server.getRaftConf();
   
       final List<RaftPeerId> followers = voterLists.get(0);
       final boolean includeSelf = conf.containsInConf(selfId);
       final boolean newConf = hasMajority(isAcked, followers, includeSelf);
   
       if (!conf.isTransitional()) {
         return newConf;
       } else { // configuration is in transitional state
         final List<RaftPeerId> oldFollowers = voterLists.get(1);
         final boolean includeSelfInOldConf = conf.containsInOldConf(selfId);
         final boolean oldConf = hasMajority(isAcked, oldFollowers, includeSelfInOldConf);
         return newConf && oldConf;
       }
     }
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -1028,6 +1033,43 @@ public boolean checkLeadership() {
     return false;
   }
 
+  /** confirm whether it has valid leadership by broadcasting heartbeat AppendEntries */
+  private CompletableFuture<Boolean> confirmLeadership(long readIndex) {
+    if (pendingBroadcasts.containsKey(readIndex)) {
+      return pendingBroadcasts.get(readIndex);
+    }
+
+    CompletableFuture<Boolean> future =
+        this.readRequests.newHeartbeatBroadcastListener(senders.getSenders());
+    pendingBroadcasts.put(readIndex, future);
+
+    senders.stream().forEach(logAppender -> {
+      try {
+        logAppender.triggerHeartbeat();
+      } catch (IOException e) {
+        LOG.warn("{}: {} trigger heartbeat failed due to {}", this, logAppender, e);
+      }
+    });
+    return future;
+  }
+
+  /** return leader's read index, see thesis section 6.4 */
+  CompletableFuture<Long> getReadIndex() {
+    final long readIndex = server.getRaftLog().getLastCommittedIndex();

Review Comment:
   Check if the index is already ack'ed.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -17,11 +17,59 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
 
 /** For supporting linearizable read. */
-class ReadRequests {
+class ReadRequests implements BiConsumer<RaftPeerId, AppendEntriesReplyProto> {

Review Comment:
   In this case, we don't have to use `BiConsumer` since we are going to remove `registerAppendEntriesListener`.
   
   In general, use lambda expressions instead of implementing a functional interface.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -247,6 +247,8 @@ boolean removeAll(Collection<LogAppender> c) {
   private final EventProcessor processor;
   private final PendingRequests pendingRequests;
   private final WatchRequests watchRequests;
+  private final ReadRequests readRequests;
+  private final Map<Long, CompletableFuture<Boolean>> pendingBroadcasts;

Review Comment:
   Move `LeaderStateImpl.ReadRequests` and `ReadRequests.pendingBroadcasts` should be combined into a single data structure.



##########
ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java:
##########
@@ -169,6 +171,9 @@ default boolean hasAppendEntries() {
   /** send a heartbeat AppendEntries immediately */
   void triggerHeartbeat() throws IOException;
 
+  /** Register an AppendEntries listener */
+  void registerAppendEntriesListener(BiConsumer<RaftPeerId, AppendEntriesReplyProto> listener);

Review Comment:
   Let's change `LeaderState` as mentioned instead.  `LeaderState` has only one implementation while `LogAppender` has multiple implementations.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -17,11 +17,59 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
 
 /** For supporting linearizable read. */
-class ReadRequests {
+class ReadRequests implements BiConsumer<RaftPeerId, AppendEntriesReplyProto> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReadRequests.class);
+
+  private final List<HeartbeatBroadcastListener> pendingBroadcasts;
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+  private final TimeDuration readTimeout;
+
+  ReadRequests(RaftProperties properties) {
+    pendingBroadcasts = new ArrayList<>();
+    readTimeout = RaftServerConfigKeys.Read.readOnlyTimeout(properties);
+  }
+
+  CompletableFuture<Boolean> newHeartbeatBroadcastListener(List<LogAppender> senders) {
+    HeartbeatBroadcastListener listener = new HeartbeatBroadcastListener(senders);
+    synchronized (pendingBroadcasts) {
+      pendingBroadcasts.add(listener);
+    }
+
+    // timeout and complete the broadcast listener after a given time
+    scheduler.onTimeout(readTimeout,
+        listener::handleTimeout,
+        LOG, () -> "timeout heartbeat broadcast listener: " + listener);
+
+    return listener.getFuture();
+  }
+
+  @Override
+  public void accept(RaftPeerId raftPeerId, AppendEntriesReplyProto reply) {
+    synchronized (pendingBroadcasts) {
+      pendingBroadcasts.forEach(listener -> listener.handleReply(raftPeerId, reply));
+      pendingBroadcasts.removeIf(listener -> listener.isDone());

Review Comment:
   Use a `ConcurrentNavigableMap` so that it handles only the pending requests with read index <= the reply index.  Then, it do not have to process the entire list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r958531886


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) {
+      final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
+      final HeartbeatAck reply = replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
+      if (!reply.receive(proto)) {
+        return;
+      }
+      if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
+        future.complete(commitIndex);
+      }
+    }
+
+    boolean isCompleted() {
+      return future.isDone();
+    }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r958534935


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956676391


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -34,7 +37,7 @@ static class HeartbeatAck {
     }
 
     /** Is the heartbeat (for a particular call id) acknowledged? */
-    boolean isAcknowledged() {
+    synchronized boolean isAcknowledged() {

Review Comment:
   Got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r958524532


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -189,6 +189,8 @@ public long[] getFollowerNextIndices() {
 
   private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
 
+  private final ReadRequests readRequests;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] szetszwo merged pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
szetszwo merged PR #730:
URL: https://github.com/apache/ratis/pull/730


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956676538


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +66,82 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final int groupPeerCount;
+    private int ack;
+    private int fail;
+    private volatile boolean done;
+    private final CompletableFuture<Boolean> result;
+
+    HeartbeatBroadcastListener(int groupSize) {
+      this.groupPeerCount = groupSize;
+      this.ack = 0;
+      this.fail = 0;

Review Comment:
   I've removed the fail field while keeps the timeout mechanism. If a broadcast does not complete in a given time, just complete it with value `false`.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +66,82 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final int groupPeerCount;
+    private int ack;
+    private int fail;
+    private volatile boolean done;
+    private final CompletableFuture<Boolean> result;
+
+    HeartbeatBroadcastListener(int groupSize) {
+      this.groupPeerCount = groupSize;
+      this.ack = 0;
+      this.fail = 0;
+      this.done = false;
+      result = new CompletableFuture<>();
+    }
+
+    private class HeartbeatListener implements BiFunction<Boolean, AppendEntriesReplyProto, Boolean> {
+      private final HeartbeatAck heartbeatAck;
+
+      HeartbeatListener(LogAppender logAppender) {
+        heartbeatAck = new HeartbeatAck(logAppender);
+      }
+
+      @Override
+      public Boolean apply(Boolean isTimeout, AppendEntriesReplyProto reply) {
+        if (isTimeout) {
+          onHeartbeatReply(false);
+          return true;
+        }
+
+        boolean acknowledged = heartbeatAck.receive(reply);
+        if (acknowledged) {
+          onHeartbeatReply(true);
+        }
+        return heartbeatAck.isAcknowledged();
+      }
+
+    }
+
+    HeartbeatListener newHeartbeatListener(LogAppender logAppender) {
+      return new HeartbeatListener(logAppender);
+    }
+
+    synchronized void onHeartbeatReply(boolean isSuccess) {
+      if (done) {
+        return;
+      }
+
+      if (isSuccess) {
+        ack++;
+      } else {
+        fail++;
+      }
+
+      if (isMajorityAck()) {
+        result.complete(true);
+        done = true;
+      }
+
+      if (isMajorityFail()) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on PR #730:
URL: https://github.com/apache/ratis/pull/730#issuecomment-1229392727

   @szetszwo Thanks for the review. I've made changes on code, please take a look at it~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956860997


##########
ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java:
##########
@@ -225,4 +230,14 @@ public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermInd
   public Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot) {
     return new InstallSnapshotRequests(server, getFollowerId(), requestId, snapshot, snapshotChunkMaxSize);
   }
+
+  @Override
+  public void registerAppendEntriesListener(BiConsumer<RaftPeerId, AppendEntriesReplyProto> listener) {
+    appendEntriesListener = listener;
+  }
+
+  protected void notifyAppendEntriesListener(AppendEntriesReplyProto reply) {
+    Optional.ofNullable(appendEntriesListener).ifPresent(
+        listener -> listener.accept(this.getServer().getId(), reply));

Review Comment:
   done



##########
ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java:
##########
@@ -169,6 +171,9 @@ default boolean hasAppendEntries() {
   /** send a heartbeat AppendEntries immediately */
   void triggerHeartbeat() throws IOException;
 
+  /** Register an AppendEntries listener */
+  void registerAppendEntriesListener(BiConsumer<RaftPeerId, AppendEntriesReplyProto> listener);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] szetszwo commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
szetszwo commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956789679


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -17,11 +17,59 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
 
 /** For supporting linearizable read. */
-class ReadRequests {
+class ReadRequests implements BiConsumer<RaftPeerId, AppendEntriesReplyProto> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReadRequests.class);
+
+  private final List<HeartbeatBroadcastListener> pendingBroadcasts;
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+  private final TimeDuration readTimeout;
+
+  ReadRequests(RaftProperties properties) {
+    pendingBroadcasts = new ArrayList<>();
+    readTimeout = RaftServerConfigKeys.Read.readOnlyTimeout(properties);
+  }
+
+  CompletableFuture<Boolean> newHeartbeatBroadcastListener(List<LogAppender> senders) {
+    HeartbeatBroadcastListener listener = new HeartbeatBroadcastListener(senders);
+    synchronized (pendingBroadcasts) {
+      pendingBroadcasts.add(listener);
+    }
+
+    // timeout and complete the broadcast listener after a given time
+    scheduler.onTimeout(readTimeout,
+        listener::handleTimeout,
+        LOG, () -> "timeout heartbeat broadcast listener: " + listener);
+
+    return listener.getFuture();
+  }
+
+  @Override
+  public void accept(RaftPeerId raftPeerId, AppendEntriesReplyProto reply) {
+    synchronized (pendingBroadcasts) {
+      pendingBroadcasts.forEach(listener -> listener.handleReply(raftPeerId, reply));
+      pendingBroadcasts.removeIf(listener -> listener.isDone());

Review Comment:
   Use a `ConcurrentNavigableMap` so that it handles only the pending requests with read index <= the reply index.  Then, it do not have to process the entire list; see `ReadIndexQueue`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] szetszwo commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
szetszwo commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956788134


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -247,6 +247,8 @@ boolean removeAll(Collection<LogAppender> c) {
   private final EventProcessor processor;
   private final PendingRequests pendingRequests;
   private final WatchRequests watchRequests;
+  private final ReadRequests readRequests;
+  private final Map<Long, CompletableFuture<Boolean>> pendingBroadcasts;

Review Comment:
   `LeaderStateImpl.ReadRequests` and `ReadRequests.pendingBroadcasts` should be combined into a single data structure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r958558963


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) {
+      final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
+      final HeartbeatAck reply = replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
+      if (!reply.receive(proto)) {
+        return;
+      }
+      if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
+        future.complete(commitIndex);
+      }
+    }
+
+    boolean isCompleted() {
+      return future.isDone();
+    }
+  }
+
+  private final ConcurrentNavigableMap<Long, HeartbeatBroadcastListener>
+      heartbeatListeners = new ConcurrentSkipListMap<>();
+  private final RaftLogIndex ackedCommitIndex = new RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
+
+  HeartbeatBroadcastListener addHeartbeatListener(long commitIndex,
+                                                  Function<Long, HeartbeatBroadcastListener> constructor) {
+    if (commitIndex <= ackedCommitIndex.get()) {
+      return null;
+    }
+    return heartbeatListeners.computeIfAbsent(commitIndex, constructor);
+  }
+
+  synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply, Predicate<Predicate<RaftPeerId>> hasMajority) {
+    final long callId = reply.getServerReply().getCallId();
+    for (;;) {
+      final Long first = heartbeatListeners.firstKey();
+      if (first == null || first > callId) {
+        return;
+      }
+
+      final HeartbeatBroadcastListener listener = heartbeatListeners.get(callId);
+      if (listener == null) {
+        continue;
+      }
+
+      listener.receive(reply, hasMajority);
+      if (listener.isCompleted()) {
+        heartbeatListeners.remove(callId);
+        ackedCommitIndex.updateToMax(listener.commitIndex, s -> LOG.debug("ack commit index change: {}", s));

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r958559189


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final long commitIndex;
+    private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new ConcurrentHashMap<>();
+
+    HeartbeatBroadcastListener(long commitIndex) {
+      this.commitIndex = commitIndex;
+    }
+
+    void init(LogAppender appender) {
+      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
+    }
+
+    CompletableFuture<Long> getFuture() {
+      return future;
+    }
+
+    void receive(AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) {
+      final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
+      final HeartbeatAck reply = replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
+      if (!reply.receive(proto)) {
+        return;
+      }
+      if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
+        future.complete(commitIndex);
+      }
+    }
+
+    boolean isCompleted() {
+      return future.isDone();
+    }
+  }
+
+  private final ConcurrentNavigableMap<Long, HeartbeatBroadcastListener>
+      heartbeatListeners = new ConcurrentSkipListMap<>();
+  private final RaftLogIndex ackedCommitIndex = new RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
+
+  HeartbeatBroadcastListener addHeartbeatListener(long commitIndex,
+                                                  Function<Long, HeartbeatBroadcastListener> constructor) {
+    if (commitIndex <= ackedCommitIndex.get()) {
+      return null;
+    }
+    return heartbeatListeners.computeIfAbsent(commitIndex, constructor);
+  }
+
+  synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply, Predicate<Predicate<RaftPeerId>> hasMajority) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956861110


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -17,11 +17,59 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
 
 /** For supporting linearizable read. */
-class ReadRequests {
+class ReadRequests implements BiConsumer<RaftPeerId, AppendEntriesReplyProto> {

Review Comment:
   Got it~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on PR #730:
URL: https://github.com/apache/ratis/pull/730#issuecomment-1231765653

   @szetszwo Thanks for the review! I've made changes on code. Maybe we can move AppendEntriesListener and its navigable map to LeaderStateImpl, as 
   1. AppendEntriesListener is only valid and be called when peer is in valid leadership.
   2. AppendEntriesListener map should have the same lifecycle with LeaderState. Remaining pending listeners in map should be cleared when leader steps down, and the map should be recreated empty when peer is reelected as leader again. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] szetszwo commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
szetszwo commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956640897


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +66,82 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final int groupPeerCount;
+    private int ack;
+    private int fail;
+    private volatile boolean done;
+    private final CompletableFuture<Boolean> result;
+
+    HeartbeatBroadcastListener(int groupSize) {
+      this.groupPeerCount = groupSize;
+      this.ack = 0;
+      this.fail = 0;
+      this.done = false;
+      result = new CompletableFuture<>();
+    }
+
+    private class HeartbeatListener implements BiFunction<Boolean, AppendEntriesReplyProto, Boolean> {
+      private final HeartbeatAck heartbeatAck;
+
+      HeartbeatListener(LogAppender logAppender) {
+        heartbeatAck = new HeartbeatAck(logAppender);
+      }
+
+      @Override
+      public Boolean apply(Boolean isTimeout, AppendEntriesReplyProto reply) {
+        if (isTimeout) {
+          onHeartbeatReply(false);
+          return true;
+        }
+
+        boolean acknowledged = heartbeatAck.receive(reply);
+        if (acknowledged) {
+          onHeartbeatReply(true);
+        }
+        return heartbeatAck.isAcknowledged();
+      }
+
+    }
+
+    HeartbeatListener newHeartbeatListener(LogAppender logAppender) {
+      return new HeartbeatListener(logAppender);
+    }
+
+    synchronized void onHeartbeatReply(boolean isSuccess) {
+      if (done) {
+        return;
+      }
+
+      if (isSuccess) {
+        ack++;
+      } else {
+        fail++;
+      }
+
+      if (isMajorityAck()) {
+        result.complete(true);
+        done = true;
+      }
+
+      if (isMajorityFail()) {

Review Comment:
   There is not such thing as "majorityFail" since it should retry for failures.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +66,82 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final int groupPeerCount;
+    private int ack;
+    private int fail;
+    private volatile boolean done;
+    private final CompletableFuture<Boolean> result;
+
+    HeartbeatBroadcastListener(int groupSize) {
+      this.groupPeerCount = groupSize;
+      this.ack = 0;
+      this.fail = 0;

Review Comment:
   When a heartbeat fails (by some network error or other problems), it should retry until it succeeds.  We should not count heartbeat failures.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -1028,6 +1028,37 @@ public boolean checkLeadership() {
     return false;
   }
 
+  /** confirm whether it has valid leadership by broadcasting heartbeat AppendEntries */
+  private CompletableFuture<Boolean> confirmLeadership() {
+    ReadRequests.HeartbeatBroadcastListener listener =
+        new ReadRequests.HeartbeatBroadcastListener(server.getRaftConf().getCurrentPeers().size());
+
+    senders.stream().forEach(logAppender -> {

Review Comment:
   Pass the commitIndex to confirmLeadership().
   - When the commitIndex is new, create a listener and broadcast heartbeat.
   - When the commitIndex already is passed by a previous call, just return the previous future.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -34,7 +37,7 @@ static class HeartbeatAck {
     }
 
     /** Is the heartbeat (for a particular call id) acknowledged? */
-    boolean isAcknowledged() {
+    synchronized boolean isAcknowledged() {

Review Comment:
   acknowledged is `volatile` so that we don't have to add `synchronized`.



##########
ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java:
##########
@@ -56,6 +61,9 @@ public abstract class LogAppenderBase implements LogAppender {
   private final AwaitForSignal eventAwaitForSignal;
 
   private final AtomicBoolean heartbeatTrigger = new AtomicBoolean();
+  private final List<BiFunction<Boolean, AppendEntriesReplyProto, Boolean>> heartbeatAckListeners;

Review Comment:
   It only needs a single listener queue in the ReadRequests, instead of a queue per LogAppender.  Each listener in the queue corresponds to a readIndex.  When a LogAppender receive an appendEntries reply, just notify ReadRequests so that it can update the listeners. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956966671


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -1028,6 +1033,43 @@ public boolean checkLeadership() {
     return false;
   }
 
+  /** confirm whether it has valid leadership by broadcasting heartbeat AppendEntries */
+  private CompletableFuture<Boolean> confirmLeadership(long readIndex) {
+    if (pendingBroadcasts.containsKey(readIndex)) {
+      return pendingBroadcasts.get(readIndex);
+    }
+
+    CompletableFuture<Boolean> future =
+        this.readRequests.newHeartbeatBroadcastListener(senders.getSenders());
+    pendingBroadcasts.put(readIndex, future);
+
+    senders.stream().forEach(logAppender -> {
+      try {
+        logAppender.triggerHeartbeat();
+      } catch (IOException e) {
+        LOG.warn("{}: {} trigger heartbeat failed due to {}", this, logAppender, e);
+      }
+    });
+    return future;
+  }
+
+  /** return leader's read index, see thesis section 6.4 */
+  CompletableFuture<Long> getReadIndex() {
+    final long readIndex = server.getRaftLog().getLastCommittedIndex();

Review Comment:
   done



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -17,11 +17,59 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
 
 /** For supporting linearizable read. */
-class ReadRequests {
+class ReadRequests implements BiConsumer<RaftPeerId, AppendEntriesReplyProto> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReadRequests.class);
+
+  private final List<HeartbeatBroadcastListener> pendingBroadcasts;
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+  private final TimeDuration readTimeout;
+
+  ReadRequests(RaftProperties properties) {
+    pendingBroadcasts = new ArrayList<>();
+    readTimeout = RaftServerConfigKeys.Read.readOnlyTimeout(properties);
+  }
+
+  CompletableFuture<Boolean> newHeartbeatBroadcastListener(List<LogAppender> senders) {
+    HeartbeatBroadcastListener listener = new HeartbeatBroadcastListener(senders);
+    synchronized (pendingBroadcasts) {
+      pendingBroadcasts.add(listener);
+    }
+
+    // timeout and complete the broadcast listener after a given time
+    scheduler.onTimeout(readTimeout,
+        listener::handleTimeout,
+        LOG, () -> "timeout heartbeat broadcast listener: " + listener);
+
+    return listener.getFuture();
+  }
+
+  @Override
+  public void accept(RaftPeerId raftPeerId, AppendEntriesReplyProto reply) {
+    synchronized (pendingBroadcasts) {
+      pendingBroadcasts.forEach(listener -> listener.handleReply(raftPeerId, reply));
+      pendingBroadcasts.removeIf(listener -> listener.isDone());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on PR #730:
URL: https://github.com/apache/ratis/pull/730#issuecomment-1229878526

   @szetszwo Thanks for reviews! I've made changes on code. Please take a look at it~ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956676368


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -1028,6 +1028,37 @@ public boolean checkLeadership() {
     return false;
   }
 
+  /** confirm whether it has valid leadership by broadcasting heartbeat AppendEntries */
+  private CompletableFuture<Boolean> confirmLeadership() {
+    ReadRequests.HeartbeatBroadcastListener listener =
+        new ReadRequests.HeartbeatBroadcastListener(server.getRaftConf().getCurrentPeers().size());
+
+    senders.stream().forEach(logAppender -> {

Review Comment:
   done



##########
ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java:
##########
@@ -56,6 +61,9 @@ public abstract class LogAppenderBase implements LogAppender {
   private final AwaitForSignal eventAwaitForSignal;
 
   private final AtomicBoolean heartbeatTrigger = new AtomicBoolean();
+  private final List<BiFunction<Boolean, AppendEntriesReplyProto, Boolean>> heartbeatAckListeners;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] szetszwo commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
szetszwo commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r957889294


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -189,6 +189,8 @@ public long[] getFollowerNextIndices() {
 
   private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
 
+  private final ReadRequests readRequests;

Review Comment:
   Let's put move `readRequests` to `ServerState since
   - `stateMachineUpdater` needs to update`readRequests` later;
   - `ServerState` is a smaller class so that `RaftServerImpl` won't get too big.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r958526207


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +78,70 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {

Review Comment:
   Done. I've changed it to AppendEntriesListener.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] szetszwo commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
szetszwo commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956788134


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -247,6 +247,8 @@ boolean removeAll(Collection<LogAppender> c) {
   private final EventProcessor processor;
   private final PendingRequests pendingRequests;
   private final WatchRequests watchRequests;
+  private final ReadRequests readRequests;
+  private final Map<Long, CompletableFuture<Boolean>> pendingBroadcasts;

Review Comment:
   `LeaderStateImpl.ReadRequests` and `ReadRequests.pendingBroadcasts` should be combined into a single data structure; see `ReadRequests.ReadIndexQueue`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956862297


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +111,69 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final Map<RaftPeerId, HeartbeatAck> pendingAcknowledgements;
+    private final int groupPeerCount;
+    private int ack;
+    private volatile boolean done;
+    private final CompletableFuture<Boolean> result;
+
+    HeartbeatBroadcastListener(List<LogAppender> senders) {
+      this.groupPeerCount = senders.size() + 1;
+      this.ack = 0;
+      this.done = false;
+      result = new CompletableFuture<>();
+
+      pendingAcknowledgements = new ConcurrentHashMap<>();
+      senders.forEach(sender ->
+          pendingAcknowledgements.put(sender.getFollowerId(), new HeartbeatAck(sender)));
+    }
+
+    private void handleReply(RaftPeerId peerId, AppendEntriesReplyProto reply) {
+      if (done) {
+        return;
+      }
+
+      Optional.ofNullable(pendingAcknowledgements.get(peerId)).ifPresent(heartbeatAck -> {
+        boolean firstAcknowledged = heartbeatAck.receive(reply);
+        if (firstAcknowledged) {
+          onHeartbeatAcknowledged();
+        }
+      });
+    }
+
+    private synchronized void handleTimeout() {
+      if (!done) {
+        done = true;
+        result.complete(false);
+      }
+    }
+
+    private boolean isDone() {
+      return this.done;
+    }
+
+    synchronized void onHeartbeatAcknowledged() {
+      if (done) {
+        return;
+      }
+
+      ack++;
+
+      if (isMajorityAck()) {
+        done = true;
+        result.complete(true);
+      }
+    }
+
+    private synchronized boolean isMajorityAck() {
+      // include leader itself
+      return ack + 1 > groupPeerCount / 2;
+    }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ratis] SzyWilliam commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956966886


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -247,6 +247,8 @@ boolean removeAll(Collection<LogAppender> c) {
   private final EventProcessor processor;
   private final PendingRequests pendingRequests;
   private final WatchRequests watchRequests;
+  private final ReadRequests readRequests;
+  private final Map<Long, CompletableFuture<Boolean>> pendingBroadcasts;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org