You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/25 04:14:58 UTC
[incubator-iotdb] 01/04: modify filenode processor test
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 71386ef5fefbe895cc02444ea33753fe07c82703
Author: lta <li...@163.com>
AuthorDate: Tue Jun 25 11:45:07 2019 +0800
modify filenode processor test
---
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 60 +++++++++++-----------
.../filenodeV2/UnsealedTsFileProcessorV2Test.java | 2 +-
2 files changed, 30 insertions(+), 32 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index d7f6f6b..365860e 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -90,7 +90,7 @@ public class FileNodeProcessorV2 {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private Object closeFileNodeCondition = new Object();
+ private final Object closeFileNodeCondition = new Object();
/**
* Mark whether to close file node
@@ -233,6 +233,7 @@ public class FileNodeProcessorV2 {
* @return -1: failed, 1: Overflow, 2:Bufferwrite
*/
public boolean insert(InsertPlan insertPlan) {
+ lock.writeLock().lock();
try {
if (toBeClosed) {
throw new FileNodeProcessorException(
@@ -251,47 +252,44 @@ public class FileNodeProcessorV2 {
} catch (FileNodeProcessorException | IOException e) {
LOGGER.error("insert tsRecord to unsealed data file failed, because {}", e.getMessage(), e);
return false;
+ } finally {
+ lock.writeLock().unlock();
}
}
private boolean insertUnsealedDataFile(InsertPlan insertPlan, boolean sequence)
throws IOException {
- lock.writeLock().lock();
UnsealedTsFileProcessorV2 unsealedTsFileProcessor;
- try {
- boolean result;
- // create a new BufferWriteProcessor
- if (sequence) {
- if (workSequenceTsFileProcessor == null) {
- workSequenceTsFileProcessor = createTsFileProcessor(true);
- sequenceFileList.add(workSequenceTsFileProcessor.getTsFileResource());
- }
- unsealedTsFileProcessor = workSequenceTsFileProcessor;
- } else {
- if (workUnSequenceTsFileProcessor == null) {
- workUnSequenceTsFileProcessor = createTsFileProcessor(false);
- unSequenceFileList.add(workUnSequenceTsFileProcessor.getTsFileResource());
- }
- unsealedTsFileProcessor = workUnSequenceTsFileProcessor;
+ boolean result;
+ // create a new BufferWriteProcessor
+ if (sequence) {
+ if (workSequenceTsFileProcessor == null) {
+ workSequenceTsFileProcessor = createTsFileProcessor(true);
+ sequenceFileList.add(workSequenceTsFileProcessor.getTsFileResource());
}
-
- // insert BufferWrite
- result = unsealedTsFileProcessor.insert(insertPlan);
-
- // try to update the latest time of the device of this tsRecord
- if (result && latestTimeForEachDevice.get(insertPlan.getDeviceId()) < insertPlan.getTime()) {
- latestTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime());
+ unsealedTsFileProcessor = workSequenceTsFileProcessor;
+ } else {
+ if (workUnSequenceTsFileProcessor == null) {
+ workUnSequenceTsFileProcessor = createTsFileProcessor(false);
+ unSequenceFileList.add(workUnSequenceTsFileProcessor.getTsFileResource());
}
+ unsealedTsFileProcessor = workUnSequenceTsFileProcessor;
+ }
- // check memtable size and may asyncFlush the workMemtable
- if (unsealedTsFileProcessor.shouldFlush()) {
- flushAndCheckShouldClose(unsealedTsFileProcessor, sequence);
- }
+ // insert BufferWrite
+ result = unsealedTsFileProcessor.insert(insertPlan);
- return result;
- } finally {
- lock.writeLock().unlock();
+ // try to update the latest time of the device of this tsRecord
+ if (result && latestTimeForEachDevice.get(insertPlan.getDeviceId()) < insertPlan.getTime()) {
+ latestTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime());
}
+
+ // check memtable size and may asyncFlush the workMemtable
+ if (unsealedTsFileProcessor.shouldFlush()) {
+ flushAndCheckShouldClose(unsealedTsFileProcessor, sequence);
+ }
+
+ return result;
}
private UnsealedTsFileProcessorV2 createTsFileProcessor(boolean sequence) throws IOException {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index 7d08217..ce00700 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -50,7 +50,7 @@ public class UnsealedTsFileProcessorV2Test {
private UnsealedTsFileProcessorV2 processor;
private String storageGroup = "storage_group1";
- private String filePath = "testUnsealedTsFileProcessor.tsfile";
+ private String filePath = "data/testUnsealedTsFileProcessor.tsfile";
private String deviceId = "root.vehicle.d0";
private String measurementId = "s0";
private TSDataType dataType = TSDataType.INT32;