You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/10/24 22:54:53 UTC

incubator-ratis git commit: r371

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 2272086a5 -> ce5f48c41


r371


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ce5f48c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ce5f48c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ce5f48c4

Branch: refs/heads/master
Commit: ce5f48c41e4666f96dfd1444845f1f9a1e0cf0bc
Parents: 2272086
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Wed Oct 24 23:58:46 2018 +0800
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Wed Oct 24 23:58:46 2018 +0800

----------------------------------------------------------------------
 .../org/apache/ratis/TestRestartRaftPeer.java   | 106 -----------
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java |   9 +-
 .../ratis/grpc/TestServerRestartWithGrpc.java   |  25 +++
 .../ratis/netty/TestServerRestartWithNetty.java |  25 +++
 .../java/org/apache/ratis/RaftAsyncTests.java   |   6 +-
 .../java/org/apache/ratis/RaftTestUtil.java     |  18 +-
 .../java/org/apache/ratis/RetryCacheTests.java  |  19 +-
 .../org/apache/ratis/WatchRequestTests.java     | 188 ++++++++++---------
 .../apache/ratis/server/ServerRestartTests.java | 110 +++++++++++
 .../apache/ratis/server/TestRaftLogMetrics.java |  69 +++----
 .../impl/RaftReconfigurationBaseTest.java       |  13 +-
 .../ratis/server/impl/RaftServerTestUtil.java   |  11 +-
 .../impl/RaftStateMachineExceptionTests.java    |   9 +-
 .../server/impl/StateMachineShutdownTests.java  |   2 +-
 .../TestServerRestartWithSimulatedRpc.java      |  25 +++
 .../server/storage/RaftStorageTestUtils.java    |   9 +-
 .../statemachine/RaftSnapshotBaseTest.java      |  75 +++++---
 .../SimpleStateMachine4Testing.java             |  78 +++++---
 18 files changed, 472 insertions(+), 325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
deleted file mode 100644
index ccbbda0..0000000
--- a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis;
-
-import org.apache.log4j.Level;
-import org.apache.ratis.RaftTestUtil.SimpleMessage;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.examples.ParameterizedBaseTest;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.LogUtils;
-import org.apache.ratis.util.SizeInBytes;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
- * Test restarting raft peers.
- */
-@RunWith(Parameterized.class)
-public class TestRestartRaftPeer extends BaseTest {
-  static {
-    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
-    LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
-    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> data() throws IOException {
-    RaftProperties prop = new RaftProperties();
-    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
-        SimpleStateMachine4Testing.class, StateMachine.class);
-    RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB"));
-    return ParameterizedBaseTest.getMiniRaftClusters(prop, 3);
-  }
-
-  @Parameterized.Parameter
-  public MiniRaftCluster cluster;
-
-  @Test
-  public void restartFollower() throws Exception {
-    cluster.start();
-    RaftTestUtil.waitForLeader(cluster);
-    final RaftPeerId leaderId = cluster.getLeader().getId();
-    final RaftClient client = cluster.createClient(leaderId);
-
-    // write some messages
-    final byte[] content = new byte[1024];
-    Arrays.fill(content, (byte) 1);
-    final SimpleMessage message = new SimpleMessage(new String(content));
-    for (int i = 0; i < 10; i++) {
-      Assert.assertTrue(client.send(message).isSuccess());
-    }
-
-    // restart a follower
-    RaftPeerId followerId = cluster.getFollowers().get(0).getId();
-    LOG.info("Restart follower {}", followerId);
-    cluster.restartServer(followerId, false);
-
-    // write some more messages
-    for (int i = 0; i < 10; i++) {
-      Assert.assertTrue(client.send(message).isSuccess());
-    }
-    client.close();
-
-    // make sure the restarted follower can catchup
-    boolean catchup = false;
-    long lastAppliedIndex = 0;
-    for (int i = 0; i < 10 && !catchup; i++) {
-      Thread.sleep(500);
-      lastAppliedIndex = cluster.getRaftServerImpl(followerId).getState().getLastAppliedIndex();
-      catchup = lastAppliedIndex >= 20;
-    }
-    Assert.assertTrue("lastAppliedIndex=" + lastAppliedIndex, catchup);
-
-    // make sure the restarted peer's log segments is correct
-    cluster.restartServer(followerId, false);
-    Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog()
-        .getLastEntryTermIndex().getIndex() >= 20);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 7ae385d..d98be53 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -92,16 +92,17 @@ public class TestRaftWithGrpc
         Assert.assertEquals(raftServer.getState().getLog().getNextIndex(), index);
         if (!raftServer.isLeader()) {
           TermIndex[] serverEntries = raftServer.getState().getLog().getEntries(0, Integer.MAX_VALUE);
-          Arrays.equals(serverEntries, leaderEntries);
+          Assert.assertArrayEquals(serverEntries, leaderEntries);
         }
       });
 
       // Wait for heartbeats from leader to be received by followers
-      Thread.sleep(1000);
+      Thread.sleep(500);
       RaftServerTestUtil.getLogAppenders(cluster.getLeader()).forEach(logAppender -> {
         // FollowerInfo in the leader state should have updated next and match index.
-        Assert.assertEquals(logAppender.getFollower().getMatchIndex(), index - 1);
-        Assert.assertEquals(logAppender.getFollower().getNextIndex(), index);
+        final long followerMatchIndex = logAppender.getFollower().getMatchIndex();
+        Assert.assertTrue(followerMatchIndex >= index - 1);
+        Assert.assertEquals(followerMatchIndex + 1, logAppender.getFollower().getNextIndex());
       });
     }
     cluster.shutdown();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java
