You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/02/12 02:07:34 UTC

incubator-ratis git commit: RATIS-170. Add async support to the FileStore example.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master bb5c258c3 -> 3e69b13a2


RATIS-170. Add async support to the FileStore example.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/3e69b13a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/3e69b13a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/3e69b13a

Branch: refs/heads/master
Commit: 3e69b13a2bfd55c56e0c5f2d310601dcfc46731e
Parents: bb5c258
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Feb 12 10:07:03 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Feb 12 10:07:03 2018 +0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/util/JavaUtils.java   |  11 ++
 .../java/org/apache/ratis/util/LogUtils.java    |   2 +-
 .../ratis/examples/filestore/FileInfo.java      |  80 +++++++++---
 .../examples/filestore/FileStoreClient.java     |  80 +++++++++---
 .../filestore/FileStoreAsyncBaseTest.java       | 110 ++++++++++++++++
 .../examples/filestore/FileStoreBaseTest.java   | 126 +++++++++++++++----
 .../filestore/TestFileStoreAsyncWithGrpc.java   |  25 ++++
 7 files changed, 379 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e69b13a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 8322209..926bab8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -200,4 +200,15 @@ public interface JavaUtils {
   static <T> CompletableFuture<Void> allOf(List<CompletableFuture<T>> futures) {
     return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]));
   }
