You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ratis.apache.org by GitBox <gi...@apache.org> on 2022/08/27 23:18:18 UTC

[GitHub] [ratis] szetszwo commented on a diff in pull request #730: RATIS-1643. Add heartbeat broadcast mechanism for leader readIndex

szetszwo commented on code in PR #730:
URL: https://github.com/apache/ratis/pull/730#discussion_r956640897


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +66,82 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final int groupPeerCount;
+    private int ack;
+    private int fail;
+    private volatile boolean done;
+    private final CompletableFuture<Boolean> result;
+
+    HeartbeatBroadcastListener(int groupSize) {
+      this.groupPeerCount = groupSize;
+      this.ack = 0;
+      this.fail = 0;
+      this.done = false;
+      result = new CompletableFuture<>();
+    }
+
+    private class HeartbeatListener implements BiFunction<Boolean, AppendEntriesReplyProto, Boolean> {
+      private final HeartbeatAck heartbeatAck;
+
+      HeartbeatListener(LogAppender logAppender) {
+        heartbeatAck = new HeartbeatAck(logAppender);
+      }
+
+      @Override
+      public Boolean apply(Boolean isTimeout, AppendEntriesReplyProto reply) {
+        if (isTimeout) {
+          onHeartbeatReply(false);
+          return true;
+        }
+
+        boolean acknowledged = heartbeatAck.receive(reply);
+        if (acknowledged) {
+          onHeartbeatReply(true);
+        }
+        return heartbeatAck.isAcknowledged();
+      }
+
+    }
+
+    HeartbeatListener newHeartbeatListener(LogAppender logAppender) {
+      return new HeartbeatListener(logAppender);
+    }
+
+    synchronized void onHeartbeatReply(boolean isSuccess) {
+      if (done) {
+        return;
+      }
+
+      if (isSuccess) {
+        ack++;
+      } else {
+        fail++;
+      }
+
+      if (isMajorityAck()) {
+        result.complete(true);
+        done = true;
+      }
+
+      if (isMajorityFail()) {

Review Comment:
   There is not such thing as "majorityFail" since it should retry for failures.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -63,4 +66,82 @@ private boolean isValid(AppendEntriesReplyProto reply) {
       return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
     }
   }
+
+  static class HeartbeatBroadcastListener {
+    private final int groupPeerCount;
+    private int ack;
+    private int fail;
+    private volatile boolean done;
+    private final CompletableFuture<Boolean> result;
+
+    HeartbeatBroadcastListener(int groupSize) {
+      this.groupPeerCount = groupSize;
+      this.ack = 0;
+      this.fail = 0;

Review Comment:
   When a heartbeat fails (by some network error or other problems), it should retry until it succeeds.  We should not count heartbeat failures.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -1028,6 +1028,37 @@ public boolean checkLeadership() {
     return false;
   }
 
+  /** confirm whether it has valid leadership by broadcasting heartbeat AppendEntries */
+  private CompletableFuture<Boolean> confirmLeadership() {
+    ReadRequests.HeartbeatBroadcastListener listener =
+        new ReadRequests.HeartbeatBroadcastListener(server.getRaftConf().getCurrentPeers().size());
+
+    senders.stream().forEach(logAppender -> {

Review Comment:
   Pass the commitIndex to confirmLeadership().
   - When the commitIndex is new, create a listener and broadcast heartbeat.
   - When the commitIndex already is passed by a previous call, just return the previous future.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -34,7 +37,7 @@ static class HeartbeatAck {
     }
 
     /** Is the heartbeat (for a particular call id) acknowledged? */
-    boolean isAcknowledged() {
+    synchronized boolean isAcknowledged() {

Review Comment:
   acknowledged is `volatile` so that we don't have to add `synchronized`.



##########
ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java:
##########
@@ -56,6 +61,9 @@ public abstract class LogAppenderBase implements LogAppender {
   private final AwaitForSignal eventAwaitForSignal;
 
   private final AtomicBoolean heartbeatTrigger = new AtomicBoolean();
+  private final List<BiFunction<Boolean, AppendEntriesReplyProto, Boolean>> heartbeatAckListeners;

Review Comment:
   It only needs a single listener queue in the ReadRequests, instead of a queue per LogAppender.  Each listener in the queue corresponds to a readIndex.  When a LogAppender receive an appendEntries reply, just notify ReadRequests so that it can update the listeners. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org