new file mode 100644
index 0000000..682b3ba
--- /dev/null
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.server.ServerRestartTests;
+
+public class TestServerRestartWithGrpc
+    extends ServerRestartTests<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java
new file mode 100644
index 0000000..15dc688
--- /dev/null
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.server.ServerRestartTests;
+
+public class TestServerRestartWithNetty
+    extends ServerRestartTests<MiniRaftClusterWithNetty>
+    implements MiniRaftClusterWithNetty.FactoryGet {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index f79eb6b..c14515c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -192,7 +192,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
       // submit some messages
       final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>();
       for (int i = 0; i < numMesssages; i++) {
-        final String s = "m" + i;
+        final String s = "" + i;
         LOG.info("sendAsync " + s);
         futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage(s)));
       }
@@ -218,12 +218,12 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
       // test a failure case
       testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index",
           () -> client.sendStaleReadAsync(
-              new RaftTestUtil.SimpleMessage("" + (numMesssages + 1)),
+              new RaftTestUtil.SimpleMessage("" + Long.MAX_VALUE),
               followerCommitInfo.getCommitIndex(), follower),
           StateMachineException.class, IndexOutOfBoundsException.class);
 
       // test sendStaleReadAsync
-      for (int i = 1; i < followerCommitInfo.getCommitIndex(); i++) {
+      for (int i = 0; i < numMesssages; i++) {
         final int query = i;
         LOG.info("sendStaleReadAsync, query=" + query);
         final Message message = new RaftTestUtil.SimpleMessage("" + query);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 60629f9..5946a47 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -113,11 +113,14 @@ public interface RaftTestUtil {
     return leader != null ? leader.getId().toString() : null;
   }
 
-  static boolean logEntriesContains(RaftLog log,
-      SimpleMessage... expectedMessages) {
+  static boolean logEntriesContains(RaftLog log, SimpleMessage... expectedMessages) {
+    return logEntriesContains(log, 0L, Long.MAX_VALUE, expectedMessages);
+  }
+
+  static boolean logEntriesContains(RaftLog log, long startIndex, long endIndex, SimpleMessage... expectedMessages) {
     int idxEntries = 0;
     int idxExpected = 0;
-    TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE);
+    TermIndex[] termIndices = log.getEntries(startIndex, endIndex);
     while (idxEntries < termIndices.length
         && idxExpected < expectedMessages.length) {
       try {
@@ -376,4 +379,13 @@ public interface RaftTestUtil {
       }
     }).start();
   }
+
+  static void assertSameLog(RaftLog expected, RaftLog computed) throws Exception {
+    Assert.assertEquals(expected.getLastEntryTermIndex(), computed.getLastEntryTermIndex());
+    final long lastIndex = expected.getNextIndex() - 1;
+    Assert.assertEquals(expected.getLastEntryTermIndex().getIndex(), lastIndex);
+    for(long i = 0; i < lastIndex; i++) {
+      Assert.assertEquals(expected.get(i), computed.get(i));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 9fdb4f7..c962481 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -29,6 +29,8 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftLogIOException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -36,6 +38,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.stream.LongStream;
 
 import static java.util.Arrays.asList;
 
@@ -110,10 +113,21 @@ public abstract class RetryCacheTests extends BaseTest {
       Assert.assertEquals(2, RaftServerTestUtil.getRetryCacheSize(server));
       Assert.assertNotNull(RaftServerTestUtil.getRetryEntry(server, clientId, callId));
       // make sure there is only one log entry committed
-      Assert.assertEquals(oldLastApplied + 1, server.getState().getLastAppliedIndex());
+      Assert.assertEquals(1, count(server.getState().getLog(), oldLastApplied + 1));
     }
   }
 
+  static int count(RaftLog log, long startIndex) throws RaftLogIOException {
+    final long nextIndex = log.getNextIndex();
+    int count = 0;
+    for(long i = startIndex; i < nextIndex; i++) {
+      if (log.get(i).hasStateMachineLogEntry()) {
+        count++;
+      }
+    }
+    return count;
+  }
+
   /**
    * Test retry while the leader changes to another peer
    */
@@ -158,8 +172,7 @@ public abstract class RetryCacheTests extends BaseTest {
     }
 
     // check the new leader and make sure the retry did not get committed
-    Assert.assertEquals(oldLastApplied + 3,
-        cluster.getLeader().getState().getLastAppliedIndex());
+    Assert.assertEquals(0, count(cluster.getLeader().getState().getLog(), oldLastApplied + 1));
     client.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 9ff27ad..d1cb7e0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -41,6 +41,8 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
     extends BaseTest
@@ -71,7 +73,6 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
   }
 
   static class TestParameters {
-    final long startLogIndex;
     final int numMessages;
     final RaftClient writeClient;
     final RaftClient watchMajorityClient;
@@ -81,12 +82,10 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
     final MiniRaftCluster cluster;
     final Logger log;
 
-    TestParameters(
-        long startLogIndex, int numMessages, RaftClient writeClient,
+    TestParameters(int numMessages, RaftClient writeClient,
         RaftClient watchMajorityClient, RaftClient watchAllClient,
         RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient,
         MiniRaftCluster cluster, Logger log) {
-      this.startLogIndex = startLogIndex;
       this.numMessages = numMessages;
       this.writeClient = writeClient;
       this.watchMajorityClient = watchMajorityClient;
@@ -97,9 +96,31 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
       this.log = log;
     }
 
+    void sendRequests(List<CompletableFuture<RaftClientReply>> replies,
+        List<CompletableFuture<WatchReplies>> watches) {
+      for(int i = 0; i < numMessages; i++) {
+        final String message = "m" + i;
+        log.info("SEND_REQUEST {}: message={}", i, message);
+        final CompletableFuture<RaftClientReply> replyFuture = writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message));
+        replies.add(replyFuture);
+        final CompletableFuture<WatchReplies> watchFuture = new CompletableFuture<>();
+        watches.add(watchFuture);
+        replyFuture.thenAccept(reply -> {
+          final long logIndex = reply.getLogIndex();
+          log.info("SEND_WATCH: message={}, logIndex={}", message, logIndex);
+          watchFuture.complete(new WatchReplies(logIndex,
+              watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY),
+              watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL),
+              watchMajorityCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED),
+              watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED)
+          ));
+        });
+      }
+    }
+
     @Override
     public String toString() {
-      return "startLogIndex=" + startLogIndex + ", numMessages=" + numMessages;
+      return "numMessages=" + numMessages;
     }
   }
 
