You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/01/29 08:13:40 UTC

[iotdb] 03/03: [IOTDB-5338] WAL buffer flush threshold optimaztion (#8832)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch geely_1.0.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a5efa3fa01c3e056d8d26fa20f123f2a2e4455ca
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Sun Jan 29 16:09:39 2023 +0800

    [IOTDB-5338] WAL buffer flush threshold optimaztion (#8832)
---
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  | 58 +++++++++++++---------
 1 file changed, 35 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index c1ef5932e4..db2d627ef5 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -58,6 +58,7 @@ public class WALBuffer extends AbstractWALBuffer {
   private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final int HALF_WAL_BUFFER_SIZE = config.getWalBufferSize() / 2;
+  private static final double FSYNC_BUFFER_RATIO = 0.95;
   private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity();
 
   /** whether close method is called */
@@ -142,9 +143,9 @@ public class WALBuffer extends AbstractWALBuffer {
 
   /** This task serializes WALEntry to workingBuffer and will call fsync at last. */
   private class SerializeTask implements Runnable {
-    private final ByteBufferView byteBufferVew = new ByteBufferView();
+    private final ByteBufferView byteBufferView = new ByteBufferView();
     private final SerializeInfo info = new SerializeInfo();
-    private int batchSize = 0;
+    private int totalSize = 0;
 
     @Override
     public void run() {
@@ -172,20 +173,21 @@ public class WALBuffer extends AbstractWALBuffer {
         Thread.currentThread().interrupt();
       }
 
-      // for better fsync performance, sleep a while to enlarge write batch
-      long fsyncDelay = config.getFsyncWalDelayInMs();
-      if (fsyncDelay > 0) {
+      // try to get more WALEntries with blocking interface to enlarge write batch
+      while (totalSize < HALF_WAL_BUFFER_SIZE * FSYNC_BUFFER_RATIO) {
+        WALEntry walEntry = null;
         try {
-          Thread.sleep(fsyncDelay);
+          // for better fsync performance, wait a while to enlarge write batch
+          walEntry = walEntries.poll(config.getFsyncWalDelayInMs(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
-          logger.warn("Interrupted when sleeping a while to enlarge wal write batch.");
+          logger.warn(
+              "Interrupted when waiting for taking WALEntry from blocking queue to serialize.");
           Thread.currentThread().interrupt();
         }
-      }
 
-      // try to get more WALEntries with non-blocking interface to enlarge write batch
-      while (walEntries.peek() != null && batchSize < QUEUE_CAPACITY) {
-        WALEntry walEntry = walEntries.poll();
+        if (walEntry == null) {
+          break;
+        }
         boolean returnFlag = handleWALEntry(walEntry);
         if (returnFlag) {
           return;
@@ -193,7 +195,7 @@ public class WALBuffer extends AbstractWALBuffer {
       }
 
       // call fsync at last and set fsyncListeners
-      if (batchSize > 0) {
+      if (totalSize > 0) {
         fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
       }
     }
@@ -209,7 +211,6 @@ public class WALBuffer extends AbstractWALBuffer {
 
       boolean success = handleInfoEntry(walEntry);
       if (success) {
-        ++batchSize;
         info.fsyncListeners.add(walEntry.getWalFlushListener());
       }
       return false;
@@ -221,11 +222,10 @@ public class WALBuffer extends AbstractWALBuffer {
      * @return true if serialization is successful.
      */
     private boolean handleInfoEntry(WALEntry walEntry) {
-      int size = byteBufferVew.position();
+      int size = byteBufferView.position();
       try {
-        walEntry.serialize(byteBufferVew);
-        size = byteBufferVew.position() - size;
-        logger.debug("wal entry size is: {}", size);
+        walEntry.serialize(byteBufferView);
+        size = byteBufferView.position() - size;
       } catch (Exception e) {
         logger.error(
             "Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
@@ -245,6 +245,7 @@ public class WALBuffer extends AbstractWALBuffer {
           currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
         }
       }
+      totalSize += size;
       info.metaData.add(size, searchIndex);
       return true;
     }
@@ -256,16 +257,20 @@ public class WALBuffer extends AbstractWALBuffer {
     private boolean handleSignalEntry(WALSignalEntry walSignalEntry) {
       switch (walSignalEntry.getType()) {
         case ROLL_WAL_LOG_WRITER_SIGNAL:
-          logger.debug("Handle roll log writer signal for wal node-{}.", identifier);
+          if (logger.isDebugEnabled()) {
+            logger.debug("Handle roll log writer signal for wal node-{}.", identifier);
+          }
           info.rollWALFileWriterListener = walSignalEntry.getWalFlushListener();
           fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
           return true;
         case CLOSE_SIGNAL:
-          logger.debug(
-              "Handle close signal for wal node-{}, there are {} entries left.",
-              identifier,
-              walEntries.size());
-          boolean dataExists = batchSize > 0;
+          if (logger.isDebugEnabled()) {
+            logger.debug(
+                "Handle close signal for wal node-{}, there are {} entries left.",
+                identifier,
+                walEntries.size());
+          }
+          boolean dataExists = totalSize > 0;
           if (dataExists) {
             fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
           }
@@ -420,6 +425,13 @@ public class WALBuffer extends AbstractWALBuffer {
     public void run() {
       currentWALFileWriter.updateFileStatus(fileStatus);
 
+      if (logger.isDebugEnabled()) {
+        double usedRatio = (double) syncingBuffer.position() / syncingBuffer.capacity();
+        logger.debug(
+            "Sync wal buffer, forceFlag: {}, buffer used: {} / {} = {}%",
+            forceFlag, syncingBuffer.position(), syncingBuffer.capacity(), usedRatio * 100);
+      }
+
       // flush buffer to os
       try {
         currentWALFileWriter.write(syncingBuffer, info.metaData);