You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/08/16 16:28:55 UTC
[ratis] branch branch-2 updated: RATIS-1644. Provide a safe async flush. (#699)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 2ecb09d75 RATIS-1644. Provide a safe async flush. (#699)
2ecb09d75 is described below
commit 2ecb09d7521c51eacb68d2cca68b84838f63f26f
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Wed Aug 17 00:21:13 2022 +0800
RATIS-1644. Provide a safe async flush. (#699)
(cherry picked from commit fafca0a287b28fd27eced7371afd5bfad5bd06fe)
---
.../apache/ratis/server/RaftServerConfigKeys.java | 11 ++++
.../apache/ratis/server/impl/ServerProtoUtils.java | 32 +++++++++-
.../raftlog/segmented/BufferedWriteChannel.java | 16 ++++-
.../segmented/SegmentedRaftLogOutputStream.java | 5 +-
.../raftlog/segmented/SegmentedRaftLogWorker.java | 43 ++++++++++---
.../ratis/server/storage/SnapshotManager.java | 3 +-
.../statemachine/SimpleStateMachine4Testing.java | 2 +-
.../segmented/TestBufferedWriteChannel.java | 2 +-
.../server/raftlog/segmented/TestLogSegment.java | 10 +--
.../raftlog/segmented/TestRaftLogReadWrite.java | 12 ++--
.../raftlog/segmented/TestSegmentedRaftLog.java | 73 ++++++++++++++++++++--
11 files changed, 178 insertions(+), 31 deletions(-)
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index ad05536f2..6d0c5d41b 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -369,6 +369,17 @@ public interface RaftServerConfigKeys {
setBoolean(properties::setBoolean, UNSAFE_FLUSH_ENABLED_KEY, unsafeFlush);
}
+ /** Async-flush will increase flush index until the actual flush has completed. */
+ String ASYNC_FLUSH_ENABLED_KEY = PREFIX + ".async-flush.enabled";
+ boolean ASYNC_FLUSH_ENABLED_DEFAULT = false;
+ static boolean asyncFlushEnabled(RaftProperties properties) {
+ return getBoolean(properties::getBoolean,
+ ASYNC_FLUSH_ENABLED_KEY, ASYNC_FLUSH_ENABLED_DEFAULT, getDefaultLog());
+ }
+ static void setAsyncFlushEnabled(RaftProperties properties, boolean asyncFlush) {
+ setBoolean(properties::setBoolean, ASYNC_FLUSH_ENABLED_KEY, asyncFlush);
+ }
+
/** The policy to handle corrupted raft log. */
enum CorruptionPolicy {
/** Rethrow the exception. */
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index deae754c3..eccebb6fc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -30,7 +30,7 @@ import java.util.List;
import java.util.Optional;
/** Server proto utilities for internal use. */
-final class ServerProtoUtils {
+public final class ServerProtoUtils {
private ServerProtoUtils() {}
private static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
@@ -171,4 +171,34 @@ final class ServerProtoUtils {
return false;
}
}
+
+ public static String convertToString(InstallSnapshotRequestProto request) {
+ final StringBuilder s = new StringBuilder();
+ final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunk =
+ request.getSnapshotChunk();
+ s.append(" { " + request.getServerRequest() + "leaderTerm: " + request.getLeaderTerm() + "\n");
+ if (request.hasSnapshotChunk()) {
+ s.append("snapshotChunk: {\n requestId: " + snapshotChunk.getRequestId() + "\n")
+ .append(" requestIndex: " + snapshotChunk.getRequestIndex() + "\n")
+ .append(" raftConfiguration: " + snapshotChunk.getRaftConfiguration() + "\n")
+ .append(" termIndex: {\n term: " + snapshotChunk.getTermIndex().getTerm() + "\n index: " +
+ snapshotChunk.getTermIndex().getIndex() + "\n }\n");
+ for (FileChunkProto chunk : snapshotChunk.getFileChunksList()) {
+ s.append(" fileChunks: {\n filename: " + chunk.getFilename() + "\n")
+ .append(" totalSize: " + chunk.getTotalSize() + "\n")
+ .append(" fileDigest: " + chunk.getFileDigest() + "\n")
+ .append(" chunkIndex: " + chunk.getChunkIndex() + "\n")
+ .append(" offset: " + chunk.getOffset() + "\n")
+ .append(" done: " + chunk.getDone() + "\n }\n");
+
+ }
+ s.append(" totalSize: " + snapshotChunk.getTotalSize() + "\n")
+ .append(" done: " + snapshotChunk.getDone()).append("\n}\n");
+ } else if (request.hasNotification()) {
+ s.append(" notification: " + request.getNotification() + "\n");
+ }
+
+ s.append(request.getLastRaftConfigurationLogEntryProto());
+ return s.toString();
+ }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index ebc4852a2..9bc26596f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.raftlog.segmented;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.function.CheckedBiFunction;
@@ -26,9 +27,11 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
/**
* Provides a buffering layer in front of a FileChannel for writing.
@@ -36,7 +39,8 @@ import java.util.concurrent.ExecutorService;
* This class is NOT threadsafe.
*/
class BufferedWriteChannel implements Closeable {
- static BufferedWriteChannel open(File file, boolean append, ByteBuffer buffer) throws IOException {
+ static BufferedWriteChannel open(File file, boolean append, ByteBuffer buffer,
+ Supplier<CompletableFuture<Void>> flushFuture) throws IOException {
final RandomAccessFile raf = new RandomAccessFile(file, "rw");
final FileChannel fc = raf.getChannel();
if (append) {
@@ -45,16 +49,19 @@ class BufferedWriteChannel implements Closeable {
fc.truncate(0);
}
Preconditions.assertSame(fc.size(), fc.position(), "fc.position");
- return new BufferedWriteChannel(fc, buffer);
+ return new BufferedWriteChannel(fc, buffer, flushFuture);
}
private final FileChannel fileChannel;
private final ByteBuffer writeBuffer;
private boolean forced = true;
+ private final Supplier<CompletableFuture<Void>> flushFuture;
- BufferedWriteChannel(FileChannel fileChannel, ByteBuffer byteBuffer) {
+ BufferedWriteChannel(FileChannel fileChannel, ByteBuffer byteBuffer,
+ Supplier<CompletableFuture<Void>> flushFuture) {
this.fileChannel = fileChannel;
this.writeBuffer = byteBuffer;
+ this.flushFuture = flushFuture;
}
void write(byte[] b) throws IOException {
@@ -105,6 +112,7 @@ class BufferedWriteChannel implements Closeable {
try {
fileChannel.force(false);
} catch (IOException e) {
+ LogSegment.LOG.error("Failed to flush channel", e);
throw new CompletionException(e);
}
return null;
@@ -133,12 +141,14 @@ class BufferedWriteChannel implements Closeable {
}
@Override
+ @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")
public void close() throws IOException {
if (!isOpen()) {
return;
}
try {
+ Optional.ofNullable(flushFuture).ifPresent(f -> f.get());
fileChannel.truncate(fileChannel.position());
} finally {
fileChannel.close();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index 22eebac93..b96acdc61 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -34,6 +34,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import java.util.zip.Checksum;
public class SegmentedRaftLogOutputStream implements Closeable {
@@ -57,13 +58,13 @@ public class SegmentedRaftLogOutputStream implements Closeable {
private final long preallocatedSize;
public SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSize,
- long preallocatedSize, ByteBuffer byteBuffer)
+ long preallocatedSize, ByteBuffer byteBuffer, Supplier<CompletableFuture<Void>> flushFuture)
throws IOException {
this.file = file;
this.checksum = new PureJavaCrc32C();
this.segmentMaxSize = segmentMaxSize;
this.preallocatedSize = preallocatedSize;
- this.out = BufferedWriteChannel.open(file, append, byteBuffer);
+ this.out = BufferedWriteChannel.open(file, append, byteBuffer, flushFuture);
if (!append) {
// write header
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index e19584eff..599772f12 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -51,6 +51,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -175,8 +176,11 @@ class SegmentedRaftLogWorker {
private final RaftServer.Division server;
private int flushBatchSize;
+ private final boolean asyncFlush;
private final boolean unsafeFlush;
private final ExecutorService flushExecutor;
+ private final AtomicReference<CompletableFuture<Void>> flushFuture
+ = new AtomicReference<>(CompletableFuture.completedFuture(null));
private final StateMachineDataPolicy stateMachineDataPolicy;
@@ -217,7 +221,12 @@ class SegmentedRaftLogWorker {
final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);
- this.flushExecutor = !unsafeFlush? null
+ this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled(properties);
+ if (asyncFlush && unsafeFlush) {
+ throw new IllegalStateException("Cannot enable both " + RaftServerConfigKeys.Log.UNSAFE_FLUSH_ENABLED_KEY +
+ " and " + RaftServerConfigKeys.Log.ASYNC_FLUSH_ENABLED_KEY);
+ }
+ this.flushExecutor = (!asyncFlush && !unsafeFlush)? null
: Executors.newSingleThreadExecutor(ConcurrentUtils.newThreadFactory(name + "-flush"));
}
@@ -375,16 +384,19 @@ class SegmentedRaftLogWorker {
if (unsafeFlush) {
// unsafe-flush: call updateFlushedIndexIncreasingly() without waiting the underlying FileChannel.force(..).
unsafeFlushOutStream();
+ updateFlushedIndexIncreasingly();
+ } else if (asyncFlush) {
+ asyncFlushOutStream(f);
} else {
flushOutStream();
if (!stateMachineDataPolicy.isSync()) {
IOUtils.getFromFuture(f, () -> this + "-flushStateMachineData");
}
+ updateFlushedIndexIncreasingly();
}
} finally {
timerContext.stop();
}
- updateFlushedIndexIncreasingly();
}
}
@@ -393,6 +405,17 @@ class SegmentedRaftLogWorker {
out.asyncFlush(flushExecutor).whenComplete((v, e) -> logSyncTimerContext.stop());
}
+ private void asyncFlushOutStream(CompletableFuture<Void> stateMachineFlush) throws IOException {
+ final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
+ final CompletableFuture<Void> f = out.asyncFlush(flushExecutor)
+ .thenCombine(stateMachineFlush, (async, sm) -> async);
+ flushFuture.updateAndGet(previous -> f.thenCombine(previous, (current, prev) -> current))
+ .whenComplete((v, e) -> {
+ updateFlushedIndexIncreasingly(lastWrittenIndex);
+ logSyncTimerContext.stop();
+ });
+ }
+
private void flushOutStream() throws IOException {
final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
try {
@@ -403,14 +426,18 @@ class SegmentedRaftLogWorker {
}
private void updateFlushedIndexIncreasingly() {
- final long i = lastWrittenIndex;
+ updateFlushedIndexIncreasingly(lastWrittenIndex);
+ }
+
+ private void updateFlushedIndexIncreasingly(long index) {
+ final long i = index;
flushIndex.updateIncreasingly(i, traceIndexChange);
- postUpdateFlushedIndex();
+ postUpdateFlushedIndex(Math.toIntExact(lastWrittenIndex - index));
writeTasks.updateIndex(i);
}
- private void postUpdateFlushedIndex() {
- pendingFlushNum = 0;
+ private void postUpdateFlushedIndex(int count) {
+ pendingFlushNum = count;
Optional.ofNullable(submitUpdateCommitEvent).ifPresent(Runnable::run);
}
@@ -691,7 +718,7 @@ class SegmentedRaftLogWorker {
}
flushIndex.setUnconditionally(lastWrittenIndex, infoIndexChange);
safeCacheEvictIndex.setUnconditionally(lastWrittenIndex, infoIndexChange);
- postUpdateFlushedIndex();
+ postUpdateFlushedIndex(0);
}
@Override
@@ -727,6 +754,6 @@ class SegmentedRaftLogWorker {
private void allocateSegmentedRaftLogOutputStream(File file, boolean append) throws IOException {
Preconditions.assertTrue(out == null && writeBuffer.position() == 0);
out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize,
- preallocatedSize, writeBuffer);
+ preallocatedSize, writeBuffer, flushFuture::get);
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index 8748de5e4..c72e9f78e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -31,6 +31,7 @@ import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.proto.RaftProtos.FileChunkProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.FileUtils;
@@ -74,7 +75,7 @@ public class SnapshotManager {
FileUtils.createDirectories(tmpDir);
tmpDir.deleteOnExit();
- LOG.info("Installing snapshot:{}, to tmp dir:{}", request, tmpDir);
+ LOG.info("Installing snapshot:{}, to tmp dir:{}", ServerProtoUtils.convertToString(request), tmpDir);
// TODO: Make sure that subsequent requests for the same installSnapshot are coming in order,
// and are not lost when whole request cycle is done. Check requestId and requestIndex here
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 2b4d2b872..7b5527090 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -264,7 +264,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}", termIndex.getTerm(),
termIndex.getIndex(), snapshotFile);
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(snapshotFile, false,
- segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null)) {
for (final LogEntryProto entry : indexMap.values()) {
if (entry.getIndex() > endIndex) {
break;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java
index a7bb7000a..dec13903b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestBufferedWriteChannel.java
@@ -137,7 +137,7 @@ public class TestBufferedWriteChannel extends BaseTest {
final byte[] bytes = new byte[10];
final ByteBuffer buffer = ByteBuffer.wrap(bytes);
final FakeFileChannel fake = new FakeFileChannel();
- final BufferedWriteChannel out = new BufferedWriteChannel(fake, buffer);
+ final BufferedWriteChannel out = new BufferedWriteChannel(fake, buffer, null);
// write exactly buffer size, then flush.
fake.assertValues(0, 0);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 876519634..ba24f8997 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -93,11 +93,12 @@ public class TestLogSegment extends BaseTest {
final LogEntryProto[] entries = new LogEntryProto[numEntries];
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
- segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null)) {
for (int i = 0; i < entries.length; i++) {
SimpleOperation op = new SimpleOperation("m" + i);
entries[i] = LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i + startIndex);
out.write(entries[i]);
+ LOG.info("Write entry with size {}", size(entries[i]));
}
}
@@ -293,7 +294,8 @@ public class TestLogSegment extends BaseTest {
// make sure preallocation is correct with different max/pre-allocated size
for (int max : maxSizes) {
for (int a : preallocated) {
- try(SegmentedRaftLogOutputStream ignored = new SegmentedRaftLogOutputStream(file, false, max, a, ByteBuffer.allocateDirect(bufferSize))) {
+ try(SegmentedRaftLogOutputStream ignored =
+ new SegmentedRaftLogOutputStream(file, false, max, a, ByteBuffer.allocateDirect(bufferSize), null)) {
Assert.assertEquals("max=" + max + ", a=" + a, file.length(), Math.min(max, a));
}
try(SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, 0, INVALID_LOG_INDEX, true)) {
@@ -308,7 +310,7 @@ public class TestLogSegment extends BaseTest {
Arrays.fill(content, (byte) 1);
final long size;
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
- 1024, 1024, ByteBuffer.allocateDirect(bufferSize))) {
+ 1024, 1024, ByteBuffer.allocateDirect(bufferSize), null)) {
SimpleOperation op = new SimpleOperation(new String(content));
LogEntryProto entry = LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
size = LogSegment.getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
@@ -343,7 +345,7 @@ public class TestLogSegment extends BaseTest {
long totalSize = SegmentedRaftLogFormat.getHeaderLength();
long preallocated = 16 * 1024;
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
- max.getSize(), 16 * 1024, ByteBuffer.allocateDirect(10 * 1024))) {
+ max.getSize(), 16 * 1024, ByteBuffer.allocateDirect(10 * 1024), null)) {
Assert.assertEquals(preallocated, file.length());
while (totalSize + entrySize < max.getSize()) {
totalSize += entrySize;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
index 88b5e2f48..5d54eff03 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
@@ -108,7 +108,7 @@ public class TestRaftLogReadWrite extends BaseTest {
final LogEntryProto[] entries = new LogEntryProto[100];
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
- segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null)) {
size += writeMessages(entries, out);
} finally {
storage.close();
@@ -126,7 +126,7 @@ public class TestRaftLogReadWrite extends BaseTest {
final File openSegment = LogSegmentStartEnd.valueOf(0).getFile(storage);
LogEntryProto[] entries = new LogEntryProto[200];
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
- segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null)) {
for (int i = 0; i < 100; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
@@ -135,7 +135,7 @@ public class TestRaftLogReadWrite extends BaseTest {
}
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, true,
- segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null)) {
for (int i = 100; i < 200; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
@@ -161,7 +161,7 @@ public class TestRaftLogReadWrite extends BaseTest {
LogEntryProto[] entries = new LogEntryProto[100];
final SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
- segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize));
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null);
size += writeMessages(entries, out);
out.flush();
@@ -189,7 +189,7 @@ public class TestRaftLogReadWrite extends BaseTest {
LogEntryProto[] entries = new LogEntryProto[10];
final SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
- 16 * 1024 * 1024, 4 * 1024 * 1024, ByteBuffer.allocateDirect(bufferSize));
+ 16 * 1024 * 1024, 4 * 1024 * 1024, ByteBuffer.allocateDirect(bufferSize), null);
for (int i = 0; i < 10; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
@@ -236,7 +236,7 @@ public class TestRaftLogReadWrite extends BaseTest {
RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
final File openSegment = LogSegmentStartEnd.valueOf(0).getFile(storage);
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(openSegment, false,
- segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null)) {
for (int i = 0; i < 100; i++) {
LogEntryProto entry = LogProtoUtils.toLogEntryProto(
new SimpleOperation("m" + i).getLogEntryContent(), 0, i);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index a78c102ba..2600e351e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -59,6 +59,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -68,12 +69,29 @@ import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.apache.ratis.thirdparty.com.codahale.metrics.Timer;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class TestSegmentedRaftLog extends BaseTest {
static {
- Log4jUtils.setLogLevel(SegmentedRaftLogWorker.LOG, Level.DEBUG);
- Log4jUtils.setLogLevel(SegmentedRaftLogCache.LOG, Level.TRACE);
- Log4jUtils.setLogLevel(SegmentedRaftLog.LOG, Level.TRACE);
+ Log4jUtils.setLogLevel(SegmentedRaftLogWorker.LOG, Level.INFO);
+ Log4jUtils.setLogLevel(SegmentedRaftLogCache.LOG, Level.INFO);
+ Log4jUtils.setLogLevel(SegmentedRaftLog.LOG, Level.INFO);
+ }
+
+ private final Boolean smSyncFlush;
+ private final Boolean useAsyncFlush;
+
+ public TestSegmentedRaftLog(Boolean raftLogAsync, Boolean smSync) {
+ this.useAsyncFlush = raftLogAsync;
+ this.smSyncFlush = smSync;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList((new Boolean[][] {{Boolean.FALSE, Boolean.FALSE}, {Boolean.FALSE, Boolean.TRUE},
+ {Boolean.TRUE, Boolean.FALSE}, {Boolean.TRUE, Boolean.TRUE}}));
}
public static long getOpenSegmentSize(RaftLog raftLog) {
@@ -133,6 +151,8 @@ public class TestSegmentedRaftLog extends BaseTest {
storageDir = getTestDir();
properties = new RaftProperties();
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
+ RaftServerConfigKeys.Log.setAsyncFlushEnabled(properties, useAsyncFlush);
+ RaftServerConfigKeys.Log.StateMachineData.setSync(properties, smSyncFlush);
storage = RaftStorageTestUtils.newRaftStorage(storageDir);
this.segmentMaxSize =
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
@@ -156,7 +176,7 @@ public class TestSegmentedRaftLog extends BaseTest {
final int size = (int) (range.end - range.start + 1);
LogEntryProto[] entries = new LogEntryProto[size];
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false,
- segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
+ segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize), null)) {
for (int i = 0; i < size; i++) {
SimpleOperation m = new SimpleOperation("m" + (i + range.start));
entries[i] = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, i + range.start);
@@ -678,4 +698,49 @@ public class TestSegmentedRaftLog extends BaseTest {
return null;
});
}
+
+ @Test
+ public void testAsyncFlushPerf1() throws Exception {
+ List<SegmentRange> ranges = prepareRanges(0, 50, 20000, 0);
+ List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+ // append entries to the raftlog
+ List<List<CompletableFuture<Long>>> futures = new ArrayList<>();
+ long start = System.nanoTime();
+ for (int i = 0; i < entries.size(); i += 5) {
+ // call append API
+ futures.add(raftLog.append(entries.get(i), entries.get(i + 1), entries.get(i + 2), entries.get(i + 3),
+ entries.get(i + 4)));
+ }
+ for (List<CompletableFuture<Long>> futureList: futures) {
+ futureList.forEach(CompletableFuture::join);
+ }
+ System.out.println(entries.size() + " appendEntry finished in " + (System.nanoTime() - start) +
+ " ns with asyncFlush " + useAsyncFlush);
+ }
+ }
+
+ @Test
+ public void testAsyncFlushPerf2() throws Exception {
+ List<SegmentRange> ranges = prepareRanges(0, 50, 20000, 0);
+ List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+ // append entries to the raftlog
+ List<CompletableFuture<Long>> futures = new ArrayList<>();
+ long start = System.nanoTime();
+ for (int i = 0; i < entries.size(); i++) {
+ // call appendEntry API
+ futures.add(raftLog.appendEntry(entries.get(i)));
+ }
+ for (CompletableFuture<Long> futureList: futures) {
+ futureList.join();
+ }
+ System.out.println(entries.size() + " appendEntry finished in " + (System.nanoTime() - start) +
+ " ns with asyncFlush " + useAsyncFlush);
+ }
+ }
}