You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ar...@apache.org on 2019/03/19 18:34:37 UTC

[incubator-ratis] branch master updated: RATIS-502. Commit Index less than the snapshot's commit indexes need to be ignored on restart. Contributed by Tsz Wo Nicholas Sze.

This is an automated email from the ASF dual-hosted git repository.

arp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c779f7a  RATIS-502. Commit Index less than the snapshot's commit indexes need to be ignored on restart. Contributed by Tsz Wo Nicholas Sze.
c779f7a is described below

commit c779f7a585227f5d57e2d21e130627d540b91d69
Author: Arpit Agarwal <ar...@apache.org>
AuthorDate: Tue Mar 19 11:34:24 2019 -0700

    RATIS-502. Commit Index less than the snapshot's commit indexes need to be ignored on restart. Contributed by Tsz Wo Nicholas Sze.
---
 .../org/apache/ratis/server/storage/RaftLog.java   |  2 +-
 .../apache/ratis/server/storage/RaftLogIndex.java  |  6 ++-
 .../ratis/statemachine/impl/BaseStateMachine.java  |  2 +-
 .../test/java/org/apache/ratis/RaftTestUtil.java   | 10 +++--
 .../apache/ratis/server/ServerRestartTests.java    | 47 +++++++++++++++++-----
 5 files changed, 50 insertions(+), 17 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index b1971e5..749d371 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -242,7 +242,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
       }
     });
     Optional.ofNullable(lastMetadataEntry).ifPresent(
-        e -> commitIndex.updateIncreasingly(e.getMetadataEntry().getCommitIndex(), infoIndexChange));
+        e -> commitIndex.updateToMax(e.getMetadataEntry().getCommitIndex(), infoIndexChange));
     state.open();
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java
index a7dd1a7..67e7e14 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java
@@ -65,8 +65,10 @@ public class RaftLogIndex {
 
   public boolean updateToMax(long newIndex, Consumer<Object> log) {
     final long old = index.getAndUpdate(oldIndex -> Math.max(oldIndex, newIndex));
-    log.accept(StringUtils.stringSupplierAsObject(() -> name + ": updateToMax " + old + " -> " + newIndex));
-    return old != newIndex;
+    final boolean updated = old < newIndex;
+    log.accept(StringUtils.stringSupplierAsObject(
+        () -> name + ": updateToMax old=" + old + ", new=" + newIndex + ", updated? " + updated));
+    return updated;
   }
 
   @Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 8a09bd4..f038b86 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -103,7 +103,7 @@ public class BaseStateMachine implements StateMachine {
     return lastAppliedTermIndex.get();
   }
 
-  protected void setLastAppliedTermIndex(TermIndex newTI) {
+  public void setLastAppliedTermIndex(TermIndex newTI) {
     lastAppliedTermIndex.set(newTI);
   }
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 5f782a0..93af79d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -200,9 +200,9 @@ public interface RaftTestUtil {
     });
   }
 
