You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/10/24 22:54:53 UTC
incubator-ratis git commit: r371
Repository: incubator-ratis
Updated Branches:
refs/heads/master 2272086a5 -> ce5f48c41
r371
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ce5f48c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ce5f48c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ce5f48c4
Branch: refs/heads/master
Commit: ce5f48c41e4666f96dfd1444845f1f9a1e0cf0bc
Parents: 2272086
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Wed Oct 24 23:58:46 2018 +0800
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Wed Oct 24 23:58:46 2018 +0800
----------------------------------------------------------------------
.../org/apache/ratis/TestRestartRaftPeer.java | 106 -----------
.../org/apache/ratis/grpc/TestRaftWithGrpc.java | 9 +-
.../ratis/grpc/TestServerRestartWithGrpc.java | 25 +++
.../ratis/netty/TestServerRestartWithNetty.java | 25 +++
.../java/org/apache/ratis/RaftAsyncTests.java | 6 +-
.../java/org/apache/ratis/RaftTestUtil.java | 18 +-
.../java/org/apache/ratis/RetryCacheTests.java | 19 +-
.../org/apache/ratis/WatchRequestTests.java | 188 ++++++++++---------
.../apache/ratis/server/ServerRestartTests.java | 110 +++++++++++
.../apache/ratis/server/TestRaftLogMetrics.java | 69 +++----
.../impl/RaftReconfigurationBaseTest.java | 13 +-
.../ratis/server/impl/RaftServerTestUtil.java | 11 +-
.../impl/RaftStateMachineExceptionTests.java | 9 +-
.../server/impl/StateMachineShutdownTests.java | 2 +-
.../TestServerRestartWithSimulatedRpc.java | 25 +++
.../server/storage/RaftStorageTestUtils.java | 9 +-
.../statemachine/RaftSnapshotBaseTest.java | 75 +++++---
.../SimpleStateMachine4Testing.java | 78 +++++---
18 files changed, 472 insertions(+), 325 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
deleted file mode 100644
index ccbbda0..0000000
--- a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis;
-
-import org.apache.log4j.Level;
-import org.apache.ratis.RaftTestUtil.SimpleMessage;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.examples.ParameterizedBaseTest;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.LogUtils;
-import org.apache.ratis.util.SizeInBytes;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
- * Test restarting raft peers.
- */
-@RunWith(Parameterized.class)
-public class TestRestartRaftPeer extends BaseTest {
- static {
- LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
- LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
- LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> data() throws IOException {
- RaftProperties prop = new RaftProperties();
- prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
- SimpleStateMachine4Testing.class, StateMachine.class);
- RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB"));
- return ParameterizedBaseTest.getMiniRaftClusters(prop, 3);
- }
-
- @Parameterized.Parameter
- public MiniRaftCluster cluster;
-
- @Test
- public void restartFollower() throws Exception {
- cluster.start();
- RaftTestUtil.waitForLeader(cluster);
- final RaftPeerId leaderId = cluster.getLeader().getId();
- final RaftClient client = cluster.createClient(leaderId);
-
- // write some messages
- final byte[] content = new byte[1024];
- Arrays.fill(content, (byte) 1);
- final SimpleMessage message = new SimpleMessage(new String(content));
- for (int i = 0; i < 10; i++) {
- Assert.assertTrue(client.send(message).isSuccess());
- }
-
- // restart a follower
- RaftPeerId followerId = cluster.getFollowers().get(0).getId();
- LOG.info("Restart follower {}", followerId);
- cluster.restartServer(followerId, false);
-
- // write some more messages
- for (int i = 0; i < 10; i++) {
- Assert.assertTrue(client.send(message).isSuccess());
- }
- client.close();
-
- // make sure the restarted follower can catchup
- boolean catchup = false;
- long lastAppliedIndex = 0;
- for (int i = 0; i < 10 && !catchup; i++) {
- Thread.sleep(500);
- lastAppliedIndex = cluster.getRaftServerImpl(followerId).getState().getLastAppliedIndex();
- catchup = lastAppliedIndex >= 20;
- }
- Assert.assertTrue("lastAppliedIndex=" + lastAppliedIndex, catchup);
-
- // make sure the restarted peer's log segments is correct
- cluster.restartServer(followerId, false);
- Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog()
- .getLastEntryTermIndex().getIndex() >= 20);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 7ae385d..d98be53 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -92,16 +92,17 @@ public class TestRaftWithGrpc
Assert.assertEquals(raftServer.getState().getLog().getNextIndex(), index);
if (!raftServer.isLeader()) {
TermIndex[] serverEntries = raftServer.getState().getLog().getEntries(0, Integer.MAX_VALUE);
- Arrays.equals(serverEntries, leaderEntries);
+ Assert.assertArrayEquals(serverEntries, leaderEntries);
}
});
// Wait for heartbeats from leader to be received by followers
- Thread.sleep(1000);
+ Thread.sleep(500);
RaftServerTestUtil.getLogAppenders(cluster.getLeader()).forEach(logAppender -> {
// FollowerInfo in the leader state should have updated next and match index.
- Assert.assertEquals(logAppender.getFollower().getMatchIndex(), index - 1);
- Assert.assertEquals(logAppender.getFollower().getNextIndex(), index);
+ final long followerMatchIndex = logAppender.getFollower().getMatchIndex();
+ Assert.assertTrue(followerMatchIndex >= index - 1);
+ Assert.assertEquals(followerMatchIndex + 1, logAppender.getFollower().getNextIndex());
});
}
cluster.shutdown();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java
new file mode 100644
index 0000000..682b3ba
--- /dev/null
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.server.ServerRestartTests;
+
+public class TestServerRestartWithGrpc
+ extends ServerRestartTests<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java
new file mode 100644
index 0000000..15dc688
--- /dev/null
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.server.ServerRestartTests;
+
+public class TestServerRestartWithNetty
+ extends ServerRestartTests<MiniRaftClusterWithNetty>
+ implements MiniRaftClusterWithNetty.FactoryGet {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index f79eb6b..c14515c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -192,7 +192,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
// submit some messages
final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>();
for (int i = 0; i < numMesssages; i++) {
- final String s = "m" + i;
+ final String s = "" + i;
LOG.info("sendAsync " + s);
futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage(s)));
}
@@ -218,12 +218,12 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
// test a failure case
testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index",
() -> client.sendStaleReadAsync(
- new RaftTestUtil.SimpleMessage("" + (numMesssages + 1)),
+ new RaftTestUtil.SimpleMessage("" + Long.MAX_VALUE),
followerCommitInfo.getCommitIndex(), follower),
StateMachineException.class, IndexOutOfBoundsException.class);
// test sendStaleReadAsync
- for (int i = 1; i < followerCommitInfo.getCommitIndex(); i++) {
+ for (int i = 0; i < numMesssages; i++) {
final int query = i;
LOG.info("sendStaleReadAsync, query=" + query);
final Message message = new RaftTestUtil.SimpleMessage("" + query);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 60629f9..5946a47 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -113,11 +113,14 @@ public interface RaftTestUtil {
return leader != null ? leader.getId().toString() : null;
}
- static boolean logEntriesContains(RaftLog log,
- SimpleMessage... expectedMessages) {
+ static boolean logEntriesContains(RaftLog log, SimpleMessage... expectedMessages) {
+ return logEntriesContains(log, 0L, Long.MAX_VALUE, expectedMessages);
+ }
+
+ static boolean logEntriesContains(RaftLog log, long startIndex, long endIndex, SimpleMessage... expectedMessages) {
int idxEntries = 0;
int idxExpected = 0;
- TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE);
+ TermIndex[] termIndices = log.getEntries(startIndex, endIndex);
while (idxEntries < termIndices.length
&& idxExpected < expectedMessages.length) {
try {
@@ -376,4 +379,13 @@ public interface RaftTestUtil {
}
}).start();
}
+
+ static void assertSameLog(RaftLog expected, RaftLog computed) throws Exception {
+ Assert.assertEquals(expected.getLastEntryTermIndex(), computed.getLastEntryTermIndex());
+ final long lastIndex = expected.getNextIndex() - 1;
+ Assert.assertEquals(expected.getLastEntryTermIndex().getIndex(), lastIndex);
+ for(long i = 0; i < lastIndex; i++) {
+ Assert.assertEquals(expected.get(i), computed.get(i));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 9fdb4f7..c962481 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -29,6 +29,8 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftLogIOException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -36,6 +38,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
+import java.util.stream.LongStream;
import static java.util.Arrays.asList;
@@ -110,10 +113,21 @@ public abstract class RetryCacheTests extends BaseTest {
Assert.assertEquals(2, RaftServerTestUtil.getRetryCacheSize(server));
Assert.assertNotNull(RaftServerTestUtil.getRetryEntry(server, clientId, callId));
// make sure there is only one log entry committed
- Assert.assertEquals(oldLastApplied + 1, server.getState().getLastAppliedIndex());
+ Assert.assertEquals(1, count(server.getState().getLog(), oldLastApplied + 1));
}
}
+ static int count(RaftLog log, long startIndex) throws RaftLogIOException {
+ final long nextIndex = log.getNextIndex();
+ int count = 0;
+ for(long i = startIndex; i < nextIndex; i++) {
+ if (log.get(i).hasStateMachineLogEntry()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
/**
* Test retry while the leader changes to another peer
*/
@@ -158,8 +172,7 @@ public abstract class RetryCacheTests extends BaseTest {
}
// check the new leader and make sure the retry did not get committed
- Assert.assertEquals(oldLastApplied + 3,
- cluster.getLeader().getState().getLastAppliedIndex());
+ Assert.assertEquals(0, count(cluster.getLeader().getState().getLog(), oldLastApplied + 1));
client.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 9ff27ad..d1cb7e0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -41,6 +41,8 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
@@ -71,7 +73,6 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
}
static class TestParameters {
- final long startLogIndex;
final int numMessages;
final RaftClient writeClient;
final RaftClient watchMajorityClient;
@@ -81,12 +82,10 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
final MiniRaftCluster cluster;
final Logger log;
- TestParameters(
- long startLogIndex, int numMessages, RaftClient writeClient,
+ TestParameters(int numMessages, RaftClient writeClient,
RaftClient watchMajorityClient, RaftClient watchAllClient,
RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient,
MiniRaftCluster cluster, Logger log) {
- this.startLogIndex = startLogIndex;
this.numMessages = numMessages;
this.writeClient = writeClient;
this.watchMajorityClient = watchMajorityClient;
@@ -97,9 +96,31 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
this.log = log;
}
+ void sendRequests(List<CompletableFuture<RaftClientReply>> replies,
+ List<CompletableFuture<WatchReplies>> watches) {
+ for(int i = 0; i < numMessages; i++) {
+ final String message = "m" + i;
+ log.info("SEND_REQUEST {}: message={}", i, message);
+ final CompletableFuture<RaftClientReply> replyFuture = writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message));
+ replies.add(replyFuture);
+ final CompletableFuture<WatchReplies> watchFuture = new CompletableFuture<>();
+ watches.add(watchFuture);
+ replyFuture.thenAccept(reply -> {
+ final long logIndex = reply.getLogIndex();
+ log.info("SEND_WATCH: message={}, logIndex={}", message, logIndex);
+ watchFuture.complete(new WatchReplies(logIndex,
+ watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY),
+ watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL),
+ watchMajorityCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED),
+ watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED)
+ ));
+ });
+ }
+ }
+
@Override
public String toString() {
- return "startLogIndex=" + startLogIndex + ", numMessages=" + numMessages;
+ return "numMessages=" + numMessages;
}
}
@@ -119,10 +140,9 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
final RaftClient watchAllCommittedClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) {
final int[] numMessages = {1, 10, 100};
for(int i = 0; i < 5; i++) {
- final long logIndex = getLogIndex(writeClient) + 1;
final int n = numMessages[ThreadLocalRandom.current().nextInt(numMessages.length)];
final TestParameters p = new TestParameters(
- logIndex, n, writeClient, watchMajorityClient, watchAllClient,
+ n, writeClient, watchMajorityClient, watchAllClient,
watchMajorityCommittedClient, watchAllCommittedClient, cluster, LOG);
LOG.info("{}) {}, {}", i, p, cluster.printServers());
testCase.apply(p);
@@ -131,18 +151,29 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
}
}
- static Void runTestWatchRequestAsync(TestParameters p) throws Exception {
- runTestWatchRequestAsync(p.startLogIndex, p.numMessages,
- p.writeClient, p.watchMajorityClient, p.watchAllClient,
- p.watchMajorityCommittedClient, p.watchAllCommittedClient, p.cluster, p.log);
- return null;
+ static class WatchReplies {
+ private final long logIndex;
+ private final CompletableFuture<RaftClientReply> majority;
+ private final CompletableFuture<RaftClientReply> all;
+ private final CompletableFuture<RaftClientReply> majorityCommitted;
+ private final CompletableFuture<RaftClientReply> allCommitted;
+
+ WatchReplies(long logIndex,
+ CompletableFuture<RaftClientReply> majority, CompletableFuture<RaftClientReply> all,
+ CompletableFuture<RaftClientReply> majorityCommitted, CompletableFuture<RaftClientReply> allCommitted) {
+ this.logIndex = logIndex;
+ this.majority = majority;
+ this.all = all;
+ this.majorityCommitted = majorityCommitted;
+ this.allCommitted = allCommitted;
+ }
}
- static void runTestWatchRequestAsync(
- long startLogIndex, int numMessages,
- RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient,
- RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient,
- MiniRaftCluster cluster, Logger LOG) throws Exception {
+ static Void runTestWatchRequestAsync(TestParameters p) throws Exception {
+ final Logger LOG = p.log;
+ final MiniRaftCluster cluster = p.cluster;
+ final int numMessages = p.numMessages;
+
// blockStartTransaction of the leader so that no transaction can be committed MAJORITY
final RaftServerImpl leader = cluster.getLeader();
LOG.info("block leader {}", leader.getId());
@@ -156,52 +187,35 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
// send a message
final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
- final List<CompletableFuture<RaftClientReply>> watchMajoritys = new ArrayList<>();
- final List<CompletableFuture<RaftClientReply>> watchAlls = new ArrayList<>();
- final List<CompletableFuture<RaftClientReply>> watchMajorityCommitteds = new ArrayList<>();
- final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new ArrayList<>();
+ final List<CompletableFuture<WatchReplies>> watches = new ArrayList<>();
- for(int i = 0; i < numMessages; i++) {
- final long logIndex = startLogIndex + i;
- final String message = "m" + logIndex;
- LOG.info("SEND_REQUEST {}: logIndex={}, message={}", i, logIndex, message);
- replies.add(writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message)));
- watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY));
- watchAlls.add(watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL));
- watchMajorityCommitteds.add(watchMajorityCommittedClient.sendWatchAsync(
- logIndex, ReplicationLevel.MAJORITY_COMMITTED));
- watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED));
- }
+ p.sendRequests(replies, watches);
Assert.assertEquals(numMessages, replies.size());
- Assert.assertEquals(numMessages, watchMajoritys.size());
- Assert.assertEquals(numMessages, watchAlls.size());
- Assert.assertEquals(numMessages, watchMajorityCommitteds.size());
- Assert.assertEquals(numMessages, watchAllCommitteds.size());
+ Assert.assertEquals(numMessages, watches.size());
// since leader is blocked, nothing can be done.
TimeUnit.SECONDS.sleep(1);
assertNotDone(replies);
- assertNotDone(watchMajoritys);
- assertNotDone(watchAlls);
- assertNotDone(watchMajorityCommitteds);
- assertNotDone(watchAllCommitteds);
+ assertNotDone(watches);
// unblock leader so that the transaction can be committed.
SimpleStateMachine4Testing.get(leader).unblockStartTransaction();
LOG.info("unblock leader {}", leader.getId());
for(int i = 0; i < numMessages; i++) {
- final long logIndex = startLogIndex + i;
- LOG.info("UNBLOCK_LEADER {}: logIndex={}", i, logIndex);
final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+ final long logIndex = reply.getLogIndex();
+ LOG.info("{}: receive reply for logIndex={}", i, logIndex);
Assert.assertTrue(reply.isSuccess());
- Assert.assertEquals(logIndex, reply.getLogIndex());
- final RaftClientReply watchMajorityReply = watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+
+ final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+ Assert.assertEquals(logIndex, watchReplies.logIndex);
+ final RaftClientReply watchMajorityReply = watchReplies.majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply);
- Assert.assertTrue(watchMajoritys.get(i).get().isSuccess());
+ Assert.assertTrue(watchMajorityReply.isSuccess());
final RaftClientReply watchMajorityCommittedReply
- = watchMajorityCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+ = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
LOG.info("watchMajorityCommittedReply({}) = ", logIndex, watchMajorityCommittedReply);
Assert.assertTrue(watchMajorityCommittedReply.isSuccess());
{ // check commit infos
@@ -219,22 +233,25 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
}
}
+ Assert.assertEquals(numMessages, watches.size());
+
// but not replicated/committed to all.
TimeUnit.SECONDS.sleep(1);
- assertNotDone(watchAlls);
- assertNotDone(watchAllCommitteds);
+ assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.all));
+ assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.allCommitted));
// unblock follower so that the transaction can be replicated and committed to all.
LOG.info("unblock follower {}", blockedFollower.getId());
SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData();
for(int i = 0; i < numMessages; i++) {
- final long logIndex = startLogIndex + i;
+ final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+ final long logIndex = watchReplies.logIndex;
LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex);
- final RaftClientReply watchAllReply = watchAlls.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+ final RaftClientReply watchAllReply = watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply);
Assert.assertTrue(watchAllReply.isSuccess());
- final RaftClientReply watchAllCommittedReply = watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+ final RaftClientReply watchAllCommittedReply = watchReplies.allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
LOG.info("watchAllCommittedReply({}) = {}", logIndex, watchAllCommittedReply);
Assert.assertTrue(watchAllCommittedReply.isSuccess());
{ // check commit infos
@@ -243,9 +260,14 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
commitInfos.forEach(info -> Assert.assertTrue(logIndex <= info.getCommitIndex()));
}
}
+ return null;
}
static <T> void assertNotDone(List<CompletableFuture<T>> futures) {
+ assertNotDone(futures.stream());
+ }
+
+ static <T> void assertNotDone(Stream<CompletableFuture<T>> futures) {
futures.forEach(f -> {
if (f.isDone()) {
try {
@@ -267,65 +289,44 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
}
static Void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws Exception {
- runTestWatchRequestAsyncChangeLeader(p.startLogIndex, p.numMessages,
- p.writeClient, p.watchMajorityClient, p.watchAllClient,
- p.watchMajorityCommittedClient, p.watchAllCommittedClient, p.cluster, p.log);
- return null;
- }
+ final Logger LOG = p.log;
+ final MiniRaftCluster cluster = p.cluster;
+ final int numMessages = p.numMessages;
- static void runTestWatchRequestAsyncChangeLeader(
- long startLogIndex, int numMessages,
- RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient,
- RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient,
- MiniRaftCluster cluster, Logger LOG) throws Exception {
// blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED
final List<RaftServerImpl> followers = cluster.getFollowers();
final RaftServerImpl blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
LOG.info("block follower {}", blockedFollower.getId());
SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData();
- // send a message
final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
- final List<CompletableFuture<RaftClientReply>> watchMajoritys = new ArrayList<>();
- final List<CompletableFuture<RaftClientReply>> watchAlls = new ArrayList<>();
- final List<CompletableFuture<RaftClientReply>> watchMajorityCommitteds = new ArrayList<>();
- final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new ArrayList<>();
+ final List<CompletableFuture<WatchReplies>> watches = new ArrayList<>();
- for(int i = 0; i < numMessages; i++) {
- final long logIndex = startLogIndex + i;
- final String message = "m" + logIndex;
- LOG.info("SEND_REQUEST {}: logIndex={}, message={}", i, logIndex, message);
- replies.add(writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message)));
- watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY));
- watchAlls.add(watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL));
- watchMajorityCommitteds.add(
- watchMajorityCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED));
- watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED));
- }
+ p.sendRequests(replies, watches);
Assert.assertEquals(numMessages, replies.size());
- Assert.assertEquals(numMessages, watchMajoritys.size());
- Assert.assertEquals(numMessages, watchAlls.size());
- Assert.assertEquals(numMessages, watchMajorityCommitteds.size());
- Assert.assertEquals(numMessages, watchAllCommitteds.size());
+ Assert.assertEquals(numMessages, watches.size());
// since only one follower is blocked, requests can be committed MAJORITY but neither ALL nor ALL_COMMITTED.
for(int i = 0; i < numMessages; i++) {
- final long logIndex = startLogIndex + i;
- LOG.info("UNBLOCK_F1 {}: logIndex={}", i, logIndex);
final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+ final long logIndex = reply.getLogIndex();
+ LOG.info("UNBLOCK_F1 {}: reply logIndex={}", i, logIndex);
Assert.assertTrue(reply.isSuccess());
- Assert.assertEquals(logIndex, reply.getLogIndex());
- final RaftClientReply watchMajorityReply = watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+
+ final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+ Assert.assertEquals(logIndex, watchReplies.logIndex);
+ final RaftClientReply watchMajorityReply = watchReplies.majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply);
- Assert.assertTrue(watchMajoritys.get(i).get().isSuccess());
+ Assert.assertTrue(watchMajorityReply.isSuccess());
final RaftClientReply watchMajorityCommittedReply
- = watchMajorityCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+ = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
LOG.info("watchMajorityCommittedReply({}) = ", logIndex, watchMajorityCommittedReply);
Assert.assertTrue(watchMajorityCommittedReply.isSuccess());
{ // check commit infos
final Collection<CommitInfoProto> commitInfos = watchMajorityCommittedReply.getCommitInfos();
+ LOG.info("commitInfos=" + commitInfos);
Assert.assertEquals(NUM_SERVERS, commitInfos.size());
// One follower has not committed, so min must be less than logIndex
@@ -339,8 +340,8 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
}
}
TimeUnit.SECONDS.sleep(1);
- assertNotDone(watchAlls);
- assertNotDone(watchAllCommitteds);
+ assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.all));
+ assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.allCommitted));
// Now change leader
RaftTestUtil.changeLeader(cluster, cluster.getLeader().getId());
@@ -349,13 +350,14 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData();
LOG.info("unblock follower {}", blockedFollower.getId());
for(int i = 0; i < numMessages; i++) {
- final long logIndex = startLogIndex + i;
+ final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+ final long logIndex = watchReplies.logIndex;
LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex);
- final RaftClientReply watchAllReply = watchAlls.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+ final RaftClientReply watchAllReply = watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply);
Assert.assertTrue(watchAllReply.isSuccess());
- final RaftClientReply watchAllCommittedReply = watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+ final RaftClientReply watchAllCommittedReply = watchReplies.allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
LOG.info("watchAllCommittedReply({}) = {}", logIndex, watchAllCommittedReply);
Assert.assertTrue(watchAllCommittedReply.isSuccess());
{ // check commit infos
@@ -364,6 +366,6 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
commitInfos.forEach(info -> Assert.assertTrue(logIndex <= info.getCommitIndex()));
}
}
+ return null;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
new file mode 100644
index 0000000..5353caa
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerState;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test restarting raft peers.
+ */
+public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
+ static {
+ LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+ LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
+ }
+
+ static final int NUM_SERVERS = 3;
+
+ @Before
+ public void setup() {
+ final RaftProperties prop = getProperties();
+ prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ SimpleStateMachine4Testing.class, StateMachine.class);
+ RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB"));
+ }
+
+ @Test
+ public void testRestartFollower() throws Exception {
+ try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) {
+ runTestRestartFollower(cluster, LOG);
+ }
+ }
+
+ static void runTestRestartFollower(MiniRaftCluster cluster, Logger LOG) throws Exception {
+ cluster.start();
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+ final RaftClient client = cluster.createClient(leaderId);
+
+ // write some messages
+ final byte[] content = new byte[1024];
+ Arrays.fill(content, (byte)1);
+ final SimpleMessage message = new SimpleMessage(new String(content));
+ for(int i = 0; i < 10; i++) {
+ Assert.assertTrue(client.send(message).isSuccess());
+ }
+
+ // restart a follower
+ RaftPeerId followerId = cluster.getFollowers().get(0).getId();
+ LOG.info("Restart follower {}", followerId);
+ cluster.restartServer(followerId, false);
+
+ // write some more messages
+ for(int i = 0; i < 10; i++) {
+ Assert.assertTrue(client.send(message).isSuccess());
+ }
+ client.close();
+
+ final long leaderLastIndex = cluster.getLeader().getState().getLog().getLastEntryTermIndex().getIndex();
+ // make sure the restarted follower can catchup
+ final ServerState followerState = cluster.getRaftServerImpl(followerId).getState();
+ JavaUtils.attempt(() -> followerState.getLastAppliedIndex() >= leaderLastIndex,
+ 10, 500, "follower catchup", LOG);
+
+ // make sure the restarted peer's log segments is correct
+ cluster.restartServer(followerId, false);
+ Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog()
+ .getLastEntryTermIndex().getIndex() >= leaderLastIndex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
index 978800d..9cc60a6 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
@@ -20,61 +20,67 @@ package org.apache.ratis.server;
import com.codahale.metrics.Timer;
import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.RatisMetricsRegistry;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
+import org.apache.ratis.server.storage.RaftStorageTestUtils;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.LogUtils;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import javax.management.ObjectName;
-import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
-public class TestRaftLogMetrics {
+public class TestRaftLogMetrics extends BaseTest
+ implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
{
LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
- LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
}
public static final int NUM_SERVERS = 3;
- protected static final RaftProperties properties = new RaftProperties();
-
- private final MiniRaftClusterWithSimulatedRpc cluster = MiniRaftClusterWithSimulatedRpc
- .FACTORY.newCluster(NUM_SERVERS, getProperties());
-
- public RaftProperties getProperties() {
- return properties;
+ {
+ getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ MetricsStateMachine.class, StateMachine.class);
}
- @Before
- public void setup() throws IOException {
- Assert.assertNull(cluster.getLeader());
- cluster.start();
- }
+ static class MetricsStateMachine extends BaseStateMachine {
+ static MetricsStateMachine get(RaftServerImpl s) {
+ return (MetricsStateMachine)s.getStateMachine();
+ }
+
+ private final AtomicInteger flushCount = new AtomicInteger();
- @After
- public void tearDown() {
- if (cluster != null) {
- cluster.shutdown();
+ int getFlushCount() {
+ return flushCount.get();
}
- }
- private String getLogFlushTimeMetric(String serverId) {
- return new StringBuilder("org.apache.ratis.server.storage.RaftLogWorker.")
- .append(serverId).append(".flush-time").toString();
+ @Override
+ public CompletableFuture<Void> flushStateMachineData(long index) {
+ flushCount.incrementAndGet();
+ return super.flushStateMachineData(index);
+ }
}
@Test
public void testFlushMetric() throws Exception {
+ try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) {
+ cluster.start();
+ runTestFlushMetric(cluster);
+ }
+ }
+
+ static void runTestFlushMetric(MiniRaftCluster cluster) throws Exception {
int numMsg = 2;
final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMsg);
@@ -85,22 +91,21 @@ public class TestRaftLogMetrics {
}
for (RaftServerProxy rsp: cluster.getServers()) {
- String flushTimeMetric = getLogFlushTimeMetric(rsp.getId().toString());
+ final String flushTimeMetric = RaftStorageTestUtils.getLogFlushTimeMetric(rsp.getId());
Timer tm = RatisMetricsRegistry.getRegistry().getTimers().get(flushTimeMetric);
Assert.assertNotNull(tm);
- // Number of log entries expected = numMsg + 1 entry for start-log-segment
- int numExpectedLogEntries = numMsg + 1;
+ final MetricsStateMachine stateMachine = MetricsStateMachine.get(rsp.getImpl(cluster.getGroupId()));
+ final int expectedFlush = stateMachine.getFlushCount();
- Assert.assertEquals(numExpectedLogEntries, tm.getCount());
+ Assert.assertEquals(expectedFlush, tm.getCount());
Assert.assertTrue(tm.getMeanRate() > 0);
// Test jmx
ObjectName oname = new ObjectName("metrics", "name", flushTimeMetric);
- Assert.assertEquals(numExpectedLogEntries,
+ Assert.assertEquals(expectedFlush,
((Long) ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count"))
.intValue());
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index e9651d6..246a9a2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -61,8 +61,6 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
protected static final RaftProperties prop = new RaftProperties();
- private static final ClientId clientId = ClientId.randomId();
-
static final int STAGING_CATCHUP_GAP = 10;
@BeforeClass
public static void setup() {
@@ -416,17 +414,16 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
cluster.start();
RaftTestUtil.waitForLeader(cluster);
- final RaftPeerId leaderId = cluster.getLeader().getId();
- final RaftClient client = cluster.createClient(leaderId);
+ final RaftServerImpl leader = cluster.getLeader();
+ final RaftClient client = cluster.createClient(leader.getId());
client.send(new SimpleMessage("m"));
- final long committedIndex = cluster.getLeader().getState().getLog()
- .getLastCommittedIndex();
+ final RaftLog leaderLog = leader.getState().getLog();
+ final long committedIndex = leaderLog.getLastCommittedIndex();
final RaftConfiguration confBefore = cluster.getLeader().getRaftConf();
// no real configuration change in the request
- RaftClientReply reply = client.setConfiguration(cluster.getPeers()
- .toArray(new RaftPeer[0]));
+ final RaftClientReply reply = client.setConfiguration(cluster.getPeers().toArray(RaftPeer.emptyArray()));
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(committedIndex, cluster.getLeader().getState()
.getLog().getLastCommittedIndex());
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index bcfaf01..827117e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -41,8 +41,7 @@ public class RaftServerTestUtil {
3, sleepMs, "waitAndCheckNewConf", LOG);
}
private static void waitAndCheckNewConf(MiniRaftCluster cluster,
- RaftPeer[] peers, Collection<String> deadPeers)
- throws Exception {
+ RaftPeer[] peers, Collection<String> deadPeers) {
LOG.info(cluster.printServers());
Assert.assertNotNull(cluster.getLeader());
@@ -61,9 +60,11 @@ public class RaftServerTestUtil {
numIncluded++;
Assert.assertTrue(server.getRaftConf().isStable());
Assert.assertTrue(server.getRaftConf().hasNoChange(peers));
- } else {
- Assert.assertFalse(server.getId() + " is still running: " + server,
- server.isAlive());
+ } else if (server.isAlive()) {
+ // The server is successfully removed from the conf
+ // It may not be shutdown since it may not be able to talk to the new leader (who is not in its conf).
+ Assert.assertTrue(server.getRaftConf().isStable());
+ Assert.assertFalse(server.getRaftConf().containsInConf(server.getId()));
}
}
Assert.assertEquals(peers.length, numIncluded + deadIncluded);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index ec635d0..cf3a490 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -21,6 +21,7 @@ import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
@@ -107,8 +108,8 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
final RaftClientRpc rpc = client.getClientRpc();
final long callId = 999;
final long seqNum = 111;
- RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
- callId, seqNum, new RaftTestUtil.SimpleMessage("message"));
+ final SimpleMessage message = new SimpleMessage("message");
+ final RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, callId, seqNum, message);
RaftClientReply reply = rpc.sendRequest(r);
Assert.assertFalse(reply.isSuccess());
Assert.assertNotNull(reply.getStateMachineException());
@@ -131,8 +132,8 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
}
Assert.assertNotNull(
RaftServerTestUtil.getRetryEntry(server, client.getId(), callId));
- Assert.assertEquals(oldLastApplied + 1,
- server.getState().getLastAppliedIndex());
+ final RaftLog log = server.getState().getLog();
+ RaftTestUtil.logEntriesContains(log, oldLastApplied + 1, log.getNextIndex(), message);
}
client.close();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
index a66cf70..e566700 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
@@ -97,7 +97,7 @@ public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster>
RaftClientReply watchReply = client.sendWatch(
logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED);
watchReply.getCommitInfos().forEach(
- val -> Assert.assertEquals(val.getCommitIndex(), logIndex));
+ val -> Assert.assertTrue(val.getCommitIndex() >= logIndex));
RaftServerImpl secondFollower = cluster.getFollowers().get(1);
// Second follower is blocked in apply transaction
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java
new file mode 100644
index 0000000..306e5e7
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.server.ServerRestartTests;
+
+public class TestServerRestartWithSimulatedRpc
+ extends ServerRestartTests<MiniRaftClusterWithSimulatedRpc>
+ implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index ad8308e..e681b66 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -18,6 +18,8 @@
package org.apache.ratis.server.storage;
import org.apache.log4j.Level;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.util.AutoCloseableLock;
@@ -30,6 +32,10 @@ public interface RaftStorageTestUtils {
LogUtils.setLogLevel(RaftLogWorker.LOG, level);
}
+ static String getLogFlushTimeMetric(RaftPeerId serverId) {
+ return RaftLogWorker.class.getName() + "." + serverId + ".flush-time";
+ }
+
static void printLog(RaftLog log, Consumer<String> println) {
if (log == null) {
println.accept("log == null");
@@ -50,8 +56,7 @@ public interface RaftStorageTestUtils {
b.append(i == committed? 'c': ' ');
b.append(String.format("%3d: ", i));
try {
- final RaftProtos.LogEntryProto entry = log.get(i);
- b.append(entry != null? entry.getLogEntryBodyCase(): null);
+ b.append(ServerProtoUtils.toLogEntryString(log.get(i)));
} catch (RaftLogIOException e) {
b.append(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 7a326a3..0a5e38d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -33,7 +33,9 @@ import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.junit.After;
import org.junit.Assert;
@@ -45,6 +47,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
public abstract class RaftSnapshotBaseTest extends BaseTest {
static {
@@ -56,25 +60,31 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
static final Logger LOG = LoggerFactory.getLogger(RaftSnapshotBaseTest.class);
private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10;
- static File getSnapshotFile(MiniRaftCluster cluster, int i) {
+ static List<File> getSnapshotFiles(MiniRaftCluster cluster, long startIndex, long endIndex) {
final RaftServerImpl leader = cluster.getLeader();
- final SimpleStateMachine4Testing sm = SimpleStateMachine4Testing.get(leader);
- return sm.getStateMachineStorage().getSnapshotFile(
- leader.getState().getCurrentTerm(), i);
+ final SimpleStateMachineStorage storage = SimpleStateMachine4Testing.get(leader).getStateMachineStorage();
+ final long term = leader.getState().getCurrentTerm();
+ return LongStream.range(startIndex, endIndex)
+ .mapToObj(i -> storage.getSnapshotFile(term, i))
+ .collect(Collectors.toList());
}
- static void assertLeaderContent(MiniRaftCluster cluster)
- throws InterruptedException {
+
+ static void assertLeaderContent(MiniRaftCluster cluster) throws Exception {
final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
- Assert.assertEquals(SNAPSHOT_TRIGGER_THRESHOLD * 2,
- leader.getState().getLog().getLastCommittedIndex());
- final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader).getContent();
+ final RaftLog leaderLog = leader.getState().getLog();
+ final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
+ final LogEntryProto e = leaderLog.get(lastIndex);
- for (int i = 1; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
- Assert.assertEquals(i+1, entries[i].getIndex());
- Assert.assertArrayEquals(
- new SimpleMessage("m" + i).getContent().toByteArray(),
- entries[i].getStateMachineLogEntry().getLogData().toByteArray());
+ final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader).getContent();
+ long message = 0;
+ for (int i = 0; i < entries.length; i++) {
+ LOG.info("{}) {} {}", i, message, entries[i]);
+ if (entries[i].hasStateMachineLogEntry()) {
+ final SimpleMessage m = new SimpleMessage("m" + message++);
+ Assert.assertArrayEquals(m.getContent().toByteArray(),
+ entries[i].getStateMachineLogEntry().getLogData().toByteArray());
+ }
}
}
@@ -118,15 +128,12 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
}
}
+ final long nextIndex = cluster.getLeader().getState().getLog().getNextIndex();
+ LOG.info("nextIndex = {}", nextIndex);
// wait for the snapshot to be done
- final File snapshotFile = getSnapshotFile(cluster, i);
-
- int retries = 0;
- do {
- Thread.sleep(1000);
- } while (!snapshotFile.exists() && retries++ < 10);
-
- Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists());
+ final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+ JavaUtils.attempt(() -> snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
+ 10, 1000, "snapshotFile.exist", LOG);
// restart the peer and check if it can correctly load snapshot
cluster.restart(false);
@@ -138,6 +145,14 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
}
}
+ static boolean exists(File f) {
+ if (f.exists()) {
+ LOG.info("File exists: " + f);
+ return true;
+ }
+ return false;
+ }
+
/**
* Basic test for install snapshot: start a one node cluster and let it
* generate a snapshot. Then delete the log and restart the node, and add more
@@ -145,7 +160,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
*/
@Test
public void testBasicInstallSnapshot() throws Exception {
- List<LogPathAndIndex> logs;
+ final List<LogPathAndIndex> logs;
try {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();
@@ -161,15 +176,13 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
// wait for the snapshot to be done
RaftStorageDirectory storageDirectory = cluster.getLeader().getState()
.getStorage().getStorageDir();
- final File snapshotFile = getSnapshotFile(cluster, i);
- logs = storageDirectory.getLogSegmentFiles();
-
- int retries = 0;
- do {
- Thread.sleep(1000);
- } while (!snapshotFile.exists() && retries++ < 10);
- Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists());
+ final long nextIndex = cluster.getLeader().getState().getLog().getNextIndex();
+ LOG.info("nextIndex = {}", nextIndex);
+ final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+ JavaUtils.attempt(() -> snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
+ 10, 1000, "snapshotFile.exist", LOG);
+ logs = storageDirectory.getLogSegmentFiles();
} finally {
cluster.shutdown();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce5f48c4/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 9a7267b..313e713 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -20,13 +20,15 @@ package org.apache.ratis.statemachine;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.io.MD5Hash;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
@@ -34,22 +36,29 @@ import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.LogInputStream;
import org.apache.ratis.server.storage.LogOutputStream;
import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.statemachine.impl.TransactionContextImpl;
-import org.apache.ratis.util.*;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.MD5FileUtil;
+import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.util.*;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
/**
* A {@link StateMachine} implementation example that simply stores all the log
@@ -68,8 +77,8 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
return (SimpleStateMachine4Testing)s.getStateMachine();
}
- private final List<LogEntryProto> list =
- Collections.synchronizedList(new ArrayList<>());
+ private final SortedMap<Long, LogEntryProto> indexMap = Collections.synchronizedSortedMap(new TreeMap<>());
+ private final SortedMap<String, LogEntryProto> dataMap = Collections.synchronizedSortedMap(new TreeMap<>());
private final Daemon checkpointer;
private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
private final RaftProperties properties = new RaftProperties();
@@ -119,14 +128,14 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
public SimpleStateMachine4Testing() {
checkpointer = new Daemon(() -> {
while (running) {
- if (list.get(list.size() - 1).getIndex() - endIndexLastCkpt >=
- SNAPSHOT_THRESHOLD) {
- endIndexLastCkpt = takeSnapshot();
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignored) {
- }
+ if (indexMap.lastKey() - endIndexLastCkpt >= SNAPSHOT_THRESHOLD) {
+ endIndexLastCkpt = takeSnapshot();
+ }
+
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch(InterruptedException ignored) {
+ }
}
});
}
@@ -139,6 +148,12 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
return leaderElectionTimeoutInfo;
}
+ private void put(LogEntryProto entry) {
+ final LogEntryProto previous = indexMap.put(entry.getIndex(), entry);
+ Preconditions.assertNull(previous, "previous");
+ dataMap.put(entry.getStateMachineLogEntry().getLogData().toStringUtf8(), entry);
+ }
+
@Override
public synchronized void initialize(RaftServer server, RaftGroupId groupId,
RaftStorage raftStorage) throws IOException {
@@ -171,7 +186,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
- list.add(entry);
+ put(entry);
updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
return CompletableFuture.completedFuture(
new SimpleMessage(entry.getIndex() + " OK"));
@@ -192,7 +207,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
termIndex.getIndex(), snapshotFile);
try (LogOutputStream out = new LogOutputStream(snapshotFile, false,
segmentMaxSize, preallocatedSize, bufferSize)) {
- for (final LogEntryProto entry : list) {
+ for (final LogEntryProto entry : indexMap.values()) {
if (entry.getIndex() > endIndex) {
break;
} else {
@@ -241,13 +256,13 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
LogEntryProto entry;
while ((entry = in.nextEntry()) != null) {
- list.add(entry);
+ put(entry);
updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
}
}
Preconditions.assertTrue(
- !list.isEmpty() && endIndex == list.get(list.size() - 1).getIndex(),
- "endIndex=%s, list=%s", endIndex, list);
+ !indexMap.isEmpty() && endIndex == indexMap.lastKey(),
+ "endIndex=%s, indexMap=%s", endIndex, indexMap);
this.endIndexLastCkpt = endIndex;
setLastAppliedTermIndex(snapshot.getTermIndex());
this.storage.loadLatestSnapshot();
@@ -264,18 +279,21 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
*/
@Override
public CompletableFuture<Message> query(Message request) {
- final ByteString bytes = request.getContent();
+ final String string = request.getContent().toStringUtf8();
+ Exception exception;
try {
- final long index = bytes.isEmpty()? getLastAppliedTermIndex().getIndex()
- : Long.parseLong(bytes.toStringUtf8());
- LOG.info("query log index " + index);
- final LogEntryProto entry = list.get(Math.toIntExact(index - 1));
- return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString()));
+ LOG.info("query " + string);
+ final LogEntryProto entry = dataMap.get(string);
+ if (entry != null) {
+ return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString()));
+ }
+ exception = new IndexOutOfBoundsException("Log entry not found for query " + string);
} catch (Exception e) {
LOG.warn("Failed request " + request, e);
- return JavaUtils.completeExceptionally(new StateMachineException(
- "Failed request " + request, e));
+ exception = e;
}
+ return JavaUtils.completeExceptionally(new StateMachineException(
+ "Failed request " + request, exception));
}
static final ByteString STATE_MACHINE_DATA = ByteString.copyFromUtf8("StateMachine Data");
@@ -314,7 +332,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
}
public LogEntryProto[] getContent() {
- return list.toArray(new LogEntryProto[list.size()]);
+ return indexMap.values().toArray(new LogEntryProto[0]);
}
public void blockStartTransaction() {