You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/19 14:06:32 UTC

[iotdb] branch master updated: [IOTDB-2917] Fix flush operation has one residual wal file (#5560)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e891a68c13 [IOTDB-2917] Fix flush operation has one residual wal file (#5560)
e891a68c13 is described below

commit e891a68c132dabc3a404eafe3b9b77376313465b
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Tue Apr 19 22:06:27 2022 +0800

    [IOTDB-2917] Fix flush operation has one residual wal file (#5560)
---
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  2 +-
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  | 49 +++++++++++++---------
 2 files changed, 30 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index d67b6048ab..2bebd7196e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -139,6 +139,7 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(CacheHitRatioMonitor.getInstance());
     registerManager.register(CompactionTaskManager.getInstance());
     JMXService.registerMBean(getInstance(), mbeanName);
+    registerManager.register(WALManager.getInstance());
 
     // in mpp mode we need to start some other services
     if (IoTDBDescriptor.getInstance().getConfig().isMppMode()) {
@@ -153,7 +154,6 @@ public class IoTDB implements IoTDBMBean {
       registerManager.register(StorageEngine.getInstance());
     }
 
-    registerManager.register(WALManager.getInstance());
     registerManager.register(TemporaryQueryDataFileService.getInstance());
     registerManager.register(UDFClassLoaderManager.getInstance());
     registerManager.register(UDFRegistrationService.getInstance());
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index 670e81607e..916bb4846d 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -134,7 +134,7 @@ public class WALBuffer extends AbstractWALBuffer {
 
     /** In order to control memory usage of blocking queue, get 1 and then serialize 1 */
     private void serialize() {
-      boolean rollWAlFileWriter = false;
+      WALFlushListener rollWAlFileWriterListener = null;
       int batchSize = 0;
 
       // try to get first WALEntry with blocking interface
@@ -153,9 +153,9 @@ public class WALBuffer extends AbstractWALBuffer {
         } else {
           switch (((SignalWALEntry) firstWALEntry).getSignalType()) {
             case ROLL_WAL_LOG_WRITER_SIGNAL:
-              rollWAlFileWriter = true;
-              fsyncListeners.add(firstWALEntry.getWalFlushListener());
-              break;
+              rollWAlFileWriterListener = firstWALEntry.getWalFlushListener();
+              fsyncWorkingBuffer(fsyncListeners, rollWAlFileWriterListener);
+              return;
             case CLOSE_SIGNAL:
             default:
               break;
@@ -195,8 +195,7 @@ public class WALBuffer extends AbstractWALBuffer {
         } else {
           switch (((SignalWALEntry) walEntry).getSignalType()) {
             case ROLL_WAL_LOG_WRITER_SIGNAL:
-              rollWAlFileWriter = true;
-              fsyncListeners.add(walEntry.getWalFlushListener());
+              rollWAlFileWriterListener = walEntry.getWalFlushListener();
               break;
             case CLOSE_SIGNAL:
             default:
@@ -207,8 +206,8 @@ public class WALBuffer extends AbstractWALBuffer {
       }
 
       // call fsync at last and set fsyncListeners
-      if (batchSize > 0 || rollWAlFileWriter) {
-        fsyncWorkingBuffer(fsyncListeners, rollWAlFileWriter);
+      if (batchSize > 0 || rollWAlFileWriterListener != null) {
+        fsyncWorkingBuffer(fsyncListeners, rollWAlFileWriterListener);
       }
     }
   }
@@ -296,9 +295,9 @@ public class WALBuffer extends AbstractWALBuffer {
 
   /** Notice: this method only called at the last of SerializeTask. */
   private void fsyncWorkingBuffer(
-      List<WALFlushListener> fsyncListeners, boolean rollWAlFileWriter) {
+      List<WALFlushListener> fsyncListeners, WALFlushListener rollWAlFileWriterListener) {
     switchWorkingBufferToFlushing();
-    syncBufferThread.submit(new SyncBufferTask(true, rollWAlFileWriter, fsyncListeners));
+    syncBufferThread.submit(new SyncBufferTask(true, fsyncListeners, rollWAlFileWriterListener));
   }
 
   // only called by serializeThread
@@ -326,19 +325,21 @@ public class WALBuffer extends AbstractWALBuffer {
    * This task syncs syncingBuffer to disk. The precondition is that syncingBuffer cannot be null.
    */
   private class SyncBufferTask implements Runnable {
-    private final boolean force;
-    private final boolean rollWAlFileWriter;
+    private final boolean forceFlag;
     private final List<WALFlushListener> fsyncListeners;
+    private final WALFlushListener rollWAlFileWriterListener;
 
-    public SyncBufferTask(boolean force) {
-      this(force, false, Collections.emptyList());
+    public SyncBufferTask(boolean forceFlag) {
+      this(forceFlag, null, null);
     }
 
     public SyncBufferTask(
-        boolean force, boolean rollWAlFileWriter, List<WALFlushListener> fsyncListeners) {
-      this.force = force;
-      this.rollWAlFileWriter = rollWAlFileWriter;
+        boolean forceFlag,
+        List<WALFlushListener> fsyncListeners,
+        WALFlushListener rollWAlFileWriterListener) {
+      this.forceFlag = forceFlag;
       this.fsyncListeners = fsyncListeners == null ? Collections.emptyList() : fsyncListeners;
+      this.rollWAlFileWriterListener = rollWAlFileWriterListener;
     }
 
     @Override
@@ -355,7 +356,7 @@ public class WALBuffer extends AbstractWALBuffer {
       }
 
       // force os cache to the storage device
-      if (force) {
+      if (forceFlag) {
         try {
           currentWALFileWriter.force();
         } catch (IOException e) {
@@ -368,6 +369,7 @@ public class WALBuffer extends AbstractWALBuffer {
           }
           config.setReadOnly(true);
         }
+        // notify all waiting listeners
         for (WALFlushListener fsyncListener : fsyncListeners) {
           fsyncListener.succeed();
         }
@@ -375,15 +377,22 @@ public class WALBuffer extends AbstractWALBuffer {
 
       // try to roll log writer
       try {
-        if (rollWAlFileWriter
-            || (force && currentWALFileWriter.size() >= config.getWalFileSizeThresholdInByte())) {
+        if (rollWAlFileWriterListener != null
+            || (forceFlag
+                && currentWALFileWriter.size() >= config.getWalFileSizeThresholdInByte())) {
           rollLogWriter();
+          if (rollWAlFileWriterListener != null) {
+            rollWAlFileWriterListener.succeed();
+          }
         }
       } catch (IOException e) {
         logger.error(
             "Fail to roll wal node-{}'s log writer, change system mode to read-only.",
             identifier,
             e);
+        if (rollWAlFileWriterListener != null) {
+          rollWAlFileWriterListener.fail(e);
+        }
         config.setReadOnly(true);
       }
     }