@@ -119,10 +140,9 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
         final RaftClient watchAllCommittedClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) {
       final int[] numMessages = {1, 10, 100};
       for(int i = 0; i < 5; i++) {
-        final long logIndex = getLogIndex(writeClient) + 1;
         final int n = numMessages[ThreadLocalRandom.current().nextInt(numMessages.length)];
         final TestParameters p = new TestParameters(
-            logIndex, n, writeClient, watchMajorityClient, watchAllClient,
+            n, writeClient, watchMajorityClient, watchAllClient,
             watchMajorityCommittedClient, watchAllCommittedClient, cluster, LOG);
         LOG.info("{}) {}, {}", i, p, cluster.printServers());
         testCase.apply(p);
@@ -131,18 +151,29 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
     }
   }
 
-  static Void runTestWatchRequestAsync(TestParameters p) throws Exception {
-    runTestWatchRequestAsync(p.startLogIndex, p.numMessages,
-        p.writeClient, p.watchMajorityClient, p.watchAllClient,
-        p.watchMajorityCommittedClient, p.watchAllCommittedClient, p.cluster, p.log);
-    return null;
+  static class WatchReplies {
+    private final long logIndex;
+    private final CompletableFuture<RaftClientReply> majority;
+    private final CompletableFuture<RaftClientReply> all;
+    private final CompletableFuture<RaftClientReply> majorityCommitted;
+    private final CompletableFuture<RaftClientReply> allCommitted;
+
+    WatchReplies(long logIndex,
+        CompletableFuture<RaftClientReply> majority, CompletableFuture<RaftClientReply> all,
+        CompletableFuture<RaftClientReply> majorityCommitted, CompletableFuture<RaftClientReply> allCommitted) {
+      this.logIndex = logIndex;
+      this.majority = majority;
+      this.all = all;
+      this.majorityCommitted = majorityCommitted;
+      this.allCommitted = allCommitted;
+    }
   }
 
-  static void runTestWatchRequestAsync(
-      long startLogIndex, int numMessages,
-      RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient,
-      RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient,
-      MiniRaftCluster cluster, Logger LOG) throws Exception {
+  static Void runTestWatchRequestAsync(TestParameters p) throws Exception {
+    final Logger LOG = p.log;
+    final MiniRaftCluster cluster = p.cluster;
+    final int numMessages = p.numMessages;
+
     // blockStartTransaction of the leader so that no transaction can be committed MAJORITY
     final RaftServerImpl leader = cluster.getLeader();
     LOG.info("block leader {}", leader.getId());
@@ -156,52 +187,35 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
 
     // send a message
     final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
-    final List<CompletableFuture<RaftClientReply>> watchMajoritys = new ArrayList<>();
-    final List<CompletableFuture<RaftClientReply>> watchAlls = new ArrayList<>();
-    final List<CompletableFuture<RaftClientReply>> watchMajorityCommitteds = new ArrayList<>();
-    final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new ArrayList<>();
+    final List<CompletableFuture<WatchReplies>> watches = new ArrayList<>();
 
-    for(int i = 0; i < numMessages; i++) {
-      final long logIndex = startLogIndex + i;
-      final String message = "m" + logIndex;
-      LOG.info("SEND_REQUEST {}: logIndex={}, message={}", i, logIndex, message);
-      replies.add(writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message)));
-      watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY));
-      watchAlls.add(watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL));
-      watchMajorityCommitteds.add(watchMajorityCommittedClient.sendWatchAsync(
-          logIndex, ReplicationLevel.MAJORITY_COMMITTED));
-      watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED));
-    }
+    p.sendRequests(replies, watches);
 
     Assert.assertEquals(numMessages, replies.size());
-    Assert.assertEquals(numMessages, watchMajoritys.size());
-    Assert.assertEquals(numMessages, watchAlls.size());
-    Assert.assertEquals(numMessages, watchMajorityCommitteds.size());
-    Assert.assertEquals(numMessages, watchAllCommitteds.size());
+    Assert.assertEquals(numMessages, watches.size());
 
     // since leader is blocked, nothing can be done.
     TimeUnit.SECONDS.sleep(1);
     assertNotDone(replies);
-    assertNotDone(watchMajoritys);
-    assertNotDone(watchAlls);
-    assertNotDone(watchMajorityCommitteds);
-    assertNotDone(watchAllCommitteds);
+    assertNotDone(watches);
 
     // unblock leader so that the transaction can be committed.
     SimpleStateMachine4Testing.get(leader).unblockStartTransaction();
     LOG.info("unblock leader {}", leader.getId());
     for(int i = 0; i < numMessages; i++) {
-      final long logIndex = startLogIndex + i;
-      LOG.info("UNBLOCK_LEADER {}: logIndex={}", i, logIndex);
       final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      final long logIndex = reply.getLogIndex();
+      LOG.info("{}: receive reply for logIndex={}", i, logIndex);
       Assert.assertTrue(reply.isSuccess());
-      Assert.assertEquals(logIndex, reply.getLogIndex());
-      final RaftClientReply watchMajorityReply = watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+
+      final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      Assert.assertEquals(logIndex, watchReplies.logIndex);
+      final RaftClientReply watchMajorityReply = watchReplies.majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply);
-      Assert.assertTrue(watchMajoritys.get(i).get().isSuccess());
+      Assert.assertTrue(watchMajorityReply.isSuccess());
 
       final RaftClientReply watchMajorityCommittedReply
