You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ha...@apache.org on 2020/05/12 20:17:15 UTC
[incubator-ratis] branch master updated: RATIS-874. Fix AppendEntry
validity checks to take the SnapshotIndex into account (#89)
This is an automated email from the ASF dual-hosted git repository.
hanishakoneru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 3596a58 RATIS-874. Fix AppendEntry validity checks to take the SnapshotIndex into account (#89)
3596a58 is described below
commit 3596a589707fa8f7947af50ada582b12ec15b650
Author: Hanisha Koneru <ha...@apache.org>
AuthorDate: Tue May 12 13:16:18 2020 -0700
RATIS-874. Fix AppendEntry validity checks to take the SnapshotIndex into account (#89)
---
.../org/apache/ratis/server/impl/FollowerInfo.java | 12 +++++--
.../org/apache/ratis/server/impl/LogAppender.java | 17 +++++----
.../apache/ratis/server/impl/RaftServerImpl.java | 4 +--
.../org/apache/ratis/server/impl/ServerState.java | 4 +--
.../org/apache/ratis/server/raftlog/RaftLog.java | 40 ++++++++++++++++++++--
.../server/raftlog/segmented/SegmentedRaftLog.java | 2 +-
.../raftlog/segmented/SegmentedRaftLogCache.java | 4 +--
.../raftlog/segmented/TestSegmentedRaftLog.java | 3 +-
8 files changed, 66 insertions(+), 20 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index 743dc51..c423e68 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -41,6 +41,7 @@ public class FollowerInfo {
private final RaftLogIndex nextIndex;
private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", 0L);
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
+ private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L);
private volatile boolean attendVote;
private final int rpcSlownessTimeoutMs;
@@ -75,6 +76,10 @@ public class FollowerInfo {
return commitIndex.updateToMax(newCommitIndex, debugIndexChange);
}
+ long getSnapshotIndex() {
+ return snapshotIndex.get();
+ }
+
public long getNextIndex() {
return nextIndex.get();
}
@@ -95,9 +100,10 @@ public class FollowerInfo {
nextIndex.updateToMax(newNextIndex, infoIndexChange);
}
- public void setSnapshotIndex(long snapshotIndex) {
- matchIndex.setUnconditionally(snapshotIndex, infoIndexChange);
- nextIndex.setUnconditionally(snapshotIndex + 1, infoIndexChange);
+ public void setSnapshotIndex(long newSnapshotIndex) {
+ snapshotIndex.setUnconditionally(newSnapshotIndex, infoIndexChange);
+ matchIndex.setUnconditionally(newSnapshotIndex, infoIndexChange);
+ nextIndex.setUnconditionally(newSnapshotIndex + 1, infoIndexChange);
}
public String getName() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index c73693c..18252a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -209,6 +209,7 @@ public class LogAppender {
protected AppendEntriesRequestProto createRequest(long callId) throws RaftLogIOException {
final TermIndex previous = getPrevious(follower.getNextIndex());
+ final long snapshotIndex = follower.getSnapshotIndex();
final long heartbeatRemainingMs = getHeartbeatRemainingTime();
if (heartbeatRemainingMs <= 0L) {
// heartbeat
@@ -234,12 +235,12 @@ public class LogAppender {
(entry, time, exception) -> LOG.warn("{}: Failed to get {} in {}: {}",
follower.getName(), entry, time, exception));
buffer.clear();
- assertProtos(protos, followerNext, previous);
+ assertProtos(protos, followerNext, previous, snapshotIndex);
return leaderState.newAppendEntriesRequestProto(
getFollowerId(), previous, protos, !follower.isAttendingVote(), callId);
}
- private void assertProtos(List<LogEntryProto> protos, long nextIndex, TermIndex previous) {
+ private void assertProtos(List<LogEntryProto> protos, long nextIndex, TermIndex previous, long snapshotIndex) {
if (protos.isEmpty()) {
return;
}
@@ -247,10 +248,14 @@ public class LogAppender {
Preconditions.assertTrue(firstIndex == nextIndex,
() -> follower.getName() + ": firstIndex = " + firstIndex + " != nextIndex = " + nextIndex);
if (firstIndex > RaftLog.LEAST_VALID_LOG_INDEX) {
- Objects.requireNonNull(previous,
- () -> follower.getName() + ": Previous TermIndex not found for firstIndex = " + firstIndex);
- Preconditions.assertTrue(previous.getIndex() == firstIndex - 1,
- () -> follower.getName() + ": Previous = " + previous + " but firstIndex = " + firstIndex);
+ // Check if nextIndex is 1 greater than the snapshotIndex. If yes, then
+ // we do not have to check for the existence of previous.
+ if (nextIndex != snapshotIndex + 1) {
+ Objects.requireNonNull(previous,
+ () -> follower.getName() + ": Previous TermIndex not found for firstIndex = " + firstIndex);
+ Preconditions.assertTrue(previous.getIndex() == firstIndex - 1,
+ () -> follower.getName() + ": Previous = " + previous + " but firstIndex = " + firstIndex);
+ }
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index a2f931c..14620d8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1168,7 +1168,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
// update the committed index
// re-load the state machine if this is the last chunk
if (snapshotChunkRequest.getDone()) {
- state.reloadStateMachine(lastIncludedIndex, leaderTerm);
+ state.reloadStateMachine(lastIncludedIndex);
}
} finally {
updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
@@ -1238,7 +1238,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
getMemberId(), reply.getIndex());
stateMachine.pause();
state.updateInstalledSnapshotIndex(reply);
- state.reloadStateMachine(reply.getIndex(), leaderTerm);
+ state.reloadStateMachine(reply.getIndex());
}
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
});
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 3ed045a..380bcf2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -372,8 +372,8 @@ public class ServerState implements Closeable {
return false;
}
- void reloadStateMachine(long lastIndexInSnapshot, long curTerm) {
- log.updateLastCommitted(lastIndexInSnapshot, curTerm);
+ void reloadStateMachine(long lastIndexInSnapshot) {
+ log.updateSnapshotIndex(lastIndexInSnapshot);
stateMachineUpdater.reloadStateMachine();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index 6739478..b882a60 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -71,6 +71,8 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
* in the latest snapshot file.
*/
private final RaftLogIndex commitIndex;
+ /** The last log index in snapshot */
+ private final RaftLogIndex snapshotIndex;
private final RaftLogIndex purgeIndex;
private final int purgeGap;
@@ -87,6 +89,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
this.name = memberId + "-" + getClass().getSimpleName();
this.memberId = memberId;
this.commitIndex = new RaftLogIndex("commitIndex", commitIndex);
+ this.snapshotIndex = new RaftLogIndex("snapshotIndex", commitIndex);
this.purgeIndex = new RaftLogIndex("purgeIndex", LEAST_VALID_LOG_INDEX - 1);
this.purgeGap = RaftServerConfigKeys.Log.purgeGap(properties);
@@ -98,6 +101,10 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
return commitIndex.get();
}
+ public long getSnapshotIndex() {
+ return snapshotIndex.get();
+ }
+
public void checkLogState() {
state.assertOpen();
}
@@ -130,6 +137,24 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
}
/**
+ * Update the last committed index and snapshotIndex with the last index in
+ * the snapshot.
+ * @param newSnapshotIndex the last index in the snapshot
+ */
+ public void updateSnapshotIndex(long newSnapshotIndex) {
+ try(AutoCloseableLock writeLock = writeLock()) {
+ final long oldSnapshotIndex = getSnapshotIndex();
+ if (oldSnapshotIndex < newSnapshotIndex) {
+ snapshotIndex.updateIncreasingly(newSnapshotIndex, infoIndexChange);
+ }
+ final long oldCommitIndex = getLastCommittedIndex();
+ if (oldCommitIndex < newSnapshotIndex) {
+ commitIndex.updateIncreasingly(newSnapshotIndex, traceIndexChange);
+ }
+ }
+ }
+
+ /**
* Does the log contains the given term and index? Used to check the
* consistency between the local log of a follower and the log entries sent
* by the leader.
@@ -311,13 +336,22 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
if (entry.hasMetadataEntry()) {
return;
}
+ long latestSnapshotIndex = getSnapshotIndex();
TermIndex lastTermIndex = getLastEntryTermIndex();
if (lastTermIndex != null) {
+ long lastIndex = lastTermIndex.getIndex() > latestSnapshotIndex ?
+ lastTermIndex.getIndex() : latestSnapshotIndex;
Preconditions.assertTrue(entry.getTerm() >= lastTermIndex.getTerm(),
"Entry term less than RaftLog's last term: %d, entry: %s", lastTermIndex.getTerm(), entry);
- Preconditions.assertTrue(entry.getIndex() == lastTermIndex.getIndex() + 1,
- "Difference between entry index and RaftLog's last index %d greater than 1, entry: %s",
- lastTermIndex.getIndex(), entry);
+ Preconditions.assertTrue(entry.getIndex() == lastIndex + 1,
+ "Difference between entry index and RaftLog's last index %d (or snapshot index %d) " +
+ "is greater than 1, entry: %s",
+ lastTermIndex.getIndex(), latestSnapshotIndex, entry);
+ } else {
+ Preconditions.assertTrue(entry.getIndex() == latestSnapshotIndex + 1,
+ "Difference between entry index and RaftLog's latest snapshot index %d is greater than 1 " +
+ "and in between log entries are not present, entry: %s",
+ latestSnapshotIndex, entry);
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 804a904..9bf4d41 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -248,7 +248,7 @@ public class SegmentedRaftLog extends RaftLog {
// entries to the state machine
boolean keepEntryInCache = (paths.size() - i++) <= cache.getMaxCachedSegments();
final Timer.Context loadSegmentContext = raftLogMetrics.getRaftLogLoadSegmentTimer().time();
- cache.loadSegment(pi, keepEntryInCache, logConsumer);
+ cache.loadSegment(pi, keepEntryInCache, logConsumer, lastIndexInSnapshot);
loadSegmentContext.stop();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 70b4dc8..b0ff789 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -342,10 +342,10 @@ class SegmentedRaftLogCache {
}
void loadSegment(LogPathAndIndex pi, boolean keepEntryInCache,
- Consumer<LogEntryProto> logConsumer) throws IOException {
+ Consumer<LogEntryProto> logConsumer, long lastIndexInSnapshot) throws IOException {
LogSegment logSegment = LogSegment.loadSegment(storage, pi.getPath().toFile(),
pi.getStartIndex(), pi.getEndIndex(), pi.isOpen(), keepEntryInCache, logConsumer, raftLogMetrics);
- if (logSegment != null) {
+ if (logSegment != null && logSegment.getEndIndex() > lastIndexInSnapshot) {
addSegment(logSegment);
}
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index f20dc5b..6d85bd2 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -274,7 +274,8 @@ public class TestSegmentedRaftLog extends BaseTest {
} catch (IllegalStateException e) {
ex = e;
}
- Assert.assertTrue(ex.getMessage().contains("and RaftLog's last index " + lastTermIndex.getIndex() + " greater than 1"));
+ Assert.assertTrue(ex.getMessage().contains("and RaftLog's last index " + lastTermIndex.getIndex()
+ + " (or snapshot index " + raftLog.getSnapshotIndex() + ") is greater than 1"));
}
}