You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/21 02:32:15 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: add
filenode test
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new c8d8e24 add filenode test
new c90ad2e Merge remote-tracking branch 'origin/feature_async_close_tsfile' into feature_async_close_tsfile
c8d8e24 is described below
commit c8d8e24de9fa728a293f2d9e21b72b70ac8e2550
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jun 21 10:31:29 2019 +0800
add filenode test
---
.../db/engine/filenodeV2/FileNodeManagerV2.java | 6 +--
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 49 +++++++++-------------
.../filenodeV2/UnsealedTsFileProcessorV2.java | 8 ++--
.../engine/filenodeV2/FileNodeProcessorV2Test.java | 26 +++---------
.../filenodeV2/UnsealedTsFileProcessorV2Test.java | 1 +
5 files changed, 32 insertions(+), 58 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index ea1fdd3..595481a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -152,11 +152,7 @@ public class FileNodeManagerV2 implements IStatistic, IService {
if (processor == null) {
LOGGER.debug("construct a processor instance, the storage group is {}, Thread is {}",
filenodeName, Thread.currentThread().getId());
- try {
- processor = new FileNodeProcessorV2(baseDir, filenodeName);
- } catch (FileNodeProcessorException e) {
- throw new FileNodeManagerException(e);
- }
+ processor = new FileNodeProcessorV2(filenodeName);
synchronized (processorMap) {
processorMap.put(filenodeName, processor);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 02cea35..665691f 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSourceV2;
import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.FileNodeProcessorException;
import org.apache.iotdb.db.metadata.MManager;
@@ -56,8 +57,6 @@ public class FileNodeProcessorV2 {
private static final Logger LOGGER = LoggerFactory.getLogger(FileNodeProcessorV2.class);
- private static final String RESTORE_FILE_SUFFIX = ".restore";
-
private static final MManager mManager = MManager.getInstance();
private static final Directories directories = Directories.getInstance();
@@ -96,29 +95,17 @@ public class FileNodeProcessorV2 {
private VersionController versionController;
- public FileNodeProcessorV2(String absoluteBaseDir, String storageGroupName)
- throws FileNodeProcessorException {
+ public FileNodeProcessorV2(String storageGroupName) {
this.storageGroupName = storageGroupName;
lock = new ReentrantReadWriteLock();
closeFileNodeCondition = lock.writeLock().newCondition();
- File storageGroupDir = new File(absoluteBaseDir, storageGroupName);
- if (!storageGroupDir.exists()) {
- storageGroupDir.mkdir();
- LOGGER.info("The directory of the storage group {} doesn't exist. Create a new " +
- "directory {}", storageGroupName, storageGroupDir.getAbsolutePath());
- }
-
recovery();
/**
* version controller
*/
- try {
- versionController = new SimpleFileVersionController(storageGroupDir.getAbsolutePath());
- } catch (IOException e) {
- throw new FileNodeProcessorException(e);
- }
+ versionController = SysTimeVersionController.INSTANCE;
// construct the file schema
this.fileSchema = constructFileSchema(storageGroupName);
@@ -175,10 +162,10 @@ public class FileNodeProcessorV2 {
boolean result;
// write to sequence or unsequence file
if (tsRecord.time > latestFlushedTimeForEachDevice.get(tsRecord.deviceId)) {
- result = writeUnsealedDataFile(workSequenceTsFileProcessor, tsRecord, true);
+ result = writeUnsealedDataFile(tsRecord, true);
insertResult = result ? 1 : -1;
} else {
- result = writeUnsealedDataFile(workUnSequenceTsFileProcessor, tsRecord, false);
+ result = writeUnsealedDataFile(tsRecord, false);
insertResult = result ? 2 : -1;
}
} catch (Exception e) {
@@ -191,31 +178,35 @@ public class FileNodeProcessorV2 {
return insertResult;
}
- private boolean writeUnsealedDataFile(UnsealedTsFileProcessorV2 unsealedTsFileProcessor,
- TSRecord tsRecord, boolean sequence) throws IOException {
+ private boolean writeUnsealedDataFile(TSRecord tsRecord, boolean sequence) throws IOException {
lock.writeLock().lock();
+ UnsealedTsFileProcessorV2 unsealedTsFileProcessor;
try {
boolean result;
// create a new BufferWriteProcessor
- if (unsealedTsFileProcessor == null) {
- if (sequence) {
+ if (sequence) {
+ if (workSequenceTsFileProcessor == null) {
String baseDir = directories.getNextFolderForTsfile();
- String filePath = Paths.get(baseDir, storageGroupName, System.currentTimeMillis() + "")
+ String filePath = Paths.get(baseDir, System.currentTimeMillis() + "")
.toString();
- unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
+ workSequenceTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
new File(filePath),
fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback);
- sequenceFileList.add(unsealedTsFileProcessor.getTsFileResource());
- } else {
+ sequenceFileList.add(workSequenceTsFileProcessor.getTsFileResource());
+ }
+ unsealedTsFileProcessor = workSequenceTsFileProcessor;
+ } else {
+ if (workUnSequenceTsFileProcessor == null) {
// TODO check if the disk is full
String baseDir = IoTDBDescriptor.getInstance().getConfig().getOverflowDataDir();
- String filePath = Paths.get(baseDir, storageGroupName, System.currentTimeMillis() + "")
+ String filePath = Paths.get(baseDir, System.currentTimeMillis() + "")
.toString();
- unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
+ workUnSequenceTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
new File(filePath),
fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback);
- unSequenceFileList.add(unsealedTsFileProcessor.getTsFileResource());
+ unSequenceFileList.add(workUnSequenceTsFileProcessor.getTsFileResource());
}
+ unsealedTsFileProcessor = workUnSequenceTsFileProcessor;
}
// write BufferWrite
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index f00b935..3185a2e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -71,7 +71,7 @@ public class UnsealedTsFileProcessorV2 {
private VersionController versionController;
- private Consumer<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor;
+ private Consumer<UnsealedTsFileProcessorV2> closeUnsealedFileCallback;
/**
* sync this object in query() and asyncFlush()
@@ -80,14 +80,14 @@ public class UnsealedTsFileProcessorV2 {
public UnsealedTsFileProcessorV2(String storageGroupName, File tsfile, FileSchema fileSchema,
VersionController versionController,
- Consumer<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor)
+ Consumer<UnsealedTsFileProcessorV2> closeUnsealedFileCallback)
throws IOException {
this.storageGroupName = storageGroupName;
this.fileSchema = fileSchema;
this.tsFileResource = new TsFileResourceV2(tsfile);
this.versionController = versionController;
this.writer = new NativeRestorableIOWriter(tsfile);
- this.closeUnsealedTsFileProcessor = closeUnsealedTsFileProcessor;
+ this.closeUnsealedFileCallback = closeUnsealedFileCallback;
}
/**
@@ -254,7 +254,7 @@ public class UnsealedTsFileProcessorV2 {
writer = null;
// remove this processor from Closing list in FileNodeProcessor
- closeUnsealedTsFileProcessor.accept(this);
+ closeUnsealedFileCallback.accept(this);
// delete the restore for this bufferwrite processor
if (LOGGER.isInfoEnabled()) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index 385b86e..5503547 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -18,16 +18,8 @@
*/
package org.apache.iotdb.db.engine.filenodeV2;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
-import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.exception.FileNodeProcessorException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.TSRecord;
@@ -43,15 +35,13 @@ public class FileNodeProcessorV2Test {
private String baseDir = "data";
private String deviceId = "root.vehicle.d0";
private String measurementId = "s0";
- private TSDataType dataType = TSDataType.INT32;
- private Map<String, String> props = Collections.emptyMap();
private FileNodeProcessorV2 processor;
@Before
public void setUp() throws Exception {
MetadataManagerHelper.initMetadata();
EnvironmentUtils.envSetUp();
- processor = new FileNodeProcessorV2(baseDir, storageGroup);
+ processor = new FileNodeProcessorV2(storageGroup);
}
@After
@@ -62,23 +52,19 @@ public class FileNodeProcessorV2Test {
@Test
- public void testAsyncClose()
- throws FileNodeProcessorException, BufferWriteProcessorException, ExecutionException, InterruptedException {
-
-
+ public void testAsyncClose() {
for (int j = 1; j <= 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
- for (int i = 0; i < 10; i++) {
- record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(i)));
- }
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(record);
processor.asyncForceClose();
}
QueryDataSourceV2 queryDataSource = processor.query(deviceId, measurementId);
-
Assert.assertEquals(queryDataSource.getSeqDataSource().getQueryTsFiles().size(), 10);
-
+ for (TsFileResourceV2 resource: queryDataSource.getSeqDataSource().getQueryTsFiles()) {
+ Assert.assertEquals(resource.isClosed(), true);
+ }
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index 3b36cc4..8900479 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -151,6 +151,7 @@ public class UnsealedTsFileProcessorV2Test {
for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
String deviceId = startTime.getKey();
resource.getEndTimeMap().put(deviceId, resource.getStartTimeMap().get(deviceId));
+ resource.setClosed(true);
}
}
});