-          = watchMajorityCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+          = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchMajorityCommittedReply({}) = ", logIndex, watchMajorityCommittedReply);
       Assert.assertTrue(watchMajorityCommittedReply.isSuccess());
       { // check commit infos
@@ -219,22 +233,25 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
       }
     }
 
+    Assert.assertEquals(numMessages, watches.size());
+
     // but not replicated/committed to all.
     TimeUnit.SECONDS.sleep(1);
-    assertNotDone(watchAlls);
-    assertNotDone(watchAllCommitteds);
+    assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.all));
+    assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.allCommitted));
 
     // unblock follower so that the transaction can be replicated and committed to all.
     LOG.info("unblock follower {}", blockedFollower.getId());
     SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData();
     for(int i = 0; i < numMessages; i++) {
-      final long logIndex = startLogIndex + i;
+      final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      final long logIndex = watchReplies.logIndex;
       LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex);
-      final RaftClientReply watchAllReply = watchAlls.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      final RaftClientReply watchAllReply = watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply);
       Assert.assertTrue(watchAllReply.isSuccess());
 
-      final RaftClientReply watchAllCommittedReply = watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      final RaftClientReply watchAllCommittedReply = watchReplies.allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchAllCommittedReply({}) = {}", logIndex, watchAllCommittedReply);
       Assert.assertTrue(watchAllCommittedReply.isSuccess());
       { // check commit infos
@@ -243,9 +260,14 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
         commitInfos.forEach(info -> Assert.assertTrue(logIndex <= info.getCommitIndex()));
       }
     }
+    return null;
   }
 
   static <T> void assertNotDone(List<CompletableFuture<T>> futures) {
+    assertNotDone(futures.stream());
+  }
+
+  static <T> void assertNotDone(Stream<CompletableFuture<T>> futures) {
     futures.forEach(f -> {
       if (f.isDone()) {
         try {
@@ -267,65 +289,44 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
   }
 
   static Void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws Exception {
-    runTestWatchRequestAsyncChangeLeader(p.startLogIndex, p.numMessages,
-        p.writeClient, p.watchMajorityClient, p.watchAllClient,
-        p.watchMajorityCommittedClient, p.watchAllCommittedClient, p.cluster, p.log);
-    return null;
-  }
+    final Logger LOG = p.log;
+    final MiniRaftCluster cluster = p.cluster;
+    final int numMessages = p.numMessages;
 
-  static void runTestWatchRequestAsyncChangeLeader(
-      long startLogIndex, int numMessages,
-      RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient,
-      RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient,
-      MiniRaftCluster cluster, Logger LOG) throws Exception {
     // blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED
     final List<RaftServerImpl> followers = cluster.getFollowers();
     final RaftServerImpl blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
     LOG.info("block follower {}", blockedFollower.getId());
     SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData();
 
-    // send a message
     final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
-    final List<CompletableFuture<RaftClientReply>> watchMajoritys = new ArrayList<>();
-    final List<CompletableFuture<RaftClientReply>> watchAlls = new ArrayList<>();
-    final List<CompletableFuture<RaftClientReply>> watchMajorityCommitteds = new ArrayList<>();
-    final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new ArrayList<>();
+    final List<CompletableFuture<WatchReplies>> watches = new ArrayList<>();
 
-    for(int i = 0; i < numMessages; i++) {
-      final long logIndex = startLogIndex + i;
-      final String message = "m" + logIndex;
-      LOG.info("SEND_REQUEST {}: logIndex={}, message={}", i, logIndex, message);
-      replies.add(writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message)));
-      watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY));
-      watchAlls.add(watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL));
-      watchMajorityCommitteds.add(
-          watchMajorityCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED));
-      watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED));
-    }
+    p.sendRequests(replies, watches);
 
     Assert.assertEquals(numMessages, replies.size());
-    Assert.assertEquals(numMessages, watchMajoritys.size());
-    Assert.assertEquals(numMessages, watchAlls.size());
-    Assert.assertEquals(numMessages, watchMajorityCommitteds.size());
-    Assert.assertEquals(numMessages, watchAllCommitteds.size());
+    Assert.assertEquals(numMessages, watches.size());
 
     // since only one follower is blocked, requests can be committed MAJORITY but neither ALL nor ALL_COMMITTED.
     for(int i = 0; i < numMessages; i++) {
-      final long logIndex = startLogIndex + i;
-      LOG.info("UNBLOCK_F1 {}: logIndex={}", i, logIndex);
       final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      final long logIndex = reply.getLogIndex();
+      LOG.info("UNBLOCK_F1 {}: reply logIndex={}", i, logIndex);
       Assert.assertTrue(reply.isSuccess());
-      Assert.assertEquals(logIndex, reply.getLogIndex());
-      final RaftClientReply watchMajorityReply = watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+
+      final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      Assert.assertEquals(logIndex, watchReplies.logIndex);
+      final RaftClientReply watchMajorityReply = watchReplies.majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply);
-      Assert.assertTrue(watchMajoritys.get(i).get().isSuccess());
+      Assert.assertTrue(watchMajorityReply.isSuccess());
 
       final RaftClientReply watchMajorityCommittedReply
