You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/12/01 20:50:19 UTC
incubator-ratis git commit: RATIS-438. RaftBasicTests.testWithLoad
may fail.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 959d493c0 -> 609773e03
RATIS-438. RaftBasicTests.testWithLoad may fail.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/609773e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/609773e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/609773e0
Branch: refs/heads/master
Commit: 609773e03c96f733829a93cc5bbcc3febd25408d
Parents: 959d493
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Sat Dec 1 12:50:04 2018 -0800
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Sat Dec 1 12:50:04 2018 -0800
----------------------------------------------------------------------
.../java/org/apache/ratis/util/JavaUtils.java | 19 +-
.../java/org/apache/ratis/RaftAsyncTests.java | 186 +++++++++----------
.../java/org/apache/ratis/RaftBasicTests.java | 92 +++++----
.../java/org/apache/ratis/RaftTestUtil.java | 10 +-
.../SimpleStateMachine4Testing.java | 6 +-
.../ratis/grpc/TestRaftAsyncWithGrpc.java | 2 +-
.../org/apache/ratis/grpc/TestRaftWithGrpc.java | 42 ++---
7 files changed, 187 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index f3b0a0d..769e12f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -72,6 +72,21 @@ public interface JavaUtils {
return trace[3];
}
+ static <T extends Throwable> void runAsUnchecked(CheckedRunnable<T> runnable) {
+ runAsUnchecked(runnable::run, RuntimeException::new);
+ }
+
+ static <THROWABLE extends Throwable> void runAsUnchecked(
+ CheckedRunnable<THROWABLE> runnable, Function<THROWABLE, ? extends RuntimeException> converter) {
+ try {
+ runnable.run();
+ } catch(RuntimeException | Error e) {
+ throw e;
+ } catch(Throwable t) {
+ throw converter.apply(cast(t));
+ }
+ }
+
/**
* Invoke {@link Callable#call()} and, if there any,
* wrap the checked exception by {@link RuntimeException}.
@@ -88,9 +103,7 @@ public interface JavaUtils {
} catch(RuntimeException | Error e) {
throw e;
} catch(Throwable t) {
- @SuppressWarnings("unchecked")
- final THROWABLE casted = (THROWABLE)t;
- throw converter.apply(casted);
+ throw converter.apply(cast(t));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index e459e7c..0719976 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -18,26 +18,31 @@
package org.apache.ratis;
import org.apache.log4j.Level;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.*;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RaftRetryFailureException;
+import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
@@ -45,22 +50,23 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
- static {
+ {
LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
}
public static final int NUM_SERVERS = 3;
- @Before
- public void setup() {
+ {
getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
SimpleStateMachine4Testing.class, StateMachine.class);
}
@@ -91,42 +97,43 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
@Test
public void testRequestAsyncWithRetryPolicy() throws Exception {
- LOG.info("Running testWatchRequestsWithRetryPolicy");
- try(final CLUSTER cluster = newCluster(NUM_SERVERS)) {
- int maxRetries = 3;
- final RetryPolicy retryPolicy = RetryPolicies
- .retryUpToMaximumCountWithFixedSleep(maxRetries, TimeDuration.valueOf(1, TimeUnit.SECONDS));
- cluster.start();
- final RaftClient writeClient =
- cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId(), retryPolicy);
+ runWithNewCluster(NUM_SERVERS, this::runTestRequestAsyncWithRetryPolicy);
+ }
+
+ void runTestRequestAsyncWithRetryPolicy(CLUSTER cluster) throws Exception {
+ final RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ 3, TimeDuration.valueOf(1, TimeUnit.SECONDS));
+ final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+
+ try(final RaftClient writeClient = cluster.createClient(leader.getId(), retryPolicy)) {
// blockStartTransaction of the leader so that no transaction can be committed MAJORITY
- final RaftServerImpl leader = cluster.getLeader();
LOG.info("block leader {}", leader.getId());
SimpleStateMachine4Testing.get(leader).blockStartTransaction();
- RaftClientReply reply =
- writeClient.sendAsync(RaftTestUtil.SimpleMessage.create(1)[0]).get();
+ final SimpleMessage[] messages = SimpleMessage.create(2);
+ final RaftClientReply reply = writeClient.sendAsync(messages[0]).get();
RaftRetryFailureException rfe = reply.getRetryFailureException();
- Assert.assertTrue(rfe != null);
+ Assert.assertNotNull(rfe);
Assert.assertTrue(rfe.getMessage().contains(retryPolicy.toString()));
+
// unblock leader so that the next transaction can be committed.
SimpleStateMachine4Testing.get(leader).unblockStartTransaction();
// make sure the the next request succeeds. This will ensure the first
// request completed
- writeClient.sendAsync(RaftTestUtil.SimpleMessage.create(1)[0]).get();
- }
+ writeClient.sendAsync(messages[1]).get();
}
+ }
@Test
public void testAsyncRequestSemaphore() throws Exception {
- LOG.info("Running testAsyncRequestSemaphore");
- final CLUSTER cluster = newCluster(NUM_SERVERS);
- Assert.assertNull(cluster.getLeader());
- cluster.start();
+ runWithNewCluster(NUM_SERVERS, this::runTestAsyncRequestSemaphore);
+ }
+
+ void runTestAsyncRequestSemaphore(CLUSTER cluster) throws Exception {
waitForLeader(cluster);
int numMessages = RaftClientConfigKeys.Async.maxOutstandingRequests(getProperties());
CompletableFuture[] futures = new CompletableFuture[numMessages + 1];
- final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMessages);
+ final SimpleMessage[] messages = SimpleMessage.create(numMessages);
final RaftClient client = cluster.createClient();
//Set blockTransaction flag so that transaction blocks
cluster.getServers().stream()
@@ -141,11 +148,11 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
futures[i] = client.sendAsync(messages[i]);
blockedRequestsCount.decrementAndGet();
}
- Assert.assertTrue(blockedRequestsCount.get() == 0);
+ Assert.assertEquals(0, blockedRequestsCount.get());
futures[numMessages] = CompletableFuture.supplyAsync(() -> {
blockedRequestsCount.incrementAndGet();
- client.sendAsync(new RaftTestUtil.SimpleMessage("n1"));
+ client.sendAsync(new SimpleMessage("n1"));
blockedRequestsCount.decrementAndGet();
return null;
});
@@ -154,7 +161,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
while (blockedRequestsCount.get() != 1) {
Thread.sleep(1000);
}
- Assert.assertTrue(blockedRequestsCount.get() == 1);
+ Assert.assertEquals(1, blockedRequestsCount.get());
//Since all semaphore permits are acquired the last message sent is in queue
RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1);
@@ -167,19 +174,12 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
for(int i=0; i<=numMessages; i++){
futures[i].join();
}
- Assert.assertTrue(blockedRequestsCount.get() == 0);
- cluster.shutdown();
+ Assert.assertEquals(0, blockedRequestsCount.get());
}
void runTestBasicAppendEntriesAsync(boolean killLeader) throws Exception {
- final CLUSTER cluster = newCluster(killLeader? 5: 3);
- try {
- cluster.start();
- waitForLeader(cluster);
- RaftBasicTests.runTestBasicAppendEntries(true, killLeader, 100, cluster, LOG);
- } finally {
- cluster.shutdown();
- }
+ runWithNewCluster(killLeader? 5: 3,
+ cluster -> RaftBasicTests.runTestBasicAppendEntries(true, killLeader, 100, cluster, LOG));
}
@Test
@@ -194,21 +194,18 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
@Test
public void testWithLoadAsync() throws Exception {
- LOG.info("Running testWithLoadAsync");
- final CLUSTER cluster = newCluster(NUM_SERVERS);
- cluster.start();
- waitForLeader(cluster);
- RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG);
- cluster.shutdown();
+ runWithNewCluster(NUM_SERVERS,
+ cluster -> RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG));
}
@Test
public void testStaleReadAsync() throws Exception {
- final int numMesssages = 10;
- final CLUSTER cluster = newCluster(NUM_SERVERS);
+ runWithNewCluster(NUM_SERVERS, this::runTestStaleReadAsync);
+ }
+ void runTestStaleReadAsync(CLUSTER cluster) throws Exception {
+ final int numMesssages = 10;
try (RaftClient client = cluster.createClient()) {
- cluster.start();
RaftTestUtil.waitForLeader(cluster);
// submit some messages
@@ -216,17 +213,19 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
for (int i = 0; i < numMesssages; i++) {
final String s = "" + i;
LOG.info("sendAsync " + s);
- futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage(s)));
+ futures.add(client.sendAsync(new SimpleMessage(s)));
}
Assert.assertEquals(numMesssages, futures.size());
- RaftClientReply lastWriteReply = null;
+ final List<RaftClientReply> replies = new ArrayList<>();
for (CompletableFuture<RaftClientReply> f : futures) {
- lastWriteReply = f.join();
- Assert.assertTrue(lastWriteReply.isSuccess());
+ final RaftClientReply r = f.join();
+ Assert.assertTrue(r.isSuccess());
+ replies.add(r);
}
futures.clear();
// Use a follower with the max commit index
+ final RaftClientReply lastWriteReply = replies.get(replies.size() - 1);
final RaftPeerId leader = lastWriteReply.getServerId();
LOG.info("leader = " + leader);
final Collection<CommitInfoProto> commitInfos = lastWriteReply.getCommitInfos();
@@ -235,70 +234,72 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
.filter(info -> !RaftPeerId.valueOf(info.getServer().getId()).equals(leader))
.max(Comparator.comparing(CommitInfoProto::getCommitIndex)).get();
final RaftPeerId follower = RaftPeerId.valueOf(followerCommitInfo.getServer().getId());
- LOG.info("max follower = " + follower);
+ final long followerCommitIndex = followerCommitInfo.getCommitIndex();
+ LOG.info("max follower = {}, commitIndex = {}", follower, followerCommitIndex);
// test a failure case
testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index",
() -> client.sendStaleReadAsync(
- new RaftTestUtil.SimpleMessage("" + Long.MAX_VALUE),
+ new SimpleMessage("" + Long.MAX_VALUE),
followerCommitInfo.getCommitIndex(), follower),
StateMachineException.class, IndexOutOfBoundsException.class);
// test sendStaleReadAsync
for (int i = 0; i < numMesssages; i++) {
- final int query = i;
- LOG.info("sendStaleReadAsync, query=" + query);
- final Message message = new RaftTestUtil.SimpleMessage("" + query);
+ final RaftClientReply reply = replies.get(i);
+ final String query = "" + i;
+ LOG.info("query=" + query + ", reply=" + reply);
+ final Message message = new SimpleMessage(query);
final CompletableFuture<RaftClientReply> readFuture = client.sendReadOnlyAsync(message);
- final CompletableFuture<RaftClientReply> staleReadFuture = client.sendStaleReadAsync(
- message, followerCommitInfo.getCommitIndex(), follower);
-
- futures.add(readFuture.thenApply(r -> getMessageContent(r))
- .thenCombine(staleReadFuture.thenApply(r -> getMessageContent(r)), (expected, computed) -> {
- try {
- LOG.info("query " + query + " returns "
- + LogEntryProto.parseFrom(expected).getStateMachineLogEntry().getLogData().toStringUtf8());
- } catch (InvalidProtocolBufferException e) {
- throw new CompletionException(e);
- }
-
- Assert.assertEquals("log entry mismatch for query=" + query, expected, computed);
- return null;
- })
- );
+
+ futures.add(readFuture.thenCompose(r -> {
+ if (reply.getLogIndex() <= followerCommitIndex) {
+ LOG.info("sendStaleReadAsync, query=" + query);
+ return client.sendStaleReadAsync(message, followerCommitIndex, follower);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenApply(staleReadReply -> {
+ if (staleReadReply == null) {
+ return null;
+ }
+
+ final ByteString expected = readFuture.join().getMessage().getContent();
+ final ByteString computed = staleReadReply.getMessage().getContent();
+ try {
+ LOG.info("query " + query + " returns "
+ + LogEntryProto.parseFrom(expected).getStateMachineLogEntry().getLogData().toStringUtf8());
+ } catch (InvalidProtocolBufferException e) {
+ throw new CompletionException(e);
+ }
+
+ Assert.assertEquals("log entry mismatch for query=" + query, expected, computed);
+ return null;
+ }));
}
JavaUtils.allOf(futures).join();
- } finally {
- cluster.shutdown();
}
}
- static ByteString getMessageContent(RaftClientReply reply) {
- Assert.assertTrue(reply.isSuccess());
- return reply.getMessage().getContent();
- }
-
@Test
public void testRequestTimeout() throws Exception {
final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5, TimeUnit.SECONDS));
- final CLUSTER cluster = newCluster(NUM_SERVERS);
- cluster.start();
- RaftBasicTests.testRequestTimeout(true, cluster, LOG);
- cluster.shutdown();
+ runWithNewCluster(NUM_SERVERS, cluster -> RaftBasicTests.testRequestTimeout(true, cluster, LOG));
//reset for the other tests
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), oldExpiryTime);
}
@Test
- public void testAppendEntriesTimeout()
- throws IOException, InterruptedException, ExecutionException {
+ public void testAppendEntriesTimeout() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::runTestAppendEntriesTimeout);
+ }
+
+ void runTestAppendEntriesTimeout(CLUSTER cluster) throws Exception {
LOG.info("Running testAppendEntriesTimeout");
final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(20, TimeUnit.SECONDS));
- final CLUSTER cluster = newCluster(NUM_SERVERS);
- cluster.start();
waitForLeader(cluster);
long time = System.currentTimeMillis();
long waitTime = 5000;
@@ -309,7 +310,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
- CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new RaftTestUtil.SimpleMessage("abc"));
+ CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new SimpleMessage("abc"));
Thread.sleep(waitTime);
// replyFuture should not be completed until append request is unblocked.
Assert.assertTrue(!replyFuture.isDone());
@@ -322,7 +323,6 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
replyFuture.get();
Assert.assertTrue(System.currentTimeMillis() - time > waitTime);
}
- cluster.shutdown();
//reset for the other tests
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), oldExpiryTime);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index f37fc21..90cc627 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -35,7 +35,6 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
-import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Test;
@@ -54,10 +53,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.ratis.RaftTestUtil.logEntriesContains;
-import static org.apache.ratis.RaftTestUtil.sendMessageInNewThread;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
-import static org.junit.Assert.assertTrue;
public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
@@ -67,38 +63,39 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
LogUtils.setLogLevel(RaftServerTestUtil.getStateMachineUpdaterLog(), Level.DEBUG);
LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
- RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration
- .valueOf(5, TimeUnit.SECONDS));
+ RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5, TimeUnit.SECONDS));
}
public static final int NUM_SERVERS = 5;
@Test
public void testBasicAppendEntries() throws Exception {
- try(CLUSTER cluster = newCluster(NUM_SERVERS)) {
- cluster.start();
- runTestBasicAppendEntries(false, false, 10, cluster, LOG);
- }
+ runWithNewCluster(NUM_SERVERS, cluster ->
+ runTestBasicAppendEntries(false, false, 10, cluster, LOG));
}
@Test
public void testBasicAppendEntriesKillLeader() throws Exception {
- try(CLUSTER cluster = newCluster(NUM_SERVERS)) {
- cluster.start();
- runTestBasicAppendEntries(false, true, 10, cluster, LOG);
- }
+ runWithNewCluster(NUM_SERVERS, cluster ->
+ runTestBasicAppendEntries(false, true, 10, cluster, LOG));
}
- static void killAndRestartServer(RaftPeerId id, long killSleepMs, long restartSleepMs, MiniRaftCluster cluster, Logger LOG) {
- try {
- Thread.sleep(killSleepMs);
- cluster.killServer(id);
- Thread.sleep(restartSleepMs);
- LOG.info("restart server: " + id);
- cluster.restartServer(id, false);
- } catch (Exception e) {
- ExitUtils.terminate(-1, "Failed to kill/restart server: " + id, e, LOG);
- }
+ static CompletableFuture<Void> killAndRestartServer(
+ RaftPeerId id, long killSleepMs, long restartSleepMs, MiniRaftCluster cluster, Logger LOG) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ new Thread(() -> {
+ try {
+ Thread.sleep(killSleepMs);
+ cluster.killServer(id);
+ Thread.sleep(restartSleepMs);
+ LOG.info("restart server: " + id);
+ cluster.restartServer(id, false);
+ future.complete(null);
+ } catch (Exception e) {
+ ExitUtils.terminate(-1, "Failed to kill/restart server: " + id, e, LOG);
+ }
+ }).start();
+ return future;
}
static void runTestBasicAppendEntries(
@@ -112,10 +109,14 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
RaftServerImpl leader = waitForLeader(cluster);
final long term = leader.getState().getCurrentTerm();
- new Thread(() -> killAndRestartServer(cluster.getFollowers().get(0).getId(), 0, 1000, cluster, LOG)).start();
+ final CompletableFuture<Void> killAndRestartFollower = killAndRestartServer(
+ cluster.getFollowers().get(0).getId(), 0, 1000, cluster, LOG);
+ final CompletableFuture<Void> killAndRestartLeader;
if (killLeader) {
LOG.info("killAndRestart leader " + leader.getId());
- new Thread(() -> killAndRestartServer(leader.getId(), 2000, 4000, cluster, LOG)).start();
+ killAndRestartLeader = killAndRestartServer(leader.getId(), 2000, 4000, cluster, LOG);
+ } else {
+ killAndRestartLeader = CompletableFuture.completedFuture(null);
}
LOG.info(cluster.printServers());
@@ -138,7 +139,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
});
} else {
final RaftClientReply reply = client.send(message);
- Preconditions.assertTrue(reply.isSuccess());
+ Assert.assertTrue(reply.isSuccess());
}
}
if (async) {
@@ -148,6 +149,8 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
}
Thread.sleep(cluster.getTimeoutMax().toInt(TimeUnit.MILLISECONDS) + 100);
LOG.info(cluster.printAllLogs());
+ killAndRestartFollower.join();
+ killAndRestartLeader.join();
for(RaftServerProxy server : cluster.getServers()) {
final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(server, cluster.getGroupId());
@@ -161,9 +164,10 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
@Test
public void testOldLeaderCommit() throws Exception {
- LOG.info("Running testOldLeaderCommit");
- final CLUSTER cluster = newCluster(NUM_SERVERS);
- cluster.start();
+ runWithNewCluster(NUM_SERVERS, this::runTestOldLeaderCommit);
+ }
+
+ void runTestOldLeaderCommit(CLUSTER cluster) throws Exception {
final RaftServerImpl leader = waitForLeader(cluster);
final RaftPeerId leaderId = leader.getId();
final long term = leader.getState().getCurrentTerm();
@@ -180,7 +184,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
Thread.sleep(cluster.getMaxTimeout() + 100);
RaftLog followerLog = followerToSendLog.getState().getLog();
- assertTrue(logEntriesContains(followerLog, messages));
+ Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, messages));
LOG.info(String.format("killing old leader: %s", leaderId.toString()));
cluster.killServer(leaderId);
@@ -198,15 +202,14 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
cluster.getServerAliveStream().map(s -> s.getState().getLog())
.forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages));
- LOG.info("terminating testOldLeaderCommit test");
- cluster.shutdown();
}
@Test
public void testOldLeaderNotCommit() throws Exception {
- LOG.info("Running testOldLeaderNotCommit");
- final CLUSTER cluster = newCluster(NUM_SERVERS);
- cluster.start();
+ runWithNewCluster(NUM_SERVERS, this::runTestOldLeaderNotCommit);
+ }
+
+ void runTestOldLeaderNotCommit(CLUSTER cluster) throws Exception {
final RaftPeerId leaderId = waitForLeader(cluster).getId();
List<RaftServerImpl> followers = cluster.getFollowers();
@@ -217,10 +220,10 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
}
SimpleMessage[] messages = SimpleMessage.create(1);
- sendMessageInNewThread(cluster, messages);
+ RaftTestUtil.sendMessageInNewThread(cluster, messages);
Thread.sleep(cluster.getMaxTimeout() + 100);
- logEntriesContains(followerToCommit.getState().getLog(), messages);
+ RaftTestUtil.logEntriesContains(followerToCommit.getState().getLog(), messages);
cluster.killServer(leaderId);
cluster.killServer(followerToCommit.getId());
@@ -236,7 +239,6 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
cluster.getServerAliveStream()
.map(s -> s.getState().getLog())
.forEach(log -> RaftTestUtil.checkLogEntries(log, messages, predicate));
- cluster.shutdown();
}
static class Client4TestWithLoad extends Thread {
@@ -318,13 +320,10 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
@Test
public void testWithLoad() throws Exception {
- try(CLUSTER cluster = newCluster(NUM_SERVERS)) {
- cluster.start();
- testWithLoad(10, 500, false, cluster, LOG);
- }
+ runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG));
}
- public static void testWithLoad(final int numClients, final int numMessages,
+ static void testWithLoad(final int numClients, final int numMessages,
boolean useAsync, MiniRaftCluster cluster, Logger LOG) throws Exception {
LOG.info("Running testWithLoad: numClients=" + numClients
+ ", numMessages=" + numMessages + ", async=" + useAsync);
@@ -371,12 +370,12 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
int count = 0;
for(;; ) {
- if (clients.stream().filter(Client4TestWithLoad::isRunning).count() == 0) {
+ if (clients.stream().noneMatch(Client4TestWithLoad::isRunning)) {
break;
}
final int n = clients.stream().mapToInt(c -> c.step.get()).sum();
- assertTrue(n >= lastStep.get());
+ Assert.assertTrue(n >= lastStep.get());
if (n - lastStep.get() < 50 * numClients) { // Change leader at least 50 steps.
Thread.sleep(10);
@@ -406,7 +405,6 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
}
public static void testRequestTimeout(boolean async, MiniRaftCluster cluster, Logger LOG) throws Exception {
- LOG.info("Running testRequestTimeout");
waitForLeader(cluster);
long time = System.currentTimeMillis();
try (final RaftClient client = cluster.createClient()) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 19422bc..a96b917 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -156,11 +156,17 @@ public interface RaftTestUtil {
}
}
- static void assertLogEntries(MiniRaftCluster cluster, SimpleMessage... expectedMessages) {
+ static void assertLogEntries(MiniRaftCluster cluster, SimpleMessage[] expectedMessages) {
+ for(SimpleMessage m : expectedMessages) {
+ assertLogEntries(cluster, m);
+ }
+ }
+
+ static void assertLogEntries(MiniRaftCluster cluster, SimpleMessage expectedMessage) {
final int size = cluster.getServers().size();
final long count = cluster.getServerAliveStream()
.map(s -> s.getState().getLog())
- .filter(log -> logEntriesContains(log, expectedMessages))
+ .filter(log -> logEntriesContains(log, expectedMessage))
.count();
if (2*count <= size) {
throw new AssertionError("Not in majority: size=" + size
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index d5fdf53..d4c4021 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -154,7 +154,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
Preconditions.assertNull(previous, "previous");
final String s = entry.getStateMachineLogEntry().getLogData().toStringUtf8();
dataMap.put(s, entry);
- LOG.info("put {}, {} -> {}", entry.getIndex(), s, ServerProtoUtils.toLogEntryString(entry));
+ LOG.info("{}: put {}, {} -> {}", getId(), entry.getIndex(), s, ServerProtoUtils.toLogEntryString(entry));
}
@Override
@@ -290,7 +290,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
if (entry != null) {
return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString()));
}
- exception = new IndexOutOfBoundsException("Log entry not found for query " + string);
+ exception = new IndexOutOfBoundsException(getId() + ": LogEntry not found for query " + string);
} catch (Exception e) {
LOG.warn("Failed request " + request, e);
exception = e;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
index 614787e..a12c52f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index d98be53..50b2d7f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -27,11 +27,13 @@ import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Test;
-import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
@@ -53,19 +55,16 @@ public class TestRaftWithGrpc
@Test
public void testRequestTimeout() throws Exception {
- try(MiniRaftClusterWithGrpc cluster = newCluster(NUM_SERVERS)) {
- cluster.start();
- testRequestTimeout(false, cluster, LOG);
- }
+ runWithNewCluster(NUM_SERVERS, cluster -> testRequestTimeout(false, cluster, LOG));
}
@Test
public void testUpdateViaHeartbeat() throws Exception {
- LOG.info("Running testUpdateViaHeartbeat");
- final MiniRaftClusterWithGrpc cluster = newCluster(NUM_SERVERS);
- cluster.start();
+ runWithNewCluster(NUM_SERVERS, this::runTestUpdateViaHeartbeat);
+ }
+
+ void runTestUpdateViaHeartbeat(MiniRaftClusterWithGrpc cluster) throws Exception {
waitForLeader(cluster);
- long waitTime = 5000;
try (final RaftClient client = cluster.createClient()) {
// block append requests
cluster.getServerAliveStream()
@@ -75,7 +74,7 @@ public class TestRaftWithGrpc
CompletableFuture<RaftClientReply>
replyFuture = client.sendAsync(new RaftTestUtil.SimpleMessage("abc"));
- Thread.sleep(waitTime);
+ TimeDuration.valueOf(5 , TimeUnit.SECONDS).sleep();
// replyFuture should not be completed until append request is unblocked.
Assert.assertTrue(!replyFuture.isDone());
// unblock append request.
@@ -84,27 +83,28 @@ public class TestRaftWithGrpc
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
- long index = cluster.getLeader().getState().getLog().getNextIndex();
+ final long index = cluster.getLeader().getState().getLog().getNextIndex();
TermIndex[] leaderEntries = cluster.getLeader().getState().getLog().getEntries(0, Integer.MAX_VALUE);
// The entries have been appended in the followers
// although the append entry timed out at the leader
- cluster.getServerAliveStream().forEach(raftServer -> {
+
+ final TimeDuration sleepTime = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+ cluster.getServerAliveStream().filter(impl -> !impl.isLeader()).forEach(raftServer ->
+ JavaUtils.runAsUnchecked(() -> JavaUtils.attempt(() -> {
Assert.assertEquals(raftServer.getState().getLog().getNextIndex(), index);
- if (!raftServer.isLeader()) {
- TermIndex[] serverEntries = raftServer.getState().getLog().getEntries(0, Integer.MAX_VALUE);
- Assert.assertArrayEquals(serverEntries, leaderEntries);
- }
- });
+ TermIndex[] serverEntries = raftServer.getState().getLog().getEntries(0, Integer.MAX_VALUE);
+ Assert.assertArrayEquals(serverEntries, leaderEntries);
+ }, 10, sleepTime, "assertRaftLog-" + raftServer.getId(), LOG)));
// Wait for heartbeats from leader to be received by followers
Thread.sleep(500);
- RaftServerTestUtil.getLogAppenders(cluster.getLeader()).forEach(logAppender -> {
+ RaftServerTestUtil.getLogAppenders(cluster.getLeader()).forEach(logAppender ->
+ JavaUtils.runAsUnchecked(() -> JavaUtils.attempt(() -> {
// FollowerInfo in the leader state should have updated next and match index.
final long followerMatchIndex = logAppender.getFollower().getMatchIndex();
Assert.assertTrue(followerMatchIndex >= index - 1);
Assert.assertEquals(followerMatchIndex + 1, logAppender.getFollower().getNextIndex());
- });
+ }, 10, sleepTime, "assertRaftLog-" + logAppender.getFollower(), LOG)));
}
- cluster.shutdown();
}
}