You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by lj...@apache.org on 2020/11/26 10:35:02 UTC

[incubator-ratis] branch master updated: RATIS-1161. Updating commit index of follower does not require term check (#289)

This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new bcc392c  RATIS-1161. Updating commit index of follower does not require term check (#289)
bcc392c is described below

commit bcc392ca4fedcb47202aaf775e31fa867c63b218
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Nov 26 16:04:30 2020 +0530

    RATIS-1161. Updating commit index of follower does not require term check (#289)
---
 .../main/java/org/apache/ratis/server/impl/LeaderState.java   |  6 +++---
 .../java/org/apache/ratis/server/impl/RaftServerImpl.java     |  3 ++-
 .../java/org/apache/ratis/server/impl/ServerImplUtils.java    |  7 +++++++
 .../main/java/org/apache/ratis/server/impl/ServerState.java   |  4 ++--
 .../main/java/org/apache/ratis/server/raftlog/RaftLog.java    | 11 ++++++++---
 5 files changed, 22 insertions(+), 9 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 176961c..eda0a43 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -433,8 +433,8 @@ public class LeaderState {
   AppendEntriesRequestProto newAppendEntriesRequestProto(RaftPeerId targetId,
       TermIndex previous, List<LogEntryProto> entries, boolean initializing,
       long callId) {
-    return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId,
-        currentTerm, entries, raftLog.getLastCommittedIndex(),
+    return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries,
+        ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()),
         initializing, previous, server.getCommitInfos(), callId);
   }
 
@@ -694,7 +694,7 @@ public class LeaderState {
       final TermIndex[] entriesToCommit = raftLog.getEntries(
           oldLastCommitted + 1, majority + 1);
 
-      if (server.getState().updateStatemachine(majority, currentTerm)) {
+      if (server.getState().updateCommitIndex(majority, currentTerm, true)) {
         watchRequests.update(ReplicationLevel.MAJORITY, majority);
         logMetadata(majority);
         commitIndexChanged();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 14a6e60..02fc88e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1172,7 +1172,8 @@ public class RaftServerImpl implements RaftServer.Division,
     ).thenApply(v -> {
       final AppendEntriesReplyProto reply;
       synchronized(this) {
-        state.updateStatemachine(leaderCommit, currentTerm);
+        final long commitIndex = ServerImplUtils.effectiveCommitIndex(leaderCommit, previous, entries.length);
+        state.updateCommitIndex(commitIndex, currentTerm, false);
         updateCommitInfoCache();
         final long n = isHeartbeat? state.getLog().getNextIndex(): entries[entries.length - 1].getIndex() + 1;
         final long matchIndex = entries.length != 0 ? entries[entries.length - 1].getIndex() :
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 4206a5b..db452da 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -23,12 +23,14 @@ import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.TimeDuration;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 /** Server utilities for internal use. */
@@ -64,6 +66,11 @@ public final class ServerImplUtils {
     return proxy;
   }
 
+  static long effectiveCommitIndex(long leaderCommitIndex, TermIndex followerPrevious, int numAppendEntries) {
+    final long p = Optional.ofNullable(followerPrevious).map(TermIndex::getIndex).orElse(RaftLog.LEAST_VALID_LOG_INDEX);
+    return Math.min(leaderCommitIndex, p + numAppendEntries);
+  }
+
   public static TermIndex newTermIndex(long term, long index) {
     return new TermIndexImpl(term, index);
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 4dfaf5c..708bd2b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -382,8 +382,8 @@ public class ServerState implements Closeable {
     }
   }
 
-  boolean updateStatemachine(long majorityIndex, long curTerm) {
-    if (log.updateLastCommitted(majorityIndex, curTerm)) {
+  boolean updateCommitIndex(long majorityIndex, long curTerm, boolean isLeader) {
+    if (log.updateCommitIndex(majorityIndex, curTerm, isLeader)) {
       stateMachineUpdater.notifyUpdater();
       return true;
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index 23977bf..3c6d102 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -123,15 +123,20 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
    * Update the last committed index.
    * @param majorityIndex the index that has achieved majority.
    * @param currentTerm the current term.
+   * @param isLeader Is this server the leader?
    * @return true if update is applied; otherwise, return false, i.e. no update required.
    */
-  public boolean updateLastCommitted(long majorityIndex, long currentTerm) {
+  public boolean updateCommitIndex(long majorityIndex, long currentTerm, boolean isLeader) {
     try(AutoCloseableLock writeLock = writeLock()) {
       final long oldCommittedIndex = getLastCommittedIndex();
       final long newCommitIndex = Math.min(majorityIndex, getFlushIndex());
       if (oldCommittedIndex < newCommitIndex) {
-        // Only update last committed index for current term. See §5.4.2 in
-        // paper for details.
+        if (!isLeader) {
+          commitIndex.updateIncreasingly(newCommitIndex, traceIndexChange);
+          return true;
+        }
+
+        // Only update last committed index for current term. See §5.4.2 in paper for details.
         final TermIndex entry = getTermIndex(newCommitIndex);
         if (entry != null && entry.getTerm() == currentTerm) {
           commitIndex.updateIncreasingly(newCommitIndex, traceIndexChange);