You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ha...@apache.org on 2020/05/12 20:17:15 UTC

[incubator-ratis] branch master updated: RATIS-874. Fix AppendEntry validity checks to take the SnapshotIndex into account (#89)

This is an automated email from the ASF dual-hosted git repository.

hanishakoneru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 3596a58  RATIS-874. Fix AppendEntry validity checks to take the SnapshotIndex into account (#89)
3596a58 is described below

commit 3596a589707fa8f7947af50ada582b12ec15b650
Author: Hanisha Koneru <ha...@apache.org>
AuthorDate: Tue May 12 13:16:18 2020 -0700

    RATIS-874. Fix AppendEntry validity checks to take the SnapshotIndex into account (#89)
---
 .../org/apache/ratis/server/impl/FollowerInfo.java | 12 +++++--
 .../org/apache/ratis/server/impl/LogAppender.java  | 17 +++++----
 .../apache/ratis/server/impl/RaftServerImpl.java   |  4 +--
 .../org/apache/ratis/server/impl/ServerState.java  |  4 +--
 .../org/apache/ratis/server/raftlog/RaftLog.java   | 40 ++++++++++++++++++++--
 .../server/raftlog/segmented/SegmentedRaftLog.java |  2 +-
 .../raftlog/segmented/SegmentedRaftLogCache.java   |  4 +--
 .../raftlog/segmented/TestSegmentedRaftLog.java    |  3 +-
 8 files changed, 66 insertions(+), 20 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index 743dc51..c423e68 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -41,6 +41,7 @@ public class FollowerInfo {
   private final RaftLogIndex nextIndex;
   private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", 0L);
   private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
+  private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L);
   private volatile boolean attendVote;
   private final int rpcSlownessTimeoutMs;
 
@@ -75,6 +76,10 @@ public class FollowerInfo {
     return commitIndex.updateToMax(newCommitIndex, debugIndexChange);
   }
 
+  long getSnapshotIndex() {
+    return snapshotIndex.get();
+  }
+
   public long getNextIndex() {
     return nextIndex.get();
   }
@@ -95,9 +100,10 @@ public class FollowerInfo {
     nextIndex.updateToMax(newNextIndex, infoIndexChange);
   }
 
-  public void setSnapshotIndex(long snapshotIndex) {
-    matchIndex.setUnconditionally(snapshotIndex, infoIndexChange);
-    nextIndex.setUnconditionally(snapshotIndex + 1, infoIndexChange);
+  public void setSnapshotIndex(long newSnapshotIndex) {
+    snapshotIndex.setUnconditionally(newSnapshotIndex, infoIndexChange);
+    matchIndex.setUnconditionally(newSnapshotIndex, infoIndexChange);
+    nextIndex.setUnconditionally(newSnapshotIndex + 1, infoIndexChange);
   }
 
   public String getName() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index c73693c..18252a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -209,6 +209,7 @@ public class LogAppender {
 
   protected AppendEntriesRequestProto createRequest(long callId) throws RaftLogIOException {
     final TermIndex previous = getPrevious(follower.getNextIndex());
+    final long snapshotIndex = follower.getSnapshotIndex();
     final long heartbeatRemainingMs = getHeartbeatRemainingTime();
     if (heartbeatRemainingMs <= 0L) {
       // heartbeat
@@ -234,12 +235,12 @@ public class LogAppender {
         (entry, time, exception) -> LOG.warn("{}: Failed to get {} in {}: {}",
             follower.getName(), entry, time, exception));
     buffer.clear();
-    assertProtos(protos, followerNext, previous);
+    assertProtos(protos, followerNext, previous, snapshotIndex);
     return leaderState.newAppendEntriesRequestProto(
         getFollowerId(), previous, protos, !follower.isAttendingVote(), callId);
   }
 
-  private void assertProtos(List<LogEntryProto> protos, long nextIndex, TermIndex previous) {
+  private void assertProtos(List<LogEntryProto> protos, long nextIndex, TermIndex previous, long snapshotIndex) {
     if (protos.isEmpty()) {
       return;
     }
@@ -247,10 +248,14 @@ public class LogAppender {
     Preconditions.assertTrue(firstIndex == nextIndex,
         () -> follower.getName() + ": firstIndex = " + firstIndex + " != nextIndex = " + nextIndex);
     if (firstIndex > RaftLog.LEAST_VALID_LOG_INDEX) {
-      Objects.requireNonNull(previous,
-          () -> follower.getName() + ": Previous TermIndex not found for firstIndex = " + firstIndex);
-      Preconditions.assertTrue(previous.getIndex() == firstIndex - 1,
-          () -> follower.getName() + ": Previous = " + previous + " but firstIndex = " + firstIndex);
+      // Check if nextIndex is 1 greater than the snapshotIndex. If yes, then
+      // we do not have to check for the existence of previous.
+      if (nextIndex != snapshotIndex + 1) {
+        Objects.requireNonNull(previous,
+            () -> follower.getName() + ": Previous TermIndex not found for firstIndex = " + firstIndex);
+        Preconditions.assertTrue(previous.getIndex() == firstIndex - 1,
+            () -> follower.getName() + ": Previous = " + previous + " but firstIndex = " + firstIndex);
+      }
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index a2f931c..14620d8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1168,7 +1168,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
         // update the committed index
         // re-load the state machine if this is the last chunk
         if (snapshotChunkRequest.getDone()) {
-          state.reloadStateMachine(lastIncludedIndex, leaderTerm);
+          state.reloadStateMachine(lastIncludedIndex);
         }
       } finally {
         updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
@@ -1238,7 +1238,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
                     getMemberId(), reply.getIndex());
                 stateMachine.pause();
                 state.updateInstalledSnapshotIndex(reply);
-                state.reloadStateMachine(reply.getIndex(), leaderTerm);
+                state.reloadStateMachine(reply.getIndex());
               }
               inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
             });
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 3ed045a..380bcf2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -372,8 +372,8 @@ public class ServerState implements Closeable {
     return false;
   }
 
-  void reloadStateMachine(long lastIndexInSnapshot, long curTerm) {
-    log.updateLastCommitted(lastIndexInSnapshot, curTerm);
+  void reloadStateMachine(long lastIndexInSnapshot) {
+    log.updateSnapshotIndex(lastIndexInSnapshot);
     stateMachineUpdater.reloadStateMachine();
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index 6739478..b882a60 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -71,6 +71,8 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
    * in the latest snapshot file.
    */
   private final RaftLogIndex commitIndex;
+  /** The last log index in snapshot */
+  private final RaftLogIndex snapshotIndex;
   private final RaftLogIndex purgeIndex;
   private final int purgeGap;
 
@@ -87,6 +89,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
     this.name = memberId + "-" + getClass().getSimpleName();
     this.memberId = memberId;
     this.commitIndex = new RaftLogIndex("commitIndex", commitIndex);
+    this.snapshotIndex = new RaftLogIndex("snapshotIndex", commitIndex);
     this.purgeIndex = new RaftLogIndex("purgeIndex", LEAST_VALID_LOG_INDEX - 1);
     this.purgeGap = RaftServerConfigKeys.Log.purgeGap(properties);
 
@@ -98,6 +101,10 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
     return commitIndex.get();
   }
 
+  public long getSnapshotIndex() {
+    return snapshotIndex.get();
+  }
+
   public void checkLogState() {
     state.assertOpen();
   }
@@ -130,6 +137,24 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
   }
 
   /**
+   * Update the last committed index and snapshotIndex with the last index in
+   * the snapshot.
+   * @param newSnapshotIndex the last index in the snapshot
+   */
+  public void updateSnapshotIndex(long newSnapshotIndex) {
+    try(AutoCloseableLock writeLock = writeLock()) {
+      final long oldSnapshotIndex = getSnapshotIndex();
+      if (oldSnapshotIndex < newSnapshotIndex) {
+        snapshotIndex.updateIncreasingly(newSnapshotIndex, infoIndexChange);
+      }
+      final long oldCommitIndex = getLastCommittedIndex();
+      if (oldCommitIndex < newSnapshotIndex) {
+        commitIndex.updateIncreasingly(newSnapshotIndex, traceIndexChange);
+      }
+    }
+  }
+
+  /**
    * Does the log contains the given term and index? Used to check the
    * consistency between the local log of a follower and the log entries sent
    * by the leader.
@@ -311,13 +336,22 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
     if (entry.hasMetadataEntry()) {
       return;
     }
+    long latestSnapshotIndex = getSnapshotIndex();
     TermIndex lastTermIndex = getLastEntryTermIndex();
     if (lastTermIndex != null) {
+      long lastIndex = lastTermIndex.getIndex() > latestSnapshotIndex ?
+          lastTermIndex.getIndex() : latestSnapshotIndex;
       Preconditions.assertTrue(entry.getTerm() >= lastTermIndex.getTerm(),
           "Entry term less than RaftLog's last term: %d, entry: %s", lastTermIndex.getTerm(), entry);
-      Preconditions.assertTrue(entry.getIndex() == lastTermIndex.getIndex() + 1,
-          "Difference between entry index and RaftLog's last index %d greater than 1, entry: %s",
-          lastTermIndex.getIndex(), entry);
+      Preconditions.assertTrue(entry.getIndex() == lastIndex + 1,
+          "Difference between entry index and RaftLog's last index %d (or snapshot index %d) " +
+              "is greater than 1, entry: %s",
+          lastTermIndex.getIndex(), latestSnapshotIndex, entry);
+    } else {
+      Preconditions.assertTrue(entry.getIndex() == latestSnapshotIndex + 1,
+          "Difference between entry index and RaftLog's latest snapshot index %d is greater than 1 " +
+              "and in between log entries are not present, entry: %s",
+          latestSnapshotIndex, entry);
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 804a904..9bf4d41 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -248,7 +248,7 @@ public class SegmentedRaftLog extends RaftLog {
         // entries to the state machine
         boolean keepEntryInCache = (paths.size() - i++) <= cache.getMaxCachedSegments();
         final Timer.Context loadSegmentContext = raftLogMetrics.getRaftLogLoadSegmentTimer().time();
-        cache.loadSegment(pi, keepEntryInCache, logConsumer);
+        cache.loadSegment(pi, keepEntryInCache, logConsumer, lastIndexInSnapshot);
         loadSegmentContext.stop();
       }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 70b4dc8..b0ff789 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -342,10 +342,10 @@ class SegmentedRaftLogCache {
   }
 
   void loadSegment(LogPathAndIndex pi, boolean keepEntryInCache,
-      Consumer<LogEntryProto> logConsumer) throws IOException {
+      Consumer<LogEntryProto> logConsumer, long lastIndexInSnapshot) throws IOException {
     LogSegment logSegment = LogSegment.loadSegment(storage, pi.getPath().toFile(),
         pi.getStartIndex(), pi.getEndIndex(), pi.isOpen(), keepEntryInCache, logConsumer, raftLogMetrics);
-    if (logSegment != null) {
+    if (logSegment != null && logSegment.getEndIndex() > lastIndexInSnapshot) {
       addSegment(logSegment);
     }
   }
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index f20dc5b..6d85bd2 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -274,7 +274,8 @@ public class TestSegmentedRaftLog extends BaseTest {
       } catch (IllegalStateException e) {
         ex = e;
       }
-      Assert.assertTrue(ex.getMessage().contains("and RaftLog's last index " + lastTermIndex.getIndex() + " greater than 1"));
+      Assert.assertTrue(ex.getMessage().contains("and RaftLog's last index " + lastTermIndex.getIndex()
+          + " (or snapshot index " + raftLog.getSnapshotIndex() + ") is greater than 1"));
     }
   }