You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/08/16 16:28:55 UTC

[ratis] branch branch-2 updated: RATIS-1644. Provide a safe async flush. (#699)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 2ecb09d75 RATIS-1644. Provide a safe async flush. (#699)
2ecb09d75 is described below

commit 2ecb09d7521c51eacb68d2cca68b84838f63f26f
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Wed Aug 17 00:21:13 2022 +0800

    RATIS-1644. Provide a safe async flush. (#699)
    
    (cherry picked from commit fafca0a287b28fd27eced7371afd5bfad5bd06fe)
---
 .../apache/ratis/server/RaftServerConfigKeys.java  | 11 ++++
 .../apache/ratis/server/impl/ServerProtoUtils.java | 32 +++++++++-
 .../raftlog/segmented/BufferedWriteChannel.java    | 16 ++++-
 .../segmented/SegmentedRaftLogOutputStream.java    |  5 +-
 .../raftlog/segmented/SegmentedRaftLogWorker.java  | 43 ++++++++++---
 .../ratis/server/storage/SnapshotManager.java      |  3 +-
 .../statemachine/SimpleStateMachine4Testing.java   |  2 +-
 .../segmented/TestBufferedWriteChannel.java        |  2 +-
 .../server/raftlog/segmented/TestLogSegment.java   | 10 +--
 .../raftlog/segmented/TestRaftLogReadWrite.java    | 12 ++--
 .../raftlog/segmented/TestSegmentedRaftLog.java    | 73 ++++++++++++++++++++--
 11 files changed, 178 insertions(+), 31 deletions(-)

diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index ad05536f2..6d0c5d41b 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -369,6 +369,17 @@ public interface RaftServerConfigKeys {
       setBoolean(properties::setBoolean, UNSAFE_FLUSH_ENABLED_KEY, unsafeFlush);
     }
 
+    /** Async-flush will increase flush index until the actual flush has completed. */
+    String ASYNC_FLUSH_ENABLED_KEY = PREFIX + ".async-flush.enabled";
+    boolean ASYNC_FLUSH_ENABLED_DEFAULT = false;
+    static boolean asyncFlushEnabled(RaftProperties properties) {
+      return getBoolean(properties::getBoolean,
+          ASYNC_FLUSH_ENABLED_KEY, ASYNC_FLUSH_ENABLED_DEFAULT, getDefaultLog());
+    }
+    static void setAsyncFlushEnabled(RaftProperties properties, boolean asyncFlush) {
+      setBoolean(properties::setBoolean, ASYNC_FLUSH_ENABLED_KEY, asyncFlush);
+    }
+
     /** The policy to handle corrupted raft log. */
     enum CorruptionPolicy {
       /** Rethrow the exception. */
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index deae754c3..eccebb6fc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -30,7 +30,7 @@ import java.util.List;
 import java.util.Optional;
 
 /** Server proto utilities for internal use. */
-final class ServerProtoUtils {
+public final class ServerProtoUtils {
   private ServerProtoUtils() {}
 
   private static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
@@ -171,4 +171,34 @@ final class ServerProtoUtils {
         return false;
     }
   }
+
+  public static String convertToString(InstallSnapshotRequestProto request) {
+    final StringBuilder s = new StringBuilder();
+    final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunk =
+        request.getSnapshotChunk();
+    s.append(" { " + request.getServerRequest() + "leaderTerm: " + request.getLeaderTerm() + "\n");
+    if (request.hasSnapshotChunk()) {
+      s.append("snapshotChunk: {\n requestId: " + snapshotChunk.getRequestId() + "\n")
+          .append(" requestIndex: "  + snapshotChunk.getRequestIndex() + "\n")
+          .append(" raftConfiguration: " + snapshotChunk.getRaftConfiguration() + "\n")
+          .append(" termIndex: {\n  term: " + snapshotChunk.getTermIndex().getTerm() + "\n  index: " +
+              snapshotChunk.getTermIndex().getIndex() + "\n }\n");
+      for (FileChunkProto chunk : snapshotChunk.getFileChunksList()) {
+        s.append(" fileChunks: {\n  filename: " + chunk.getFilename() + "\n")
+            .append("  totalSize: " + chunk.getTotalSize() + "\n")
+            .append("  fileDigest: " + chunk.getFileDigest() + "\n")
+            .append("  chunkIndex: " + chunk.getChunkIndex() + "\n")
+            .append("  offset: " + chunk.getOffset() + "\n")
+            .append("  done: " + chunk.getDone() + "\n }\n");
+
+      }
+      s.append(" totalSize: " + snapshotChunk.getTotalSize() + "\n")
+          .append(" done: " + snapshotChunk.getDone()).append("\n}\n");
+    } else if (request.hasNotification()) {
+      s.append(" notification: " + request.getNotification() + "\n");
+    }
+
+    s.append(request.getLastRaftConfigurationLogEntryProto());
+    return s.toString();
+  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index ebc4852a2..9bc26596f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.raftlog.segmented;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.function.CheckedBiFunction;
 
@@ -26,9 +27,11 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 /**
  * Provides a buffering layer in front of a FileChannel for writing.
@@ -36,7 +39,8 @@ import java.util.concurrent.ExecutorService;
  * This class is NOT threadsafe.
  */
 class BufferedWriteChannel implements Closeable {
-  static BufferedWriteChannel open(File file, boolean append, ByteBuffer buffer) throws IOException {
+  static BufferedWriteChannel open(File file, boolean append, ByteBuffer buffer,
+      Supplier<CompletableFuture<Void>> flushFuture) throws IOException {
     final RandomAccessFile raf = new RandomAccessFile(file, "rw");
     final FileChannel fc = raf.getChannel();
     if (append) {
@@ -45,16 +49,19 @@ class BufferedWriteChannel implements Closeable {
       fc.truncate(0);
     }
     Preconditions.assertSame(fc.size(), fc.position(), "fc.position");
-    return new BufferedWriteChannel(fc, buffer);
+    return new BufferedWriteChannel(fc, buffer, flushFuture);
   }
 
   private final FileChannel fileChannel;
   private final ByteBuffer writeBuffer;
   private boolean forced = true;
+  private final Supplier<CompletableFuture<Void>> flushFuture;
 
-  BufferedWriteChannel(FileChannel fileChannel, ByteBuffer byteBuffer) {
+  BufferedWriteChannel(FileChannel fileChannel, ByteBuffer byteBuffer,
+      Supplier<CompletableFuture<Void>> flushFuture) {
     this.fileChannel = fileChannel;
     this.writeBuffer = byteBuffer;
+    this.flushFuture = flushFuture;
   }
 
   void write(byte[] b) throws IOException {
@@ -105,6 +112,7 @@ class BufferedWriteChannel implements Closeable {
     try {
       fileChannel.force(false);
     } catch (IOException e) {
+      LogSegment.LOG.error("Failed to flush channel", e);
       throw new CompletionException(e);
     }
     return null;
@@ -133,12 +141,14 @@ class BufferedWriteChannel implements Closeable {
   }
 
   @Override
+  @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")
   public void close() throws IOException {
     if (!isOpen()) {
       return;
     }
 
     try {
+      Optional.ofNullable(flushFuture).ifPresent(f -> f.get());
       fileChannel.truncate(fileChannel.position());
     } finally {
       fileChannel.close();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index 22eebac93..b96acdc61 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -34,6 +34,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import java.util.zip.Checksum;
 
 public class SegmentedRaftLogOutputStream implements Closeable {
@@ -57,13 +58,13 @@ public class SegmentedRaftLogOutputStream implements Closeable {
   private final long preallocatedSize;
 
   public SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSize,
-      long preallocatedSize, ByteBuffer byteBuffer)
+      long preallocatedSize, ByteBuffer byteBuffer, Supplier<CompletableFuture<Void>> flushFuture)
       throws IOException {
     this.file = file;
     this.checksum = new PureJavaCrc32C();
     this.segmentMaxSize = segmentMaxSize;
     this.preallocatedSize = preallocatedSize;
-    this.out = BufferedWriteChannel.open(file, append, byteBuffer);
+    this.out = BufferedWriteChannel.open(file, append, byteBuffer, flushFuture);
 
     if (!append) {
       // write header
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index e19584eff..599772f12 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -51,6 +51,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
@@ -175,8 +176,11 @@ class SegmentedRaftLogWorker {
   private final RaftServer.Division server;
   private int flushBatchSize;
 
+  private final boolean asyncFlush;
   private final boolean unsafeFlush;
   private final ExecutorService flushExecutor;
+  private final AtomicReference<CompletableFuture<Void>> flushFuture
+      = new AtomicReference<>(CompletableFuture.completedFuture(null));
 
   private final StateMachineDataPolicy stateMachineDataPolicy;
 
@@ -217,7 +221,12 @@ class SegmentedRaftLogWorker {
     final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
     this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
     this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);
-    this.flushExecutor = !unsafeFlush? null
+    this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled(properties);
+    if (asyncFlush && unsafeFlush) {
+      throw new IllegalStateException("Cannot enable both " +  RaftServerConfigKeys.Log.UNSAFE_FLUSH_ENABLED_KEY +
+          " and " + RaftServerConfigKeys.Log.ASYNC_FLUSH_ENABLED_KEY);
+    }
+    this.flushExecutor = (!asyncFlush && !unsafeFlush)? null
         : Executors.newSingleThreadExecutor(ConcurrentUtils.newThreadFactory(name + "-flush"));
   }
 
@@ -375,16 +384,19 @@ class SegmentedRaftLogWorker {
         if (unsafeFlush) {
           // unsafe-flush: call updateFlushedIndexIncreasingly() without waiting the underlying FileChannel.force(..).
           unsafeFlushOutStream();
+          updateFlushedIndexIncreasingly();
+        } else if (asyncFlush) {
+          asyncFlushOutStream(f);
         } else {
           flushOutStream();
           if (!stateMachineDataPolicy.isSync()) {
             IOUtils.getFromFuture(f, () -> this + "-flushStateMachineData");
           }
+          updateFlushedIndexIncreasingly();
         }
       } finally {
         timerContext.stop();
       }
-      updateFlushedIndexIncreasingly();
     }
   }
 
@@ -393,6 +405,17 @@ class SegmentedRaftLogWorker {
     out.asyncFlush(flushExecutor).whenComplete((v, e) -> logSyncTimerContext.stop());
   }
 
+  private void asyncFlushOutStream(CompletableFuture<Void> stateMachineFlush) throws IOException {
+    final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
+    final CompletableFuture<Void> f = out.asyncFlush(flushExecutor)
+        .thenCombine(stateMachineFlush, (async, sm) -> async);
+    flushFuture.updateAndGet(previous -> f.thenCombine(previous, (current, prev) -> current))
+        .whenComplete((v, e) -> {
+          updateFlushedIndexIncreasingly(lastWrittenIndex);
+          logSyncTimerContext.stop();
+        });
+  }
+
   private void flushOutStream() throws IOException {
     final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
     try {
@@ -403,14 +426,18 @@ class SegmentedRaftLogWorker {
   }
 
   private void updateFlushedIndexIncreasingly() {
-    final long i = lastWrittenIndex;
+    updateFlushedIndexIncreasingly(lastWrittenIndex);
+  }
+
+  private void updateFlushedIndexIncreasingly(long index) {
+    final long i = index;
     flushIndex.updateIncreasingly(i, traceIndexChange);
-    postUpdateFlushedIndex();
+    postUpdateFlushedIndex(Math.toIntExact(lastWrittenIndex - index));
     writeTasks.updateIndex(i);
   }
 
-  private void postUpdateFlushedIndex() {
-    pendingFlushNum = 0;
+  private void postUpdateFlushedIndex(int count) {
+    pendingFlushNum = count;
     Optional.ofNullable(submitUpdateCommitEvent).ifPresent(Runnable::run);
   }
 
@@ -691,7 +718,7 @@ class SegmentedRaftLogWorker {
       }
       flushIndex.setUnconditionally(lastWrittenIndex, infoIndexChange);
       safeCacheEvictIndex.setUnconditionally(lastWrittenIndex, infoIndexChange);
-      postUpdateFlushedIndex();
+      postUpdateFlushedIndex(0);
     }
 
     @Override
@@ -727,6 +754,6 @@ class SegmentedRaftLogWorker {
   private void allocateSegmentedRaftLogOutputStream(File file, boolean append) throws IOException {
     Preconditions.assertTrue(out == null && writeBuffer.position() == 0);
     out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize,
-            preallocatedSize, writeBuffer);
+            preallocatedSize, writeBuffer, flushFuture::get);
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index 8748de5e4..c72e9f78e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -31,6 +31,7 @@ import org.apache.ratis.io.MD5Hash;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.proto.RaftProtos.FileChunkProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.FileUtils;
@@ -74,7 +75,7 @@ public class SnapshotManager {
     FileUtils.createDirectories(tmpDir);
     tmpDir.deleteOnExit();
 
-    LOG.info("Installing snapshot:{}, to tmp dir:{}", request, tmpDir);
+    LOG.info("Installing snapshot:{}, to tmp dir:{}", ServerProtoUtils.convertToString(request), tmpDir);
 
     // TODO: Make sure that subsequent requests for the same installSnapshot are coming in order,
     // and are not lost when whole request cycle is done. Check requestId and requestIndex here
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 2b4d2b872..7b5527090 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -264,7 +264,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
     LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}", termIndex.getTerm(),
         termIndex.getIndex(), snapshotFile);
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(snapshotFile, false,
-        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
+        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null)) {
       for (final LogEntryProto entry : indexMap.values()) {
         if (entry.getIndex() > endIndex) {
           break;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java
index a7bb7000a..dec13903b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java
@@ -137,7 +137,7 @@ public class TestBufferedWriteChannel extends BaseTest {
     final byte[] bytes = new byte[10];
     final ByteBuffer buffer = ByteBuffer.wrap(bytes);
     final FakeFileChannel fake = new FakeFileChannel();
-    final BufferedWriteChannel out = new BufferedWriteChannel(fake, buffer);
+    final BufferedWriteChannel out = new BufferedWriteChannel(fake, buffer, null);
 
     // write exactly buffer size, then flush.
     fake.assertValues(0, 0);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 876519634..ba24f8997 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -93,11 +93,12 @@ public class TestLogSegment extends BaseTest {
 
     final LogEntryProto[] entries = new LogEntryProto[numEntries];
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
-        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
+        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null)) {
       for (int i = 0; i < entries.length; i++) {
         SimpleOperation op = new SimpleOperation("m" + i);
         entries[i] = LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i + startIndex);
         out.write(entries[i]);
+        LOG.info("Write entry with size {}", size(entries[i]));
       }
     }
 
@@ -293,7 +294,8 @@ public class TestLogSegment extends BaseTest {
     // make sure preallocation is correct with different max/pre-allocated size
     for (int max : maxSizes) {
       for (int a : preallocated) {
-        try(SegmentedRaftLogOutputStream ignored = new SegmentedRaftLogOutputStream(file, false, max, a, ByteBuffer.allocateDirect(bufferSize))) {
+        try(SegmentedRaftLogOutputStream ignored =
+                new SegmentedRaftLogOutputStream(file, false, max, a, ByteBuffer.allocateDirect(bufferSize), null)) {
           Assert.assertEquals("max=" + max + ", a=" + a, file.length(), Math.min(max, a));
         }
         try(SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, 0, INVALID_LOG_INDEX, true)) {
@@ -308,7 +310,7 @@ public class TestLogSegment extends BaseTest {
     Arrays.fill(content, (byte) 1);
     final long size;
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
-        1024, 1024, ByteBuffer.allocateDirect(bufferSize))) {
+        1024, 1024, ByteBuffer.allocateDirect(bufferSize), null)) {
       SimpleOperation op = new SimpleOperation(new String(content));
       LogEntryProto entry = LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
       size = LogSegment.getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
@@ -343,7 +345,7 @@ public class TestLogSegment extends BaseTest {
     long totalSize = SegmentedRaftLogFormat.getHeaderLength();
     long preallocated = 16 * 1024;
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
-        max.getSize(), 16 * 1024, ByteBuffer.allocateDirect(10 * 1024))) {
+        max.getSize(), 16 * 1024, ByteBuffer.allocateDirect(10 * 1024), null)) {
       Assert.assertEquals(preallocated, file.length());
       while (totalSize + entrySize < max.getSize()) {
         totalSize += entrySize;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
index 88b5e2f48..5d54eff03 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
@@ -108,7 +108,7 @@ public class TestRaftLogReadWrite extends BaseTest {
 
     final LogEntryProto[] entries = new LogEntryProto[100];
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
-        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
+        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null)) {
       size += writeMessages(entries, out);
     } finally {
       storage.close();
@@ -126,7 +126,7 @@ public class TestRaftLogReadWrite extends BaseTest {
     final File openSegment = LogSegmentStartEnd.valueOf(0).getFile(storage);
     LogEntryProto[] entries = new LogEntryProto[200];
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
-        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
+        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null)) {
       for (int i = 0; i < 100; i++) {
         SimpleOperation m = new SimpleOperation("m" + i);
         entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
@@ -135,7 +135,7 @@ public class TestRaftLogReadWrite extends BaseTest {
     }
 
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, true,
-        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
+        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null)) {
       for (int i = 100; i < 200; i++) {
         SimpleOperation m = new SimpleOperation("m" + i);
         entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
@@ -161,7 +161,7 @@ public class TestRaftLogReadWrite extends BaseTest {
 
     LogEntryProto[] entries = new LogEntryProto[100];
     final SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
-        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize));
+        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null);
     size += writeMessages(entries, out);
     out.flush();
 
@@ -189,7 +189,7 @@ public class TestRaftLogReadWrite extends BaseTest {
 
     LogEntryProto[] entries = new LogEntryProto[10];
     final SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
-        16 * 1024 * 1024, 4 * 1024 * 1024, ByteBuffer.allocateDirect(bufferSize));
+        16 * 1024 * 1024, 4 * 1024 * 1024, ByteBuffer.allocateDirect(bufferSize), null);
     for (int i = 0; i < 10; i++) {
       SimpleOperation m = new SimpleOperation("m" + i);
       entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
@@ -236,7 +236,7 @@ public class TestRaftLogReadWrite extends BaseTest {
     RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
     final File openSegment = LogSegmentStartEnd.valueOf(0).getFile(storage);
     try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
-        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
+        segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null)) {
       for (int i = 0; i < 100; i++) {
         LogEntryProto entry = LogProtoUtils.toLogEntryProto(
             new SimpleOperation("m" + i).getLogEntryContent(), 0, i);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index a78c102ba..2600e351e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -59,6 +59,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -68,12 +69,29 @@ import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
 import org.apache.ratis.thirdparty.com.codahale.metrics.Timer;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class TestSegmentedRaftLog extends BaseTest {
   static {
-    Log4jUtils.setLogLevel(SegmentedRaftLogWorker.LOG, Level.DEBUG);
-    Log4jUtils.setLogLevel(SegmentedRaftLogCache.LOG, Level.TRACE);
-    Log4jUtils.setLogLevel(SegmentedRaftLog.LOG, Level.TRACE);
+    Log4jUtils.setLogLevel(SegmentedRaftLogWorker.LOG, Level.INFO);
+    Log4jUtils.setLogLevel(SegmentedRaftLogCache.LOG, Level.INFO);
+    Log4jUtils.setLogLevel(SegmentedRaftLog.LOG, Level.INFO);
+  }
+
+  private final Boolean smSyncFlush;
+  private final Boolean useAsyncFlush;
+
+  public TestSegmentedRaftLog(Boolean raftLogAsync, Boolean smSync) {
+    this.useAsyncFlush = raftLogAsync;
+    this.smSyncFlush = smSync;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Boolean[]> data() {
+    return Arrays.asList((new Boolean[][] {{Boolean.FALSE, Boolean.FALSE}, {Boolean.FALSE, Boolean.TRUE},
+        {Boolean.TRUE, Boolean.FALSE}, {Boolean.TRUE, Boolean.TRUE}}));
   }
 
   public static long getOpenSegmentSize(RaftLog raftLog) {
@@ -133,6 +151,8 @@ public class TestSegmentedRaftLog extends BaseTest {
     storageDir = getTestDir();
     properties = new RaftProperties();
     RaftServerConfigKeys.setStorageDir(properties,  Collections.singletonList(storageDir));
+    RaftServerConfigKeys.Log.setAsyncFlushEnabled(properties, useAsyncFlush);
+    RaftServerConfigKeys.Log.StateMachineData.setSync(properties, smSyncFlush);
     storage = RaftStorageTestUtils.newRaftStorage(storageDir);
     this.segmentMaxSize =
         RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
@@ -156,7 +176,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       final int size = (int) (range.end - range.start + 1);
       LogEntryProto[] entries = new LogEntryProto[size];
       try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
-          segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
+          segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null)) {
         for (int i = 0; i < size; i++) {
           SimpleOperation m = new SimpleOperation("m" + (i + range.start));
           entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, i + range.start);
@@ -678,4 +698,49 @@ public class TestSegmentedRaftLog extends BaseTest {
       return null;
     });
   }
+
+  @Test
+  public void testAsyncFlushPerf1() throws Exception {
+    List<SegmentRange> ranges = prepareRanges(0, 50, 20000, 0);
+    List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+      // append entries to the raftlog
+      List<List<CompletableFuture<Long>>> futures = new ArrayList<>();
+      long start = System.nanoTime();
+      for (int i = 0; i < entries.size(); i += 5) {
+        // call append API
+        futures.add(raftLog.append(entries.get(i), entries.get(i + 1), entries.get(i + 2), entries.get(i + 3),
+            entries.get(i + 4)));
+      }
+      for (List<CompletableFuture<Long>> futureList: futures) {
+        futureList.forEach(CompletableFuture::join);
+      }
+      System.out.println(entries.size() + " appendEntry finished in " + (System.nanoTime() - start) +
+          " ns with asyncFlush " + useAsyncFlush);
+    }
+  }
+
+  @Test
+  public void testAsyncFlushPerf2() throws Exception {
+    List<SegmentRange> ranges = prepareRanges(0, 50, 20000, 0);
+    List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+      // append entries to the raftlog
+      List<CompletableFuture<Long>> futures = new ArrayList<>();
+      long start = System.nanoTime();
+      for (int i = 0; i < entries.size(); i++) {
+        // call appendEntry API
+        futures.add(raftLog.appendEntry(entries.get(i)));
+      }
+      for (CompletableFuture<Long> futureList: futures) {
+        futureList.join();
+      }
+      System.out.println(entries.size() + " appendEntry finished in " + (System.nanoTime() - start) +
+          " ns with asyncFlush " + useAsyncFlush);
+    }
+  }
 }