You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/09/23 05:43:16 UTC

[incubator-ratis] branch master updated: RATIS-1042. Watch for commit calls are blocked for a long if no other message (#185)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new a79281c  RATIS-1042. Watch for commit calls are blocked for a long if no other message (#185)
a79281c is described below

commit a79281ca2452028228582ddde143e5350df9203d
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Wed Sep 23 11:13:07 2020 +0530

    RATIS-1042. Watch for commit calls are blocked for a long if no other message (#185)
    
    * Watch for commit calls are blocked for a long if no other message
    
    Co-authored-by: Elek Márton <el...@apache.org>
    
    * Leader should notify appenders on commitIndex change
    
    * Retrigger CI
    
    * Retrigger CI
    
    Co-authored-by: Elek Márton <el...@apache.org>
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 41 ++++++++++++----------
 .../org/apache/ratis/server/impl/LeaderState.java  |  1 +
 .../org/apache/ratis/server/impl/LogAppender.java  | 19 +++++++---
 3 files changed, 38 insertions(+), 23 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 9f5eb76..235a023 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -101,41 +101,44 @@ public class GrpcLogAppender extends LogAppender {
 
   @Override
   protected void runAppenderImpl() throws IOException {
-    boolean shouldAppendLog;
+    boolean installSnapshotRequired;
     for(; isAppenderRunning(); mayWait()) {
-      shouldAppendLog = true;
-      if (shouldSendRequest()) {
+      installSnapshotRequired = false;
+
+      //HB period is expired OR we have messages OR follower is behind with commit index
+      if (haveLogEntriesToSendOut() || heartbeatTimeout() || isFollowerCommitBehindLastCommitIndex()) {
+
         if (installSnapshotEnabled) {
           SnapshotInfo snapshot = shouldInstallSnapshot();
           if (snapshot != null) {
             installSnapshot(snapshot);
-            shouldAppendLog = false;
+            installSnapshotRequired = true;
           }
         } else {
           TermIndex installSnapshotNotificationTermIndex = shouldNotifyToInstallSnapshot();
           if (installSnapshotNotificationTermIndex != null) {
             installSnapshot(installSnapshotNotificationTermIndex);
-            shouldAppendLog = false;
+            installSnapshotRequired = true;
           }
         }
-        if (shouldHeartbeat() || (shouldAppendLog && !shouldWait())) {
-          // keep appending log entries or sending heartbeats
-          appendLog();
-        }
+
+        appendLog(installSnapshotRequired || haveTooManyPendingRequests());
+
       }
       checkSlowness();
+
     }
 
     Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);
   }
 
   private long getWaitTimeMs() {
-    if (!shouldSendRequest()) {
-      return getHeartbeatRemainingTime(); // No requests, wait until heartbeat
-    } else if (shouldWait()) {
-      return getHalfMinTimeoutMs(); // Should wait for a short time
+    if (haveTooManyPendingRequests()) {
+      return getHeartbeatRemainingTime(); // Should wait for a short time
+    } else if (haveLogEntriesToSendOut() || heartbeatTimeout()) {
+      return 0L;
     }
-    return 0L;
+    return Math.min(10L, getHeartbeatRemainingTime());
   }
 
   private void mayWait() {
@@ -166,8 +169,10 @@ public class GrpcLogAppender extends LogAppender {
     return appendLogRequestObserver == null || super.shouldSendRequest();
   }
 
-  /** @return true iff not received first response or queue is full. */
-  private boolean shouldWait() {
+  /**
+   * @return true iff not received first response or queue is full.
+   */
+  private boolean haveTooManyPendingRequests() {
     final int size = pendingRequests.logRequestsSize();
     if (size == 0) {
       return false;
@@ -175,7 +180,7 @@ public class GrpcLogAppender extends LogAppender {
     return !firstResponseReceived || size >= maxPendingRequestsNum;
   }
 
-  private void appendLog() throws IOException {
+  private void appendLog(boolean excludeLogEntries) throws IOException {
     final AppendEntriesRequestProto pending;
     final AppendEntriesRequest request;
     final StreamObserver<AppendEntriesRequestProto> s;
@@ -183,7 +188,7 @@ public class GrpcLogAppender extends LogAppender {
       // prepare and enqueue the append request. note changes on follower's
       // nextIndex and ops on pendingRequests should always be associated
       // together and protected by the lock
-      pending = createRequest(callId++);
+      pending = createRequest(callId++, excludeLogEntries);
       if (pending == null) {
         return;
       }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 5718158..240be2b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -387,6 +387,7 @@ public class LeaderState {
       watchRequests.update(ReplicationLevel.MAJORITY_COMMITTED, m.majority);
       watchRequests.update(ReplicationLevel.MAJORITY, m.max);
     });
+    notifySenders();
   }
 
   private void applyOldNewConf() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index b796099..03fd67d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -221,11 +221,12 @@ public class LogAppender {
     return null;
   }
 
-  protected AppendEntriesRequestProto createRequest(long callId) throws RaftLogIOException {
+  protected AppendEntriesRequestProto createRequest(long callId,
+      boolean heartbeat) throws RaftLogIOException {
     final TermIndex previous = getPrevious(follower.getNextIndex());
     final long snapshotIndex = follower.getSnapshotIndex();
     final long heartbeatRemainingMs = getHeartbeatRemainingTime();
-    if (heartbeatRemainingMs <= 0L) {
+    if (heartbeatRemainingMs <= 0L || heartbeat) {
       // heartbeat
       return leaderState.newAppendEntriesRequestProto(
           getFollowerId(), previous, Collections.emptyList(), !follower.isAttendingVote(), callId);
@@ -281,7 +282,7 @@ public class LogAppender {
     while (isAppenderRunning()) { // keep retrying for IOException
       try {
         if (request == null || request.getEntriesCount() == 0) {
-          request = createRequest(DEFAULT_CALLID);
+          request = createRequest(DEFAULT_CALLID, false);
         }
 
         if (request == null) {
@@ -571,14 +572,22 @@ public class LogAppender {
 
   /** Should the leader send appendEntries RPC to this follower? */
   protected boolean shouldSendRequest() {
-    return shouldAppendEntries(follower.getNextIndex()) || shouldHeartbeat();
+    return shouldAppendEntries(follower.getNextIndex()) || heartbeatTimeout();
+  }
+
+  protected boolean haveLogEntriesToSendOut() {
+    return shouldAppendEntries(follower.getNextIndex());
+  }
+
+  protected boolean isFollowerCommitBehindLastCommitIndex() {
+    return raftLog.getLastCommittedIndex() > follower.getCommitIndex();
   }
 
   private boolean shouldAppendEntries(long followerIndex) {
     return followerIndex < raftLog.getNextIndex();
   }
 
-  protected boolean shouldHeartbeat() {
+  protected boolean heartbeatTimeout() {
     return getHeartbeatRemainingTime() <= 0;
   }