You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/21 06:58:46 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: fix
make metadata visible bug
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 19bb5ad fix make metadata visible bug
19bb5ad is described below
commit 19bb5ad1cd1ccfc82f457fa42db4dbc258c989e2
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jun 21 14:58:35 2019 +0800
fix make metadata visible bug
---
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 84 ++++++++++-----------
.../filenodeV2/UnsealedTsFileProcessorV2.java | 88 +++++++++++-----------
.../db/engine/memtable/MemTableFlushTaskV2.java | 1 -
3 files changed, 86 insertions(+), 87 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index e2485d2..f1b05fe 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -287,7 +287,7 @@ public class FileNodeProcessorV2 {
/**
* ensure there must be a flush thread submitted after setCloseMark() is called, therefore the setCloseMark task
- * will be executed by a flush thread. -- said by qiaojialin
+ * will be executed by a flush thread.
*
* only called by insert(), thread-safety should be ensured by caller
*/
@@ -308,47 +308,6 @@ public class FileNodeProcessorV2 {
unsealedTsFileProcessor.asyncFlush();
}
-
- public boolean updateLatestFlushTimeCallback() {
- lock.writeLock().lock();
- try {
- // update the largest timestamp in the last flushing memtable
- for (Entry<String, Long> entry : latestTimeForEachDevice.entrySet()) {
- latestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
- }
- } finally {
- lock.writeLock().unlock();
- }
- return true;
- }
-
- /**
- * put the memtable back to the MemTablePool and make the metadata in writer visible
- */
- // TODO please consider concurrency with query and insert method.
- public void closeUnsealedTsFileProcessorCallback(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
- lock.writeLock().lock();
- try {
- if (closingSequenceTsFileProcessor.contains(unsealedTsFileProcessor)) {
- closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
- } else {
- closingUnSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
- }
- // end time with one start time
- TsFileResourceV2 resource = unsealedTsFileProcessor.getTsFileResource();
- synchronized (resource) {
- for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
- String deviceId = startTime.getKey();
- resource.getEndTimeMap().put(deviceId, latestTimeForEachDevice.get(deviceId));
- }
- resource.setClosed(true);
- }
- closeFileNodeCondition.signal();
- }finally {
- lock.writeLock().unlock();
- }
- }
-
public void asyncForceClose() {
lock.writeLock().lock();
try {
@@ -415,6 +374,47 @@ public class FileNodeProcessorV2 {
}
+ public boolean updateLatestFlushTimeCallback() {
+ lock.writeLock().lock();
+ try {
+ // update the largest timestamp in the last flushing memtable
+ for (Entry<String, Long> entry : latestTimeForEachDevice.entrySet()) {
+ latestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ return true;
+ }
+
+ /**
+ * put the memtable back to the MemTablePool and make the metadata in writer visible
+ */
+ // TODO please consider concurrency with query and insert method.
+ public void closeUnsealedTsFileProcessorCallback(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
+ lock.writeLock().lock();
+ try {
+ if (closingSequenceTsFileProcessor.contains(unsealedTsFileProcessor)) {
+ closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
+ } else {
+ closingUnSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
+ }
+ // end time with one start time
+ TsFileResourceV2 resource = unsealedTsFileProcessor.getTsFileResource();
+ synchronized (resource) {
+ for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
+ String deviceId = startTime.getKey();
+ resource.getEndTimeMap().put(deviceId, latestTimeForEachDevice.get(deviceId));
+ }
+ resource.setClosed(true);
+ }
+ closeFileNodeCondition.signal();
+ }finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+
public UnsealedTsFileProcessorV2 getWorkSequenceTsFileProcessor() {
return workSequenceTsFileProcessor;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 12c42ef..e365ca6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -130,21 +130,6 @@ public class UnsealedTsFileProcessorV2 {
return tsFileResource;
}
- /**
- * return the memtable to MemTablePool and make metadata in writer visible
- */
- private void releaseFlushedMemTable(IMemTable memTable) {
- flushQueryLock.writeLock().lock();
- try {
- writer.makeMetadataVisible();
- flushingMemTables.remove(memTable);
- MemTablePool.getInstance().putBack(memTable, storageGroupName);
- } finally {
- flushQueryLock.writeLock().unlock();
- }
- }
-
-
public boolean shouldFlush() {
return workMemTable.memSize() > TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
}
@@ -190,38 +175,21 @@ public class UnsealedTsFileProcessorV2 {
}
}
- public boolean shouldClose() {
- long fileSize = tsFileResource.getFileSize();
- long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig()
- .getBufferwriteFileSizeThreshold();
- return fileSize > fileSizeThreshold;
- }
-
- public void setCloseMark() {
- shouldClose = true;
- }
-
- public synchronized void asyncClose() {
- flushingMemTables.add(workMemTable == null ? new EmptyMemTable() : workMemTable);
- FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
- flushUpdateLatestFlushTimeCallback.get();
- shouldClose = true;
- workMemTable = null;
- }
-
- public void syncClose() {
- asyncClose();
- synchronized (flushingMemTables) {
- try {
- flushingMemTables.wait();
- } catch (InterruptedException e) {
- LOGGER.error("wait close interrupted", e);
- }
+ /**
+ * return the memtable to MemTablePool and make metadata in writer visible
+ */
+ private void releaseFlushedMemTableCallback(IMemTable memTable) {
+ flushQueryLock.writeLock().lock();
+ try {
+ writer.makeMetadataVisible();
+ flushingMemTables.remove(memTable);
+ MemTablePool.getInstance().putBack(memTable, storageGroupName);
+ } finally {
+ flushQueryLock.writeLock().unlock();
}
}
-
/**
* Take the first MemTable from the flushingMemTables and flush it. Called by a flush thread of
* the flush manager pool
@@ -233,7 +201,7 @@ public class UnsealedTsFileProcessorV2 {
// null memtable only appears when calling asyncClose()
if (memTableToFlush.isManagedByMemPool()) {
MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName,
- this::releaseFlushedMemTable);
+ this::releaseFlushedMemTableCallback);
flushTask.flushMemTable(fileSchema, memTableToFlush, versionController.nextVersion());
}
// for sync flush
@@ -278,6 +246,37 @@ public class UnsealedTsFileProcessorV2 {
}
}
+
+ public boolean shouldClose() {
+ long fileSize = tsFileResource.getFileSize();
+ long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig()
+ .getBufferwriteFileSizeThreshold();
+ return fileSize > fileSizeThreshold;
+ }
+
+ public void setCloseMark() {
+ shouldClose = true;
+ }
+
+ public synchronized void asyncClose() {
+ flushingMemTables.add(workMemTable == null ? new EmptyMemTable() : workMemTable);
+ FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
+ flushUpdateLatestFlushTimeCallback.get();
+ shouldClose = true;
+ workMemTable = null;
+ }
+
+ public void syncClose() {
+ asyncClose();
+ synchronized (flushingMemTables) {
+ try {
+ flushingMemTables.wait();
+ } catch (InterruptedException e) {
+ LOGGER.error("wait close interrupted", e);
+ }
+ }
+ }
+
public boolean isManagedByFlushManager() {
return managedByFlushManager;
}
@@ -323,4 +322,5 @@ public class UnsealedTsFileProcessorV2 {
flushQueryLock.readLock().unlock();
}
}
+
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index 70bbe4d..02a6c82 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -131,7 +131,6 @@ public class MemTableFlushTaskV2 {
}
}
- tsFileIoWriter.makeMetadataVisible();
flushCallBack.accept(memTable);
});