You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/23 07:31:34 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: fix
end time map update big
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 395145c fix end time map update big
395145c is described below
commit 395145c347b232bc827a947eedef255bd6284357
Author: qiaojialin <64...@qq.com>
AuthorDate: Sun Jun 23 15:31:31 2019 +0800
fix end time map update big
---
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 26 +++++++++++++--------
.../filenodeV2/UnsealedTsFileProcessorV2.java | 27 +++++++++++-----------
2 files changed, 30 insertions(+), 23 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 6f1ec2f..74f7fef 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -445,6 +445,7 @@ public class FileNodeProcessorV2 {
}
unsealedTsFileProcessor.asyncFlush();
+
}
public void asyncForceClose() {
@@ -453,11 +454,13 @@ public class FileNodeProcessorV2 {
if (workSequenceTsFileProcessor != null) {
closingSequenceTsFileProcessor.add(workSequenceTsFileProcessor);
workSequenceTsFileProcessor.asyncClose();
+ updateEndTimeMap(workSequenceTsFileProcessor);
workSequenceTsFileProcessor = null;
}
if (workUnSequenceTsFileProcessor != null) {
closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor);
workUnSequenceTsFileProcessor.asyncClose();
+ updateEndTimeMap(workUnSequenceTsFileProcessor);
workUnSequenceTsFileProcessor = null;
}
} finally {
@@ -466,6 +469,18 @@ public class FileNodeProcessorV2 {
}
/**
+ * when close an UnsealedTsFileProcessor, update its EndTimeMap immediately
+ * @param tsFileProcessor processor to be closed
+ */
+ private void updateEndTimeMap(UnsealedTsFileProcessorV2 tsFileProcessor) {
+ TsFileResourceV2 resource = tsFileProcessor.getTsFileResource();
+ for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
+ String deviceId = startTime.getKey();
+ resource.getEndTimeMap().put(deviceId, latestTimeForEachDevice.get(deviceId));
+ }
+ }
+
+ /**
* This method will be blocked until all tsfile processors are closed.
*/
public void syncCloseFileNode(){
@@ -533,19 +548,12 @@ public class FileNodeProcessorV2 {
try {
if (closingSequenceTsFileProcessor.contains(unsealedTsFileProcessor)) {
closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
- LOGGER.info("removed a sequence tsfile processor, closing list size: {}", closingSequenceTsFileProcessor.size());
} else {
-// closingUnSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
+ closingUnSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
}
// end time with one start time
TsFileResourceV2 resource = unsealedTsFileProcessor.getTsFileResource();
- synchronized (resource) {
- for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
- String deviceId = startTime.getKey();
- resource.getEndTimeMap().put(deviceId, latestTimeForEachDevice.get(deviceId));
- }
- resource.setClosed(true);
- }
+ resource.setClosed(true);
LOGGER.info("signal closing file node condition");
closeFileNodeCondition.signal();
}finally {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 5b27226..b412248 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -182,6 +182,16 @@ public class UnsealedTsFileProcessorV2 {
shouldClose = true;
}
+ public void syncClose() {
+ asyncClose();
+ synchronized (flushingMemTables) {
+ try {
+ flushingMemTables.wait();
+ } catch (InterruptedException e) {
+ LOGGER.error("wait close interrupted", e);
+ }
+ }
+ }
public void asyncClose() {
flushQueryLock.writeLock().lock();
@@ -200,7 +210,9 @@ public class UnsealedTsFileProcessorV2 {
}
}
- // only for test
+ /**
+ * TODO if the flushing thread is too fast, the tmpMemTable.wait() may never wakeup
+ */
public void syncFlush() {
IMemTable tmpMemTable;
flushQueryLock.writeLock().lock();
@@ -226,19 +238,6 @@ public class UnsealedTsFileProcessorV2 {
}
}
-
-
- public void syncClose() {
- asyncClose();
- synchronized (flushingMemTables) {
- try {
- flushingMemTables.wait();
- } catch (InterruptedException e) {
- LOGGER.error("wait close interrupted", e);
- }
- }
- }
-
/**
* put the working memtable into flushing list and set the working memtable to null
*/