You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/20 09:16:44 UTC

[iotdb] branch master updated: [IOTDB-4694] Make sure memTable snapshot is made before flush operation (#7662)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8303f187b6 [IOTDB-4694] Make sure memTable snapshot is made before flush operation (#7662)
8303f187b6 is described below

commit 8303f187b691602af4b6cd7e844e24406e8e1522
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Thu Oct 20 17:16:38 2022 +0800

    [IOTDB-4694] Make sure memTable snapshot is made before flush operation (#7662)
---
 .../db/engine/storagegroup/TsFileProcessor.java    |  9 +--
 .../java/org/apache/iotdb/db/wal/node/WALNode.java | 70 ++++++++++++----------
 2 files changed, 44 insertions(+), 35 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 6365f7e414..3c27521cb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -1288,10 +1288,6 @@ public class TsFileProcessor {
       }
     }
 
-    for (FlushListener flushListener : flushListeners) {
-      flushListener.onMemTableFlushed(memTableToFlush);
-    }
-
     try {
       flushQueryLock.writeLock().lock();
       Iterator<Pair<Modification, IMemTable>> iterator = modsToMemtable.iterator();
@@ -1329,6 +1325,11 @@ public class TsFileProcessor {
     // for sync flush
     syncReleaseFlushedMemTable(memTableToFlush);
 
+    // call flushed listener after memtable is released safely
+    for (FlushListener flushListener : flushListeners) {
+      flushListener.onMemTableFlushed(memTableToFlush);
+    }
+
     // retry to avoid unnecessary read-only mode
     int retryCnt = 0;
     while (shouldClose && flushingMemTables.isEmpty() && writer != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 0f4ced8df6..0f26210202 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -422,45 +422,53 @@ public class WALNode implements IWALNode {
       }
     }
 
+    /**
+     * synchronize memTable to make sure snapshot is made before memTable flush operation, {@link
+     * org.apache.iotdb.db.engine.storagegroup.TsFileProcessor#flushOneMemTable}
+     */
     private void snapshotMemTable(DataRegion dataRegion, File tsFile, MemTableInfo memTableInfo) {
       IMemTable memTable = memTableInfo.getMemTable();
-      if (memTable.getFlushStatus() != FlushStatus.WORKING) {
-        return;
-      }
-
-      // update snapshot count
-      memTableSnapshotCount.compute(memTable.getMemTableId(), (k, v) -> v == null ? 1 : v + 1);
-      // roll wal log writer to make sure first version id will be updated
-      WALEntry rollWALFileSignal =
-          new WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
-      WALFlushListener fileRolledListener = log(rollWALFileSignal);
-      if (fileRolledListener.waitForResult() == WALFlushListener.Status.FAILURE) {
-        logger.error("Fail to roll wal log writer.", fileRolledListener.getCause());
-        return;
-      }
-
-      // update first version id first to make sure snapshot is in the files ≥ current log
-      // version
-      memTableInfo.setFirstFileVersionId(buffer.getCurrentWALFileVersion());
 
       // get dataRegion write lock to make sure no more writes to the memTable
       dataRegion.writeLock(
           "CheckpointManager$DeleteOutdatedFileTask.snapshotOrFlushOldestMemTable");
       try {
-        // log snapshot in a new .wal file
-        WALEntry walEntry = new WALInfoEntry(memTable.getMemTableId(), memTable, true);
-        WALFlushListener flushListener = log(walEntry);
-
-        // wait until getting the result
-        // it's low-risk to block writes awhile because this memTable accumulates slowly
-        if (flushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
-          logger.error("Fail to snapshot memTable of {}", tsFile, flushListener.getCause());
+        // make sure snapshot is made before memTable flush operation
+        synchronized (memTable) {
+          if (memTable.getFlushStatus() != FlushStatus.WORKING) {
+            return;
+          }
+
+          // update snapshot count
+          memTableSnapshotCount.compute(memTable.getMemTableId(), (k, v) -> v == null ? 1 : v + 1);
+          // roll wal log writer to make sure first version id will be updated
+          WALEntry rollWALFileSignal =
+              new WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
+          WALFlushListener fileRolledListener = log(rollWALFileSignal);
+          if (fileRolledListener.waitForResult() == WALFlushListener.Status.FAILURE) {
+            logger.error("Fail to roll wal log writer.", fileRolledListener.getCause());
+            return;
+          }
+
+          // update first version id first to make sure snapshot is in the files ≥ current log
+          // version
+          memTableInfo.setFirstFileVersionId(buffer.getCurrentWALFileVersion());
+
+          // log snapshot in a new .wal file
+          WALEntry walEntry = new WALInfoEntry(memTable.getMemTableId(), memTable, true);
+          WALFlushListener flushListener = log(walEntry);
+
+          // wait until getting the result
+          // it's low-risk to block writes awhile because this memTable accumulates slowly
+          if (flushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
+            logger.error("Fail to snapshot memTable of {}", tsFile, flushListener.getCause());
+          }
+          logger.info(
+              "WAL node-{} snapshots memTable-{} to wal files, memTable size is {}.",
+              identifier,
+              memTable.getMemTableId(),
+              memTable.getTVListsRamCost());
         }
-        logger.info(
-            "WAL node-{} snapshots memTable-{} to wal files, memTable size is {}.",
-            identifier,
-            memTable.getMemTableId(),
-            memTable.getTVListsRamCost());
       } finally {
         dataRegion.writeUnlock();
       }