-          = watchMajorityCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+          = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchMajorityCommittedReply({}) = ", logIndex, watchMajorityCommittedReply);
       Assert.assertTrue(watchMajorityCommittedReply.isSuccess());
       { // check commit infos
         final Collection<CommitInfoProto> commitInfos = watchMajorityCommittedReply.getCommitInfos();
+        LOG.info("commitInfos=" + commitInfos);
         Assert.assertEquals(NUM_SERVERS, commitInfos.size());
 
         // One follower has not committed, so min must be less than logIndex
@@ -339,8 +340,8 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
       }
     }
     TimeUnit.SECONDS.sleep(1);
-    assertNotDone(watchAlls);
-    assertNotDone(watchAllCommitteds);
+    assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.all));
+    assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.allCommitted));
 
     // Now change leader
     RaftTestUtil.changeLeader(cluster, cluster.getLeader().getId());
@@ -349,13 +350,14 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
     SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData();
     LOG.info("unblock follower {}", blockedFollower.getId());
     for(int i = 0; i < numMessages; i++) {
-      final long logIndex = startLogIndex + i;
+      final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      final long logIndex = watchReplies.logIndex;
       LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex);
-      final RaftClientReply watchAllReply = watchAlls.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      final RaftClientReply watchAllReply = watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply);
       Assert.assertTrue(watchAllReply.isSuccess());
 
-      final RaftClientReply watchAllCommittedReply = watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      final RaftClientReply watchAllCommittedReply = watchReplies.allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchAllCommittedReply({}) = {}", logIndex, watchAllCommittedReply);
       Assert.assertTrue(watchAllCommittedReply.isSuccess());
       { // check commit infos
@@ -364,6 +366,6 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
         commitInfos.forEach(info -> Assert.assertTrue(logIndex <= info.getCommitIndex()));
       }
     }
