You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/21 04:04:06 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: add
test for filenode processor
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 8e35e67 add test for filenode processor
new 6bd40b1 Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
8e35e67 is described below
commit 8e35e67d6d64aebdb710ee79dcd62b302c52c290
Author: lta <li...@163.com>
AuthorDate: Fri Jun 21 12:03:18 2019 +0800
add test for filenode processor
---
.../db/engine/filenode/CopyOnReadLinkedList.java | 5 ++-
.../db/engine/filenodeV2/FileNodeManagerV2.java | 39 ++++++++++++----------
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 34 +++++++++++--------
.../engine/filenodeV2/FileNodeProcessorV2Test.java | 12 ++++---
.../filenodeV2/UnsealedTsFileProcessorV2Test.java | 22 ++++++------
5 files changed, 65 insertions(+), 47 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
index d3b87fa..3353a6c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
/**
* this class can just guarantee some behavior in a concurrent thread safety mode:
@@ -59,6 +58,10 @@ public class CopyOnReadLinkedList<T> {
return readCopy;
}
+ public boolean isEmpty() {
+ return size() == 0;
+ }
+
public int size() {
return data.size();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index 595481a..e0e6830 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -135,31 +135,34 @@ public class FileNodeManagerV2 implements IStatistic, IService {
private FileNodeProcessorV2 getProcessor(String devicePath)
throws FileNodeManagerException {
- String filenodeName;
+ String filenodeName = "";
try {
// return the storage group name
filenodeName = MManager.getInstance().getFileNameByPath(devicePath);
- } catch (PathErrorException e) {
- LOGGER.error("MManager get storage group name error, seriesPath is {}", devicePath);
- throw new FileNodeManagerException(e);
- }
- FileNodeProcessorV2 processor;
- processor = processorMap.get(filenodeName);
- if (processor == null) {
- filenodeName = filenodeName.intern();
- synchronized (filenodeName) {
- processor = processorMap.get(filenodeName);
- if (processor == null) {
- LOGGER.debug("construct a processor instance, the storage group is {}, Thread is {}",
- filenodeName, Thread.currentThread().getId());
- processor = new FileNodeProcessorV2(filenodeName);
- synchronized (processorMap) {
- processorMap.put(filenodeName, processor);
+ FileNodeProcessorV2 processor;
+ processor = processorMap.get(filenodeName);
+ if (processor == null) {
+ filenodeName = filenodeName.intern();
+ synchronized (filenodeName) {
+ processor = processorMap.get(filenodeName);
+ if (processor == null) {
+ LOGGER.debug("construct a processor instance, the storage group is {}, Thread is {}",
+ filenodeName, Thread.currentThread().getId());
+ processor = new FileNodeProcessorV2(baseDir, filenodeName);
+ synchronized (processorMap) {
+ processorMap.put(filenodeName, processor);
+ }
}
}
}
+ return processor;
+ } catch (PathErrorException e) {
+ LOGGER.error("MManager get storage group name error, seriesPath is {}", devicePath);
+ throw new FileNodeManagerException(e);
+ } catch (FileNodeProcessorException e) {
+ LOGGER.error("Fail to init simple file version controller of file node {}", filenodeName, e);
+ throw new FileNodeManagerException(e);
}
- return processor;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 665691f..b86d9fb 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -37,10 +37,10 @@ import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSourceV2;
import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.FileNodeProcessorException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -95,7 +95,7 @@ public class FileNodeProcessorV2 {
private VersionController versionController;
- public FileNodeProcessorV2(String storageGroupName) {
+ public FileNodeProcessorV2(String baseDir, String storageGroupName) throws FileNodeProcessorException {
this.storageGroupName = storageGroupName;
lock = new ReentrantReadWriteLock();
closeFileNodeCondition = lock.writeLock().newCondition();
@@ -105,7 +105,14 @@ public class FileNodeProcessorV2 {
/**
* version controller
*/
- versionController = SysTimeVersionController.INSTANCE;
+ try {
+ File storageGroupInfoDir = new File(baseDir, storageGroupName);
+ storageGroupInfoDir.mkdirs();
+ versionController = new SimpleFileVersionController(
+ storageGroupInfoDir.getPath());
+ } catch (IOException e) {
+ throw new FileNodeProcessorException(e);
+ }
// construct the file schema
this.fileSchema = constructFileSchema(storageGroupName);
@@ -187,8 +194,9 @@ public class FileNodeProcessorV2 {
if (sequence) {
if (workSequenceTsFileProcessor == null) {
String baseDir = directories.getNextFolderForTsfile();
- String filePath = Paths.get(baseDir, System.currentTimeMillis() + "")
+ String filePath = Paths.get(baseDir, System.currentTimeMillis() + "-" + versionController.nextVersion())
.toString();
+ System.out.println(filePath);
workSequenceTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
new File(filePath),
fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback);
@@ -199,7 +207,7 @@ public class FileNodeProcessorV2 {
if (workUnSequenceTsFileProcessor == null) {
// TODO check if the disk is full
String baseDir = IoTDBDescriptor.getInstance().getConfig().getOverflowDataDir();
- String filePath = Paths.get(baseDir, System.currentTimeMillis() + "")
+ String filePath = Paths.get(baseDir, System.currentTimeMillis() + "" + + versionController.nextVersion())
.toString();
workUnSequenceTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
new File(filePath),
@@ -263,7 +271,7 @@ public class FileNodeProcessorV2 {
for (TsFileResourceV2 tsFileResource : tsFileResources) {
synchronized (tsFileResource) {
if (!tsFileResource.getStartTimeMap().isEmpty()) {
- if (!tsFileResource.isClosed()) {
+ if (tsFileResource.isClosed()) {
tsfileResourcesForQuery.add(tsFileResource);
} else {
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = tsFileResource
@@ -312,12 +320,12 @@ public class FileNodeProcessorV2 {
* put the memtable back to the MemTablePool and make the metadata in writer visible
*/
// TODO please consider concurrency with query and write method.
- private void closeUnsealedTsFileProcessorCallback(UnsealedTsFileProcessorV2 bufferWriteProcessor) {
- lock.writeLock().unlock();
+ public void closeUnsealedTsFileProcessorCallback(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
+ lock.writeLock().lock();
try {
- closingSequenceTsFileProcessor.remove(bufferWriteProcessor);
+ closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
// end time with one start time
- TsFileResourceV2 resource = workSequenceTsFileProcessor.getTsFileResource();
+ TsFileResourceV2 resource = unsealedTsFileProcessor.getTsFileResource();
synchronized (resource) {
for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
String deviceId = startTime.getKey();
@@ -352,15 +360,15 @@ public class FileNodeProcessorV2 {
/**
* Block this method until this file node can be closed.
*/
- public void syncCloseFileNode(Supplier<Boolean> removeProcessorFromManager){
+ public void syncCloseFileNode(Supplier<Boolean> removeProcessorFromManagerCallback){
lock.writeLock().lock();
try {
asyncForceClose();
toBeClosed = true;
while (true) {
- if (unSequenceFileList.isEmpty() && sequenceFileList.isEmpty()
+ if (closingSequenceTsFileProcessor.isEmpty() && closingUnSequenceTsFileProcessor.isEmpty()
&& workSequenceTsFileProcessor == null && workUnSequenceTsFileProcessor == null) {
- removeProcessorFromManager.get();
+ removeProcessorFromManagerCallback.get();
break;
}
closeFileNodeCondition.await();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index 5503547..103e803 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -33,6 +33,7 @@ public class FileNodeProcessorV2Test {
private String storageGroup = "storage_group1";
private String baseDir = "data";
+ private String systemDir = "data/info";
private String deviceId = "root.vehicle.d0";
private String measurementId = "s0";
private FileNodeProcessorV2 processor;
@@ -41,7 +42,7 @@ public class FileNodeProcessorV2Test {
public void setUp() throws Exception {
MetadataManagerHelper.initMetadata();
EnvironmentUtils.envSetUp();
- processor = new FileNodeProcessorV2(storageGroup);
+ processor = new FileNodeProcessorV2(systemDir, storageGroup);
}
@After
@@ -53,17 +54,18 @@ public class FileNodeProcessorV2Test {
@Test
public void testAsyncClose() {
- for (int j = 1; j <= 10; j++) {
+ for (int j = 1; j <= 100; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(record);
processor.asyncForceClose();
}
+ processor.syncCloseFileNode(() -> null);
QueryDataSourceV2 queryDataSource = processor.query(deviceId, measurementId);
- Assert.assertEquals(queryDataSource.getSeqDataSource().getQueryTsFiles().size(), 10);
- for (TsFileResourceV2 resource: queryDataSource.getSeqDataSource().getQueryTsFiles()) {
- Assert.assertEquals(resource.isClosed(), true);
+ Assert.assertEquals(queryDataSource.getSeqDataSource().getQueryTsFiles().size(), 100);
+ for (TsFileResourceV2 resource : queryDataSource.getSeqDataSource().getQueryTsFiles()) {
+ Assert.assertTrue(resource.isClosed());
}
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index 8900479..a51df9e 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.function.Consumer;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
@@ -145,16 +146,17 @@ public class UnsealedTsFileProcessorV2Test {
@Test
public void testWriteAndClose() throws WriteProcessException, IOException {
processor = new UnsealedTsFileProcessorV2(storageGroup, new File(filePath),
- FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE, x->{
- TsFileResourceV2 resource = processor.getTsFileResource();
- synchronized (resource) {
- for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
- String deviceId = startTime.getKey();
- resource.getEndTimeMap().put(deviceId, resource.getStartTimeMap().get(deviceId));
- resource.setClosed(true);
- }
- }
- });
+ FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE,
+ unsealedTsFileProcessorV2 -> {
+ TsFileResourceV2 resource = unsealedTsFileProcessorV2.getTsFileResource();
+ synchronized (resource) {
+ for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
+ String deviceId = startTime.getKey();
+ resource.getEndTimeMap().put(deviceId, resource.getStartTimeMap().get(deviceId));
+ }
+ resource.setClosed(true);
+ }
+ });
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
.query(deviceId, measurementId, dataType, props);