You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/09/23 05:43:16 UTC
[incubator-ratis] branch master updated: RATIS-1042. Watch for
commit calls are blocked for a long if no other message (#185)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new a79281c RATIS-1042. Watch for commit calls are blocked for a long if no other message (#185)
a79281c is described below
commit a79281ca2452028228582ddde143e5350df9203d
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Wed Sep 23 11:13:07 2020 +0530
RATIS-1042. Watch for commit calls are blocked for a long if no other message (#185)
* Watch for commit calls are blocked for a long if no other message
Co-authored-by: Elek Márton <el...@apache.org>
* Leader should notify appenders on commitIndex change
* Retrigger CI
* Retrigger CI
Co-authored-by: Elek Márton <el...@apache.org>
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 41 ++++++++++++----------
.../org/apache/ratis/server/impl/LeaderState.java | 1 +
.../org/apache/ratis/server/impl/LogAppender.java | 19 +++++++---
3 files changed, 38 insertions(+), 23 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 9f5eb76..235a023 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -101,41 +101,44 @@ public class GrpcLogAppender extends LogAppender {
@Override
protected void runAppenderImpl() throws IOException {
- boolean shouldAppendLog;
+ boolean installSnapshotRequired;
for(; isAppenderRunning(); mayWait()) {
- shouldAppendLog = true;
- if (shouldSendRequest()) {
+ installSnapshotRequired = false;
+
+ //HB period is expired OR we have messages OR follower is behind with commit index
+ if (haveLogEntriesToSendOut() || heartbeatTimeout() || isFollowerCommitBehindLastCommitIndex()) {
+
if (installSnapshotEnabled) {
SnapshotInfo snapshot = shouldInstallSnapshot();
if (snapshot != null) {
installSnapshot(snapshot);
- shouldAppendLog = false;
+ installSnapshotRequired = true;
}
} else {
TermIndex installSnapshotNotificationTermIndex = shouldNotifyToInstallSnapshot();
if (installSnapshotNotificationTermIndex != null) {
installSnapshot(installSnapshotNotificationTermIndex);
- shouldAppendLog = false;
+ installSnapshotRequired = true;
}
}
- if (shouldHeartbeat() || (shouldAppendLog && !shouldWait())) {
- // keep appending log entries or sending heartbeats
- appendLog();
- }
+
+ appendLog(installSnapshotRequired || haveTooManyPendingRequests());
+
}
checkSlowness();
+
}
Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);
}
private long getWaitTimeMs() {
- if (!shouldSendRequest()) {
- return getHeartbeatRemainingTime(); // No requests, wait until heartbeat
- } else if (shouldWait()) {
- return getHalfMinTimeoutMs(); // Should wait for a short time
+ if (haveTooManyPendingRequests()) {
+ return getHeartbeatRemainingTime(); // Should wait for a short time
+ } else if (haveLogEntriesToSendOut() || heartbeatTimeout()) {
+ return 0L;
}
- return 0L;
+ return Math.min(10L, getHeartbeatRemainingTime());
}
private void mayWait() {
@@ -166,8 +169,10 @@ public class GrpcLogAppender extends LogAppender {
return appendLogRequestObserver == null || super.shouldSendRequest();
}
- /** @return true iff not received first response or queue is full. */
- private boolean shouldWait() {
+ /**
+ * @return true iff not received first response or queue is full.
+ */
+ private boolean haveTooManyPendingRequests() {
final int size = pendingRequests.logRequestsSize();
if (size == 0) {
return false;
@@ -175,7 +180,7 @@ public class GrpcLogAppender extends LogAppender {
return !firstResponseReceived || size >= maxPendingRequestsNum;
}
- private void appendLog() throws IOException {
+ private void appendLog(boolean excludeLogEntries) throws IOException {
final AppendEntriesRequestProto pending;
final AppendEntriesRequest request;
final StreamObserver<AppendEntriesRequestProto> s;
@@ -183,7 +188,7 @@ public class GrpcLogAppender extends LogAppender {
// prepare and enqueue the append request. note changes on follower's
// nextIndex and ops on pendingRequests should always be associated
// together and protected by the lock
- pending = createRequest(callId++);
+ pending = createRequest(callId++, excludeLogEntries);
if (pending == null) {
return;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 5718158..240be2b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -387,6 +387,7 @@ public class LeaderState {
watchRequests.update(ReplicationLevel.MAJORITY_COMMITTED, m.majority);
watchRequests.update(ReplicationLevel.MAJORITY, m.max);
});
+ notifySenders();
}
private void applyOldNewConf() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index b796099..03fd67d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -221,11 +221,12 @@ public class LogAppender {
return null;
}
- protected AppendEntriesRequestProto createRequest(long callId) throws RaftLogIOException {
+ protected AppendEntriesRequestProto createRequest(long callId,
+ boolean heartbeat) throws RaftLogIOException {
final TermIndex previous = getPrevious(follower.getNextIndex());
final long snapshotIndex = follower.getSnapshotIndex();
final long heartbeatRemainingMs = getHeartbeatRemainingTime();
- if (heartbeatRemainingMs <= 0L) {
+ if (heartbeatRemainingMs <= 0L || heartbeat) {
// heartbeat
return leaderState.newAppendEntriesRequestProto(
getFollowerId(), previous, Collections.emptyList(), !follower.isAttendingVote(), callId);
@@ -281,7 +282,7 @@ public class LogAppender {
while (isAppenderRunning()) { // keep retrying for IOException
try {
if (request == null || request.getEntriesCount() == 0) {
- request = createRequest(DEFAULT_CALLID);
+ request = createRequest(DEFAULT_CALLID, false);
}
if (request == null) {
@@ -571,14 +572,22 @@ public class LogAppender {
/** Should the leader send appendEntries RPC to this follower? */
protected boolean shouldSendRequest() {
- return shouldAppendEntries(follower.getNextIndex()) || shouldHeartbeat();
+ return shouldAppendEntries(follower.getNextIndex()) || heartbeatTimeout();
+ }
+
+ protected boolean haveLogEntriesToSendOut() {
+ return shouldAppendEntries(follower.getNextIndex());
+ }
+
+ protected boolean isFollowerCommitBehindLastCommitIndex() {
+ return raftLog.getLastCommittedIndex() > follower.getCommitIndex();
}
private boolean shouldAppendEntries(long followerIndex) {
return followerIndex < raftLog.getNextIndex();
}
- protected boolean shouldHeartbeat() {
+ protected boolean heartbeatTimeout() {
return getHeartbeatRemainingTime() <= 0;
}