-  static void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage... expectedMessages) {
-    final List<LogEntryProto> entries = new ArrayList<>(expectedMessages.length);
-    for(LogEntryProto e : getLogEntryProtos(log)) {
+  static List<LogEntryProto> getStateMachineLogEntries(RaftLog log) {
+    final List<LogEntryProto> entries = new ArrayList<>();
+    for (LogEntryProto e : getLogEntryProtos(log)) {
       final String s = ServerProtoUtils.toString(e);
       if (e.hasStateMachineLogEntry()) {
         LOG.info(s + ", " + e.getStateMachineLogEntry().toString().trim().replace("\n", ", "));
@@ -215,7 +215,11 @@ public interface RaftTestUtil {
         throw new AssertionError("Unexpected LogEntryBodyCase " + e.getLogEntryBodyCase() + " at " + s);
       }
     }
+    return entries;
+  }
 
+  static void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage... expectedMessages) {
+    final List<LogEntryProto> entries = getStateMachineLogEntries(log);
     try {
       assertLogEntries(entries, expectedTerm, expectedMessages);
     } catch(Throwable t) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index f8f1a4c..ced1aba 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -32,6 +32,7 @@ import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.impl.ServerState;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.server.storage.SegmentedRaftLogFormat;
@@ -49,10 +50,12 @@ import org.junit.Test;
 import org.slf4j.Logger;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
@@ -219,24 +222,39 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
   }
 
   void runTestRestartCommitIndex(MiniRaftCluster cluster) throws Exception {
-    final TimeDuration sleepTime = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
-    final SimpleMessage[] messages = SimpleMessage.create(10);
-    try (final RaftClient client = cluster.createClient()) {
-      for(SimpleMessage m : messages) {
-        Assert.assertTrue(client.send(m).isSuccess());
+    final SimpleMessage[] messages = SimpleMessage.create(100);
+    final List<CompletableFuture<Void>> futures = new ArrayList<>(messages.length);
+    for(int i = 0; i < messages.length; i++) {
+      final CompletableFuture<Void> f = new CompletableFuture<>();
+      futures.add(f);
+
+      final SimpleMessage m = messages[i];
+      try (final RaftClient client = cluster.createClient()) {
+        new Thread(() -> {
+          try {
+            Assert.assertTrue(client.send(m).isSuccess());
+          } catch (IOException e) {
+            throw new IllegalStateException("Failed to send " + m, e);
+          }
+          f.complete(null);
+        }).start();
       }
     }
+    JavaUtils.allOf(futures).get();
 
     final List<RaftPeerId> ids = new ArrayList<>();
-    final RaftLog leaderLog = cluster.getLeader().getState().getLog();
+    final RaftServerImpl leader = cluster.getLeader();
+    final RaftLog leaderLog = leader.getState().getLog();
     final RaftPeerId leaderId = leaderLog.getSelfId();
     ids.add(leaderId);
 
+    RaftTestUtil.getStateMachineLogEntries(leaderLog);
+
     // check that the last logged commit index is equal to the index of the last committed StateMachineLogEntry
     final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
     LOG.info("{}: leader lastIndex={}", leaderId, lastIndex);
     JavaUtils.attempt(() -> leaderLog.getLastCommittedIndex() == lastIndex,
-        10, sleepTime, "leader(commitIndex == lastIndex)", LOG);
+        10, HUNDRED_MILLIS, "leader(commitIndex == lastIndex)", LOG);
 
     final LogEntryProto lastEntry = leaderLog.get(lastIndex);
     LOG.info("{}: leader lastEntry entry[{}] = {}", leaderId, lastIndex, ServerProtoUtils.toLogEntryString(lastEntry));
@@ -245,13 +263,18 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
     for(long i = lastIndex - 1; i > loggedCommitIndex; i--) {
       final LogEntryProto entry = leaderLog.get(i);
       LOG.info("{}: leader entry[{}] =  {}", leaderId, i, ServerProtoUtils.toLogEntryString(entry));
-      Assert.assertFalse(entry.hasStateMachineLogEntry());
     }
     final LogEntryProto lastCommittedEntry = leaderLog.get(loggedCommitIndex);
     LOG.info("{}: leader lastCommittedEntry = entry[{}] = {}",
         leaderId, loggedCommitIndex, ServerProtoUtils.toLogEntryString(lastCommittedEntry));
     Assert.assertTrue(lastCommittedEntry.hasStateMachineLogEntry());
 
+    final SimpleStateMachine4Testing leaderStateMachine = SimpleStateMachine4Testing.get(leader);
+    final TermIndex lastAppliedTermIndex = leaderStateMachine.getLastAppliedTermIndex();
+    LOG.info("{}: leader lastAppliedTermIndex = {}", leaderId, lastAppliedTermIndex);
+    Assert.assertEquals(lastCommittedEntry.getTerm(), lastAppliedTermIndex.getTerm());
+    Assert.assertEquals(lastCommittedEntry.getIndex(), lastAppliedTermIndex.getIndex());
+
     // check follower logs
     for(RaftServerImpl s : cluster.iterateServerImpls()) {
       if (!s.getId().equals(leaderId)) {
@@ -260,6 +283,10 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
       }
     }
 
+    // take snapshot and truncate last (metadata) entry
+    leaderStateMachine.takeSnapshot();
+    leaderLog.truncate(lastIndex);
+
     // kill all servers
     ids.forEach(cluster::killServer);
 
@@ -269,9 +296,9 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
       final RaftServerImpl server = cluster.getRaftServerImpl(id);
       final RaftLog raftLog = server.getState().getLog();
       JavaUtils.attempt(() -> raftLog.getLastCommittedIndex() >= loggedCommitIndex,
-          10, sleepTime, id + "(commitIndex >= loggedCommitIndex)", LOG);
+          10, HUNDRED_MILLIS, id + "(commitIndex >= loggedCommitIndex)", LOG);
       JavaUtils.attempt(() -> server.getState().getLastAppliedIndex() >= loggedCommitIndex,
-          10, sleepTime, id + "(lastAppliedIndex >= loggedCommitIndex)", LOG);
+          10, HUNDRED_MILLIS, id + "(lastAppliedIndex >= loggedCommitIndex)", LOG);
       LOG.info("{}: commitIndex={}, lastAppliedIndex={}",
           id, raftLog.getLastCommittedIndex(), server.getState().getLastAppliedIndex());
       cluster.killServer(id);