You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/09/05 05:54:08 UTC
[ratis] branch master updated: RATIS-1400. add a separate RWlock
for GrpcLogAppender (#497)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f0d75c1 RATIS-1400. add a separate RWlock for GrpcLogAppender (#497)
f0d75c1 is described below
commit f0d75c1ec5d2fcad187beaddb6a22e10a5ea1368
Author: Jackson Yao <ja...@tencent.com>
AuthorDate: Sun Sep 5 13:54:02 2021 +0800
RATIS-1400. add a separate RWlock for GrpcLogAppender (#497)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 74 ++++++++++++----------
1 file changed, 42 insertions(+), 32 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index f9968e7..1e8203d 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -66,6 +66,9 @@ public class GrpcLogAppender extends LogAppenderBase {
private final GrpcServerMetrics grpcServerMetrics;
+ private final AutoCloseableReadWriteLock lock;
+ private final StackTraceElement caller;
+
public GrpcLogAppender(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
super(server, leaderState, f);
@@ -78,6 +81,9 @@ public class GrpcLogAppender extends LogAppenderBase {
grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(), pendingRequests::logRequestsSize);
+
+ lock = new AutoCloseableReadWriteLock(this);
+ caller = LOG.isTraceEnabled()? JavaUtils.getCallerStackTraceElement(): null;
}
@Override
@@ -89,30 +95,26 @@ public class GrpcLogAppender extends LogAppenderBase {
return getServerRpc().getProxies().getProxy(getFollowerId());
}
- private synchronized void resetClient(AppendEntriesRequest request, boolean onError) {
- try {
+ private void resetClient(AppendEntriesRequest request, boolean onError) {
+ try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
getClient().resetConnectBackoff();
+ appendLogRequestObserver = null;
+ firstResponseReceived = false;
+ // clear the pending requests queue and reset the next index of follower
+ pendingRequests.clear();
+ final long nextIndex = 1 + Optional.ofNullable(request)
+ .map(AppendEntriesRequest::getPreviousLog)
+ .map(TermIndex::getIndex)
+ .orElseGet(getFollower()::getMatchIndex);
+ if (onError && getFollower().getMatchIndex() == 0 && request == null) {
+ LOG.warn("{}: Leader has not got in touch with Follower {} yet, " +
+ "just keep nextIndex unchanged and retry.", this, getFollower());
+ return;
+ }
+ getFollower().decreaseNextIndex(nextIndex);
} catch (IOException ie) {
LOG.warn(this + ": Failed to getClient for " + getFollowerId(), ie);
}
-
- appendLogRequestObserver = null;
- firstResponseReceived = false;
-
- // clear the pending requests queue and reset the next index of follower
- pendingRequests.clear();
-
- final long nextIndex = 1 + Optional.ofNullable(request)
- .map(AppendEntriesRequest::getPreviousLog)
- .map(TermIndex::getIndex)
- .orElseGet(getFollower()::getMatchIndex);
-
- if (onError && getFollower().getMatchIndex() == 0 && request == null) {
- LOG.warn("{}: Leader has not got in touch with Follower {} yet, " +
- "just keep nextIndex unchanged and retry.", this, getFollower());
- return;
- }
- getFollower().decreaseNextIndex(nextIndex);
}
private boolean isFollowerCommitBehindLastCommitIndex() {
@@ -204,7 +206,7 @@ public class GrpcLogAppender extends LogAppenderBase {
final AppendEntriesRequestProto pending;
final AppendEntriesRequest request;
final StreamObserver<AppendEntriesRequestProto> s;
- synchronized (this) {
+ try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
// prepare and enqueue the append request. note changes on follower's
// nextIndex and ops on pendingRequests should always be associated
// together and protected by the lock
@@ -351,9 +353,11 @@ public class GrpcLogAppender extends LogAppenderBase {
}
}
- private synchronized void updateNextIndex(long replyNextIndex) {
- pendingRequests.clear();
- getFollower().setNextIndex(replyNextIndex);
+ private void updateNextIndex(long replyNextIndex) {
+ try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
+ pendingRequests.clear();
+ getFollower().setNextIndex(replyNextIndex);
+ }
}
private class InstallSnapshotResponseHandler implements StreamObserver<InstallSnapshotReplyProto> {
@@ -371,14 +375,18 @@ public class GrpcLogAppender extends LogAppenderBase {
this.isNotificationOnly = notifyOnly;
}
- synchronized void addPending(InstallSnapshotRequestProto request) {
- pending.offer(request.getSnapshotChunk().getRequestIndex());
+ void addPending(InstallSnapshotRequestProto request) {
+ try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
+ pending.offer(request.getSnapshotChunk().getRequestIndex());
+ }
}
- synchronized void removePending(InstallSnapshotReplyProto reply) {
- final Integer index = pending.poll();
- Objects.requireNonNull(index, "index == null");
- Preconditions.assertTrue(index == reply.getRequestIndex());
+ void removePending(InstallSnapshotReplyProto reply) {
+ try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
+ final Integer index = pending.poll();
+ Objects.requireNonNull(index, "index == null");
+ Preconditions.assertTrue(index == reply.getRequestIndex());
+ }
}
boolean isDone() {
@@ -390,8 +398,10 @@ public class GrpcLogAppender extends LogAppenderBase {
notifyLogAppender();
}
- synchronized boolean hasAllResponse() {
- return pending.isEmpty();
+ boolean hasAllResponse() {
+ try (AutoCloseableLock readLock = lock.readLock(caller, LOG::trace)) {
+ return pending.isEmpty();
+ }
}
@Override