You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/07 05:05:25 UTC

[incubator-ratis] branch master updated: RATIS-1201. Enhance FileStore steaming tests (#320). Contributed by Rui Wang

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 365746f  RATIS-1201. Enhance FileStore steaming tests (#320).  Contributed by Rui Wang
365746f is described below

commit 365746f01368a8b3fe894f93831803dc9739c0e0
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Sun Dec 6 21:05:16 2020 -0800

    RATIS-1201. Enhance FileStore steaming tests (#320).  Contributed by Rui Wang
---
 .../examples/filestore/FileStoreAsyncBaseTest.java |  34 ++-
 .../examples/filestore/FileStoreBaseTest.java      | 193 ++------------
 .../filestore/FileStoreStreamingBaseTest.java      | 101 ++++---
 .../ratis/examples/filestore/FileStoreWriter.java  | 290 +++++++++++++++++++++
 4 files changed, 390 insertions(+), 228 deletions(-)

diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
index f99d927..d70cfa6 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
@@ -22,7 +22,6 @@ import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.ConfUtils;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.examples.filestore.FileStoreBaseTest.Writer;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.SizeInBytes;
@@ -72,10 +71,15 @@ public abstract class FileStoreAsyncBaseTest<CLUSTER extends MiniRaftCluster>
       throws Exception {
     LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
 
-    new Writer(path, fileLength, executor, () -> client)
+    FileStoreWriter.newBuilder()
+        .setFileName(path)
+        .setFileSize(fileLength)
+        .setAsyncExecutor(executor)
+        .setFileStoreClientSupplier(() -> client)
+        .build()
         .writeAsync()
-        .thenCompose(Writer::verifyAsync)
-        .thenCompose(Writer::deleteAsync)
+        .thenCompose(FileStoreWriter::verifyAsync)
+        .thenCompose(FileStoreWriter::deleteAsync)
         .get();
   }
 
@@ -85,25 +89,31 @@ public abstract class FileStoreAsyncBaseTest<CLUSTER extends MiniRaftCluster>
     LOG.info("runTestMultipleFile with pathPrefix={}, numFile={}, fileLength={}",
         pathPrefix, numFile, fileLength);
 
-    final List<CompletableFuture<Writer>> writerFutures = new ArrayList<>();
+    final List<CompletableFuture<FileStoreWriter>> writerFutures = new ArrayList<>();
     for (int i = 0; i < numFile; i++) {
       final String path = String.format("%s%02d", pathPrefix, i);
-      final Callable<CompletableFuture<Writer>> callable = LogUtils.newCallable(LOG,
-          () -> new Writer(path, fileLength, executor, () -> client).writeAsync(),
+      final Callable<CompletableFuture<FileStoreWriter>> callable = LogUtils.newCallable(LOG,
+          () -> FileStoreWriter.newBuilder()
+              .setFileName(path)
+              .setFileSize(fileLength)
+              .setAsyncExecutor(executor)
+              .setFileStoreClientSupplier(() -> client)
+              .build()
+              .writeAsync(),
           () -> path + ":" + fileLength);
       writerFutures.add(callable.call());
     }
 
-    final List<Writer> writers = new ArrayList<>();
-    for(CompletableFuture<Writer> f : writerFutures) {
+    final List<FileStoreWriter> writers = new ArrayList<>();
+    for(CompletableFuture<FileStoreWriter> f : writerFutures) {
       writers.add(f.get());
     }
 
     writerFutures.clear();
-    for (Writer w : writers) {
-      writerFutures.add(w.verifyAsync().thenCompose(Writer::deleteAsync));
+    for (FileStoreWriter w : writers) {
+      writerFutures.add(w.verifyAsync().thenCompose(FileStoreWriter::deleteAsync));
     }
-    for(CompletableFuture<Writer> f : writerFutures) {
+    for(CompletableFuture<FileStoreWriter> f : writerFutures) {
       f.get();
     }
   }
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
index 198ea2b..cf01050 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
@@ -86,7 +86,12 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
       throws Exception {
     LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
 
-    try (final Writer w = new Writer(path, fileLength, null, newClient)) {
+    try (final FileStoreWriter w =
+             FileStoreWriter.newBuilder()
+                 .setFileName(path)
+                 .setFileSize(fileLength)
+                 .setFileStoreClientSupplier(newClient)
+                 .build()) {
       w.write().verify().delete();
     }
   }
@@ -99,196 +104,32 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
 
     final ExecutorService executor = Executors.newFixedThreadPool(20);
 
-    final List<Future<Writer>> writerFutures = new ArrayList<>();
+    final List<Future<FileStoreWriter>> writerFutures = new ArrayList<>();
     for (int i = 0; i < numFile; i++) {
       final String path = String.format("%s%02d", pathPrefix, i);
-      final Callable<Writer> callable = LogUtils.newCallable(LOG,
-          () -> new Writer(path, fileLength, null, newClient).write(),
+      final Callable<FileStoreWriter> callable = LogUtils.newCallable(LOG,
+          () -> FileStoreWriter.newBuilder()
+              .setFileName(path)
+              .setFileSize(fileLength)
+              .setFileStoreClientSupplier(newClient)
+              .build().write(),
           () -> path + ":" + fileLength);
       writerFutures.add(executor.submit(callable));
     }
 
-    final List<Writer> writers = new ArrayList<>();
-    for(Future<Writer> f : writerFutures) {
+    final List<FileStoreWriter> writers = new ArrayList<>();
+    for(Future<FileStoreWriter> f : writerFutures) {
       writers.add(f.get());
     }
 
     writerFutures.clear();
-    for (Writer w : writers) {
+    for (FileStoreWriter w : writers) {
       writerFutures.add(executor.submit(() -> w.verify().delete()));
     }
-    for(Future<Writer> f : writerFutures) {
+    for(Future<FileStoreWriter> f : writerFutures) {
       f.get().close();
     }
 
     executor.shutdown();
   }
-
-  static class Writer implements Closeable {
-    final long seed = ThreadLocalRandom.current().nextLong();
-    final byte[] buffer = new byte[4 << 10];
-
-    final String fileName;
-    final SizeInBytes fileSize;
-    final FileStoreClient client;
-    final Executor asyncExecutor;
-
-    Writer(String fileName, SizeInBytes fileSize, Executor asyncExecutor,
-        CheckedSupplier<FileStoreClient, IOException> clientSupplier)
-        throws IOException {
-      this.fileName = fileName;
-      this.fileSize = fileSize;
-      this.client = clientSupplier.get();
-      this.asyncExecutor = asyncExecutor;
-    }
-
-    ByteBuffer randomBytes(int length, Random random) {
-      Preconditions.assertTrue(length <= buffer.length);
-      random.nextBytes(buffer);
-      final ByteBuffer b = ByteBuffer.wrap(buffer);
-      b.limit(length);
-      return b;
-    }
-
-    Writer write() throws IOException {
-      final Random r = new Random(seed);
-      final int size = fileSize.getSizeInt();
-
-      for(int offset = 0; offset < size; ) {
-        final int remaining = size - offset;
-        final int length = Math.min(remaining, buffer.length);
-        final boolean close = length == remaining;
-
-        final ByteBuffer b = randomBytes(length, r);
-
-        LOG.trace("write {}, offset={}, length={}, close? {}",
-            fileName, offset, length, close);
-        final long written = client.write(fileName, offset, close, b);
-        Assert.assertEquals(length, written);
-        offset += written;
-      }
-      return this;
-    }
-
-    CompletableFuture<Writer> writeAsync() {
-      Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
-      final Random r = new Random(seed);
-      final int size = fileSize.getSizeInt();
-
-      final CompletableFuture<Writer> returnFuture = new CompletableFuture<>();
-      final AtomicInteger callCount = new AtomicInteger();
-      final AtomicInteger n = new AtomicInteger();
-      for(; n.get() < size; ) {
-        final int offset = n.get();
-        final int remaining = size - offset;
-        final int length = Math.min(remaining, buffer.length);
-        final boolean close = length == remaining;
-
-        final ByteBuffer b = randomBytes(length, r);
-
-        callCount.incrementAndGet();
-        n.addAndGet(length);
-
-        LOG.trace("writeAsync {}, offset={}, length={}, close? {}",
-            fileName, offset, length, close);
-        client.writeAsync(fileName, offset, close, b)
-            .thenAcceptAsync(written -> Assert.assertEquals(length, (long)written), asyncExecutor)
-            .thenRun(() -> {
-              final int count = callCount.decrementAndGet();
-              LOG.trace("writeAsync {}, offset={}, length={}, close? {}: n={}, callCount={}",
-                  fileName, offset, length, close, n.get(), count);
-              if (n.get() == size && count == 0) {
-                returnFuture.complete(this);
-              }
-            })
-            .exceptionally(e -> {
-              returnFuture.completeExceptionally(e);
-              return null;
-            });
-      }
-      return returnFuture;
-    }
-
-    Writer verify() throws IOException {
-      final Random r = new Random(seed);
-      final int size = fileSize.getSizeInt();
-
-      for(int offset = 0; offset < size; ) {
-        final int remaining = size - offset;
-        final int n = Math.min(remaining, buffer.length);
-        final ByteString read = client.read(fileName, offset, n);
-        final ByteBuffer expected = randomBytes(n, r);
-        verify(read, offset, n, expected);
-        offset += n;
-      }
-      return this;
-    }
-
-    CompletableFuture<Writer> verifyAsync() {
-      Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
-      final Random r = new Random(seed);
-      final int size = fileSize.getSizeInt();
-
-      final CompletableFuture<Writer> returnFuture = new CompletableFuture<>();
-      final AtomicInteger callCount = new AtomicInteger();
-      final AtomicInteger n = new AtomicInteger();
-      for(; n.get() < size; ) {
-        final int offset = n.get();
-        final int remaining = size - offset;
-        final int length = Math.min(remaining, buffer.length);
-
-        callCount.incrementAndGet();
-        n.addAndGet(length);
-        final ByteBuffer expected = ByteString.copyFrom(randomBytes(length, r)).asReadOnlyByteBuffer();
-
-        client.readAsync(fileName, offset, length)
-            .thenAcceptAsync(read -> verify(read, offset, length, expected), asyncExecutor)
-            .thenRun(() -> {
-              final int count = callCount.decrementAndGet();
-              LOG.trace("verifyAsync {}, offset={}, length={}: n={}, callCount={}",
-                  fileName, offset, length, n.get(), count);
-              if (n.get() == size && count == 0) {
-                returnFuture.complete(this);
-              }
-            })
-            .exceptionally(e -> {
-              returnFuture.completeExceptionally(e);
-              return null;
-            });
-      }
-      Assert.assertEquals(size, n.get());
-      return returnFuture;
-    }
-
-    void verify(ByteString read, int offset, int length, ByteBuffer expected) {
-      Assert.assertEquals(length, read.size());
-      assertBuffers(offset, length, expected, read.asReadOnlyByteBuffer());
-    }
-
-    CompletableFuture<Writer> deleteAsync() {
-      Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
-      return client.deleteAsync(fileName).thenApplyAsync(reply -> this, asyncExecutor);
-    }
-
-    Writer delete() throws IOException {
-      client.delete(fileName);
-      return this;
-    }
-
-    @Override
-    public void close() throws IOException {
-      client.close();
-    }
-  }
-
-  static void assertBuffers(int offset, int length, ByteBuffer expected, ByteBuffer computed) {
-    try {
-      Assert.assertEquals(expected, computed);
-    } catch(AssertionError e) {
-      LOG.error("Buffer mismatched at offset=" + offset + ", length=" + length
-          + "\n  expected = " + StringUtils.bytes2HexString(expected)
-          + "\n  computed = " + StringUtils.bytes2HexString(computed), e);
-      throw e;
-    }
-  }
 }
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
index 2e6f40d..598dc0c 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
@@ -19,16 +19,13 @@ package org.apache.ratis.examples.filestore;
 
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.conf.ConfUtils;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.datastream.DataStreamTestUtils;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
-import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.function.CheckedSupplier;
 import org.junit.Assert;
@@ -38,11 +35,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluster>
     extends BaseTest
@@ -60,7 +59,7 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
   static final int NUM_PEERS = 3;
 
   @Test
-  public void testFileStoreStream() throws Exception {
+  public void testFileStoreStreamSingleFile() throws Exception {
     final CLUSTER cluster = newCluster(NUM_PEERS);
     cluster.start();
     RaftTestUtil.waitForLeader(cluster);
@@ -72,9 +71,30 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
 
     final CheckedSupplier<FileStoreClient, IOException> newClient =
         () -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
-    // TODO: configurable buffer size
-    final int bufferSize = 10_000;
-    testSingleFile("foo", SizeInBytes.valueOf("2M"), bufferSize, newClient);
+
+    testSingleFile("foo", SizeInBytes.valueOf("2M"), 10_000, newClient);
+    testSingleFile("bar", SizeInBytes.valueOf("2M"), 1000, newClient);
+    testSingleFile("sar", SizeInBytes.valueOf("20M"), 100_000, newClient);
+
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testFileStoreStreamMultipleFiles() throws Exception {
+    final CLUSTER cluster = newCluster(NUM_PEERS);
+    cluster.start();
+    RaftTestUtil.waitForLeader(cluster);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_PEERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    final CheckedSupplier<FileStoreClient, IOException> newClient =
+        () -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
+
+    testMultipleFiles("foo", 5, SizeInBytes.valueOf("2M"), 10_000, newClient);
+    testMultipleFiles("bar", 10, SizeInBytes.valueOf("2M"), 1000, newClient);
 
     cluster.shutdown();
   }
@@ -84,36 +104,37 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
       IOException> newClient)
       throws Exception {
     LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
-    final int size = fileLength.getSizeInt();
-    try (FileStoreClient client = newClient.get()) {
-      final DataStreamOutput dataStreamOutput = client.getStreamOutput(path, size);
-      final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
-      final List<Integer> sizes = new ArrayList<>();
-
-      for(int offset = 0; offset < size; ) {
-        final int remaining = size - offset;
-        final int length = Math.min(remaining, bufferSize);
-        final boolean close = length == remaining;
-
-        LOG.trace("write {}, offset={}, length={}, close? {}",
-            path, offset, length, close);
-        final ByteBuffer bf = DataStreamTestUtils.initBuffer(0, length);
-        futures.add(dataStreamOutput.writeAsync(bf, close));
-        sizes.add(length);
-        offset += length;
-      }
-
-      DataStreamReply reply = dataStreamOutput.closeAsync().join();
-      Assert.assertTrue(reply.isSuccess());
-
-      // TODO: handle when any of the writeAsync has failed.
-      // check writeAsync requests
-      for (int i = 0; i < futures.size(); i++) {
-        reply = futures.get(i).join();
-        Assert.assertTrue(reply.isSuccess());
-        Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
-        Assert.assertEquals(reply.getType(), i == futures.size() - 1 ? DataStreamPacketHeaderProto.Type.STREAM_DATA_SYNC : DataStreamPacketHeaderProto.Type.STREAM_DATA);
-      }
+    FileStoreWriter.newBuilder()
+        .setFileName(path)
+        .setFileSize(fileLength)
+        .setBufferSize(bufferSize)
+        .setFileStoreClientSupplier(newClient)
+        .build().streamWriteAndVerify();
+  }
+
+  private void testMultipleFiles(String pathBase, int numFile, SizeInBytes fileLength,
+      int bufferSize, CheckedSupplier<FileStoreClient, IOException> newClient) throws Exception {
+    final ExecutorService executor = Executors.newFixedThreadPool(numFile);
+
+    final List<Future<FileStoreWriter>> writerFutures = new ArrayList<>();
+    for (int i = 0; i < numFile; i++) {
+      String path =  pathBase + "-" + i;
+      final Callable<FileStoreWriter> callable = LogUtils.newCallable(LOG,
+          () -> FileStoreWriter.newBuilder()
+              .setFileName(path)
+              .setFileSize(fileLength)
+              .setBufferSize(bufferSize)
+              .setFileStoreClientSupplier(newClient)
+              .build().streamWriteAndVerify(),
+          () -> path);
+      writerFutures.add(executor.submit(callable));
+    }
+    for (Future<FileStoreWriter> future : writerFutures) {
+      future.get();
     }
   }
+
+  static class StreamWriter {
+
+  }
 }
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
new file mode 100644
index 0000000..343b0b1
--- /dev/null
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.datastream.DataStreamTestUtils;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.netty.util.internal.ThreadLocalRandom;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.StringUtils;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class FileStoreWriter implements Closeable {
+  public static final Logger LOG = LoggerFactory.getLogger(FileStoreWriter.class);
+
+  final long seed = ThreadLocalRandom.current().nextLong();
+  final byte[] buffer = new byte[4 << 10];
+
+  final String fileName;
+  final SizeInBytes fileSize;
+  final FileStoreClient client;
+  final Executor asyncExecutor;
+  final int bufferSize;
+
+  static Builder newBuilder() {
+    return new Builder();
+  }
+
+  static class Builder {
+    private String fileName;
+    private SizeInBytes fileSize;
+    private CheckedSupplier<FileStoreClient, IOException> clientSupplier;
+    private Executor asyncExecutor;
+    private int bufferSize;
+
+    public Builder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public Builder setFileSize(SizeInBytes size) {
+      this.fileSize = size;
+      return this;
+    }
+
+    public Builder setFileStoreClientSupplier(CheckedSupplier<FileStoreClient, IOException> supplier) {
+      this.clientSupplier = supplier;
+      return this;
+    }
+
+    public Builder setAsyncExecutor(Executor asyncExecutor) {
+      this.asyncExecutor = asyncExecutor;
+      return this;
+    }
+
+    public Builder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public FileStoreWriter build() throws IOException {
+      return new FileStoreWriter(fileName, fileSize, asyncExecutor, clientSupplier, bufferSize);
+    }
+  }
+
+  private FileStoreWriter(String fileName, SizeInBytes fileSize, Executor asyncExecutor,
+         CheckedSupplier<FileStoreClient, IOException> clientSupplier, int bufferSize)
+      throws IOException {
+    this.fileName = fileName;
+    this.fileSize = fileSize;
+    this.client = clientSupplier.get();
+    this.asyncExecutor = asyncExecutor;
+    this.bufferSize = bufferSize;
+  }
+
+  ByteBuffer randomBytes(int length, Random random) {
+    Preconditions.assertTrue(length <= buffer.length);
+    random.nextBytes(buffer);
+    final ByteBuffer b = ByteBuffer.wrap(buffer);
+    b.limit(length);
+    return b;
+  }
+
+  FileStoreWriter write() throws IOException {
+    final Random r = new Random(seed);
+    final int size = fileSize.getSizeInt();
+
+    for(int offset = 0; offset < size; ) {
+      final int remaining = size - offset;
+      final int length = Math.min(remaining, buffer.length);
+      final boolean close = length == remaining;
+
+      final ByteBuffer b = randomBytes(length, r);
+
+      LOG.trace("write {}, offset={}, length={}, close? {}",
+          fileName, offset, length, close);
+      final long written = client.write(fileName, offset, close, b);
+      Assert.assertEquals(length, written);
+      offset += written;
+    }
+    return this;
+  }
+
+  public FileStoreWriter streamWriteAndVerify() throws IOException {
+    final int size = fileSize.getSizeInt();
+    final DataStreamOutput dataStreamOutput = client.getStreamOutput(fileName, size);
+    final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+    final List<Integer> sizes = new ArrayList<>();
+
+    for(int offset = 0; offset < size; ) {
+      final int remaining = size - offset;
+      final int length = Math.min(remaining, bufferSize);
+      final boolean close = length == remaining;
+
+      LOG.trace("write {}, offset={}, length={}, close? {}",
+          fileName, offset, length, close);
+      final ByteBuffer bf = DataStreamTestUtils.initBuffer(0, length);
+      futures.add(dataStreamOutput.writeAsync(bf, close));
+      sizes.add(length);
+      offset += length;
+    }
+
+    DataStreamReply reply = dataStreamOutput.closeAsync().join();
+    Assert.assertTrue(reply.isSuccess());
+
+    // TODO: handle when any of the writeAsync has failed.
+    // check writeAsync requests
+    for (int i = 0; i < futures.size(); i++) {
+      reply = futures.get(i).join();
+      Assert.assertTrue(reply.isSuccess());
+      Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
+      Assert.assertEquals(reply.getType(), i == futures.size() - 1 ? RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA_SYNC : RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA);
+    }
+
+    return this;
+  }
+
+  CompletableFuture<FileStoreWriter> writeAsync() {
+    Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
+    final Random r = new Random(seed);
+    final int size = fileSize.getSizeInt();
+
+    final CompletableFuture<FileStoreWriter> returnFuture = new CompletableFuture<>();
+    final AtomicInteger callCount = new AtomicInteger();
+    final AtomicInteger n = new AtomicInteger();
+    for(; n.get() < size; ) {
+      final int offset = n.get();
+      final int remaining = size - offset;
+      final int length = Math.min(remaining, buffer.length);
+      final boolean close = length == remaining;
+
+      final ByteBuffer b = randomBytes(length, r);
+
+      callCount.incrementAndGet();
+      n.addAndGet(length);
+
+      LOG.trace("writeAsync {}, offset={}, length={}, close? {}",
+          fileName, offset, length, close);
+      client.writeAsync(fileName, offset, close, b)
+          .thenAcceptAsync(written -> Assert.assertEquals(length, (long)written), asyncExecutor)
+          .thenRun(() -> {
+            final int count = callCount.decrementAndGet();
+            LOG.trace("writeAsync {}, offset={}, length={}, close? {}: n={}, callCount={}",
+                fileName, offset, length, close, n.get(), count);
+            if (n.get() == size && count == 0) {
+              returnFuture.complete(this);
+            }
+          })
+          .exceptionally(e -> {
+            returnFuture.completeExceptionally(e);
+            return null;
+          });
+    }
+    return returnFuture;
+  }
+
+  FileStoreWriter verify() throws IOException {
+    final Random r = new Random(seed);
+    final int size = fileSize.getSizeInt();
+
+    for(int offset = 0; offset < size; ) {
+      final int remaining = size - offset;
+      final int n = Math.min(remaining, buffer.length);
+      final ByteString read = client.read(fileName, offset, n);
+      final ByteBuffer expected = randomBytes(n, r);
+      verify(read, offset, n, expected);
+      offset += n;
+    }
+    return this;
+  }
+
+  CompletableFuture<FileStoreWriter> verifyAsync() {
+    Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
+    final Random r = new Random(seed);
+    final int size = fileSize.getSizeInt();
+
+    final CompletableFuture<FileStoreWriter> returnFuture = new CompletableFuture<>();
+    final AtomicInteger callCount = new AtomicInteger();
+    final AtomicInteger n = new AtomicInteger();
+    for(; n.get() < size; ) {
+      final int offset = n.get();
+      final int remaining = size - offset;
+      final int length = Math.min(remaining, buffer.length);
+
+      callCount.incrementAndGet();
+      n.addAndGet(length);
+      final ByteBuffer expected = ByteString.copyFrom(randomBytes(length, r)).asReadOnlyByteBuffer();
+
+      client.readAsync(fileName, offset, length)
+          .thenAcceptAsync(read -> verify(read, offset, length, expected), asyncExecutor)
+          .thenRun(() -> {
+            final int count = callCount.decrementAndGet();
+            LOG.trace("verifyAsync {}, offset={}, length={}: n={}, callCount={}",
+                fileName, offset, length, n.get(), count);
+            if (n.get() == size && count == 0) {
+              returnFuture.complete(this);
+            }
+          })
+          .exceptionally(e -> {
+            returnFuture.completeExceptionally(e);
+            return null;
+          });
+    }
+    Assert.assertEquals(size, n.get());
+    return returnFuture;
+  }
+
+  void verify(ByteString read, int offset, int length, ByteBuffer expected) {
+    Assert.assertEquals(length, read.size());
+    assertBuffers(offset, length, expected, read.asReadOnlyByteBuffer());
+  }
+
+  CompletableFuture<FileStoreWriter> deleteAsync() {
+    Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
+    return client.deleteAsync(fileName).thenApplyAsync(reply -> this, asyncExecutor);
+  }
+
+  FileStoreWriter delete() throws IOException {
+    client.delete(fileName);
+    return this;
+  }
+
+  @Override
+  public void close() throws IOException {
+    client.close();
+  }
+
+  static void assertBuffers(int offset, int length, ByteBuffer expected, ByteBuffer computed) {
+    try {
+      Assert.assertEquals(expected, computed);
+    } catch(AssertionError e) {
+      LOG.error("Buffer mismatched at offset=" + offset + ", length=" + length
+          + "\n  expected = " + StringUtils.bytes2HexString(expected)
+          + "\n  computed = " + StringUtils.bytes2HexString(computed), e);
+      throw e;
+    }
+  }
+}