You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/03/09 06:06:02 UTC
[ratis] branch master updated: RATIS-1545. RaftLogOutputStream support async flush. (#616)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 96db7f6 RATIS-1545. RaftLogOutputStream support async flush. (#616)
96db7f6 is described below
commit 96db7f6801738c1c1036f205d925be02e219426c
Author: SincereXIA <Si...@users.noreply.github.com>
AuthorDate: Wed Mar 9 14:05:20 2022 +0800
RATIS-1545. RaftLogOutputStream support async flush. (#616)
---
.../apache/ratis/server/RaftServerConfigKeys.java | 16 ++++----
.../raftlog/segmented/BufferedWriteChannel.java | 22 +++++++++++
.../segmented/SegmentedRaftLogOutputStream.java | 10 +++++
.../raftlog/segmented/SegmentedRaftLogWorker.java | 44 +++++++++++++++-------
4 files changed, 70 insertions(+), 22 deletions(-)
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 9eae751..f1e5efb 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -348,15 +348,15 @@ public interface RaftServerConfigKeys {
setInt(properties::setInt, FORCE_SYNC_NUM_KEY, forceSyncNum);
}
-
- String FLUSH_INTERVAL_MIN_KEY = PREFIX + ".flush.interval.min";
- TimeDuration FLUSH_INTERVAL_MIN_DEFAULT = TimeDuration.ZERO;
- static TimeDuration flushIntervalMin(RaftProperties properties) {
- return getTimeDuration(properties.getTimeDuration(FLUSH_INTERVAL_MIN_DEFAULT.getUnit()),
- FLUSH_INTERVAL_MIN_KEY, FLUSH_INTERVAL_MIN_DEFAULT, getDefaultLog());
+ /** Unsafe-flush allow increasing flush index without waiting the actual flush to complete. */
+ String UNSAFE_FLUSH_ENABLED_KEY = PREFIX + ".unsafe-flush.enabled";
+ boolean UNSAFE_FLUSH_ENABLED_DEFAULT = false;
+ static boolean unsafeFlushEnabled(RaftProperties properties) {
+ return getBoolean(properties::getBoolean,
+ UNSAFE_FLUSH_ENABLED_KEY, UNSAFE_FLUSH_ENABLED_DEFAULT, getDefaultLog());
}
- static void setFlushIntervalMin(RaftProperties properties, TimeDuration flushTimeInterval) {
- setTimeDuration(properties::setTimeDuration, FLUSH_INTERVAL_MIN_KEY, flushTimeInterval);
+ static void setUnsafeFlushEnabled(RaftProperties properties, boolean unsafeFlush) {
+ setBoolean(properties::setBoolean, UNSAFE_FLUSH_ENABLED_KEY, unsafeFlush);
}
/** The policy to handle corrupted raft log. */
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index c5106d6..ebc4852 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -26,6 +26,9 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
/**
* Provides a buffering layer in front of a FileChannel for writing.
@@ -88,6 +91,25 @@ class BufferedWriteChannel implements Closeable {
}
}
+ CompletableFuture<Void> asyncFlush(ExecutorService executor) throws IOException {
+ flushBuffer();
+ if (forced) {
+ return CompletableFuture.completedFuture(null);
+ }
+ final CompletableFuture<Void> f = CompletableFuture.supplyAsync(this::fileChannelForce, executor);
+ forced = true;
+ return f;
+ }
+
+ private Void fileChannelForce() {
+ try {
+ fileChannel.force(false);
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ return null;
+ }
+
/**
* Write any data in the buffer to the file.
*
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index 6ad3a46..3992638 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -32,6 +32,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.zip.Checksum;
public class SegmentedRaftLogOutputStream implements Closeable {
@@ -121,6 +123,14 @@ public class SegmentedRaftLogOutputStream implements Closeable {
}
}
+ CompletableFuture<Void> asyncFlush(ExecutorService executor) throws IOException {
+ try {
+ return out.asyncFlush(executor);
+ } catch (IOException ioe) {
+ throw new IOException("Failed to flush " + this, ioe);
+ }
+ }
+
private static long actualPreallocateSize(long outstandingData, long remainingSpace, long preallocate) {
return outstandingData > remainingSpace? outstandingData
: outstandingData > preallocate? outstandingData
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 9e5eb7e..a9e4b13 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -50,8 +50,7 @@ import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -176,8 +175,8 @@ class SegmentedRaftLogWorker {
private final RaftServer.Division server;
private int flushBatchSize;
- private Timestamp lastFlush;
- private final TimeDuration flushIntervalMin;
+ private final boolean unsafeFlush;
+ private final ExecutorService flushExecutor;
private final StateMachineDataPolicy stateMachineDataPolicy;
@@ -217,8 +216,9 @@ class SegmentedRaftLogWorker {
final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
- this.lastFlush = Timestamp.currentTime();
- this.flushIntervalMin = RaftServerConfigKeys.Log.flushIntervalMin(properties);
+ this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);
+ this.flushExecutor = !unsafeFlush? null
+ : Executors.newSingleThreadExecutor(ConcurrentUtils.newThreadFactory(name + "-flush"));
}
void start(long latestIndex, long evictIndex, File openSegmentFile) throws IOException {
@@ -236,6 +236,7 @@ class SegmentedRaftLogWorker {
void close() {
this.running = false;
workerThread.interrupt();
+ Optional.ofNullable(flushExecutor).ifPresent(ExecutorService::shutdown);
try {
workerThread.join(3000);
} catch (InterruptedException ignored) {
@@ -326,7 +327,6 @@ class SegmentedRaftLogWorker {
}
task.done();
}
- flushIfNecessary();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (running) {
@@ -355,7 +355,7 @@ class SegmentedRaftLogWorker {
} else if (pendingFlushNum >= forceSyncNum) {
return true;
}
- return pendingFlushNum > 0 && queue.isEmpty() && lastFlush.elapsedTime().compareTo(flushIntervalMin) > 0;
+ return pendingFlushNum > 0 && queue.isEmpty();
}
@SuppressFBWarnings("NP_NULL_PARAM_DEREF")
@@ -371,21 +371,37 @@ class SegmentedRaftLogWorker {
if (stateMachineDataPolicy.isSync()) {
stateMachineDataPolicy.getFromFuture(f, () -> this + "-flushStateMachineData");
}
- final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
flushBatchSize = (int)(lastWrittenIndex - flushIndex.get());
- out.flush();
- logSyncTimerContext.stop();
- if (!stateMachineDataPolicy.isSync()) {
- IOUtils.getFromFuture(f, () -> this + "-flushStateMachineData");
+ if (unsafeFlush) {
+ // unsafe-flush: call updateFlushedIndexIncreasingly() without waiting the underlying FileChannel.force(..).
+ unsafeFlushOutStream();
+ } else {
+ flushOutStream();
+ if (!stateMachineDataPolicy.isSync()) {
+ IOUtils.getFromFuture(f, () -> this + "-flushStateMachineData");
+ }
}
} finally {
timerContext.stop();
- lastFlush = Timestamp.currentTime();
}
updateFlushedIndexIncreasingly();
}
}
+ private void unsafeFlushOutStream() throws IOException {
+ final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
+ out.asyncFlush(flushExecutor).whenComplete((v, e) -> logSyncTimerContext.stop());
+ }
+
+ private void flushOutStream() throws IOException {
+ final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
+ try {
+ out.flush();
+ } finally {
+ logSyncTimerContext.stop();
+ }
+ }
+
private void updateFlushedIndexIncreasingly() {
final long i = lastWrittenIndex;
flushIndex.updateIncreasingly(i, traceIndexChange);