You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/09/05 05:54:08 UTC

[ratis] branch master updated: RATIS-1400. add a separate RWlock for GrpcLogAppender (#497)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f0d75c1  RATIS-1400. add a separate RWlock for GrpcLogAppender (#497)
f0d75c1 is described below

commit f0d75c1ec5d2fcad187beaddb6a22e10a5ea1368
Author: Jackson Yao <ja...@tencent.com>
AuthorDate: Sun Sep 5 13:54:02 2021 +0800

    RATIS-1400. add a separate RWlock for GrpcLogAppender (#497)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 74 ++++++++++++----------
 1 file changed, 42 insertions(+), 32 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index f9968e7..1e8203d 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -66,6 +66,9 @@ public class GrpcLogAppender extends LogAppenderBase {
 
   private final GrpcServerMetrics grpcServerMetrics;
 
+  private final AutoCloseableReadWriteLock lock;
+  private final StackTraceElement caller;
+
   public GrpcLogAppender(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
     super(server, leaderState, f);
 
@@ -78,6 +81,9 @@ public class GrpcLogAppender extends LogAppenderBase {
 
     grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
     grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(), pendingRequests::logRequestsSize);
+
+    lock = new AutoCloseableReadWriteLock(this);
+    caller = LOG.isTraceEnabled()? JavaUtils.getCallerStackTraceElement(): null;
   }
 
   @Override
@@ -89,30 +95,26 @@ public class GrpcLogAppender extends LogAppenderBase {
     return getServerRpc().getProxies().getProxy(getFollowerId());
   }
 
-  private synchronized void resetClient(AppendEntriesRequest request, boolean onError) {
-    try {
+  private void resetClient(AppendEntriesRequest request, boolean onError) {
+    try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
       getClient().resetConnectBackoff();
+      appendLogRequestObserver = null;
+      firstResponseReceived = false;
+      // clear the pending requests queue and reset the next index of follower
+      pendingRequests.clear();
+      final long nextIndex = 1 + Optional.ofNullable(request)
+          .map(AppendEntriesRequest::getPreviousLog)
+          .map(TermIndex::getIndex)
+          .orElseGet(getFollower()::getMatchIndex);
+      if (onError && getFollower().getMatchIndex() == 0 && request == null) {
+        LOG.warn("{}: Leader has not got in touch with Follower {} yet, " +
+          "just keep nextIndex unchanged and retry.", this, getFollower());
+        return;
+      }
+      getFollower().decreaseNextIndex(nextIndex);
     } catch (IOException ie) {
       LOG.warn(this + ": Failed to getClient for " + getFollowerId(), ie);
     }
-
-    appendLogRequestObserver = null;
-    firstResponseReceived = false;
-
-    // clear the pending requests queue and reset the next index of follower
-    pendingRequests.clear();
-
-    final long nextIndex = 1 + Optional.ofNullable(request)
-        .map(AppendEntriesRequest::getPreviousLog)
-        .map(TermIndex::getIndex)
-        .orElseGet(getFollower()::getMatchIndex);
-
-    if (onError && getFollower().getMatchIndex() == 0 && request == null) {
-      LOG.warn("{}: Leader has not got in touch with Follower {} yet, " +
-          "just keep nextIndex unchanged and retry.", this, getFollower());
-      return;
-    }
-    getFollower().decreaseNextIndex(nextIndex);
   }
 
   private boolean isFollowerCommitBehindLastCommitIndex() {
@@ -204,7 +206,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     final AppendEntriesRequestProto pending;
     final AppendEntriesRequest request;
     final StreamObserver<AppendEntriesRequestProto> s;
-    synchronized (this) {
+    try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
       // prepare and enqueue the append request. note changes on follower's
       // nextIndex and ops on pendingRequests should always be associated
       // together and protected by the lock
@@ -351,9 +353,11 @@ public class GrpcLogAppender extends LogAppenderBase {
     }
   }
 
-  private synchronized void updateNextIndex(long replyNextIndex) {
-    pendingRequests.clear();
-    getFollower().setNextIndex(replyNextIndex);
+  private void updateNextIndex(long replyNextIndex) {
+    try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
+      pendingRequests.clear();
+      getFollower().setNextIndex(replyNextIndex);
+    }
   }
 
   private class InstallSnapshotResponseHandler implements StreamObserver<InstallSnapshotReplyProto> {
@@ -371,14 +375,18 @@ public class GrpcLogAppender extends LogAppenderBase {
       this.isNotificationOnly = notifyOnly;
     }
 
-    synchronized void addPending(InstallSnapshotRequestProto request) {
-      pending.offer(request.getSnapshotChunk().getRequestIndex());
+    void addPending(InstallSnapshotRequestProto request) {
+      try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
+        pending.offer(request.getSnapshotChunk().getRequestIndex());
+      }
     }
 
-    synchronized void removePending(InstallSnapshotReplyProto reply) {
-      final Integer index = pending.poll();
-      Objects.requireNonNull(index, "index == null");
-      Preconditions.assertTrue(index == reply.getRequestIndex());
+    void removePending(InstallSnapshotReplyProto reply) {
+      try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
+        final Integer index = pending.poll();
+        Objects.requireNonNull(index, "index == null");
+        Preconditions.assertTrue(index == reply.getRequestIndex());
+      }
     }
 
     boolean isDone() {
@@ -390,8 +398,10 @@ public class GrpcLogAppender extends LogAppenderBase {
       notifyLogAppender();
     }
 
-    synchronized boolean hasAllResponse() {
-      return pending.isEmpty();
+    boolean hasAllResponse() {
+      try (AutoCloseableLock readLock = lock.readLock(caller, LOG::trace)) {
+        return pending.isEmpty();
+      }
     }
 
     @Override