You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/10/13 10:26:00 UTC
[ratis] branch master updated: RATIS-1717. Perf: Use global serialize buf to avoid temp buf (#762)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 2753f3d62 RATIS-1717. Perf: Use global serialize buf to avoid temp buf (#762)
2753f3d62 is described below
commit 2753f3d6207a88f5a5f193a28371286db4e8b398
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Thu Oct 13 18:25:53 2022 +0800
RATIS-1717. Perf: Use global serialize buf to avoid temp buf (#762)
---
.../raftlog/segmented/BufferedWriteChannel.java | 7 +++++--
.../segmented/SegmentedRaftLogOutputStream.java | 21 ++++++++++++++++-----
.../raftlog/segmented/SegmentedRaftLogWorker.java | 6 +++++-
3 files changed, 26 insertions(+), 8 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index f6c10ea41..ef9987ff7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -63,9 +63,12 @@ class BufferedWriteChannel implements Closeable {
}
void write(byte[] b) throws IOException {
+ write(b, b.length);
+ }
+ void write(byte[] b, int len) throws IOException {
int offset = 0;
- while (offset < b.length) {
- int toPut = Math.min(b.length - offset, writeBuffer.remaining());
+ while (offset < len) {
+ int toPut = Math.min(len - offset, writeBuffer.remaining());
writeBuffer.put(b, offset, toPut);
offset += toPut;
if (writeBuffer.remaining() == 0) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index 22eebac93..e0fd41fbd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -34,6 +34,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import java.util.zip.Checksum;
public class SegmentedRaftLogOutputStream implements Closeable {
@@ -52,6 +53,7 @@ public class SegmentedRaftLogOutputStream implements Closeable {
private final File file;
private final BufferedWriteChannel out; // buffered FileChannel for writing
private final Checksum checksum;
+ private final Supplier<byte[]> sharedBuffer;
private final long segmentMaxSize;
private final long preallocatedSize;
@@ -59,10 +61,17 @@ public class SegmentedRaftLogOutputStream implements Closeable {
public SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSize,
long preallocatedSize, ByteBuffer byteBuffer)
throws IOException {
+ this(file, append, segmentMaxSize, preallocatedSize, byteBuffer, null);
+ }
+
+ SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSize,
+ long preallocatedSize, ByteBuffer byteBuffer, Supplier<byte[]> sharedBuffer)
+ throws IOException {
this.file = file;
this.checksum = new PureJavaCrc32C();
this.segmentMaxSize = segmentMaxSize;
this.preallocatedSize = preallocatedSize;
+ this.sharedBuffer = sharedBuffer;
this.out = BufferedWriteChannel.open(file, append, byteBuffer);
if (!append) {
@@ -75,12 +84,12 @@ public class SegmentedRaftLogOutputStream implements Closeable {
/**
* Write the given entry to this output stream.
- *
+ * <p>
* Format:
* (1) The serialized size of the entry.
* (2) The entry.
* (3) 4-byte checksum of the entry.
- *
+ * <p>
* Size in bytes to be written:
* (size to encode n) + n + (checksum size),
* where n is the entry serialized size and the checksum size is 4.
@@ -88,8 +97,10 @@ public class SegmentedRaftLogOutputStream implements Closeable {
public void write(LogEntryProto entry) throws IOException {
final int serialized = entry.getSerializedSize();
final int proto = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
- final byte[] buf = new byte[proto + 4]; // proto and 4-byte checksum
- preallocateIfNecessary(buf.length);
+ final int total = proto + 4; // proto and 4-byte checksum
+ final byte[] buf = sharedBuffer != null? sharedBuffer.get(): new byte[total];
+ Preconditions.assertTrue(total <= buf.length, () -> "total = " + total + " > buf.length " + buf.length);
+ preallocateIfNecessary(total);
CodedOutputStream cout = CodedOutputStream.newInstance(buf);
cout.writeUInt32NoTag(serialized);
@@ -99,7 +110,7 @@ public class SegmentedRaftLogOutputStream implements Closeable {
checksum.update(buf, 0, proto);
ByteBuffer.wrap(buf, proto, 4).putInt((int) checksum.getValue());
- out.write(buf);
+ out.write(buf, total);
}
@Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 862bb75aa..cdeaf4fdd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -149,6 +149,7 @@ class SegmentedRaftLogWorker {
private final StateMachine stateMachine;
private final SegmentedRaftLogMetrics raftLogMetrics;
private final ByteBuffer writeBuffer;
+ private final Supplier<byte[]> sharedBuffer;
/**
* The number of entries that have been written into the SegmentedRaftLogOutputStream but
@@ -207,6 +208,9 @@ class SegmentedRaftLogWorker {
final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
+ final int logEntryLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
+ // 4 bytes (serialized size) + logEntryLimit + 4 bytes (checksum)
+ this.sharedBuffer = MemoizedSupplier.valueOf(() -> new byte[logEntryLimit + 8]);
this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);
this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled(properties);
if (asyncFlush && unsafeFlush) {
@@ -729,6 +733,6 @@ class SegmentedRaftLogWorker {
private void allocateSegmentedRaftLogOutputStream(File file, boolean append) throws IOException {
Preconditions.assertTrue(out == null && writeBuffer.position() == 0);
out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize,
- preallocatedSize, writeBuffer);
+ preallocatedSize, writeBuffer, sharedBuffer);
}
}