You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ar...@apache.org on 2019/03/19 18:34:37 UTC
[incubator-ratis] branch master updated: RATIS-502. Commit Index
less than the snapshot's commit indexes need to be ignored on restart.
Contributed by Tsz Wo Nicholas Sze.
This is an automated email from the ASF dual-hosted git repository.
arp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c779f7a RATIS-502. Commit Index less than the snapshot's commit indexes need to be ignored on restart. Contributed by Tsz Wo Nicholas Sze.
c779f7a is described below
commit c779f7a585227f5d57e2d21e130627d540b91d69
Author: Arpit Agarwal <ar...@apache.org>
AuthorDate: Tue Mar 19 11:34:24 2019 -0700
RATIS-502. Commit Index less than the snapshot's commit indexes need to be ignored on restart. Contributed by Tsz Wo Nicholas Sze.
---
.../org/apache/ratis/server/storage/RaftLog.java | 2 +-
.../apache/ratis/server/storage/RaftLogIndex.java | 6 ++-
.../ratis/statemachine/impl/BaseStateMachine.java | 2 +-
.../test/java/org/apache/ratis/RaftTestUtil.java | 10 +++--
.../apache/ratis/server/ServerRestartTests.java | 47 +++++++++++++++++-----
5 files changed, 50 insertions(+), 17 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index b1971e5..749d371 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -242,7 +242,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
}
});
Optional.ofNullable(lastMetadataEntry).ifPresent(
- e -> commitIndex.updateIncreasingly(e.getMetadataEntry().getCommitIndex(), infoIndexChange));
+ e -> commitIndex.updateToMax(e.getMetadataEntry().getCommitIndex(), infoIndexChange));
state.open();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java
index a7dd1a7..67e7e14 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java
@@ -65,8 +65,10 @@ public class RaftLogIndex {
public boolean updateToMax(long newIndex, Consumer<Object> log) {
final long old = index.getAndUpdate(oldIndex -> Math.max(oldIndex, newIndex));
- log.accept(StringUtils.stringSupplierAsObject(() -> name + ": updateToMax " + old + " -> " + newIndex));
- return old != newIndex;
+ final boolean updated = old < newIndex;
+ log.accept(StringUtils.stringSupplierAsObject(
+ () -> name + ": updateToMax old=" + old + ", new=" + newIndex + ", updated? " + updated));
+ return updated;
}
@Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 8a09bd4..f038b86 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -103,7 +103,7 @@ public class BaseStateMachine implements StateMachine {
return lastAppliedTermIndex.get();
}
- protected void setLastAppliedTermIndex(TermIndex newTI) {
+ public void setLastAppliedTermIndex(TermIndex newTI) {
lastAppliedTermIndex.set(newTI);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 5f782a0..93af79d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -200,9 +200,9 @@ public interface RaftTestUtil {
});
}
- static void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage... expectedMessages) {
- final List<LogEntryProto> entries = new ArrayList<>(expectedMessages.length);
- for(LogEntryProto e : getLogEntryProtos(log)) {
+ static List<LogEntryProto> getStateMachineLogEntries(RaftLog log) {
+ final List<LogEntryProto> entries = new ArrayList<>();
+ for (LogEntryProto e : getLogEntryProtos(log)) {
final String s = ServerProtoUtils.toString(e);
if (e.hasStateMachineLogEntry()) {
LOG.info(s + ", " + e.getStateMachineLogEntry().toString().trim().replace("\n", ", "));
@@ -215,7 +215,11 @@ public interface RaftTestUtil {
throw new AssertionError("Unexpected LogEntryBodyCase " + e.getLogEntryBodyCase() + " at " + s);
}
}
+ return entries;
+ }
+ static void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage... expectedMessages) {
+ final List<LogEntryProto> entries = getStateMachineLogEntries(log);
try {
assertLogEntries(entries, expectedTerm, expectedMessages);
} catch(Throwable t) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index f8f1a4c..ced1aba 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -32,6 +32,7 @@ import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.impl.ServerState;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
import org.apache.ratis.server.storage.SegmentedRaftLogFormat;
@@ -49,10 +50,12 @@ import org.junit.Test;
import org.slf4j.Logger;
import java.io.File;
+import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -219,24 +222,39 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
}
void runTestRestartCommitIndex(MiniRaftCluster cluster) throws Exception {
- final TimeDuration sleepTime = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
- final SimpleMessage[] messages = SimpleMessage.create(10);
- try (final RaftClient client = cluster.createClient()) {
- for(SimpleMessage m : messages) {
- Assert.assertTrue(client.send(m).isSuccess());
+ final SimpleMessage[] messages = SimpleMessage.create(100);
+ final List<CompletableFuture<Void>> futures = new ArrayList<>(messages.length);
+ for(int i = 0; i < messages.length; i++) {
+ final CompletableFuture<Void> f = new CompletableFuture<>();
+ futures.add(f);
+
+ final SimpleMessage m = messages[i];
+ try (final RaftClient client = cluster.createClient()) {
+ new Thread(() -> {
+ try {
+ Assert.assertTrue(client.send(m).isSuccess());
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to send " + m, e);
+ }
+ f.complete(null);
+ }).start();
}
}
+ JavaUtils.allOf(futures).get();
final List<RaftPeerId> ids = new ArrayList<>();
- final RaftLog leaderLog = cluster.getLeader().getState().getLog();
+ final RaftServerImpl leader = cluster.getLeader();
+ final RaftLog leaderLog = leader.getState().getLog();
final RaftPeerId leaderId = leaderLog.getSelfId();
ids.add(leaderId);
+ RaftTestUtil.getStateMachineLogEntries(leaderLog);
+
// check that the last logged commit index is equal to the index of the last committed StateMachineLogEntry
final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
LOG.info("{}: leader lastIndex={}", leaderId, lastIndex);
JavaUtils.attempt(() -> leaderLog.getLastCommittedIndex() == lastIndex,
- 10, sleepTime, "leader(commitIndex == lastIndex)", LOG);
+ 10, HUNDRED_MILLIS, "leader(commitIndex == lastIndex)", LOG);
final LogEntryProto lastEntry = leaderLog.get(lastIndex);
LOG.info("{}: leader lastEntry entry[{}] = {}", leaderId, lastIndex, ServerProtoUtils.toLogEntryString(lastEntry));
@@ -245,13 +263,18 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
for(long i = lastIndex - 1; i > loggedCommitIndex; i--) {
final LogEntryProto entry = leaderLog.get(i);
LOG.info("{}: leader entry[{}] = {}", leaderId, i, ServerProtoUtils.toLogEntryString(entry));
- Assert.assertFalse(entry.hasStateMachineLogEntry());
}
final LogEntryProto lastCommittedEntry = leaderLog.get(loggedCommitIndex);
LOG.info("{}: leader lastCommittedEntry = entry[{}] = {}",
leaderId, loggedCommitIndex, ServerProtoUtils.toLogEntryString(lastCommittedEntry));
Assert.assertTrue(lastCommittedEntry.hasStateMachineLogEntry());
+ final SimpleStateMachine4Testing leaderStateMachine = SimpleStateMachine4Testing.get(leader);
+ final TermIndex lastAppliedTermIndex = leaderStateMachine.getLastAppliedTermIndex();
+ LOG.info("{}: leader lastAppliedTermIndex = {}", leaderId, lastAppliedTermIndex);
+ Assert.assertEquals(lastCommittedEntry.getTerm(), lastAppliedTermIndex.getTerm());
+ Assert.assertEquals(lastCommittedEntry.getIndex(), lastAppliedTermIndex.getIndex());
+
// check follower logs
for(RaftServerImpl s : cluster.iterateServerImpls()) {
if (!s.getId().equals(leaderId)) {
@@ -260,6 +283,10 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
}
}
+ // take snapshot and truncate last (metadata) entry
+ leaderStateMachine.takeSnapshot();
+ leaderLog.truncate(lastIndex);
+
// kill all servers
ids.forEach(cluster::killServer);
@@ -269,9 +296,9 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
final RaftServerImpl server = cluster.getRaftServerImpl(id);
final RaftLog raftLog = server.getState().getLog();
JavaUtils.attempt(() -> raftLog.getLastCommittedIndex() >= loggedCommitIndex,
- 10, sleepTime, id + "(commitIndex >= loggedCommitIndex)", LOG);
+ 10, HUNDRED_MILLIS, id + "(commitIndex >= loggedCommitIndex)", LOG);
JavaUtils.attempt(() -> server.getState().getLastAppliedIndex() >= loggedCommitIndex,
- 10, sleepTime, id + "(lastAppliedIndex >= loggedCommitIndex)", LOG);
+ 10, HUNDRED_MILLIS, id + "(lastAppliedIndex >= loggedCommitIndex)", LOG);
LOG.info("{}: commitIndex={}, lastAppliedIndex={}",
id, raftLog.getLastCommittedIndex(), server.getState().getLastAppliedIndex());
cluster.killServer(id);