+    return null;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
new file mode 100644
index 0000000..5353caa
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerState;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test restarting raft peers.
+ */
+public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  static {
+    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
+  }
+
+  static final int NUM_SERVERS = 3;
+
+  @Before
+  public void setup() {
+    final RaftProperties prop = getProperties();
+    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        SimpleStateMachine4Testing.class, StateMachine.class);
+    RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB"));
+  }
+
+  @Test
+  public void testRestartFollower() throws Exception {
+    try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) {
+      runTestRestartFollower(cluster, LOG);
+    }
+  }
+
+  static void runTestRestartFollower(MiniRaftCluster cluster, Logger LOG) throws Exception {
+    cluster.start();
+    RaftTestUtil.waitForLeader(cluster);
+    final RaftPeerId leaderId = cluster.getLeader().getId();
+    final RaftClient client = cluster.createClient(leaderId);
+
+    // write some messages
+    final byte[] content = new byte[1024];
+    Arrays.fill(content, (byte)1);
+    final SimpleMessage message = new SimpleMessage(new String(content));
+    for(int i = 0; i < 10; i++) {
+      Assert.assertTrue(client.send(message).isSuccess());
+    }
+
+    // restart a follower
+    RaftPeerId followerId = cluster.getFollowers().get(0).getId();
+    LOG.info("Restart follower {}", followerId);
+    cluster.restartServer(followerId, false);
+
+    // write some more messages
+    for(int i = 0; i < 10; i++) {
+      Assert.assertTrue(client.send(message).isSuccess());
+    }
+    client.close();
+
+    final long leaderLastIndex = cluster.getLeader().getState().getLog().getLastEntryTermIndex().getIndex();
+    // make sure the restarted follower can catchup
+    final ServerState followerState = cluster.getRaftServerImpl(followerId).getState();
+    JavaUtils.attempt(() -> followerState.getLastAppliedIndex() >= leaderLastIndex,
+        10, 500, "follower catchup", LOG);
+
+    // make sure the restarted peer's log segments is correct
+    cluster.restartServer(followerId, false);
+    Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog()
+        .getLastEntryTermIndex().getIndex() >= leaderLastIndex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
index 978800d..9cc60a6 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
@@ -20,61 +20,67 @@ package org.apache.ratis.server;
 
 import com.codahale.metrics.Timer;
 import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.metrics.RatisMetricsRegistry;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
+import org.apache.ratis.server.storage.RaftStorageTestUtils;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.util.LogUtils;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import javax.management.ObjectName;
-import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 
-public class TestRaftLogMetrics {
+public class TestRaftLogMetrics extends BaseTest
+    implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
 
   {
     LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
-    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
 
   public static final int NUM_SERVERS = 3;
 
-  protected static final RaftProperties properties = new RaftProperties();
-
-  private final MiniRaftClusterWithSimulatedRpc cluster = MiniRaftClusterWithSimulatedRpc
-      .FACTORY.newCluster(NUM_SERVERS, getProperties());
-
-  public RaftProperties getProperties() {
-    return properties;
+  {
+    getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        MetricsStateMachine.class, StateMachine.class);
   }
 
-  @Before
-  public void setup() throws IOException {
-    Assert.assertNull(cluster.getLeader());
-    cluster.start();
-  }
+  static class MetricsStateMachine extends BaseStateMachine {
+    static MetricsStateMachine get(RaftServerImpl s) {
+      return (MetricsStateMachine)s.getStateMachine();
+    }
+
+    private final AtomicInteger flushCount = new AtomicInteger();
 
-  @After
-  public void tearDown() {
-    if (cluster != null) {
-      cluster.shutdown();
+    int getFlushCount() {
+      return flushCount.get();
     }
-  }
 
-  private String getLogFlushTimeMetric(String serverId) {
-    return new StringBuilder("org.apache.ratis.server.storage.RaftLogWorker.")
-        .append(serverId).append(".flush-time").toString();
+    @Override
+    public CompletableFuture<Void> flushStateMachineData(long index) {
+      flushCount.incrementAndGet();
+      return super.flushStateMachineData(index);
+    }
   }
 
   @Test
   public void testFlushMetric() throws Exception {
+    try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) {
+      cluster.start();
+      runTestFlushMetric(cluster);
+    }
+  }
+
+  static void runTestFlushMetric(MiniRaftCluster cluster) throws Exception {
     int numMsg = 2;
     final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMsg);
 
@@ -85,22 +91,21 @@ public class TestRaftLogMetrics {
     }
 
     for (RaftServerProxy rsp: cluster.getServers()) {
-      String flushTimeMetric = getLogFlushTimeMetric(rsp.getId().toString());
+      final String flushTimeMetric = RaftStorageTestUtils.getLogFlushTimeMetric(rsp.getId());
       Timer tm = RatisMetricsRegistry.getRegistry().getTimers().get(flushTimeMetric);
       Assert.assertNotNull(tm);
 
-      // Number of log entries expected = numMsg + 1 entry for start-log-segment
-      int numExpectedLogEntries = numMsg + 1;
+      final MetricsStateMachine stateMachine = MetricsStateMachine.get(rsp.getImpl(cluster.getGroupId()));
+      final int expectedFlush = stateMachine.getFlushCount();
 
-      Assert.assertEquals(numExpectedLogEntries, tm.getCount());
+      Assert.assertEquals(expectedFlush, tm.getCount());
       Assert.assertTrue(tm.getMeanRate() > 0);
 
       // Test jmx
       ObjectName oname = new ObjectName("metrics", "name", flushTimeMetric);
-      Assert.assertEquals(numExpectedLogEntries,
+      Assert.assertEquals(expectedFlush,
           ((Long) ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count"))
               .intValue());
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index e9651d6..246a9a2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -61,8 +61,6 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
 
   protected static final RaftProperties prop = new RaftProperties();
   
-  private static final ClientId clientId = ClientId.randomId();
-
   static final int STAGING_CATCHUP_GAP = 10;
   @BeforeClass
   public static void setup() {
@@ -416,17 +414,16 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
       cluster.start();
       RaftTestUtil.waitForLeader(cluster);
 
-      final RaftPeerId leaderId = cluster.getLeader().getId();
-      final RaftClient client = cluster.createClient(leaderId);
+      final RaftServerImpl leader = cluster.getLeader();
+      final RaftClient client = cluster.createClient(leader.getId());
       client.send(new SimpleMessage("m"));
 
-      final long committedIndex = cluster.getLeader().getState().getLog()
-          .getLastCommittedIndex();
+      final RaftLog leaderLog = leader.getState().getLog();
+      final long committedIndex = leaderLog.getLastCommittedIndex();
       final RaftConfiguration confBefore = cluster.getLeader().getRaftConf();
 
       // no real configuration change in the request
-      RaftClientReply reply = client.setConfiguration(cluster.getPeers()
-          .toArray(new RaftPeer[0]));
+      final RaftClientReply reply = client.setConfiguration(cluster.getPeers().toArray(RaftPeer.emptyArray()));
       Assert.assertTrue(reply.isSuccess());
       Assert.assertEquals(committedIndex, cluster.getLeader().getState()
           .getLog().getLastCommittedIndex());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index bcfaf01..827117e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -41,8 +41,7 @@ public class RaftServerTestUtil {
         3, sleepMs, "waitAndCheckNewConf", LOG);
   }
   private static void waitAndCheckNewConf(MiniRaftCluster cluster,
-      RaftPeer[] peers, Collection<String> deadPeers)
-      throws Exception {
+      RaftPeer[] peers, Collection<String> deadPeers) {
     LOG.info(cluster.printServers());
     Assert.assertNotNull(cluster.getLeader());
 
@@ -61,9 +60,11 @@ public class RaftServerTestUtil {
         numIncluded++;
         Assert.assertTrue(server.getRaftConf().isStable());
         Assert.assertTrue(server.getRaftConf().hasNoChange(peers));
-      } else {
-        Assert.assertFalse(server.getId() + " is still running: " + server,
-            server.isAlive());
+      } else if (server.isAlive()) {
+        // The server is successfully removed from the conf
+        // It may not be shutdown since it may not be able to talk to the new leader (who is not in its conf).
+        Assert.assertTrue(server.getRaftConf().isStable());
+        Assert.assertFalse(server.getRaftConf().containsInConf(server.getId()));
       }
     }
     Assert.assertEquals(peers.length, numIncluded + deadIncluded);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index ec635d0..cf3a490 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -21,6 +21,7 @@ import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
@@ -107,8 +108,8 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
     final RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
     final long seqNum = 111;
-    RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
-        callId, seqNum, new RaftTestUtil.SimpleMessage("message"));
+    final SimpleMessage message = new SimpleMessage("message");
+    final RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, callId, seqNum, message);
     RaftClientReply reply = rpc.sendRequest(r);
     Assert.assertFalse(reply.isSuccess());
     Assert.assertNotNull(reply.getStateMachineException());
@@ -131,8 +132,8 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
       }
       Assert.assertNotNull(
           RaftServerTestUtil.getRetryEntry(server, client.getId(), callId));
-      Assert.assertEquals(oldLastApplied + 1,
-          server.getState().getLastAppliedIndex());
+      final RaftLog log = server.getState().getLog();
+      RaftTestUtil.logEntriesContains(log, oldLastApplied + 1, log.getNextIndex(), message);
     }
 
     client.close();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
index a66cf70..e566700 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
@@ -97,7 +97,7 @@ public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster>
     RaftClientReply watchReply = client.sendWatch(
         logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED);
     watchReply.getCommitInfos().forEach(
-        val -> Assert.assertEquals(val.getCommitIndex(), logIndex));
+        val -> Assert.assertTrue(val.getCommitIndex() >= logIndex));
 
     RaftServerImpl secondFollower = cluster.getFollowers().get(1);
     // Second follower is blocked in apply transaction

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java
new file mode 100644
index 0000000..306e5e7
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.server.ServerRestartTests;
+
+public class TestServerRestartWithSimulatedRpc
+    extends ServerRestartTests<MiniRaftClusterWithSimulatedRpc>
+    implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index ad8308e..e681b66 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -18,6 +18,8 @@
 package org.apache.ratis.server.storage;
 
 import org.apache.log4j.Level;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.util.AutoCloseableLock;
@@ -30,6 +32,10 @@ public interface RaftStorageTestUtils {
     LogUtils.setLogLevel(RaftLogWorker.LOG, level);
   }
 
+  static String getLogFlushTimeMetric(RaftPeerId serverId) {
+    return RaftLogWorker.class.getName() + "." + serverId + ".flush-time";
+  }
+
   static void printLog(RaftLog log, Consumer<String> println) {
     if (log == null) {
       println.accept("log == null");
@@ -50,8 +56,7 @@ public interface RaftStorageTestUtils {
       b.append(i == committed? 'c': ' ');
       b.append(String.format("%3d: ", i));
       try {
-        final RaftProtos.LogEntryProto entry = log.get(i);
-        b.append(entry != null? entry.getLogEntryBodyCase(): null);
+        b.append(ServerProtoUtils.toLogEntryString(log.get(i)));
       } catch (RaftLogIOException e) {
         b.append(e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 7a326a3..0a5e38d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -33,7 +33,9 @@ import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.server.storage.RaftStorageDirectory;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -45,6 +47,8 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
 
 public abstract class RaftSnapshotBaseTest extends BaseTest {
   static {
@@ -56,25 +60,31 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
   static final Logger LOG = LoggerFactory.getLogger(RaftSnapshotBaseTest.class);
   private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10;
 
-  static File getSnapshotFile(MiniRaftCluster cluster, int i) {
+  static List<File> getSnapshotFiles(MiniRaftCluster cluster, long startIndex, long endIndex) {
     final RaftServerImpl leader = cluster.getLeader();
-    final SimpleStateMachine4Testing sm = SimpleStateMachine4Testing.get(leader);
-    return sm.getStateMachineStorage().getSnapshotFile(
-        leader.getState().getCurrentTerm(), i);
+    final SimpleStateMachineStorage storage = SimpleStateMachine4Testing.get(leader).getStateMachineStorage();
+    final long term = leader.getState().getCurrentTerm();
+    return LongStream.range(startIndex, endIndex)
+        .mapToObj(i -> storage.getSnapshotFile(term, i))
+        .collect(Collectors.toList());
   }
 
-  static void assertLeaderContent(MiniRaftCluster cluster)
-      throws InterruptedException {
+
+  static void assertLeaderContent(MiniRaftCluster cluster) throws Exception {
     final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
-    Assert.assertEquals(SNAPSHOT_TRIGGER_THRESHOLD * 2,
-        leader.getState().getLog().getLastCommittedIndex());
-    final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader).getContent();
+    final RaftLog leaderLog = leader.getState().getLog();
+    final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
+    final LogEntryProto e = leaderLog.get(lastIndex);
 
-    for (int i = 1; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
-      Assert.assertEquals(i+1, entries[i].getIndex());
-      Assert.assertArrayEquals(
-          new SimpleMessage("m" + i).getContent().toByteArray(),
-          entries[i].getStateMachineLogEntry().getLogData().toByteArray());
+    final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader).getContent();
+    long message = 0;
+    for (int i = 0; i < entries.length; i++) {
+      LOG.info("{}) {} {}", i, message, entries[i]);
+      if (entries[i].hasStateMachineLogEntry()) {
+        final SimpleMessage m = new SimpleMessage("m" + message++);
+        Assert.assertArrayEquals(m.getContent().toByteArray(),
+            entries[i].getStateMachineLogEntry().getLogData().toByteArray());
+      }
     }
   }
 
@@ -118,15 +128,12 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
       }
     }
 
+    final long nextIndex = cluster.getLeader().getState().getLog().getNextIndex();
+    LOG.info("nextIndex = {}", nextIndex);
     // wait for the snapshot to be done
-    final File snapshotFile = getSnapshotFile(cluster, i);
-
-    int retries = 0;
-    do {
-      Thread.sleep(1000);
-    } while (!snapshotFile.exists() && retries++ < 10);
-
-    Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists());
+    final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+    JavaUtils.attempt(() -> snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
+        10, 1000, "snapshotFile.exist", LOG);
 
     // restart the peer and check if it can correctly load snapshot
     cluster.restart(false);
@@ -138,6 +145,14 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
     }
   }
 
+  static boolean exists(File f) {
+    if (f.exists()) {
+      LOG.info("File exists: " + f);
+      return true;
+    }
+    return false;
+  }
+
   /**
    * Basic test for install snapshot: start a one node cluster and let it
    * generate a snapshot. Then delete the log and restart the node, and add more
@@ -145,7 +160,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
    */
   @Test
   public void testBasicInstallSnapshot() throws Exception {
-    List<LogPathAndIndex> logs;
+    final List<LogPathAndIndex> logs;
     try {
       RaftTestUtil.waitForLeader(cluster);
       final RaftPeerId leaderId = cluster.getLeader().getId();
@@ -161,15 +176,13 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
       // wait for the snapshot to be done
       RaftStorageDirectory storageDirectory = cluster.getLeader().getState()
           .getStorage().getStorageDir();
-      final File snapshotFile = getSnapshotFile(cluster, i);
-      logs = storageDirectory.getLogSegmentFiles();
-
-      int retries = 0;
-      do {
-        Thread.sleep(1000);
-      } while (!snapshotFile.exists() && retries++ < 10);
 
-      Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists());
+      final long nextIndex = cluster.getLeader().getState().getLog().getNextIndex();
+      LOG.info("nextIndex = {}", nextIndex);
+      final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+      JavaUtils.attempt(() -> snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
+          10, 1000, "snapshotFile.exist", LOG);
+      logs = storageDirectory.getLogSegmentFiles();
     } finally {
       cluster.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 9a7267b..313e713 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -20,13 +20,15 @@ package org.apache.ratis.statemachine;
 import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.io.MD5Hash;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
@@ -34,22 +36,29 @@ import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.LogInputStream;
 import org.apache.ratis.server.storage.LogOutputStream;
 import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.statemachine.impl.TransactionContextImpl;
-import org.apache.ratis.util.*;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.MD5FileUtil;
+import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A {@link StateMachine} implementation example that simply stores all the log
@@ -68,8 +77,8 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
     return (SimpleStateMachine4Testing)s.getStateMachine();
   }
 
-  private final List<LogEntryProto> list =
-      Collections.synchronizedList(new ArrayList<>());
+  private final SortedMap<Long, LogEntryProto> indexMap = Collections.synchronizedSortedMap(new TreeMap<>());
+  private final SortedMap<String, LogEntryProto> dataMap = Collections.synchronizedSortedMap(new TreeMap<>());
   private final Daemon checkpointer;
   private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
   private final RaftProperties properties = new RaftProperties();
@@ -119,14 +128,14 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
   public SimpleStateMachine4Testing() {
     checkpointer = new Daemon(() -> {
       while (running) {
-          if (list.get(list.size() - 1).getIndex() - endIndexLastCkpt >=
-              SNAPSHOT_THRESHOLD) {
-            endIndexLastCkpt = takeSnapshot();
-          }
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException ignored) {
-          }
+        if (indexMap.lastKey() - endIndexLastCkpt >= SNAPSHOT_THRESHOLD) {
+          endIndexLastCkpt = takeSnapshot();
+        }
+
+        try {
+          TimeUnit.SECONDS.sleep(1);
+        } catch(InterruptedException ignored) {
+        }
       }
     });
   }
@@ -139,6 +148,12 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
     return leaderElectionTimeoutInfo;
   }
 
+  private void put(LogEntryProto entry) {
+    final LogEntryProto previous = indexMap.put(entry.getIndex(), entry);
+    Preconditions.assertNull(previous, "previous");
+    dataMap.put(entry.getStateMachineLogEntry().getLogData().toStringUtf8(), entry);
+  }
+
   @Override
   public synchronized void initialize(RaftServer server, RaftGroupId groupId,
       RaftStorage raftStorage) throws IOException {
@@ -171,7 +186,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
-    list.add(entry);
+    put(entry);
     updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
     return CompletableFuture.completedFuture(
         new SimpleMessage(entry.getIndex() + " OK"));
@@ -192,7 +207,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
         termIndex.getIndex(), snapshotFile);
     try (LogOutputStream out = new LogOutputStream(snapshotFile, false,
         segmentMaxSize, preallocatedSize, bufferSize)) {
-      for (final LogEntryProto entry : list) {
+      for (final LogEntryProto entry : indexMap.values()) {
         if (entry.getIndex() > endIndex) {
           break;
         } else {
@@ -241,13 +256,13 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
           snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
         LogEntryProto entry;
         while ((entry = in.nextEntry()) != null) {
-          list.add(entry);
+          put(entry);
           updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
         }
       }
       Preconditions.assertTrue(
-          !list.isEmpty() && endIndex == list.get(list.size() - 1).getIndex(),
-          "endIndex=%s, list=%s", endIndex, list);
+          !indexMap.isEmpty() && endIndex == indexMap.lastKey(),
+          "endIndex=%s, indexMap=%s", endIndex, indexMap);
       this.endIndexLastCkpt = endIndex;
       setLastAppliedTermIndex(snapshot.getTermIndex());
       this.storage.loadLatestSnapshot();
@@ -264,18 +279,21 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
    */
   @Override
   public CompletableFuture<Message> query(Message request) {
-    final ByteString bytes = request.getContent();
+    final String string = request.getContent().toStringUtf8();
+    Exception exception;
     try {
-      final long index = bytes.isEmpty()? getLastAppliedTermIndex().getIndex()
-          : Long.parseLong(bytes.toStringUtf8());
-      LOG.info("query log index " + index);
-      final LogEntryProto entry = list.get(Math.toIntExact(index - 1));
-      return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString()));
+      LOG.info("query " + string);
+      final LogEntryProto entry = dataMap.get(string);
+      if (entry != null) {
+        return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString()));
+      }
+      exception = new IndexOutOfBoundsException("Log entry not found for query " + string);
     } catch (Exception e) {
       LOG.warn("Failed request " + request, e);
-      return JavaUtils.completeExceptionally(new StateMachineException(
-          "Failed request " + request, e));
+      exception = e;
     }
+    return JavaUtils.completeExceptionally(new StateMachineException(
+        "Failed request " + request, exception));
   }
 
   static final ByteString STATE_MACHINE_DATA = ByteString.copyFromUtf8("StateMachine Data");
@@ -314,7 +332,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
   }
 
   public LogEntryProto[] getContent() {
-    return list.toArray(new LogEntryProto[list.size()]);
+    return indexMap.values().toArray(new LogEntryProto[0]);
   }
 
   public void blockStartTransaction() {