You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/20 09:16:44 UTC
[iotdb] branch master updated: [IOTDB-4694] Make sure memTable snapshot is made before flush operation (#7662)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8303f187b6 [IOTDB-4694] Make sure memTable snapshot is made before flush operation (#7662)
8303f187b6 is described below
commit 8303f187b691602af4b6cd7e844e24406e8e1522
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Thu Oct 20 17:16:38 2022 +0800
[IOTDB-4694] Make sure memTable snapshot is made before flush operation (#7662)
---
.../db/engine/storagegroup/TsFileProcessor.java | 9 +--
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 70 ++++++++++++----------
2 files changed, 44 insertions(+), 35 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 6365f7e414..3c27521cb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -1288,10 +1288,6 @@ public class TsFileProcessor {
}
}
- for (FlushListener flushListener : flushListeners) {
- flushListener.onMemTableFlushed(memTableToFlush);
- }
-
try {
flushQueryLock.writeLock().lock();
Iterator<Pair<Modification, IMemTable>> iterator = modsToMemtable.iterator();
@@ -1329,6 +1325,11 @@ public class TsFileProcessor {
// for sync flush
syncReleaseFlushedMemTable(memTableToFlush);
+ // call flushed listener after memtable is released safely
+ for (FlushListener flushListener : flushListeners) {
+ flushListener.onMemTableFlushed(memTableToFlush);
+ }
+
// retry to avoid unnecessary read-only mode
int retryCnt = 0;
while (shouldClose && flushingMemTables.isEmpty() && writer != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 0f4ced8df6..0f26210202 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -422,45 +422,53 @@ public class WALNode implements IWALNode {
}
}
+ /**
+ * synchronize memTable to make sure snapshot is made before memTable flush operation, {@link
+ * org.apache.iotdb.db.engine.storagegroup.TsFileProcessor#flushOneMemTable}
+ */
private void snapshotMemTable(DataRegion dataRegion, File tsFile, MemTableInfo memTableInfo) {
IMemTable memTable = memTableInfo.getMemTable();
- if (memTable.getFlushStatus() != FlushStatus.WORKING) {
- return;
- }
-
- // update snapshot count
- memTableSnapshotCount.compute(memTable.getMemTableId(), (k, v) -> v == null ? 1 : v + 1);
- // roll wal log writer to make sure first version id will be updated
- WALEntry rollWALFileSignal =
- new WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
- WALFlushListener fileRolledListener = log(rollWALFileSignal);
- if (fileRolledListener.waitForResult() == WALFlushListener.Status.FAILURE) {
- logger.error("Fail to roll wal log writer.", fileRolledListener.getCause());
- return;
- }
-
- // update first version id first to make sure snapshot is in the files ≥ current log
- // version
- memTableInfo.setFirstFileVersionId(buffer.getCurrentWALFileVersion());
// get dataRegion write lock to make sure no more writes to the memTable
dataRegion.writeLock(
"CheckpointManager$DeleteOutdatedFileTask.snapshotOrFlushOldestMemTable");
try {
- // log snapshot in a new .wal file
- WALEntry walEntry = new WALInfoEntry(memTable.getMemTableId(), memTable, true);
- WALFlushListener flushListener = log(walEntry);
-
- // wait until getting the result
- // it's low-risk to block writes awhile because this memTable accumulates slowly
- if (flushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
- logger.error("Fail to snapshot memTable of {}", tsFile, flushListener.getCause());
+ // make sure snapshot is made before memTable flush operation
+ synchronized (memTable) {
+ if (memTable.getFlushStatus() != FlushStatus.WORKING) {
+ return;
+ }
+
+ // update snapshot count
+ memTableSnapshotCount.compute(memTable.getMemTableId(), (k, v) -> v == null ? 1 : v + 1);
+ // roll wal log writer to make sure first version id will be updated
+ WALEntry rollWALFileSignal =
+ new WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
+ WALFlushListener fileRolledListener = log(rollWALFileSignal);
+ if (fileRolledListener.waitForResult() == WALFlushListener.Status.FAILURE) {
+ logger.error("Fail to roll wal log writer.", fileRolledListener.getCause());
+ return;
+ }
+
+ // update first version id first to make sure snapshot is in the files ≥ current log
+ // version
+ memTableInfo.setFirstFileVersionId(buffer.getCurrentWALFileVersion());
+
+ // log snapshot in a new .wal file
+ WALEntry walEntry = new WALInfoEntry(memTable.getMemTableId(), memTable, true);
+ WALFlushListener flushListener = log(walEntry);
+
+ // wait until getting the result
+ // it's low-risk to block writes awhile because this memTable accumulates slowly
+ if (flushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
+ logger.error("Fail to snapshot memTable of {}", tsFile, flushListener.getCause());
+ }
+ logger.info(
+ "WAL node-{} snapshots memTable-{} to wal files, memTable size is {}.",
+ identifier,
+ memTable.getMemTableId(),
+ memTable.getTVListsRamCost());
}
- logger.info(
- "WAL node-{} snapshots memTable-{} to wal files, memTable size is {}.",
- identifier,
- memTable.getMemTableId(),
- memTable.getTVListsRamCost());
} finally {
dataRegion.writeUnlock();
}