You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/03/09 06:06:02 UTC

[ratis] branch master updated: RATIS-1545. RaftLogOutputStream support async flush. (#616)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 96db7f6  RATIS-1545. RaftLogOutputStream support async flush. (#616)
96db7f6 is described below

commit 96db7f6801738c1c1036f205d925be02e219426c
Author: SincereXIA <Si...@users.noreply.github.com>
AuthorDate: Wed Mar 9 14:05:20 2022 +0800

    RATIS-1545. RaftLogOutputStream support async flush. (#616)
---
 .../apache/ratis/server/RaftServerConfigKeys.java  | 16 ++++----
 .../raftlog/segmented/BufferedWriteChannel.java    | 22 +++++++++++
 .../segmented/SegmentedRaftLogOutputStream.java    | 10 +++++
 .../raftlog/segmented/SegmentedRaftLogWorker.java  | 44 +++++++++++++++-------
 4 files changed, 70 insertions(+), 22 deletions(-)

diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 9eae751..f1e5efb 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -348,15 +348,15 @@ public interface RaftServerConfigKeys {
       setInt(properties::setInt, FORCE_SYNC_NUM_KEY, forceSyncNum);
     }
 
-
-    String FLUSH_INTERVAL_MIN_KEY = PREFIX + ".flush.interval.min";
-    TimeDuration FLUSH_INTERVAL_MIN_DEFAULT = TimeDuration.ZERO;
-    static TimeDuration flushIntervalMin(RaftProperties properties) {
-      return getTimeDuration(properties.getTimeDuration(FLUSH_INTERVAL_MIN_DEFAULT.getUnit()),
-              FLUSH_INTERVAL_MIN_KEY, FLUSH_INTERVAL_MIN_DEFAULT, getDefaultLog());
+    /** Unsafe-flush allow increasing flush index without waiting the actual flush to complete. */
+    String UNSAFE_FLUSH_ENABLED_KEY = PREFIX + ".unsafe-flush.enabled";
+    boolean UNSAFE_FLUSH_ENABLED_DEFAULT = false;
+    static boolean unsafeFlushEnabled(RaftProperties properties) {
+      return getBoolean(properties::getBoolean,
+              UNSAFE_FLUSH_ENABLED_KEY, UNSAFE_FLUSH_ENABLED_DEFAULT, getDefaultLog());
     }
-    static void setFlushIntervalMin(RaftProperties properties, TimeDuration flushTimeInterval) {
-      setTimeDuration(properties::setTimeDuration, FLUSH_INTERVAL_MIN_KEY, flushTimeInterval);
+    static void setUnsafeFlushEnabled(RaftProperties properties, boolean unsafeFlush) {
+      setBoolean(properties::setBoolean, UNSAFE_FLUSH_ENABLED_KEY, unsafeFlush);
     }
 
     /** The policy to handle corrupted raft log. */
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index c5106d6..ebc4852 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -26,6 +26,9 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Provides a buffering layer in front of a FileChannel for writing.
@@ -88,6 +91,25 @@ class BufferedWriteChannel implements Closeable {
     }
   }
 
+  CompletableFuture<Void> asyncFlush(ExecutorService executor) throws IOException {
+    flushBuffer();
+    if (forced) {
+      return CompletableFuture.completedFuture(null);
+    }
+    final CompletableFuture<Void> f = CompletableFuture.supplyAsync(this::fileChannelForce, executor);
+    forced = true;
+    return f;
+  }
+
+  private Void fileChannelForce() {
+    try {
+      fileChannel.force(false);
+    } catch (IOException e) {
+      throw new CompletionException(e);
+    }
+    return null;
+  }
+
   /**
    * Write any data in the buffer to the file.
    *
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index 6ad3a46..3992638 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -32,6 +32,8 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.zip.Checksum;
 
 public class SegmentedRaftLogOutputStream implements Closeable {
@@ -121,6 +123,14 @@ public class SegmentedRaftLogOutputStream implements Closeable {
     }
   }
 
+  CompletableFuture<Void> asyncFlush(ExecutorService executor) throws IOException {
+    try {
+      return out.asyncFlush(executor);
+    } catch (IOException ioe) {
+      throw new IOException("Failed to flush " + this, ioe);
+    }
+  }
+
   private static long actualPreallocateSize(long outstandingData, long remainingSpace, long preallocate) {
     return outstandingData > remainingSpace? outstandingData
         : outstandingData > preallocate? outstandingData
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 9e5eb7e..a9e4b13 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -50,8 +50,7 @@ import java.util.LinkedList;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
@@ -176,8 +175,8 @@ class SegmentedRaftLogWorker {
   private final RaftServer.Division server;
   private int flushBatchSize;
 
-  private Timestamp lastFlush;
-  private final TimeDuration flushIntervalMin;
+  private final boolean unsafeFlush;
+  private final ExecutorService flushExecutor;
 
   private final StateMachineDataPolicy stateMachineDataPolicy;
 
@@ -217,8 +216,9 @@ class SegmentedRaftLogWorker {
 
     final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
     this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
-    this.lastFlush = Timestamp.currentTime();
-    this.flushIntervalMin = RaftServerConfigKeys.Log.flushIntervalMin(properties);
+    this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);
+    this.flushExecutor = !unsafeFlush? null
+        : Executors.newSingleThreadExecutor(ConcurrentUtils.newThreadFactory(name + "-flush"));
   }
 
   void start(long latestIndex, long evictIndex, File openSegmentFile) throws IOException {
@@ -236,6 +236,7 @@ class SegmentedRaftLogWorker {
   void close() {
     this.running = false;
     workerThread.interrupt();
+    Optional.ofNullable(flushExecutor).ifPresent(ExecutorService::shutdown);
     try {
       workerThread.join(3000);
     } catch (InterruptedException ignored) {
@@ -326,7 +327,6 @@ class SegmentedRaftLogWorker {
           }
           task.done();
         }
-        flushIfNecessary();
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         if (running) {
@@ -355,7 +355,7 @@ class SegmentedRaftLogWorker {
     } else if (pendingFlushNum >= forceSyncNum) {
       return true;
     }
-    return pendingFlushNum > 0 && queue.isEmpty() && lastFlush.elapsedTime().compareTo(flushIntervalMin) > 0;
+    return pendingFlushNum > 0 && queue.isEmpty();
   }
 
   @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
@@ -371,21 +371,37 @@ class SegmentedRaftLogWorker {
         if (stateMachineDataPolicy.isSync()) {
           stateMachineDataPolicy.getFromFuture(f, () -> this + "-flushStateMachineData");
         }
-        final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
         flushBatchSize = (int)(lastWrittenIndex - flushIndex.get());
-        out.flush();
-        logSyncTimerContext.stop();
-        if (!stateMachineDataPolicy.isSync()) {
-          IOUtils.getFromFuture(f, () -> this + "-flushStateMachineData");
+        if (unsafeFlush) {
+          // unsafe-flush: call updateFlushedIndexIncreasingly() without waiting the underlying FileChannel.force(..).
+          unsafeFlushOutStream();
+        } else {
+          flushOutStream();
+          if (!stateMachineDataPolicy.isSync()) {
+            IOUtils.getFromFuture(f, () -> this + "-flushStateMachineData");
+          }
         }
       } finally {
         timerContext.stop();
-        lastFlush = Timestamp.currentTime();
       }
       updateFlushedIndexIncreasingly();
     }
   }
 
+  private void unsafeFlushOutStream() throws IOException {
+    final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
+    out.asyncFlush(flushExecutor).whenComplete((v, e) -> logSyncTimerContext.stop());
+  }
+
+  private void flushOutStream() throws IOException {
+    final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
+    try {
+      out.flush();
+    } finally {
+      logSyncTimerContext.stop();
+    }
+  }
+
   private void updateFlushedIndexIncreasingly() {
     final long i = lastWrittenIndex;
     flushIndex.updateIncreasingly(i, traceIndexChange);