You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/08 14:48:42 UTC

[incubator-ratis] branch master updated: RATIS-1218. Add sync option when test filestore performance (#337)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new efca97e  RATIS-1218. Add sync option when test filestore performance (#337)
efca97e is described below

commit efca97e57ddf0f4b5980982140433f16fda7dfde
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Tue Dec 8 22:48:35 2020 +0800

    RATIS-1218. Add sync option when test filestore performance (#337)
---
 .../apache/ratis/examples/filestore/FileInfo.java  | 65 +++++-----------------
 .../apache/ratis/examples/filestore/FileStore.java |  8 +--
 .../ratis/examples/filestore/FileStoreClient.java  | 13 +++--
 .../examples/filestore/FileStoreStateMachine.java  |  3 +-
 .../ratis/examples/filestore/cli/Client.java       | 15 +++--
 .../ratis/examples/filestore/cli/DataStream.java   | 33 +++++++++--
 .../ratis/examples/filestore/cli/LoadGen.java      | 10 +++-
 .../examples/filestore/FileStoreAsyncBaseTest.java |  4 +-
 .../examples/filestore/FileStoreBaseTest.java      |  4 +-
 .../ratis/examples/filestore/FileStoreWriter.java  |  8 +--
 ratis-proto/src/main/proto/Examples.proto          |  1 +
 11 files changed, 80 insertions(+), 84 deletions(-)

diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
index 886a646..5d136e3 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
@@ -20,7 +20,6 @@ package org.apache.ratis.examples.filestore;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.CollectionUtils;
-import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.Preconditions;
@@ -30,13 +29,10 @@ import org.apache.ratis.util.function.CheckedSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
 import java.io.IOException;
-import java.io.OutputStream;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
 import java.nio.channels.SeekableByteChannel;
-import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
@@ -78,7 +74,6 @@ abstract class FileInfo {
 
   ByteString read(CheckedFunction<Path, Path, IOException> resolver, long offset, long length, boolean readCommitted)
       throws IOException {
-    flush();
     if (readCommitted && offset + length > getCommittedSize()) {
       throw new IOException("Failed to read Committed: offset (=" + offset
           + " + length (=" + length + ") > size = " + getCommittedSize()
@@ -124,30 +119,6 @@ abstract class FileInfo {
     }
   }
 
-  static class FileOut implements Closeable {
-    private final OutputStream out;
-    private final WritableByteChannel channel;
-
-    FileOut(Path p) throws IOException {
-      this.out = FileUtils.createNewFile(p);
-      this.channel = Channels.newChannel(out);
-    }
-
-    int write(ByteBuffer data) throws IOException {
-      return channel.write(data);
-    }
-
-    void flush() throws IOException {
-      out.flush();
-    }
-
-    @Override
-    public void close() throws IOException {
-      channel.close();
-      out.close();
-    }
-  }
-
   static class WriteInfo {
     /** Future to make sure that each commit is executed after the corresponding write. */
     private final CompletableFuture<Integer> writeFuture;
@@ -176,14 +147,12 @@ abstract class FileInfo {
   }
 
   static class UnderConstruction extends FileInfo {
-    private FileOut out;
+    private FileStore.FileStoreDataChannel out;
 
     /** The size written to a local file. */
     private volatile long writeSize;
     /** The size committed to client. */
     private volatile long committedSize;
-    /** The size at last flush. */
-    private volatile long flushSize;
 
     /** A queue to make sure that the writes are in order. */
     private final TaskQueue writeQueue = new TaskQueue("writeQueue");
@@ -211,26 +180,27 @@ abstract class FileInfo {
     }
 
     CompletableFuture<Integer> submitCreate(
-        CheckedFunction<Path, Path, IOException> resolver, ByteString data, boolean close,
+        CheckedFunction<Path, Path, IOException> resolver, ByteString data, boolean close, boolean sync,
         ExecutorService executor, RaftPeerId id, long index) {
       final Supplier<String> name = () -> "create(" + getRelativePath() + ", "
           + close + ") @" + id + ":" + index;
       final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
         if (out == null) {
-          out = new FileOut(resolver.apply(getRelativePath()));
+          out = new FileStore.FileStoreDataChannel(new RandomAccessFile(resolver.apply(getRelativePath()).toFile(),
+              "rw"));
         }
-        return write(0L, data, close);
+        return write(0L, data, close, sync);
       }, name);
       return submitWrite(task, executor, id, index);
     }
 
     CompletableFuture<Integer> submitWrite(
-        long offset, ByteString data, boolean close, ExecutorService executor,
+        long offset, ByteString data, boolean close, boolean sync, ExecutorService executor,
         RaftPeerId id, long index) {
       final Supplier<String> name = () -> "write(" + getRelativePath() + ", "
           + offset + ", " + close + ") @" + id + ":" + index;
       final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG,
-          () -> write(offset, data, close), name);
+          () -> write(offset, data, close, sync), name);
       return submitWrite(task, executor, id, index);
     }
 
@@ -244,7 +214,7 @@ abstract class FileInfo {
       return f;
     }
 
-    private int write(long offset, ByteString data, boolean close) throws IOException {
+    private int write(long offset, ByteString data, boolean close, boolean sync) throws IOException {
       // If leader finish write data with offset = 4096 and writeSize become 8192,
       // and 2 follower has not written the data with offset = 4096,
       // then start a leader election. So client will retry send the data with offset = 4096,
@@ -273,6 +243,10 @@ abstract class FileInfo {
           }
         }
 
+        if (sync) {
+          out.force(false);
+        }
+
         if (close) {
           out.close();
         }
@@ -280,19 +254,6 @@ abstract class FileInfo {
       }
     }
 
-    void flush() throws IOException {
-      if (flushSize >= committedSize) {
-        return;
-      }
-      synchronized (out) {
-        if (flushSize >= committedSize) {
-          return;
-        }
-        out.flush();
-        flushSize = writeSize;
-      }
-    }
-
     CompletableFuture<Integer> submitCommit(
         long offset, int size, Function<UnderConstruction, ReadOnly> closeFunction,
         ExecutorService executor, RaftPeerId id, long index) {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
index 84fab1a..e910a99 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
@@ -195,7 +195,7 @@ public class FileStore implements Closeable {
   }
 
   CompletableFuture<Integer> write(
-      long index, String relative, boolean close, long offset, ByteString data) {
+      long index, String relative, boolean close, boolean sync, long offset, ByteString data) {
     final int size = data != null? data.size(): 0;
     LOG.trace("write {}, offset={}, size={}, close? {} @{}:{}",
         relative, offset, size, close, getId(), index);
@@ -214,8 +214,8 @@ public class FileStore implements Closeable {
     }
 
     return size == 0 && !close? CompletableFuture.completedFuture(0)
-        : createNew? uc.submitCreate(this::resolve, data, close, writer, getId(), index)
-        : uc.submitWrite(offset, data, close, writer, getId(), index);
+        : createNew? uc.submitCreate(this::resolve, data, close, sync, writer, getId(), index)
+        : uc.submitWrite(offset, data, close, sync, writer, getId(), index);
   }
 
   @Override
@@ -266,7 +266,7 @@ public class FileStore implements Closeable {
     }, writer);
   }
 
-  class FileStoreDataChannel implements StateMachine.DataChannel {
+  static class FileStoreDataChannel implements StateMachine.DataChannel {
     private final RandomAccessFile randomAccessFile;
 
     FileStoreDataChannel(RandomAccessFile file) {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index 51dc100..509b144 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -145,11 +145,11 @@ public class FileStoreClient implements Closeable {
     return sendReadOnlyFunction.apply(read.toByteString());
   }
 
-  public long write(String path, long offset, boolean close, ByteBuffer buffer)
+  public long write(String path, long offset, boolean close, ByteBuffer buffer, boolean sync)
       throws IOException {
     final int chunkSize = FileStoreCommon.getChunkSize(buffer.remaining());
     buffer.limit(chunkSize);
-    final ByteString reply = writeImpl(this::send, path, offset, close, buffer);
+    final ByteString reply = writeImpl(this::send, path, offset, close, buffer, sync);
     return WriteReplyProto.parseFrom(reply).getLength();
   }
 
@@ -162,21 +162,22 @@ public class FileStoreClient implements Closeable {
     return client.getDataStreamApi().stream(request.toByteString().asReadOnlyByteBuffer());
   }
 
-  public CompletableFuture<Long> writeAsync(String path, long offset, boolean close, ByteBuffer buffer) {
-    return writeImpl(this::sendAsync, path, offset, close, buffer
+  public CompletableFuture<Long> writeAsync(String path, long offset, boolean close, ByteBuffer buffer, boolean sync) {
+    return writeImpl(this::sendAsync, path, offset, close, buffer, sync
     ).thenApply(reply -> JavaUtils.supplyAndWrapAsCompletionException(
         () -> WriteReplyProto.parseFrom(reply).getLength()));
   }
 
   private static <OUTPUT, THROWABLE extends Throwable> OUTPUT writeImpl(
       CheckedFunction<ByteString, OUTPUT, THROWABLE> sendFunction,
-      String path, long offset, boolean close, ByteBuffer data)
+      String path, long offset, boolean close, ByteBuffer data, boolean sync)
       throws THROWABLE {
     final WriteRequestHeaderProto.Builder header = WriteRequestHeaderProto.newBuilder()
         .setPath(ProtoUtils.toByteString(path))
         .setOffset(offset)
         .setLength(data.remaining())
-        .setClose(close);
+        .setClose(close)
+        .setSync(sync);
 
     final WriteRequestProto.Builder write = WriteRequestProto.newBuilder()
         .setHeader(header)
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index aa99240..0dc1561 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -127,7 +127,8 @@ public class FileStoreStateMachine extends BaseStateMachine {
 
     final WriteRequestHeaderProto h = proto.getWriteHeader();
     final CompletableFuture<Integer> f = files.write(entry.getIndex(),
-        h.getPath().toStringUtf8(), h.getClose(), h.getOffset(), smLog.getStateMachineEntry().getStateMachineData());
+        h.getPath().toStringUtf8(), h.getClose(),  h.getSync(), h.getOffset(),
+        smLog.getStateMachineEntry().getStateMachineData());
     // sync only if closing the file
     return h.getClose()? f: null;
   }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
index fb12f45..8b45453 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
@@ -41,6 +41,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -107,19 +108,25 @@ public abstract class Client extends SubCommandBase {
     operation(client);
   }
 
-  public List<String> generateFiles() throws IOException {
+
+  protected void stop(RaftClient client) throws IOException {
+    client.close();
+    System.exit(0);
+  }
+
+  protected List<String> generateFiles() throws IOException {
     String entropy = RandomStringUtils.randomAlphanumeric(numFiles);
     List<String> paths = new ArrayList<>();
     for (int i = 0; i < numFiles; i ++) {
       String path = "file-" + entropy + "-" + i;
       paths.add(path);
-      writeFile(path, fileSizeInBytes, bufferSizeInBytes);
+      writeFile(path, fileSizeInBytes, bufferSizeInBytes, new Random().nextInt(127) + 1);
     }
 
     return paths;
   }
 
-  public void writeFile(String path, int fileSize, int bufferSize) throws IOException {
+  protected void writeFile(String path, int fileSize, int bufferSize, int random) throws IOException {
     RandomAccessFile raf = null;
     try {
       raf = new RandomAccessFile(path, "rw");
@@ -129,7 +136,7 @@ public abstract class Client extends SubCommandBase {
         final int chunkSize = Math.min(remaining, bufferSize);
         byte[] buffer = new byte[chunkSize];
         for (int i = 0; i < chunkSize; i ++) {
-          buffer[i]= (byte) ('A' + i % 23);
+          buffer[i]= (byte) (i % random);
         }
         raf.write(buffer);
         offset += chunkSize;
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index dd653be..8dab01a 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -49,8 +49,32 @@ public class DataStream extends Client {
   @Parameter(names = {"--type"}, description = "DirectByteBuffer, MappedByteBuffer, NettyFileRegion", required = true)
   private String dataStreamType = "NettyFileRegion";
 
+  @Parameter(names = {"--syncSize"}, description = "Sync every syncSize, syncSize % bufferSize should be zero," +
+      "-1 means on sync", required = true)
+  private int syncSize = -1;
+
+  private boolean checkParam() {
+    if (syncSize != -1 && syncSize % getBufferSizeInBytes() != 0) {
+      System.err.println("Error: syncSize % bufferSize should be zero");
+      return false;
+    }
+
+    if (!dataStreamType.equals("DirectByteBuffer") &&
+        !dataStreamType.equals("MappedByteBuffer") &&
+        !dataStreamType.equals("NettyFileRegion")) {
+      System.err.println("Error: dataStreamType should be one of DirectByteBuffer, MappedByteBuffer, transferTo");
+      return false;
+    }
+
+    return true;
+  }
+
   @Override
   protected void operation(RaftClient client) throws IOException {
+    if (!checkParam()) {
+      stop(client);
+    }
+
     List<String> paths = generateFiles();
     FileStoreClient fileStoreClient = new FileStoreClient(client);
     System.out.println("Starting DataStream write now ");
@@ -66,8 +90,7 @@ public class DataStream extends Client {
     System.out.println("Total data written: " + totalWrittenBytes + " bytes");
     System.out.println("Total time taken: " + (endTime - startTime) + " millis");
 
-    client.close();
-    System.exit(0);
+    stop(client);
   }
 
   private Map<String, List<CompletableFuture<DataStreamReply>>> streamWrite(
@@ -87,8 +110,6 @@ public class DataStream extends Client {
         fileMap.put(path, writeByMappedByteBuffer(dataStreamOutput, fis.getChannel()));
       } else if (dataStreamType.equals("NettyFileRegion")) {
         fileMap.put(path, writeByNettyFileRegion(dataStreamOutput, file));
-      } else {
-        System.err.println("Error: dataStreamType should be one of DirectByteBuffer, MappedByteBuffer, transferTo");
       }
 
       dataStreamOutput.closeAsync();
@@ -131,8 +152,8 @@ public class DataStream extends Client {
             + " byte(s). The channel has reached end-of-stream at " + offset);
       } else if (bytesRead > 0) {
         offset += bytesRead;
-
-        final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(buf.nioBuffer());
+        final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(buf.nioBuffer(),
+            syncSize > 0 && (offset == fileSize || offset % syncSize == 0));
         f.thenRun(buf::release);
         futures.add(f);
       }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
index 5a14e65..45ee600 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.examples.filestore.cli;
 
+import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.examples.filestore.FileStoreClient;
@@ -37,6 +38,9 @@ import java.util.concurrent.CompletableFuture;
 @Parameters(commandDescription = "Load Generator for FileStore")
 public class LoadGen extends Client {
 
+  @Parameter(names = {"--sync"}, description = "Whether sync every bufferSize", required = true)
+  private int sync = 0;
+
   @Override
   protected void operation(RaftClient client) throws IOException {
     List<String> paths = generateFiles();
@@ -54,8 +58,7 @@ public class LoadGen extends Client {
     System.out.println("Total data written: " + totalWrittenBytes + " bytes");
     System.out.println("Total time taken: " + (endTime - startTime) + " millis");
 
-    client.close();
-    System.exit(0);
+    stop(client);
   }
 
   private Map<String, List<CompletableFuture<Long>>> writeByHeapByteBuffer(
@@ -76,7 +79,8 @@ public class LoadGen extends Client {
       long offset = 0L;
       while(fis.read(buffer, 0, bytesToRead) > 0) {
         ByteBuffer b = ByteBuffer.wrap(buffer);
-        futures.add(fileStoreClient.writeAsync(path, offset, offset + bytesToRead == getFileSizeInBytes(), b));
+        futures.add(fileStoreClient.writeAsync(path, offset, offset + bytesToRead == getFileSizeInBytes(), b,
+            sync == 1));
         offset += bytesToRead;
         bytesToRead = (int)Math.min(getFileSizeInBytes() - offset, getBufferSizeInBytes());
         if (bytesToRead > 0) {
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
index d70cfa6..eb51e64 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
@@ -77,7 +77,7 @@ public abstract class FileStoreAsyncBaseTest<CLUSTER extends MiniRaftCluster>
         .setAsyncExecutor(executor)
         .setFileStoreClientSupplier(() -> client)
         .build()
-        .writeAsync()
+        .writeAsync(false)
         .thenCompose(FileStoreWriter::verifyAsync)
         .thenCompose(FileStoreWriter::deleteAsync)
         .get();
@@ -99,7 +99,7 @@ public abstract class FileStoreAsyncBaseTest<CLUSTER extends MiniRaftCluster>
               .setAsyncExecutor(executor)
               .setFileStoreClientSupplier(() -> client)
               .build()
-              .writeAsync(),
+              .writeAsync(false),
           () -> path + ":" + fileLength);
       writerFutures.add(callable.call());
     }
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
index cf01050..c455aeb 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
@@ -92,7 +92,7 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
                  .setFileSize(fileLength)
                  .setFileStoreClientSupplier(newClient)
                  .build()) {
-      w.write().verify().delete();
+      w.write(false).verify().delete();
     }
   }
 
@@ -112,7 +112,7 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
               .setFileName(path)
               .setFileSize(fileLength)
               .setFileStoreClientSupplier(newClient)
-              .build().write(),
+              .build().write(false),
           () -> path + ":" + fileLength);
       writerFutures.add(executor.submit(callable));
     }
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
index 343b0b1..1caa59f 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
@@ -113,7 +113,7 @@ class FileStoreWriter implements Closeable {
     return b;
   }
 
-  FileStoreWriter write() throws IOException {
+  FileStoreWriter write(boolean sync) throws IOException {
     final Random r = new Random(seed);
     final int size = fileSize.getSizeInt();
 
@@ -126,7 +126,7 @@ class FileStoreWriter implements Closeable {
 
       LOG.trace("write {}, offset={}, length={}, close? {}",
           fileName, offset, length, close);
-      final long written = client.write(fileName, offset, close, b);
+      final long written = client.write(fileName, offset, close, b, sync);
       Assert.assertEquals(length, written);
       offset += written;
     }
@@ -167,7 +167,7 @@ class FileStoreWriter implements Closeable {
     return this;
   }
 
-  CompletableFuture<FileStoreWriter> writeAsync() {
+  CompletableFuture<FileStoreWriter> writeAsync(boolean sync) {
     Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
     final Random r = new Random(seed);
     final int size = fileSize.getSizeInt();
@@ -188,7 +188,7 @@ class FileStoreWriter implements Closeable {
 
       LOG.trace("writeAsync {}, offset={}, length={}, close? {}",
           fileName, offset, length, close);
-      client.writeAsync(fileName, offset, close, b)
+      client.writeAsync(fileName, offset, close, b, sync)
           .thenAcceptAsync(written -> Assert.assertEquals(length, (long)written), asyncExecutor)
           .thenRun(() -> {
             final int count = callCount.decrementAndGet();
diff --git a/ratis-proto/src/main/proto/Examples.proto b/ratis-proto/src/main/proto/Examples.proto
index 42e47de..ecf750d 100644
--- a/ratis-proto/src/main/proto/Examples.proto
+++ b/ratis-proto/src/main/proto/Examples.proto
@@ -41,6 +41,7 @@ message WriteRequestHeaderProto {
   bool close = 2; // close the file after write?
   uint64 offset = 3;
   uint64 length = 4;
+  bool sync = 5;
 }
 
 message StreamWriteRequestProto {