You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by lj...@apache.org on 2020/11/26 10:35:02 UTC
[incubator-ratis] branch master updated: RATIS-1161. Updating
commit index of follower does not require term check (#289)
This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new bcc392c RATIS-1161. Updating commit index of follower does not require term check (#289)
bcc392c is described below
commit bcc392ca4fedcb47202aaf775e31fa867c63b218
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Nov 26 16:04:30 2020 +0530
RATIS-1161. Updating commit index of follower does not require term check (#289)
---
.../main/java/org/apache/ratis/server/impl/LeaderState.java | 6 +++---
.../java/org/apache/ratis/server/impl/RaftServerImpl.java | 3 ++-
.../java/org/apache/ratis/server/impl/ServerImplUtils.java | 7 +++++++
.../main/java/org/apache/ratis/server/impl/ServerState.java | 4 ++--
.../main/java/org/apache/ratis/server/raftlog/RaftLog.java | 11 ++++++++---
5 files changed, 22 insertions(+), 9 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 176961c..eda0a43 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -433,8 +433,8 @@ public class LeaderState {
AppendEntriesRequestProto newAppendEntriesRequestProto(RaftPeerId targetId,
TermIndex previous, List<LogEntryProto> entries, boolean initializing,
long callId) {
- return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId,
- currentTerm, entries, raftLog.getLastCommittedIndex(),
+ return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries,
+ ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()),
initializing, previous, server.getCommitInfos(), callId);
}
@@ -694,7 +694,7 @@ public class LeaderState {
final TermIndex[] entriesToCommit = raftLog.getEntries(
oldLastCommitted + 1, majority + 1);
- if (server.getState().updateStatemachine(majority, currentTerm)) {
+ if (server.getState().updateCommitIndex(majority, currentTerm, true)) {
watchRequests.update(ReplicationLevel.MAJORITY, majority);
logMetadata(majority);
commitIndexChanged();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 14a6e60..02fc88e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1172,7 +1172,8 @@ public class RaftServerImpl implements RaftServer.Division,
).thenApply(v -> {
final AppendEntriesReplyProto reply;
synchronized(this) {
- state.updateStatemachine(leaderCommit, currentTerm);
+ final long commitIndex = ServerImplUtils.effectiveCommitIndex(leaderCommit, previous, entries.length);
+ state.updateCommitIndex(commitIndex, currentTerm, false);
updateCommitInfoCache();
final long n = isHeartbeat? state.getLog().getNextIndex(): entries[entries.length - 1].getIndex() + 1;
final long matchIndex = entries.length != 0 ? entries[entries.length - 1].getIndex() :
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 4206a5b..db452da 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -23,12 +23,14 @@ import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
import java.io.IOException;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
/** Server utilities for internal use. */
@@ -64,6 +66,11 @@ public final class ServerImplUtils {
return proxy;
}
+ static long effectiveCommitIndex(long leaderCommitIndex, TermIndex followerPrevious, int numAppendEntries) {
+ final long p = Optional.ofNullable(followerPrevious).map(TermIndex::getIndex).orElse(RaftLog.LEAST_VALID_LOG_INDEX);
+ return Math.min(leaderCommitIndex, p + numAppendEntries);
+ }
+
public static TermIndex newTermIndex(long term, long index) {
return new TermIndexImpl(term, index);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 4dfaf5c..708bd2b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -382,8 +382,8 @@ public class ServerState implements Closeable {
}
}
- boolean updateStatemachine(long majorityIndex, long curTerm) {
- if (log.updateLastCommitted(majorityIndex, curTerm)) {
+ boolean updateCommitIndex(long majorityIndex, long curTerm, boolean isLeader) {
+ if (log.updateCommitIndex(majorityIndex, curTerm, isLeader)) {
stateMachineUpdater.notifyUpdater();
return true;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index 23977bf..3c6d102 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -123,15 +123,20 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
* Update the last committed index.
* @param majorityIndex the index that has achieved majority.
* @param currentTerm the current term.
+ * @param isLeader Is this server the leader?
* @return true if update is applied; otherwise, return false, i.e. no update required.
*/
- public boolean updateLastCommitted(long majorityIndex, long currentTerm) {
+ public boolean updateCommitIndex(long majorityIndex, long currentTerm, boolean isLeader) {
try(AutoCloseableLock writeLock = writeLock()) {
final long oldCommittedIndex = getLastCommittedIndex();
final long newCommitIndex = Math.min(majorityIndex, getFlushIndex());
if (oldCommittedIndex < newCommitIndex) {
- // Only update last committed index for current term. See §5.4.2 in
- // paper for details.
+ if (!isLeader) {
+ commitIndex.updateIncreasingly(newCommitIndex, traceIndexChange);
+ return true;
+ }
+
+ // Only update last committed index for current term. See §5.4.2 in paper for details.
final TermIndex entry = getTermIndex(newCommitIndex);
if (entry != null && entry.getTerm() == currentTerm) {
commitIndex.updateIncreasingly(newCommitIndex, traceIndexChange);