+
+  static <OUTPUT, THROWABLE extends Throwable> OUTPUT supplyAndWrapAsCompletionException(
+      CheckedSupplier<OUTPUT, THROWABLE> supplier) {
+    try {
+      return supplier.get();
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Throwable t) {
+      throw new CompletionException(t);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e69b13a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
index e75b89b..376fed5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
@@ -80,7 +80,7 @@ public interface LogUtils {
     }
 
     if (log.isTraceEnabled()) {
-      log.trace("Successfully supplied " + name.get());
+      log.trace("Successfully supplied " + name.get() + ": " + output);
     }
     return output;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e69b13a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
index d8b3eb4..636234f 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
@@ -35,8 +35,10 @@ import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -123,6 +125,33 @@ abstract class FileInfo {
     }
   }
 
+  static class WriteInfo {
+    /** Future to make sure that each commit is executed after the corresponding write. */
+    private final CompletableFuture<Integer> writeFuture;
+    /** Future to make sure that each commit is executed after the previous commit. */
+    private final CompletableFuture<Integer> commitFuture;
+    /** Previous commit index. */
+    private final long previousIndex;
+
+    WriteInfo(CompletableFuture<Integer> writeFuture, long previousIndex) {
+      this.writeFuture = writeFuture;
+      this.commitFuture = new CompletableFuture<>();
+      this.previousIndex = previousIndex;
+    }
+
+    CompletableFuture<Integer> getCommitFuture() {
+      return commitFuture;
+    }
+
+    CompletableFuture<Integer> getWriteFuture() {
+      return writeFuture;
+    }
+
+    long getPreviousIndex() {
+      return previousIndex;
+    }
+  }
+
   static class UnderConstruction extends FileInfo {
     private FileOut out;
 
@@ -135,10 +164,9 @@ abstract class FileInfo {
 
     /** A queue to make sure that the writes are in order. */
     private final TaskQueue writeQueue = new TaskQueue("writeQueue");
-    /** A queue to make sure that the commits are in order. */
-    private final TaskQueue commitQueue = new TaskQueue("commitQueue");
-    /** Futures to make sure that each commit is executed the corresponding write. */
-    private final Map<Long, CompletableFuture<Integer>> writeFutures = new ConcurrentHashMap<>();
+    private final Map<Long, WriteInfo> writeInfos = new ConcurrentHashMap<>();
+
+    private final AtomicLong lastWriteIndex = new AtomicLong(-1L);
 
     UnderConstruction(Path relativePath) {
       super(relativePath);
@@ -179,11 +207,12 @@ abstract class FileInfo {
     }
 
     private CompletableFuture<Integer> submitWrite(
-        CheckedSupplier<Integer, IOException> task, ExecutorService executor,
-      RaftPeerId id, long index) {
+        CheckedSupplier<Integer, IOException> task,
+        ExecutorService executor, RaftPeerId id, long index) {
       final CompletableFuture<Integer> f = writeQueue.submit(task, executor,
           e -> new IOException("Failed " + task, e));
-      CollectionUtils.putNew(index, f, writeFutures, () ->  id + ":writeFutures");
+      final WriteInfo info = new WriteInfo(f, lastWriteIndex.getAndSet(index));
+      CollectionUtils.putNew(index, info, writeInfos, () ->  id + ":writeInfos");
       return f;
     }
 
@@ -230,10 +259,18 @@ abstract class FileInfo {
     }
 
     CompletableFuture<Integer> submitCommit(
-        long offset, int size, Function<UnderConstruction, ReadOnly> converter,
+        long offset, int size, Function<UnderConstruction, ReadOnly> closeFunction,
         ExecutorService executor, RaftPeerId id, long index) {
+      final boolean close = closeFunction != null;
       final Supplier<String> name = () -> "commit(" + getRelativePath() + ", "
-          + offset + ", " + size + ") @" + id + ":" + index;
+          + offset + ", " + size + ", close? " + close + ") @" + id + ":" + index;
+
+      final WriteInfo info = writeInfos.get(index);
+      if (info == null) {
+        return JavaUtils.completeExceptionally(
+            new IOException(name.get() + " is already committed."));
+      }
+
       final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
         if (offset != committedSize) {
           throw new IOException("Offset/size mismatched: offset = "
@@ -245,21 +282,26 @@ abstract class FileInfo {
         }
         committedSize += size;
 
-        if (converter != null) {
-          converter.apply(this);
+        if (close) {
+          closeFunction.apply(this);
+          writeInfos.remove(index);
         }
+        info.getCommitFuture().complete(size);
         return size;
       }, name);
 
-      final CompletableFuture<Integer> write = writeFutures.remove(index);
-      if (write == null) {
-        return JavaUtils.completeExceptionally(
-            new IOException(name.get() + " is already committed."));
-      }
-      return write.thenComposeAsync(writeSize -> {
+      // Remove previous info, if there is any.
+      final WriteInfo previous = writeInfos.remove(info.getPreviousIndex());
+      final CompletableFuture<Integer> previousCommit = previous != null?
+          previous.getCommitFuture(): CompletableFuture.completedFuture(0);
+      // Commit after both current write and previous commit completed.
+      return info.getWriteFuture().thenCombineAsync(previousCommit, (writeSize, previousCommitSize) -> {
         Preconditions.assertTrue(size == writeSize);
-        return commitQueue.submit(task, executor,
-            e -> new IOException("Failed " + task, e));
+        try {
+          return task.get();
+        } catch (IOException e) {
+          throw new CompletionException("Failed " + task, e);
+        }
       }, executor);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e69b13a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index de794c2..a8f141c 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.proto.ExamplesProtos.*;
 import org.apache.ratis.util.CheckedFunction;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
 import org.slf4j.Logger;
@@ -34,6 +35,9 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Function;
 
 /** A standalone server using raft with a configurable state machine. */
 public class FileStoreClient implements Closeable {
@@ -62,10 +66,24 @@ public class FileStoreClient implements Closeable {
     if (sme != null) {
       throw new IOException("Failed to send request " + request, sme);
     }
-    Preconditions.assertTrue(reply.isSuccess(), () -> "reply=" + reply);
+    Preconditions.assertTrue(reply.isSuccess(), () -> "Failed " + request + ", reply=" + reply);
     return reply.getMessage().getContent();
   }
 
+  static CompletableFuture<ByteString> sendAsync(
+      ByteString request, Function<Message, CompletableFuture<RaftClientReply>> sendFunction)
+      {
+    return sendFunction.apply(() -> request
+    ).thenApply(reply -> {
+      final StateMachineException sme = reply.getStateMachineException();
+      if (sme != null) {
+        throw new CompletionException("Failed to send request " + request, sme);
+      }
+      Preconditions.assertTrue(reply.isSuccess(), () -> "Failed " + request + ", reply=" + reply);
+      return reply.getMessage().getContent();
+    });
+  }
+
   private ByteString send(ByteString request) throws IOException {
     return send(request, client::send);
   }
@@ -74,30 +92,55 @@ public class FileStoreClient implements Closeable {
     return send(request, client::sendReadOnly);
   }
 
+  private CompletableFuture<ByteString> sendAsync(ByteString request) {
+    return sendAsync(request, client::sendAsync);
+  }
+
+  private CompletableFuture<ByteString> sendReadOnlyAsync(ByteString request) {
+    return sendAsync(request, client::sendReadOnlyAsync);
+  }
+
   public ByteString read(String path, long offset, long length) throws IOException {
-    return readImpl(path, offset, length).getData();
+    final ByteString reply = readImpl(this::sendReadOnly, path, offset, length);
+    return ReadReplyProto.parseFrom(reply).getData();
   }
 
-  private ReadReplyProto readImpl(String path, long offset, long length) throws IOException {
+  public CompletableFuture<ByteString> readAsync(String path, long offset, long length) {
+    return readImpl(this::sendReadOnlyAsync, path, offset, length
+    ).thenApply(reply -> JavaUtils.supplyAndWrapAsCompletionException(
+        () -> ReadReplyProto.parseFrom(reply).getData()));
+  }
+
+  private static <OUTPUT, THROWABLE extends Throwable> OUTPUT readImpl(
+      CheckedFunction<ByteString, OUTPUT, THROWABLE> sendReadOnlyFunction,
+      String path, long offset, long length) throws THROWABLE {
     final ReadRequestProto read = ReadRequestProto.newBuilder()
         .setPath(ProtoUtils.toByteString(path))
         .setOffset(offset)
         .setLength(length)
         .build();
 
-    return ReadReplyProto.parseFrom(sendReadOnly(read.toByteString()));
+    return sendReadOnlyFunction.apply(read.toByteString());
   }
 
   public long write(String path, long offset, boolean close, ByteBuffer buffer)
       throws IOException {
     final int chunkSize = FileStoreCommon.getChunkSize(buffer.remaining());
     buffer.limit(chunkSize);
-    final WriteReplyProto proto = writeImpl(path, offset, close, ByteString.copyFrom(buffer));
-    return proto.getLength();
+    final ByteString reply = writeImpl(this::send, path, offset, close, buffer);
+    return WriteReplyProto.parseFrom(reply).getLength();
   }
 
-  private WriteReplyProto writeImpl(String path, long offset, boolean close, ByteString data)
-      throws IOException {
+  public CompletableFuture<Long> writeAsync(String path, long offset, boolean close, ByteBuffer buffer) {
+    return writeImpl(this::sendAsync, path, offset, close, buffer
+    ).thenApply(reply -> JavaUtils.supplyAndWrapAsCompletionException(
+        () -> WriteReplyProto.parseFrom(reply).getLength()));
+  }
+
+  private static <OUTPUT, THROWABLE extends Throwable> OUTPUT writeImpl(
+      CheckedFunction<ByteString, OUTPUT, THROWABLE> sendFunction,
+      String path, long offset, boolean close, ByteBuffer data)
+      throws THROWABLE {
     final WriteRequestHeaderProto.Builder header = WriteRequestHeaderProto.newBuilder()
         .setPath(ProtoUtils.toByteString(path))
         .setOffset(offset)
@@ -105,20 +148,29 @@ public class FileStoreClient implements Closeable {
 
     final WriteRequestProto.Builder write = WriteRequestProto.newBuilder()
         .setHeader(header)
-        .setData(data);
+        .setData(ByteString.copyFrom(data));
 
     final FileStoreRequestProto request = FileStoreRequestProto.newBuilder().setWrite(write).build();
-    return WriteReplyProto.parseFrom(send(request.toByteString()));
+    return sendFunction.apply(request.toByteString());
   }
 
-  private DeleteReplyProto deleteImpl(String path) throws IOException {
+  private static <OUTPUT, THROWABLE extends Throwable> OUTPUT deleteImpl(
+      CheckedFunction<ByteString, OUTPUT, THROWABLE> sendFunction, String path)
+      throws THROWABLE {
     final DeleteRequestProto.Builder delete = DeleteRequestProto.newBuilder()
         .setPath(ProtoUtils.toByteString(path));
     final FileStoreRequestProto request = FileStoreRequestProto.newBuilder().setDelete(delete).build();
-    return DeleteReplyProto.parseFrom(send(request.toByteString()));
+    return sendFunction.apply(request.toByteString());
+  }
+
+  public String delete(String path) throws IOException {
+    final ByteString reply = deleteImpl(this::send, path);
+    return DeleteReplyProto.parseFrom(reply).getResolvedPath().toStringUtf8();
   }
 
-  public void delete(String path) throws IOException {
-    deleteImpl(path);
+  public CompletableFuture<String> deleteAsync(String path) {
+    return deleteImpl(this::sendAsync, path
+    ).thenApply(reply -> JavaUtils.supplyAndWrapAsCompletionException(
+        () -> DeleteReplyProto.parseFrom(reply).getResolvedPath().toStringUtf8()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e69b13a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
new file mode 100644
index 0000000..7cf8e3a
--- /dev/null
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.examples.filestore.FileStoreBaseTest.Writer;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+public abstract class FileStoreAsyncBaseTest<CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  public static final Logger LOG = LoggerFactory.getLogger(FileStoreAsyncBaseTest.class);
+
+  {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        FileStoreStateMachine.class, StateMachine.class);
+    ConfUtils.setFile(p::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY,
+        new File(getClassTestDir(), "filestore"));
+  }
+
+  static final int NUM_PEERS = 3;
+
+  @Test
+  public void testFileStoreAsync() throws Exception {
+    final CLUSTER cluster = newCluster(NUM_PEERS);
+    cluster.start();
+    RaftTestUtil.waitForLeader(cluster);
+
+    final FileStoreClient client = new FileStoreClient(cluster.getGroup(), getProperties());
+    final ExecutorService executor = Executors.newFixedThreadPool(20);
+
+    testSingleFile("foo", SizeInBytes.valueOf("10M"), executor, client);
+    testMultipleFiles("file", 100, SizeInBytes.valueOf("1M"), executor, client);
+
+    executor.shutdown();
+    client.close();
+    cluster.shutdown();
+  }
+
+  private static void testSingleFile(
+      String path, SizeInBytes fileLength, Executor executor, FileStoreClient client)
+      throws Exception {
+    LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
+
+    new Writer(path, fileLength, executor, () -> client)
+        .writeAsync()
+        .thenCompose(Writer::verifyAsync)
+        .thenCompose(Writer::deleteAsync)
+        .get();
+  }
+
+  private static void testMultipleFiles(
+      String pathPrefix, int numFile, SizeInBytes fileLength, Executor executor,
+      FileStoreClient client) throws Exception {
+    LOG.info("runTestMultipleFile with pathPrefix={}, numFile={}, fileLength={}",
+        pathPrefix, numFile, fileLength);
+
+    final List<CompletableFuture<Writer>> writerFutures = new ArrayList<>();
+    for (int i = 0; i < numFile; i++) {
+      final String path = String.format("%s%02d", pathPrefix, i);
+      final Callable<CompletableFuture<Writer>> callable = LogUtils.newCallable(LOG,
+          () -> new Writer(path, fileLength, executor, () -> client).writeAsync(),
+          () -> path + ":" + fileLength);
+      writerFutures.add(callable.call());
+    }
+
+    final List<Writer> writers = new ArrayList<>();
+    for(CompletableFuture<Writer> f : writerFutures) {
+      writers.add(f.get());
+    }
+
+    writerFutures.clear();
+    for (Writer w : writers) {
+      writerFutures.add(w.verifyAsync().thenCompose(Writer::deleteAsync));
+    }
+    for(CompletableFuture<Writer> f : writerFutures) {
+      f.get();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e69b13a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
index 66aeb6a..7e9c57e 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
@@ -37,11 +37,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
     extends BaseTest
@@ -78,7 +77,7 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
       throws Exception {
     LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
 
-    try (final Writer w = new Writer(path, fileLength, newClient)) {
+    try (final Writer w = new Writer(path, fileLength, null, newClient)) {
       w.write().verify().delete();
     }
   }
@@ -95,7 +94,7 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
     for (int i = 0; i < numFile; i++) {
       final String path = String.format("%s%02d", pathPrefix, i);
       final Callable<Writer> callable = LogUtils.newCallable(LOG,
-          () -> new Writer(path, fileLength, newClient).write(),
+          () -> new Writer(path, fileLength, null, newClient).write(),
           () -> path + ":" + fileLength);
       writerFutures.add(executor.submit(callable));
     }
@@ -123,12 +122,15 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
     final String fileName;
     final SizeInBytes fileSize;
     final FileStoreClient client;
+    final Executor asyncExecutor;
 
-    Writer(String fileName, SizeInBytes fileSize, CheckedSupplier<FileStoreClient, IOException> newClient)
+    Writer(String fileName, SizeInBytes fileSize, Executor asyncExecutor,
+        CheckedSupplier<FileStoreClient, IOException> clientSupplier)
         throws IOException {
       this.fileName = fileName;
       this.fileSize = fileSize;
-      this.client = newClient.get();
+      this.client = clientSupplier.get();
+      this.asyncExecutor = asyncExecutor;
     }
 
     ByteBuffer randomBytes(int length, Random random) {
@@ -145,19 +147,59 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
 
       for(int offset = 0; offset < size; ) {
         final int remaining = size - offset;
-        final int n = Math.min(remaining, buffer.length);
-        final boolean close = n == remaining;
+        final int length = Math.min(remaining, buffer.length);
+        final boolean close = length == remaining;
 
-        final ByteBuffer b = randomBytes(n, r);
+        final ByteBuffer b = randomBytes(length, r);
 
-        LOG.trace("client write {}, offset={}", fileName, offset);
+        LOG.trace("write {}, offset={}, length={}, close? {}",
+            fileName, offset, length, close);
         final long written = client.write(fileName, offset, close, b);
-        Assert.assertEquals(n, written);
+        Assert.assertEquals(length, written);
         offset += written;
       }
       return this;
     }
 
+    CompletableFuture<Writer> writeAsync() {
+      Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
+      final Random r = new Random(seed);
+      final int size = fileSize.getSizeInt();
+
+      final CompletableFuture<Writer> returnFuture = new CompletableFuture<>();
+      final AtomicInteger callCount = new AtomicInteger();
+      final AtomicInteger n = new AtomicInteger();
+      for(; n.get() < size; ) {
+        final int offset = n.get();
+        final int remaining = size - offset;
+        final int length = Math.min(remaining, buffer.length);
+        final boolean close = length == remaining;
+
+        final ByteBuffer b = randomBytes(length, r);
+
+        callCount.incrementAndGet();
+        n.addAndGet(length);
+
+        LOG.trace("writeAsync {}, offset={}, length={}, close? {}",
+            fileName, offset, length, close);
+        client.writeAsync(fileName, offset, close, b)
+            .thenAcceptAsync(written -> Assert.assertEquals(length, (long)written), asyncExecutor)
+            .thenRun(() -> {
+              final int count = callCount.decrementAndGet();
+              LOG.trace("writeAsync {}, offset={}, length={}, close? {}: n={}, callCount={}",
+                  fileName, offset, length, close, n.get(), count);
+              if (n.get() == size && count == 0) {
+                returnFuture.complete(this);
+              }
+            })
+            .exceptionally(e -> {
+              returnFuture.completeExceptionally(e);
+              return null;
+            });
+      }
+      return returnFuture;
+    }
+
     Writer verify() throws IOException {
       final Random r = new Random(seed);
       final int size = fileSize.getSizeInt();
@@ -165,18 +207,60 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
       for(int offset = 0; offset < size; ) {
         final int remaining = size - offset;
         final int n = Math.min(remaining, buffer.length);
-
         final ByteString read = client.read(fileName, offset, n);
-        Assert.assertEquals(n, read.size());
-
-        final ByteBuffer b = randomBytes(n, r);
-
-        assertBuffers(offset, n, b, read.asReadOnlyByteBuffer());
+        final ByteBuffer expected = randomBytes(n, r);
+        verify(read, offset, n, expected);
         offset += n;
       }
       return this;
     }
 
+    CompletableFuture<Writer> verifyAsync() {
+      Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
+      final Random r = new Random(seed);
+      final int size = fileSize.getSizeInt();
+
+      final CompletableFuture<Writer> returnFuture = new CompletableFuture<>();
+      final AtomicInteger callCount = new AtomicInteger();
+      final AtomicInteger n = new AtomicInteger();
+      for(; n.get() < size; ) {
+        final int offset = n.get();
+        final int remaining = size - offset;
+        final int length = Math.min(remaining, buffer.length);
+
+        callCount.incrementAndGet();
+        n.addAndGet(length);
+        final ByteBuffer expected = ByteString.copyFrom(randomBytes(length, r)).asReadOnlyByteBuffer();
+
+        client.readAsync(fileName, offset, length)
+            .thenAcceptAsync(read -> verify(read, offset, length, expected), asyncExecutor)
+            .thenRun(() -> {
+              final int count = callCount.decrementAndGet();
+              LOG.trace("verifyAsync {}, offset={}, length={}: n={}, callCount={}",
+                  fileName, offset, length, n.get(), count);
+              if (n.get() == size && count == 0) {
+                returnFuture.complete(this);
+              }
+            })
+            .exceptionally(e -> {
+              returnFuture.completeExceptionally(e);
+              return null;
+            });
+      }
+      Assert.assertEquals(size, n.get());
+      return returnFuture;
+    }
+
+    void verify(ByteString read, int offset, int length, ByteBuffer expected) {
+      Assert.assertEquals(length, read.size());
+      assertBuffers(offset, length, expected, read.asReadOnlyByteBuffer());
+    }
+
+    CompletableFuture<Writer> deleteAsync() {
+      Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
+      return client.deleteAsync(fileName).thenApplyAsync(reply -> this, asyncExecutor);
+    }
+
     Writer delete() throws IOException {
       client.delete(fileName);
       return this;
@@ -193,8 +277,8 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
       Assert.assertEquals(expected, computed);
     } catch(AssertionError e) {
       LOG.error("Buffer mismatched at offset=" + offset + ", length=" + length
-          + "expected = " + StringUtils.bytes2HexString(expected) + "\n"
-          + "computed = " + StringUtils.bytes2HexString(computed) + "\n", e);
+          + "\n  expected = " + StringUtils.bytes2HexString(expected)
+          + "\n  computed = " + StringUtils.bytes2HexString(computed), e);
       throw e;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e69b13a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java
new file mode 100644
index 0000000..02bd2b0
--- /dev/null
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.grpc.MiniRaftClusterWithGRpc;
+
+public class TestFileStoreAsyncWithGrpc
+    extends FileStoreAsyncBaseTest<MiniRaftClusterWithGRpc>
+    implements MiniRaftClusterWithGRpc.FactoryGet {
+}