You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/20 06:15:25 UTC

[iotdb] branch master updated: Format SerializeTask in WALBuffer (#5603)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 564b286edd Format SerializeTask in WALBuffer (#5603)
564b286edd is described below

commit 564b286eddbc154a699451d52b603f1ba72264e2
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Wed Apr 20 14:15:20 2022 +0800

    Format SerializeTask in WALBuffer (#5603)
---
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  | 128 ++++++++++++---------
 .../java/org/apache/iotdb/db/wal/node/WALNode.java |   6 +-
 2 files changed, 75 insertions(+), 59 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index 916bb4846d..27c6ca15fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -123,6 +123,9 @@ public class WALBuffer extends AbstractWALBuffer {
     private final IWALByteBufferView byteBufferVew = new ByteBufferView();
     private final List<WALFlushListener> fsyncListeners = new LinkedList<>();
 
+    private int batchSize = 0;
+    private WALFlushListener rollWALFileWriterListener = null;
+
     @Override
     public void run() {
       try {
@@ -134,32 +137,12 @@ public class WALBuffer extends AbstractWALBuffer {
 
     /** In order to control memory usage of blocking queue, get 1 and then serialize 1 */
     private void serialize() {
-      WALFlushListener rollWAlFileWriterListener = null;
-      int batchSize = 0;
-
       // try to get first WALEntry with blocking interface
       try {
         WALEntry firstWALEntry = walEntries.take();
-        if (!firstWALEntry.isSignal()) {
-          try {
-            firstWALEntry.serialize(byteBufferVew);
-            ++batchSize;
-            fsyncListeners.add(firstWALEntry.getWalFlushListener());
-          } catch (Exception e) {
-            logger.error(
-                "Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
-            firstWALEntry.getWalFlushListener().fail(e);
-          }
-        } else {
-          switch (((SignalWALEntry) firstWALEntry).getSignalType()) {
-            case ROLL_WAL_LOG_WRITER_SIGNAL:
-              rollWAlFileWriterListener = firstWALEntry.getWalFlushListener();
-              fsyncWorkingBuffer(fsyncListeners, rollWAlFileWriterListener);
-              return;
-            case CLOSE_SIGNAL:
-            default:
-              break;
-          }
+        boolean returnFlag = handleWALEntry(firstWALEntry);
+        if (returnFlag) {
+          return;
         }
       } catch (InterruptedException e) {
         logger.warn(
@@ -181,33 +164,66 @@ public class WALBuffer extends AbstractWALBuffer {
       // try to get more WALEntries with non-blocking interface to enlarge write batch
       while (walEntries.peek() != null && batchSize < QUEUE_CAPACITY) {
         WALEntry walEntry = walEntries.poll();
-        if (!walEntry.isSignal()) {
-          try {
-            walEntry.serialize(byteBufferVew);
-          } catch (Exception e) {
-            logger.error(
-                "Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
-            walEntry.getWalFlushListener().fail(e);
-            continue;
-          }
-          ++batchSize;
-          fsyncListeners.add(walEntry.getWalFlushListener());
-        } else {
-          switch (((SignalWALEntry) walEntry).getSignalType()) {
-            case ROLL_WAL_LOG_WRITER_SIGNAL:
-              rollWAlFileWriterListener = walEntry.getWalFlushListener();
-              break;
-            case CLOSE_SIGNAL:
-            default:
-              break;
-          }
-          break;
+        boolean returnFlag = handleWALEntry(walEntry);
+        if (returnFlag) {
+          return;
         }
       }
 
       // call fsync at last and set fsyncListeners
-      if (batchSize > 0 || rollWAlFileWriterListener != null) {
-        fsyncWorkingBuffer(fsyncListeners, rollWAlFileWriterListener);
+      if (batchSize > 0) {
+        fsyncWorkingBuffer(fsyncListeners, rollWALFileWriterListener);
+      }
+    }
+
+    /**
+     * @return true if fsyncWorkingBuffer has been called, which means this serialization task
+     *     should be ended.
+     */
+    private boolean handleWALEntry(WALEntry walEntry) {
+      if (walEntry.isSignal()) {
+        return handleSignalEntry((SignalWALEntry) walEntry);
+      }
+
+      boolean success = handleInfoEntry(walEntry);
+      if (success) {
+        ++batchSize;
+        fsyncListeners.add(walEntry.getWalFlushListener());
+      }
+      return false;
+    }
+
+    /** @return true if serialization is successful. */
+    private boolean handleInfoEntry(WALEntry walEntry) {
+      try {
+        walEntry.serialize(byteBufferVew);
+      } catch (Exception e) {
+        logger.error(
+            "Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
+        walEntry.getWalFlushListener().fail(e);
+        return false;
+      }
+      return true;
+    }
+
+    /**
+     * @return true if fsyncWorkingBuffer has been called, which means this serialization task
+     *     should be ended.
+     */
+    private boolean handleSignalEntry(SignalWALEntry signalWALEntry) {
+      switch (signalWALEntry.getSignalType()) {
+        case ROLL_WAL_LOG_WRITER_SIGNAL:
+          rollWALFileWriterListener = signalWALEntry.getWalFlushListener();
+          fsyncWorkingBuffer(fsyncListeners, rollWALFileWriterListener);
+          return true;
+        case CLOSE_SIGNAL:
+          boolean dataExists = batchSize > 0;
+          if (dataExists) {
+            fsyncWorkingBuffer(fsyncListeners, rollWALFileWriterListener);
+          }
+          return dataExists;
+        default:
+          return false;
       }
     }
   }
@@ -295,9 +311,9 @@ public class WALBuffer extends AbstractWALBuffer {
 
   /** Notice: this method only called at the last of SerializeTask. */
   private void fsyncWorkingBuffer(
-      List<WALFlushListener> fsyncListeners, WALFlushListener rollWAlFileWriterListener) {
+      List<WALFlushListener> fsyncListeners, WALFlushListener rollWALFileWriterListener) {
     switchWorkingBufferToFlushing();
-    syncBufferThread.submit(new SyncBufferTask(true, fsyncListeners, rollWAlFileWriterListener));
+    syncBufferThread.submit(new SyncBufferTask(true, fsyncListeners, rollWALFileWriterListener));
   }
 
   // only called by serializeThread
@@ -327,7 +343,7 @@ public class WALBuffer extends AbstractWALBuffer {
   private class SyncBufferTask implements Runnable {
     private final boolean forceFlag;
     private final List<WALFlushListener> fsyncListeners;
-    private final WALFlushListener rollWAlFileWriterListener;
+    private final WALFlushListener rollWALFileWriterListener;
 
     public SyncBufferTask(boolean forceFlag) {
       this(forceFlag, null, null);
@@ -336,10 +352,10 @@ public class WALBuffer extends AbstractWALBuffer {
     public SyncBufferTask(
         boolean forceFlag,
         List<WALFlushListener> fsyncListeners,
-        WALFlushListener rollWAlFileWriterListener) {
+        WALFlushListener rollWALFileWriterListener) {
       this.forceFlag = forceFlag;
       this.fsyncListeners = fsyncListeners == null ? Collections.emptyList() : fsyncListeners;
-      this.rollWAlFileWriterListener = rollWAlFileWriterListener;
+      this.rollWALFileWriterListener = rollWALFileWriterListener;
     }
 
     @Override
@@ -377,12 +393,12 @@ public class WALBuffer extends AbstractWALBuffer {
 
       // try to roll log writer
       try {
-        if (rollWAlFileWriterListener != null
+        if (rollWALFileWriterListener != null
             || (forceFlag
                 && currentWALFileWriter.size() >= config.getWalFileSizeThresholdInByte())) {
           rollLogWriter();
-          if (rollWAlFileWriterListener != null) {
-            rollWAlFileWriterListener.succeed();
+          if (rollWALFileWriterListener != null) {
+            rollWALFileWriterListener.succeed();
           }
         }
       } catch (IOException e) {
@@ -390,8 +406,8 @@ public class WALBuffer extends AbstractWALBuffer {
             "Fail to roll wal node-{}'s log writer, change system mode to read-only.",
             identifier,
             e);
-        if (rollWAlFileWriterListener != null) {
-          rollWAlFileWriterListener.fail(e);
+        if (rollWALFileWriterListener != null) {
+          rollWALFileWriterListener.fail(e);
         }
         config.setReadOnly(true);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 581dcb03e7..57344a7f2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -82,7 +82,7 @@ public class WALNode implements IWALNode {
    */
   private final AtomicLong totalCostOfFlushedMemTables = new AtomicLong();
   /** version id -> cost sum of memTables flushed at this file version */
-  private final Map<Integer, Long> walFileVersionId2MemTableCostSum = new ConcurrentHashMap<>();
+  private final Map<Integer, Long> walFileVersionId2MemTablesTotalCost = new ConcurrentHashMap<>();
 
   public WALNode(String identifier, String logDirectory) throws FileNotFoundException {
     this.identifier = identifier;
@@ -153,7 +153,7 @@ public class WALNode implements IWALNode {
     // update cost info
     long cost = config.isEnableMemControl() ? memTable.getTVListsRamCost() : 1;
     int currentWALFileVersion = buffer.getCurrentWALFileVersion();
-    walFileVersionId2MemTableCostSum.compute(
+    walFileVersionId2MemTablesTotalCost.compute(
         currentWALFileVersion, (k, v) -> v == null ? cost : v + cost);
     totalCostOfFlushedMemTables.addAndGet(cost);
   }
@@ -242,7 +242,7 @@ public class WALNode implements IWALNode {
           }
           // update totalRamCostOfFlushedMemTables
           int versionId = WALWriter.parseVersionId(file.getName());
-          Long memTableRamCostSum = walFileVersionId2MemTableCostSum.remove(versionId);
+          Long memTableRamCostSum = walFileVersionId2MemTablesTotalCost.remove(versionId);
           if (memTableRamCostSum != null) {
             totalCostOfFlushedMemTables.addAndGet(-memTableRamCostSum);
           }