You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/25 09:16:20 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: fix
async close bug
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 01f36de fix async close bug
new 7a00539 Merge remote-tracking branch 'origin/feature_async_close_tsfile' into feature_async_close_tsfile
01f36de is described below
commit 01f36deab65a506cc1d8d0ab66dbf4a8e2b332f6
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 25 17:15:44 2019 +0800
fix async close bug
---
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 62 +++++++++++-----------
.../filenodeV2/UnsealedTsFileProcessorV2.java | 8 ++-
2 files changed, 38 insertions(+), 32 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index b167766..2b3b958 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -286,7 +286,11 @@ public class FileNodeProcessorV2 {
// check memtable size and may asyncFlush the workMemtable
if (unsealedTsFileProcessor.shouldFlush()) {
- flushAndCheckShouldClose(unsealedTsFileProcessor, sequence);
+ if (unsealedTsFileProcessor.shouldClose()) {
+ asyncCloseTsFileProcessor(unsealedTsFileProcessor, sequence);
+ } else {
+ unsealedTsFileProcessor.asyncFlush();
+ }
}
return result;
@@ -316,6 +320,33 @@ public class FileNodeProcessorV2 {
}
}
+ /**
+ * only called by insert(), thread-safety should be ensured by caller
+ */
+ private void asyncCloseTsFileProcessor(UnsealedTsFileProcessorV2 unsealedTsFileProcessor,
+ boolean sequence) {
+
+ LOGGER.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
+ unsealedTsFileProcessor.getWorkMemTableMemory(),
+ unsealedTsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
+
+ // check file size and may setCloseMark the BufferWrite
+ if (sequence) {
+ closingSequenceTsFileProcessor.add(unsealedTsFileProcessor);
+ workSequenceTsFileProcessor = null;
+ } else {
+ closingUnSequenceTsFileProcessor.add(unsealedTsFileProcessor);
+ workUnSequenceTsFileProcessor = null;
+ }
+
+ // async close tsfile
+ unsealedTsFileProcessor.asyncClose();
+
+ LOGGER.info("The file size {} reaches the threshold, async close tsfile: {}.",
+ unsealedTsFileProcessor.getTsFileResource().getFileSize(),
+ unsealedTsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
+ }
+
// TODO need a read lock, please consider the concurrency with flush manager threads.
public QueryDataSourceV2 query(String deviceId, String measurementId)
@@ -450,35 +481,6 @@ public class FileNodeProcessorV2 {
}
}
- /**
- * ensure there must be a flush thread submitted after setCloseMark() is called, therefore the
- * setCloseMark task will be executed by a flush thread.
- *
- * only called by insert(), thread-safety should be ensured by caller
- */
- private void flushAndCheckShouldClose(UnsealedTsFileProcessorV2 unsealedTsFileProcessor,
- boolean sequence) {
-
- LOGGER.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
- unsealedTsFileProcessor.getWorkMemTableMemory(),
- unsealedTsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
-
- // check file size and may setCloseMark the BufferWrite
- if (unsealedTsFileProcessor.shouldClose()) {
- if (sequence) {
- closingSequenceTsFileProcessor.add(unsealedTsFileProcessor);
- workSequenceTsFileProcessor = null;
- } else {
- closingUnSequenceTsFileProcessor.add(unsealedTsFileProcessor);
- workUnSequenceTsFileProcessor = null;
- }
- LOGGER.info("The file size {} reaches the threshold, async close tsfile: {}.",
- unsealedTsFileProcessor.getTsFileResource().getFileSize(),
- unsealedTsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
- }
-
- unsealedTsFileProcessor.asyncClose();
- }
public void asyncForceClose() {
lock.writeLock().lock();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index b3074a9..d6c860a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -197,6 +197,10 @@ public class UnsealedTsFileProcessorV2 {
LOGGER.info("File {} is closed synchronously", tsFileResource.getFile().getAbsolutePath());
}
+ /**
+ * Ensure there must be a flush thread submitted after setCloseMark() is called,
+ * therefore the setCloseMark task will be executed by a flush thread.
+ */
public void asyncClose() {
flushQueryLock.writeLock().lock();
LOGGER.info("Async close the file: {}", tsFileResource.getFile().getAbsolutePath());
@@ -206,11 +210,11 @@ public class UnsealedTsFileProcessorV2 {
LOGGER.debug("add an empty memtable into flushing memtable list when sync close");
}
flushingMemTables.add(tmpMemTable);
+ shouldClose = true;
+ workMemTable = null;
tmpMemTable.setVersion(versionController.nextVersion());
FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
flushUpdateLatestFlushTimeCallback.get();
- shouldClose = true;
- workMemTable = null;
} finally {
flushQueryLock.writeLock().unlock();
}