You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/01/24 22:00:24 UTC

incubator-ratis git commit: RATIS-169. Add TestWithLoad for async api. Contributed by Lokesh Jain

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 0f7169db5 -> c16cb1c65


RATIS-169. Add TestWithLoad for async api.  Contributed by Lokesh Jain


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c16cb1c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c16cb1c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c16cb1c6

Branch: refs/heads/master
Commit: c16cb1c6598855c47737dff46d4b07a580a37a32
Parents: 0f7169d
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Wed Jan 24 13:58:47 2018 -0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Wed Jan 24 13:58:47 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/RaftAsyncTests.java   | 11 ++++
 .../java/org/apache/ratis/RaftBasicTests.java   | 54 +++++++++++++++-----
 2 files changed, 52 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c16cb1c6/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index e5f41b7..f72ea5a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -147,4 +147,15 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
     RaftBasicTests.runTestBasicAppendEntries(true, 1000, cluster, LOG);
     cluster.shutdown();
   }
+
+  @Test
+  public void testWithLoadAsync() throws Exception {
+    LOG.info("Running testWithLoadAsync");
+    RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 100);
+    final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+    cluster.start();
+    waitForLeader(cluster);
+    RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG);
+    cluster.shutdown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c16cb1c6/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 99c14b1..f4156d4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -203,7 +203,8 @@ public abstract class RaftBasicTests extends BaseTest {
             .forEach(log -> RaftTestUtil.checkLogEntries(log, messages, predicate));
   }
 
-  class Client4TestWithLoad extends Thread {
+  static class Client4TestWithLoad extends Thread {
+    boolean useAsync;
     final int index;
     final SimpleMessage[] messages;
 
@@ -211,10 +212,17 @@ public abstract class RaftBasicTests extends BaseTest {
     final AtomicInteger step = new AtomicInteger();
     final AtomicReference<Throwable> exceptionInClientThread = new AtomicReference<>();
 
-    Client4TestWithLoad(int index, int numMessages) {
+    final MiniRaftCluster cluster;
+    final Logger LOG;
+
+    Client4TestWithLoad(int index, int numMessages, boolean useAsync,
+        MiniRaftCluster cluster, Logger LOG) {
       super("client-" + index);
       this.index = index;
       this.messages = SimpleMessage.create(numMessages, index + "-");
+      this.useAsync = useAsync;
+      this.cluster = cluster;
+      this.LOG = LOG;
     }
 
     boolean isRunning() {
@@ -223,10 +231,31 @@ public abstract class RaftBasicTests extends BaseTest {
 
     @Override
     public void run() {
-      try(RaftClient client = getCluster().createClient()) {
-        for (; step.get() < messages.length; ) {
-          final RaftClientReply reply = client.send(messages[step.getAndIncrement()]);
-          assertTrue(reply.isSuccess());
+      try (RaftClient client = cluster.createClient()) {
+        final CompletableFuture f = new CompletableFuture();
+        for (int i = 0; i < messages.length; i++) {
+          if (!useAsync) {
+            final RaftClientReply reply =
+                client.send(messages[step.getAndIncrement()]);
+            Assert.assertTrue(reply.isSuccess());
+          } else {
+            final CompletableFuture<RaftClientReply> replyFuture =
+                client.sendAsync(messages[i]);
+            replyFuture.thenAcceptAsync(r -> {
+              if (!r.isSuccess()) {
+                f.completeExceptionally(
+                    new AssertionError("Failed with reply: " + r));
+              }
+              if (step.incrementAndGet() == messages.length) {
+                f.complete(null);
+              }
+              Assert.assertTrue(r.isSuccess());
+            });
+          }
+        }
+        if (useAsync) {
+          f.join();
+          Assert.assertTrue(step.get() == messages.length);
         }
       } catch(Throwable t) {
         if (exceptionInClientThread.compareAndSet(null, t)) {
@@ -253,20 +282,19 @@ public abstract class RaftBasicTests extends BaseTest {
 
   @Test
   public void testWithLoad() throws Exception {
-    testWithLoad(10, 500);
+    testWithLoad(10, 500, false, getCluster(), LOG);
   }
 
-  private void testWithLoad(final int numClients, final int numMessages)
-      throws Exception {
+  public static void testWithLoad(final int numClients, final int numMessages,
+      boolean useAsync, MiniRaftCluster cluster, Logger LOG) throws Exception {
     LOG.info("Running testWithLoad: numClients=" + numClients
-        + ", numMessages=" + numMessages);
+        + ", numMessages=" + numMessages + ", async=" + useAsync);
 
-    final MiniRaftCluster cluster = getCluster();
-    LOG.info(cluster.printServers());
+    waitForLeader(cluster);
 
     final List<Client4TestWithLoad> clients
         = Stream.iterate(0, i -> i+1).limit(numClients)
-        .map(i -> new Client4TestWithLoad(i, numMessages))
+        .map(i -> new Client4TestWithLoad(i, numMessages, useAsync, cluster, LOG))
         .collect(Collectors.toList());
     final AtomicInteger lastStep = new AtomicInteger();