You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/01/24 22:00:24 UTC
incubator-ratis git commit: RATIS-169. Add TestWithLoad for async
api. Contributed by Lokesh Jain
Repository: incubator-ratis
Updated Branches:
refs/heads/master 0f7169db5 -> c16cb1c65
RATIS-169. Add TestWithLoad for async api. Contributed by Lokesh Jain
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c16cb1c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c16cb1c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c16cb1c6
Branch: refs/heads/master
Commit: c16cb1c6598855c47737dff46d4b07a580a37a32
Parents: 0f7169d
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Wed Jan 24 13:58:47 2018 -0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Wed Jan 24 13:58:47 2018 -0800
----------------------------------------------------------------------
.../java/org/apache/ratis/RaftAsyncTests.java | 11 ++++
.../java/org/apache/ratis/RaftBasicTests.java | 54 +++++++++++++++-----
2 files changed, 52 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c16cb1c6/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index e5f41b7..f72ea5a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -147,4 +147,15 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
RaftBasicTests.runTestBasicAppendEntries(true, 1000, cluster, LOG);
cluster.shutdown();
}
+
+ @Test
+ public void testWithLoadAsync() throws Exception {
+ LOG.info("Running testWithLoadAsync");
+ RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 100);
+ final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+ cluster.start();
+ waitForLeader(cluster);
+ RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG);
+ cluster.shutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c16cb1c6/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 99c14b1..f4156d4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -203,7 +203,8 @@ public abstract class RaftBasicTests extends BaseTest {
.forEach(log -> RaftTestUtil.checkLogEntries(log, messages, predicate));
}
- class Client4TestWithLoad extends Thread {
+ static class Client4TestWithLoad extends Thread {
+ boolean useAsync;
final int index;
final SimpleMessage[] messages;
@@ -211,10 +212,17 @@ public abstract class RaftBasicTests extends BaseTest {
final AtomicInteger step = new AtomicInteger();
final AtomicReference<Throwable> exceptionInClientThread = new AtomicReference<>();
- Client4TestWithLoad(int index, int numMessages) {
+ final MiniRaftCluster cluster;
+ final Logger LOG;
+
+ Client4TestWithLoad(int index, int numMessages, boolean useAsync,
+ MiniRaftCluster cluster, Logger LOG) {
super("client-" + index);
this.index = index;
this.messages = SimpleMessage.create(numMessages, index + "-");
+ this.useAsync = useAsync;
+ this.cluster = cluster;
+ this.LOG = LOG;
}
boolean isRunning() {
@@ -223,10 +231,31 @@ public abstract class RaftBasicTests extends BaseTest {
@Override
public void run() {
- try(RaftClient client = getCluster().createClient()) {
- for (; step.get() < messages.length; ) {
- final RaftClientReply reply = client.send(messages[step.getAndIncrement()]);
- assertTrue(reply.isSuccess());
+ try (RaftClient client = cluster.createClient()) {
+ final CompletableFuture f = new CompletableFuture();
+ for (int i = 0; i < messages.length; i++) {
+ if (!useAsync) {
+ final RaftClientReply reply =
+ client.send(messages[step.getAndIncrement()]);
+ Assert.assertTrue(reply.isSuccess());
+ } else {
+ final CompletableFuture<RaftClientReply> replyFuture =
+ client.sendAsync(messages[i]);
+ replyFuture.thenAcceptAsync(r -> {
+ if (!r.isSuccess()) {
+ f.completeExceptionally(
+ new AssertionError("Failed with reply: " + r));
+ }
+ if (step.incrementAndGet() == messages.length) {
+ f.complete(null);
+ }
+ Assert.assertTrue(r.isSuccess());
+ });
+ }
+ }
+ if (useAsync) {
+ f.join();
+ Assert.assertTrue(step.get() == messages.length);
}
} catch(Throwable t) {
if (exceptionInClientThread.compareAndSet(null, t)) {
@@ -253,20 +282,19 @@ public abstract class RaftBasicTests extends BaseTest {
@Test
public void testWithLoad() throws Exception {
- testWithLoad(10, 500);
+ testWithLoad(10, 500, false, getCluster(), LOG);
}
- private void testWithLoad(final int numClients, final int numMessages)
- throws Exception {
+ public static void testWithLoad(final int numClients, final int numMessages,
+ boolean useAsync, MiniRaftCluster cluster, Logger LOG) throws Exception {
LOG.info("Running testWithLoad: numClients=" + numClients
- + ", numMessages=" + numMessages);
+ + ", numMessages=" + numMessages + ", async=" + useAsync);
- final MiniRaftCluster cluster = getCluster();
- LOG.info(cluster.printServers());
+ waitForLeader(cluster);
final List<Client4TestWithLoad> clients
= Stream.iterate(0, i -> i+1).limit(numClients)
- .map(i -> new Client4TestWithLoad(i, numMessages))
+ .map(i -> new Client4TestWithLoad(i, numMessages, useAsync, cluster, LOG))
.collect(Collectors.toList());
final AtomicInteger lastStep = new AtomicInteger();