You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/08/28 22:19:32 UTC

[GitHub] [incubator-ratis] bharatviswa504 commented on a change in pull request #185: RATIS-1042. Watch for commit calls are blocked for a long if no other message

bharatviswa504 commented on a change in pull request #185:
URL: https://github.com/apache/incubator-ratis/pull/185#discussion_r479548926



##########
File path: ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
##########
@@ -101,41 +101,44 @@ private synchronized void resetClient(AppendEntriesRequest request) {
 
   @Override
   protected void runAppenderImpl() throws IOException {
-    boolean shouldAppendLog;
+    boolean installSnapshotRequired;

Review comment:
       Question: Why similar code changes are not done in LogAppender#runAppenderImpl

##########
File path: ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
##########
@@ -101,41 +101,44 @@ private synchronized void resetClient(AppendEntriesRequest request) {
 
   @Override
   protected void runAppenderImpl() throws IOException {
-    boolean shouldAppendLog;
+    boolean installSnapshotRequired;
     for(; isAppenderRunning(); mayWait()) {
-      shouldAppendLog = true;
-      if (shouldSendRequest()) {
+      installSnapshotRequired = false;
+
+      //HB period is expired OR we have messages OR follower is behind with commit index
+      if (haveLogEntriesToSendOut() || heartbeatTimeout() || isFollowerCommitBehindLastCommitIndex()) {
+
         if (installSnapshotEnabled) {
           SnapshotInfo snapshot = shouldInstallSnapshot();
           if (snapshot != null) {
             installSnapshot(snapshot);
-            shouldAppendLog = false;
+            installSnapshotRequired = true;
           }
         } else {
           TermIndex installSnapshotNotificationTermIndex = shouldNotifyToInstallSnapshot();
           if (installSnapshotNotificationTermIndex != null) {
             installSnapshot(installSnapshotNotificationTermIndex);
-            shouldAppendLog = false;
+            installSnapshotRequired = true;
           }
         }
-        if (shouldHeartbeat() || (shouldAppendLog && !shouldWait())) {
-          // keep appending log entries or sending heartbeats
-          appendLog();
-        }
+
+        appendLog(installSnapshotRequired || haveTooManyPendingRequests());
+
       }
       checkSlowness();
+
     }
 
     Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);
   }
 
   private long getWaitTimeMs() {
-    if (!shouldSendRequest()) {
-      return getHeartbeatRemainingTime(); // No requests, wait until heartbeat
-    } else if (shouldWait()) {
-      return getHalfMinTimeoutMs(); // Should wait for a short time
+    if (haveTooManyPendingRequests()) {
+      return getHeartbeatRemainingTime(); // Should wait for a short time
+    } else if (haveLogEntriesToSendOut() || heartbeatTimeout()) {
+      return 0L;
     }
-    return 0L;
+    return Math.min(10L, getHeartbeatRemainingTime());

Review comment:
       Any reason for using 10L with Min, and not returning directly getHeartbeatRemainingTime()

##########
File path: ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
##########
@@ -101,41 +101,44 @@ private synchronized void resetClient(AppendEntriesRequest request) {
 
   @Override
   protected void runAppenderImpl() throws IOException {
-    boolean shouldAppendLog;
+    boolean installSnapshotRequired;
     for(; isAppenderRunning(); mayWait()) {
-      shouldAppendLog = true;
-      if (shouldSendRequest()) {
+      installSnapshotRequired = false;
+
+      //HB period is expired OR we have messages OR follower is behind with commit index
+      if (haveLogEntriesToSendOut() || heartbeatTimeout() || isFollowerCommitBehindLastCommitIndex()) {
+
         if (installSnapshotEnabled) {
           SnapshotInfo snapshot = shouldInstallSnapshot();
           if (snapshot != null) {
             installSnapshot(snapshot);
-            shouldAppendLog = false;
+            installSnapshotRequired = true;
           }
         } else {
           TermIndex installSnapshotNotificationTermIndex = shouldNotifyToInstallSnapshot();
           if (installSnapshotNotificationTermIndex != null) {
             installSnapshot(installSnapshotNotificationTermIndex);
-            shouldAppendLog = false;
+            installSnapshotRequired = true;
           }
         }
-        if (shouldHeartbeat() || (shouldAppendLog && !shouldWait())) {
-          // keep appending log entries or sending heartbeats
-          appendLog();
-        }
+
+        appendLog(installSnapshotRequired || haveTooManyPendingRequests());
+
       }
       checkSlowness();
+
     }
 
     Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);
   }
 
   private long getWaitTimeMs() {
-    if (!shouldSendRequest()) {
-      return getHeartbeatRemainingTime(); // No requests, wait until heartbeat
-    } else if (shouldWait()) {
-      return getHalfMinTimeoutMs(); // Should wait for a short time
+    if (haveTooManyPendingRequests()) {
+      return getHeartbeatRemainingTime(); // Should wait for a short time
+    } else if (haveLogEntriesToSendOut() || heartbeatTimeout()) {
+      return 0L;
     }
-    return 0L;
+    return Math.min(10L, getHeartbeatRemainingTime());

Review comment:
       And also in Jira it is mentioned 
   " that the thread does not sleep for a long time while there are pending watch requests in leader"
   
   We have checked if followerIndex < raftLog.getIndex() where we have checked pending watch requests.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org