You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/10/13 10:26:00 UTC

[ratis] branch master updated: RATIS-1717. Perf: Use global serialize buf to avoid temp buf (#762)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2753f3d62 RATIS-1717. Perf: Use global serialize buf to avoid temp buf (#762)
2753f3d62 is described below

commit 2753f3d6207a88f5a5f193a28371286db4e8b398
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Thu Oct 13 18:25:53 2022 +0800

    RATIS-1717. Perf: Use global serialize buf to avoid temp buf (#762)
---
 .../raftlog/segmented/BufferedWriteChannel.java     |  7 +++++--
 .../segmented/SegmentedRaftLogOutputStream.java     | 21 ++++++++++++++++-----
 .../raftlog/segmented/SegmentedRaftLogWorker.java   |  6 +++++-
 3 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index f6c10ea41..ef9987ff7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -63,9 +63,12 @@ class BufferedWriteChannel implements Closeable {
   }
 
   void write(byte[] b) throws IOException {
+    write(b, b.length);
+  }
+  void write(byte[] b, int len) throws IOException {
     int offset = 0;
-    while (offset < b.length) {
-      int toPut = Math.min(b.length - offset, writeBuffer.remaining());
+    while (offset < len) {
+      int toPut = Math.min(len - offset, writeBuffer.remaining());
       writeBuffer.put(b, offset, toPut);
       offset += toPut;
       if (writeBuffer.remaining() == 0) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index 22eebac93..e0fd41fbd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -34,6 +34,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import java.util.zip.Checksum;
 
 public class SegmentedRaftLogOutputStream implements Closeable {
@@ -52,6 +53,7 @@ public class SegmentedRaftLogOutputStream implements Closeable {
   private final File file;
   private final BufferedWriteChannel out; // buffered FileChannel for writing
   private final Checksum checksum;
+  private final Supplier<byte[]> sharedBuffer;
 
   private final long segmentMaxSize;
   private final long preallocatedSize;
@@ -59,10 +61,17 @@ public class SegmentedRaftLogOutputStream implements Closeable {
   public SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSize,
       long preallocatedSize, ByteBuffer byteBuffer)
       throws IOException {
+    this(file, append, segmentMaxSize, preallocatedSize, byteBuffer, null);
+  }
+
+  SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSize,
+      long preallocatedSize, ByteBuffer byteBuffer, Supplier<byte[]> sharedBuffer)
+      throws IOException {
     this.file = file;
     this.checksum = new PureJavaCrc32C();
     this.segmentMaxSize = segmentMaxSize;
     this.preallocatedSize = preallocatedSize;
+    this.sharedBuffer = sharedBuffer;
     this.out = BufferedWriteChannel.open(file, append, byteBuffer);
 
     if (!append) {
@@ -75,12 +84,12 @@ public class SegmentedRaftLogOutputStream implements Closeable {
 
   /**
    * Write the given entry to this output stream.
-   *
+   * <p>
    * Format:
    *   (1) The serialized size of the entry.
    *   (2) The entry.
    *   (3) 4-byte checksum of the entry.
-   *
+   * <p>
    * Size in bytes to be written:
    *   (size to encode n) + n + (checksum size),
    *   where n is the entry serialized size and the checksum size is 4.
@@ -88,8 +97,10 @@ public class SegmentedRaftLogOutputStream implements Closeable {
   public void write(LogEntryProto entry) throws IOException {
     final int serialized = entry.getSerializedSize();
     final int proto = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
-    final byte[] buf = new byte[proto + 4]; // proto and 4-byte checksum
-    preallocateIfNecessary(buf.length);
+    final int total = proto + 4; // proto and 4-byte checksum
+    final byte[] buf = sharedBuffer != null? sharedBuffer.get(): new byte[total];
+    Preconditions.assertTrue(total <= buf.length, () -> "total = " + total + " > buf.length " + buf.length);
+    preallocateIfNecessary(total);
 
     CodedOutputStream cout = CodedOutputStream.newInstance(buf);
     cout.writeUInt32NoTag(serialized);
@@ -99,7 +110,7 @@ public class SegmentedRaftLogOutputStream implements Closeable {
     checksum.update(buf, 0, proto);
     ByteBuffer.wrap(buf, proto, 4).putInt((int) checksum.getValue());
 
-    out.write(buf);
+    out.write(buf, total);
   }
 
   @Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 862bb75aa..cdeaf4fdd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -149,6 +149,7 @@ class SegmentedRaftLogWorker {
   private final StateMachine stateMachine;
   private final SegmentedRaftLogMetrics raftLogMetrics;
   private final ByteBuffer writeBuffer;
+  private final Supplier<byte[]> sharedBuffer;
 
   /**
    * The number of entries that have been written into the SegmentedRaftLogOutputStream but
@@ -207,6 +208,9 @@ class SegmentedRaftLogWorker {
 
     final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
     this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
+    final int logEntryLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
+    // 4 bytes (serialized size) + logEntryLimit + 4 bytes (checksum)
+    this.sharedBuffer = MemoizedSupplier.valueOf(() -> new byte[logEntryLimit + 8]);
     this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);
     this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled(properties);
     if (asyncFlush && unsafeFlush) {
@@ -729,6 +733,6 @@ class SegmentedRaftLogWorker {
   private void allocateSegmentedRaftLogOutputStream(File file, boolean append) throws IOException {
     Preconditions.assertTrue(out == null && writeBuffer.position() == 0);
     out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize,
-            preallocatedSize, writeBuffer);
+            preallocatedSize, writeBuffer, sharedBuffer);
   }
 }