You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2020/12/09 04:46:32 UTC
[iotdb] branch virtual_partition_2 updated: fix test
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch virtual_partition_2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/virtual_partition_2 by this push:
new 2d3c2b7 fix test
2d3c2b7 is described below
commit 2d3c2b7bb42c34e38a9bea4fb9a05279cc026af6
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Wed Dec 9 12:46:04 2020 +0800
fix test
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 198 +++++++++++----------
.../engine/storagegroup/StorageGroupProcessor.java | 23 +--
.../virtualSg/HashVirtualPartitioner.java | 8 +-
.../virtualSg/VirtualStorageGroup.java | 8 +-
.../db/engine/cache/ChunkMetadataCacheTest.java | 2 +-
.../engine/modification/DeletionFileNodeTest.java | 16 +-
.../storagegroup/StorageGroupProcessorTest.java | 2 +-
.../iotdb/db/engine/storagegroup/TTLTest.java | 34 +++-
.../virtualSg/HashVirtualPartitionerTest.java | 5 +
.../db/integration/IoTDBLoadExternalTsfileIT.java | 37 ++--
.../db/integration/IoTDBRemovePartitionIT.java | 2 +-
.../db/sync/receiver/load/FileLoaderTest.java | 33 ++--
13 files changed, 219 insertions(+), 151 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0875a43..366d8dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -790,7 +790,7 @@ public class IoTDBConfig {
/**
* the number of virtual partition
*/
- private int virtualPartitionNum = 2;
+ private int virtualPartitionNum = 8;
public IoTDBConfig() {
// empty constructor
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 37f6036..861a584 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -23,9 +23,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -85,6 +87,7 @@ import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -273,7 +276,7 @@ public class StorageEngine implements IService {
futures.add(recoveryThreadPool.submit(() -> {
try {
VirtualStorageGroup virtualStorageGroup = new VirtualStorageGroup();
- virtualStorageGroup.recover();
+ virtualStorageGroup.recover(storageGroup);
processorMap.put(storageGroup.getPartialPath(), virtualStorageGroup);
logger.info("Storage Group Processor {} is recovered successfully",
@@ -288,36 +291,6 @@ public class StorageEngine implements IService {
}
}
- /**
- * recover virtual storage group processor
- *
- * @param futures recover future task
- */
- private void recoverVirtualStorageGroupProcessor(List<Future<Void>> futures) {
- List<String> sgNames = new ArrayList<>();
- for (int i = 0; i < partitioner.getPartitionCount(); i++) {
- sgNames.add(String.valueOf(i));
- }
-
- for (String sgName : sgNames) {
- futures.add(recoveryThreadPool.submit(() -> {
- try {
- VirtualStorageGroup virtualStorageGroup = new VirtualStorageGroup();
- virtualStorageGroup.recover();
- processorMap.put(new PartialPath(sgName), virtualStorageGroup);
-
- logger.info("Storage Group Processor {} is recovered successfully",
- sgName);
- } catch (Exception e) {
- logger
- .error("meet error when recovering storage group: {}", sgName,
- e);
- }
- return null;
- }));
- }
- }
-
@Override
public void start() {
ttlCheckThread = Executors.newSingleThreadScheduledExecutor();
@@ -460,9 +433,9 @@ public class StorageEngine implements IService {
StorageGroupProcessor processor;
logger.info("construct a processor instance, the storage group is {}, Thread is {}",
storageGroupPath, Thread.currentThread().getId());
- processor = new StorageGroupProcessor(systemDir + File.pathSeparator + storageGroupPath,
+ processor = new StorageGroupProcessor(systemDir + File.separator + storageGroupPath,
storageGroupName,
- fileFlushPolicy);
+ fileFlushPolicy, storageGroupMNode.getFullPath());
processor.setDataTTL(storageGroupMNode.getDataTTL());
processor.setCustomFlushListeners(customFlushListeners);
processor.setCustomCloseFileListeners(customCloseFileListeners);
@@ -545,6 +518,10 @@ public class StorageEngine implements IService {
public void closeStorageGroupProcessor(PartialPath storageGroupPath, boolean isSeq,
boolean isSync) {
+ if(!processorMap.containsKey(storageGroupPath)){
+ return;
+ }
+
VirtualStorageGroup virtualStorageGroup = processorMap.get(storageGroupPath);
for (StorageGroupProcessor processor : virtualStorageGroup.getAllPartition()) {
if (processor == null) {
@@ -597,6 +574,10 @@ public class StorageEngine implements IService {
boolean isSeq,
boolean isSync)
throws StorageGroupNotSetException {
+ if(!processorMap.containsKey(storageGroupPath)){
+ return;
+ }
+
VirtualStorageGroup virtualStorageGroup = processorMap.get(storageGroupPath);
for (StorageGroupProcessor processor : virtualStorageGroup.getAllPartition()) {
if (processor != null) {
@@ -641,6 +622,11 @@ public class StorageEngine implements IService {
try {
List<PartialPath> sgPaths = IoTDB.metaManager.searchAllRelatedStorageGroups(path);
for (PartialPath storageGroupPath : sgPaths) {
+ // storage group has no data
+ if(!processorMap.containsKey(storageGroupPath)){
+ continue;
+ }
+
PartialPath newPath = path.alterPrefixPath(storageGroupPath);
for (StorageGroupProcessor storageGroupProcessor : processorMap.get(storageGroupPath)
.getAllPartition()) {
@@ -662,6 +648,11 @@ public class StorageEngine implements IService {
try {
List<PartialPath> sgPaths = IoTDB.metaManager.searchAllRelatedStorageGroups(path);
for (PartialPath storageGroupPath : sgPaths) {
+ // storage group has no data
+ if(!processorMap.containsKey(storageGroupPath)){
+ continue;
+ }
+
PartialPath newPath = path.alterPrefixPath(storageGroupPath);
for (StorageGroupProcessor storageGroupProcessor : processorMap.get(storageGroupPath)
.getAllPartition()) {
@@ -778,6 +769,11 @@ public class StorageEngine implements IService {
}
public void setTTL(PartialPath storageGroup, long dataTTL) throws StorageEngineException {
+ // storage group has no data
+ if(!processorMap.containsKey(storageGroup)){
+ return;
+ }
+
for(StorageGroupProcessor storageGroupProcessor : processorMap.get(storageGroup).getAllPartition()){
if(storageGroupProcessor != null){
storageGroupProcessor.setDataTTL(dataTTL);
@@ -786,6 +782,10 @@ public class StorageEngine implements IService {
}
public void deleteStorageGroup(PartialPath storageGroupPath) {
+ if(!processorMap.containsKey(storageGroupPath)){
+ return;
+ }
+
deleteAllDataFilesInOneStorageGroup(storageGroupPath);
VirtualStorageGroup virtualStorageGroup = processorMap.remove(storageGroupPath);
for (StorageGroupProcessor processor : virtualStorageGroup.getAllPartition()) {
@@ -797,7 +797,7 @@ public class StorageEngine implements IService {
public void loadNewTsFileForSync(TsFileResource newTsFileResource)
throws StorageEngineException, LoadFileException, IllegalPathException {
- getProcessorDirectly(new PartialPath(newTsFileResource.getTsFile().getParentFile().getName()))
+ getProcessorDirectly(new PartialPath(newTsFileResource.getTsFile().getParentFile().getParentFile().getParentFile().getName()))
.loadNewTsFileForSync(newTsFileResource);
}
@@ -815,7 +815,7 @@ public class StorageEngine implements IService {
public boolean deleteTsfileForSync(File deletedTsfile)
throws StorageEngineException, IllegalPathException {
- return getProcessorDirectly(new PartialPath(deletedTsfile.getParentFile().getName()))
+ return getProcessorDirectly(new PartialPath(getSgByEngineFile(deletedTsfile)))
.deleteTsfile(deletedTsfile);
}
@@ -839,39 +839,48 @@ public class StorageEngine implements IService {
* @return sg name
*/
private String getSgByEngineFile(File file) {
- return file.getParentFile().getParentFile().getName();
- }
-
-// /**
-// * @return TsFiles (seq or unseq) grouped by their storage group and partition number.
-// */
-// public Map<PartialPath, Map<Long, List<TsFileResource>>> getAllClosedStorageGroupTsFile() {
-// Map<PartialPath, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
-// for (Entry<PartialPath, StorageGroupProcessor> entry : processorMap.entrySet()) {
-// List<TsFileResource> allResources = entry.getValue().getSequenceFileTreeSet();
-// allResources.addAll(entry.getValue().getUnSequenceFileList());
-// for (TsFileResource sequenceFile : allResources) {
-// if (!sequenceFile.isClosed()) {
-// continue;
-// }
-// long partitionNum = sequenceFile.getTimePartition();
-// Map<Long, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(entry.getKey()
-// , n -> new HashMap<>());
-// storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>()).add(sequenceFile);
-// }
-// }
-// return ret;
-// }
-//
-// public void setFileFlushPolicy(TsFileFlushPolicy fileFlushPolicy) {
-// this.fileFlushPolicy = fileFlushPolicy;
-// }
-//
-// public boolean isFileAlreadyExist(TsFileResource tsFileResource, PartialPath storageGroup,
-// long partitionNum) {
-// StorageGroupProcessor processor = processorMap.get(storageGroup);
-// return processor != null && processor.isFileAlreadyExist(tsFileResource, partitionNum);
-// }
+ return file.getParentFile().getParentFile().getParentFile().getName();
+ }
+
+ /**
+ * @return TsFiles (seq or unseq) grouped by their storage group and partition number.
+ */
+ public Map<PartialPath, Map<Long, List<TsFileResource>>> getAllClosedStorageGroupTsFile() {
+ Map<PartialPath, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
+ for (Entry<PartialPath, VirtualStorageGroup> entry : processorMap.entrySet()) {
+ for(StorageGroupProcessor storageGroupProcessor : entry.getValue().getAllPartition()){
+ if(storageGroupProcessor != null){
+ List<TsFileResource> allResources = storageGroupProcessor.getSequenceFileTreeSet();
+ allResources.addAll(storageGroupProcessor.getUnSequenceFileList());
+ for (TsFileResource sequenceFile : allResources) {
+ if (!sequenceFile.isClosed()) {
+ continue;
+ }
+ long partitionNum = sequenceFile.getTimePartition();
+ Map<Long, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(entry.getKey()
+ , n -> new HashMap<>());
+ storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>()).add(sequenceFile);
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
+ public void setFileFlushPolicy(TsFileFlushPolicy fileFlushPolicy) {
+ this.fileFlushPolicy = fileFlushPolicy;
+ }
+
+ public boolean isFileAlreadyExist(TsFileResource tsFileResource, PartialPath storageGroup,
+ long partitionNum) {
+ VirtualStorageGroup virtualStorageGroup = processorMap.get(storageGroup);
+ for(StorageGroupProcessor storageGroupProcessor : virtualStorageGroup.getAllPartition()){
+ if(storageGroupProcessor != null && storageGroupProcessor.isFileAlreadyExist(tsFileResource, partitionNum)){
+ return true;
+ }
+ }
+ return false;
+ }
/**
* Set the version of given partition to newMaxVersion if it is larger than the current version.
@@ -903,31 +912,34 @@ public class StorageEngine implements IService {
return processorMap;
}
-// /**
-// * Get a map indicating which storage groups have working TsFileProcessors and its associated
-// * partitionId and whether it is sequence or not.
-// *
-// * @return storage group -> a list of partitionId-isSequence pairs
-// */
-// public Map<String, List<Pair<Long, Boolean>>> getWorkingStorageGroupPartitions() {
-// Map<String, List<Pair<Long, Boolean>>> res = new ConcurrentHashMap<>();
-// for (Entry<PartialPath, StorageGroupProcessor> entry : processorMap.entrySet()) {
-// List<Pair<Long, Boolean>> partitionIdList = new ArrayList<>();
-// StorageGroupProcessor processor = entry.getValue();
-// for (TsFileProcessor tsFileProcessor : processor.getWorkSequenceTsFileProcessors()) {
-// Pair<Long, Boolean> tmpPair = new Pair<>(tsFileProcessor.getTimeRangeId(), true);
-// partitionIdList.add(tmpPair);
-// }
-//
-// for (TsFileProcessor tsFileProcessor : processor.getWorkUnsequenceTsFileProcessors()) {
-// Pair<Long, Boolean> tmpPair = new Pair<>(tsFileProcessor.getTimeRangeId(), false);
-// partitionIdList.add(tmpPair);
-// }
-//
-// res.put(entry.getKey().getFullPath(), partitionIdList);
-// }
-// return res;
-// }
+ /**
+ * Get a map indicating which storage groups have working TsFileProcessors and its associated
+ * partitionId and whether it is sequence or not.
+ *
+ * @return storage group -> a list of partitionId-isSequence pairs
+ */
+ public Map<String, List<Pair<Long, Boolean>>> getWorkingStorageGroupPartitions() {
+ Map<String, List<Pair<Long, Boolean>>> res = new ConcurrentHashMap<>();
+ for (Entry<PartialPath, VirtualStorageGroup> entry : processorMap.entrySet()) {
+ for(StorageGroupProcessor storageGroupProcessor : entry.getValue().getAllPartition()) {
+ if (storageGroupProcessor != null) {
+ List<Pair<Long, Boolean>> partitionIdList = new ArrayList<>();
+ for (TsFileProcessor tsFileProcessor : storageGroupProcessor.getWorkSequenceTsFileProcessors()) {
+ Pair<Long, Boolean> tmpPair = new Pair<>(tsFileProcessor.getTimeRangeId(), true);
+ partitionIdList.add(tmpPair);
+ }
+
+ for (TsFileProcessor tsFileProcessor : storageGroupProcessor.getWorkUnsequenceTsFileProcessors()) {
+ Pair<Long, Boolean> tmpPair = new Pair<>(tsFileProcessor.getTimeRangeId(), false);
+ partitionIdList.add(tmpPair);
+ }
+
+ res.put(entry.getKey().getFullPath(), partitionIdList);
+ }
+ }
+ }
+ return res;
+ }
/**
* Add a listener to listen flush start/end events. Notice that this addition only applies to
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 5d58460..5817c63 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -205,6 +205,8 @@ public class StorageGroupProcessor {
*/
private Map<String, Long> globalLatestFlushedTimeForEachDevice = new HashMap<>();
private String storageGroupName;
+ private String logicalStorageGroupName;
+
private File storageGroupSysDir;
// manage seqFileList and unSeqFileList
@@ -263,8 +265,9 @@ public class StorageGroupProcessor {
private List<FlushListener> customFlushListeners = Collections.emptyList();
public StorageGroupProcessor(String systemDir, String storageGroupName,
- TsFileFlushPolicy fileFlushPolicy) throws StorageGroupProcessorException {
+ TsFileFlushPolicy fileFlushPolicy, String logicalStorageGroupName) throws StorageGroupProcessorException {
this.storageGroupName = storageGroupName;
+ this.logicalStorageGroupName = logicalStorageGroupName;
this.fileFlushPolicy = fileFlushPolicy;
storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, storageGroupName);
@@ -276,7 +279,7 @@ public class StorageGroupProcessor {
storageGroupSysDir.getPath());
}
this.tsFileManagement = IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
- .getTsFileManagement(storageGroupName, storageGroupSysDir.getAbsolutePath());
+ .getTsFileManagement(logicalStorageGroupName, storageGroupSysDir.getAbsolutePath());
recover();
@@ -331,7 +334,7 @@ public class StorageGroupProcessor {
tsFileManagement.getTsFileList(false), storageGroupSysDir.getPath(),
tsFileManagement::mergeEndAction,
taskName,
- IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
+ IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), logicalStorageGroupName);
logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
recoverMergeTask
.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
@@ -464,7 +467,7 @@ public class StorageGroupProcessor {
List<File> tsFiles = new ArrayList<>();
List<File> upgradeFiles = new ArrayList<>();
for (String baseDir : folders) {
- File fileFolder = fsFactory.getFile(baseDir, storageGroupName);
+ File fileFolder = fsFactory.getFile(baseDir + File.separator + logicalStorageGroupName, storageGroupName);
if (!fileFolder.exists()) {
continue;
}
@@ -1042,10 +1045,10 @@ public class StorageGroupProcessor {
} else {
baseDir = DirectoryManager.getInstance().getNextFolderForUnSequenceFile();
}
- fsFactory.getFile(baseDir, storageGroupName).mkdirs();
+ fsFactory.getFile(baseDir + File.separator + logicalStorageGroupName, storageGroupName).mkdirs();
String filePath =
- baseDir + File.separator + storageGroupName + File.separator + timePartitionId
+ baseDir + File.separator + logicalStorageGroupName + File.separator + storageGroupName + File.separator + timePartitionId
+ File.separator
+ getNewTsFileName(timePartitionId);
@@ -2207,7 +2210,7 @@ public class StorageGroupProcessor {
case LOAD_UNSEQUENCE:
targetFile = fsFactory
.getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
- storageGroupName + File.separatorChar + filePartitionId + File.separator
+ logicalStorageGroupName + File.separatorChar + storageGroupName + File.separatorChar + filePartitionId + File.separator
+ tsFileResource.getTsFile().getName());
tsFileResource.setFile(targetFile);
if (tsFileManagement.contains(tsFileResource, false)) {
@@ -2221,7 +2224,7 @@ public class StorageGroupProcessor {
case LOAD_SEQUENCE:
targetFile =
fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
- storageGroupName + File.separatorChar + filePartitionId + File.separator
+ logicalStorageGroupName + File.separatorChar + storageGroupName + File.separatorChar + filePartitionId + File.separator
+ tsFileResource.getTsFile().getName());
tsFileResource.setFile(targetFile);
if (tsFileManagement.contains(tsFileResource, true)) {
@@ -2482,7 +2485,7 @@ public class StorageGroupProcessor {
Entry<Long, TsFileProcessor> longTsFileProcessorEntry = iterator.next();
long partitionId = longTsFileProcessorEntry.getKey();
TsFileProcessor processor = longTsFileProcessorEntry.getValue();
- if (filter.satisfy(storageGroupName, partitionId)) {
+ if (filter.satisfy(logicalStorageGroupName, partitionId)) {
processor.syncClose();
iterator.remove();
logger.debug("{} is removed during deleting partitions",
@@ -2495,7 +2498,7 @@ public class StorageGroupProcessor {
private void removePartitions(TimePartitionFilter filter, Iterator<TsFileResource> iterator) {
while (iterator.hasNext()) {
TsFileResource tsFileResource = iterator.next();
- if (filter.satisfy(storageGroupName, tsFileResource.getTimePartition())) {
+ if (filter.satisfy(logicalStorageGroupName, tsFileResource.getTimePartition())) {
tsFileResource.remove();
iterator.remove();
logger.debug("{} is removed during deleting partitions", tsFileResource.getTsFilePath());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
index b6c26fd..0fa6ff0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.utils.TestOnly;
public class HashVirtualPartitioner implements VirtualPartitioner {
- public static final int STORAGE_GROUP_NUM = IoTDBDescriptor.getInstance().getConfig()
+ public static int STORAGE_GROUP_NUM = IoTDBDescriptor.getInstance().getConfig()
.getVirtualPartitionNum();
@@ -42,11 +42,15 @@ public class HashVirtualPartitioner implements VirtualPartitioner {
}
@Override
- @TestOnly
public void clear() {
}
+ @TestOnly
+ public void setStorageGroupNum(int i) {
+ STORAGE_GROUP_NUM = i;
+ }
+
@Override
public int getPartitionCount() {
return STORAGE_GROUP_NUM;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroup.java
index 03756cc..85a60c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroup.java
@@ -84,7 +84,11 @@ public class VirtualStorageGroup {
return processor;
}
- public void recover(){
-
+ public void recover(StorageGroupMNode storageGroupMNode) throws StorageGroupProcessorException {
+ for (int i = 0; i < partitioner.getPartitionCount(); i++) {
+ StorageGroupProcessor processor = StorageEngine.getInstance()
+ .buildNewStorageGroupProcessor(storageGroupMNode.getPartialPath(), storageGroupMNode, String.valueOf(i));
+ virtualPartition[i] = processor;
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkMetadataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkMetadataCacheTest.java
index 7241405..908888d 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkMetadataCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkMetadataCacheTest.java
@@ -71,7 +71,7 @@ public class ChunkMetadataCacheTest {
EnvironmentUtils.envSetUp();
MetadataManagerHelper.initMetadata();
storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup,
- new DirectFlushPolicy());
+ new DirectFlushPolicy(), storageGroup);
insertData();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index 69ddcf5..08c2d1e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -184,8 +184,12 @@ public class DeletionFileNodeTest {
assertTrue(directory.isDirectory());
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
- if (file.getPath().endsWith(ModificationFile.FILE_SUFFIX)) {
- modFiles.add(file);
+ if(file.isDirectory()){
+ for(File tsfile : file.listFiles()){
+ if (tsfile.getPath().endsWith(ModificationFile.FILE_SUFFIX)) {
+ modFiles.add(tsfile);
+ }
+ }
}
}
}
@@ -311,8 +315,12 @@ public class DeletionFileNodeTest {
assertTrue(directory.isDirectory());
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
- if (file.getPath().endsWith(ModificationFile.FILE_SUFFIX)) {
- modFiles.add(file);
+ if(file.isDirectory()) {
+ for (File tsfile : file.listFiles()) {
+ if (tsfile.getPath().endsWith(ModificationFile.FILE_SUFFIX)) {
+ modFiles.add(tsfile);
+ }
+ }
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index f115f27..0c8fc68 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -601,7 +601,7 @@ public class StorageGroupProcessorTest {
class DummySGP extends StorageGroupProcessor {
DummySGP(String systemInfoDir, String storageGroupName) throws StorageGroupProcessorException {
- super(systemInfoDir, storageGroupName, new TsFileFlushPolicy.DirectFlushPolicy());
+ super(systemInfoDir, storageGroupName, new TsFileFlushPolicy.DirectFlushPolicy(), storageGroupName);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 45e6619..3041e42 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -112,7 +112,7 @@ public class TTLTest {
IoTDB.metaManager.setStorageGroup(new PartialPath(sg1));
IoTDB.metaManager.setStorageGroup(new PartialPath(sg2));
storageGroupProcessor = new StorageGroupProcessor(IoTDBDescriptor.getInstance().getConfig()
- .getSystemDir(), sg1, new DirectFlushPolicy());
+ .getSystemDir(), sg1, new DirectFlushPolicy(), sg1);
IoTDB.metaManager.createTimeseries(new PartialPath(g1s1), TSDataType.INT64, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED, Collections.emptyMap());
}
@@ -267,8 +267,12 @@ public class TTLTest {
for (File directory : seqDir.listFiles()) {
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
- if (file.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
- seqFiles.add(file);
+ if(file.isDirectory()){
+ for(File tsfile : file.listFiles()){
+ if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ seqFiles.add(file);
+ }
+ }
}
}
}
@@ -278,8 +282,12 @@ public class TTLTest {
for (File directory : unseqDir.listFiles()) {
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
- if (file.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
- unseqFiles.add(file);
+ if(file.isDirectory()){
+ for(File tsfile : file.listFiles()){
+ if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ unseqFiles.add(file);
+ }
+ }
}
}
}
@@ -301,8 +309,12 @@ public class TTLTest {
for (File directory : seqDir.listFiles()) {
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
- if (file.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
- seqFiles.add(file);
+ if(file.isDirectory()){
+ for(File tsfile : file.listFiles()){
+ if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ seqFiles.add(file);
+ }
+ }
}
}
}
@@ -312,8 +324,12 @@ public class TTLTest {
for (File directory : unseqDir.listFiles()) {
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
- if (file.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
- unseqFiles.add(file);
+ if(file.isDirectory()){
+ for(File tsfile : file.listFiles()){
+ if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ unseqFiles.add(file);
+ }
+ }
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitionerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitionerTest.java
index d3f1ff8..a8be512 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitionerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitionerTest.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+import static org.junit.Assert.assertEquals;
+
import java.util.HashMap;
import java.util.Set;
import org.apache.iotdb.db.engine.StorageEngine;
@@ -54,6 +56,9 @@ public class HashVirtualPartitionerTest {
int sg1 = hashVirtualPartitioner.deviceToStorageGroup(d1);
int sg2 = hashVirtualPartitioner.deviceToStorageGroup(d2);
+
+ assertEquals(sg1, Math.abs(d1.hashCode() % hashVirtualPartitioner.getPartitionCount()));
+ assertEquals(sg2, Math.abs(d2.hashCode() % hashVirtualPartitioner.getPartitionCount()));
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index 74cbd93..bc12a39 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.HashVirtualPartitioner;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
@@ -117,7 +118,7 @@ public class IoTDBLoadExternalTsfileIT {
private static final String TEST_D0_S0_STR = "root.test.d0.s0";
private static final String TEST_D0_S1_STR = "root.test.d0.s1";
private static final String TEST_D1_STR = "root.test.d1.g0.s0";
- private static boolean enableVirtualPartition = false;
+ private static int virtualPartitionNum = 0;
private static String[] deleteSqls = new String[]{
"DELETE STORAGE GROUP root.vehicle",
@@ -128,10 +129,11 @@ public class IoTDBLoadExternalTsfileIT {
public void setUp() throws Exception {
IoTDBDescriptor.getInstance().getConfig()
.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ IoTDBDescriptor.getInstance().getConfig().setVirtualPartitionNum(1);
+ HashVirtualPartitioner.getInstance().setStorageGroupNum(1);
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.envSetUp();
- enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
- IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
+ virtualPartitionNum = IoTDBDescriptor.getInstance().getConfig().getVirtualPartitionNum();
Class.forName(Config.JDBC_DRIVER_NAME);
prepareData(insertSequenceSqls);
}
@@ -141,7 +143,8 @@ public class IoTDBLoadExternalTsfileIT {
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig()
.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
- IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
+ IoTDBDescriptor.getInstance().getConfig().setVirtualPartitionNum(virtualPartitionNum);
+ HashVirtualPartitioner.getInstance().setStorageGroupNum(virtualPartitionNum);
}
@Test
@@ -201,8 +204,8 @@ public class IoTDBLoadExternalTsfileIT {
StorageEngine.getInstance().getProcessor(new PartialPath("root.vehicle"))
.getSequenceFileTreeSet());
File tmpDir = new File(
- resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile(),
- "tmp" + File.separator + new PartialPath("root.vehicle") + File.separator + "0");
+ resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile().getParentFile(),
+ "tmp" + File.separator + new PartialPath("root.vehicle") + File.separator + "0" + File.separator + "0");
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
@@ -214,8 +217,8 @@ public class IoTDBLoadExternalTsfileIT {
resources = new ArrayList<>(
StorageEngine.getInstance().getProcessor(new PartialPath("root.test"))
.getSequenceFileTreeSet());
- tmpDir = new File(resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile(),
- "tmp" + File.separator + new PartialPath("root.test") + File.separator + "0");
+ tmpDir = new File(resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile().getParentFile(),
+ "tmp" + File.separator + new PartialPath("root.test") + File.separator + "0" + File.separator + "0");
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
@@ -224,7 +227,7 @@ public class IoTDBLoadExternalTsfileIT {
}
// load all tsfile in tmp dir
- tmpDir = new File(resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile(),
+ tmpDir = new File(resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile().getParentFile(),
"tmp");
statement.execute(String.format("load \"%s\"", tmpDir.getAbsolutePath()));
resources = new ArrayList<>(
@@ -236,8 +239,8 @@ public class IoTDBLoadExternalTsfileIT {
.getSequenceFileTreeSet());
assertEquals(2, resources.size());
assertNotNull(tmpDir.listFiles());
- assertEquals(0, new File(tmpDir, new PartialPath("root.vehicle") + File.separator + "0").listFiles().length);
- assertEquals(0, new File(tmpDir, new PartialPath("root.test") + File.separator + "0").listFiles().length);
+ assertEquals(0, new File(tmpDir, new PartialPath("root.vehicle") + File.separator + "0" + File.separator + "0").listFiles().length);
+ assertEquals(0, new File(tmpDir, new PartialPath("root.test") + File.separator + "0" + File.separator + "0").listFiles().length);
} catch (StorageEngineException | IllegalPathException e) {
Assert.fail();
}
@@ -385,8 +388,8 @@ public class IoTDBLoadExternalTsfileIT {
.getSequenceFileTreeSet());
File tmpDir = new File(
- resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile(),
- "tmp" + File.separator + "root.vehicle" + File.separator + "0");
+ resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile().getParentFile(),
+ "tmp" + File.separator + "root.vehicle" + File.separator + "0" + File.separator + "0");
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
@@ -398,8 +401,8 @@ public class IoTDBLoadExternalTsfileIT {
resources = new ArrayList<>(
StorageEngine.getInstance().getProcessor(new PartialPath("root.test"))
.getSequenceFileTreeSet());
- tmpDir = new File(resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile(),
- "tmp" + File.separator + "root.test" + File.separator + "0");
+ tmpDir = new File(resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile().getParentFile(),
+ "tmp" + File.separator + "root.test" + File.separator + "0" + File.separator + "0");
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
@@ -440,7 +443,7 @@ public class IoTDBLoadExternalTsfileIT {
Assert.assertTrue(hasError);
// test load metadata automatically, it will succeed.
- tmpDir = tmpDir.getParentFile().getParentFile();
+ tmpDir = tmpDir.getParentFile().getParentFile().getParentFile();
statement.execute(String.format("load \"%s\" true 1", tmpDir.getAbsolutePath()));
resources = new ArrayList<>(
StorageEngine.getInstance().getProcessor(new PartialPath("root.vehicle"))
@@ -452,7 +455,7 @@ public class IoTDBLoadExternalTsfileIT {
assertEquals(2, resources.size());
assertEquals(2, tmpDir.listFiles().length);
for (File dir : tmpDir.listFiles()) {
- assertEquals(0, dir.listFiles()[0].listFiles().length);
+ assertEquals(0, dir.listFiles()[0].listFiles()[0].listFiles().length);
}
} catch (StorageEngineException | IllegalPathException e) {
e.printStackTrace();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
index d9176c4..602741d 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
@@ -179,7 +179,7 @@ public class IoTDBRemovePartitionIT {
sqls.add(String.format("INSERT INTO root.test%d(timestamp, s0) VALUES (%d, %d)", j,
i * partitionInterval, i * partitionInterval));
}
- sqls.add("MERGE");
+ // sqls.add("MERGE");
// last file is unclosed
if (i < 9) {
sqls.add("FLUSH");
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
index f62c7a3..71a2c86 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.HashVirtualPartitioner;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -66,6 +67,7 @@ public class FileLoaderTest {
public void setUp()
throws IOException, InterruptedException, StartupException, DiskSpaceInsufficientException, MetadataException {
IoTDBDescriptor.getInstance().getConfig().setSyncEnable(true);
+ HashVirtualPartitioner.getInstance().setStorageGroupNum(1);
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.envSetUp();
dataDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile())
@@ -85,10 +87,13 @@ public class FileLoaderTest {
public void tearDown() throws InterruptedException, IOException, StorageEngineException {
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig().setSyncEnable(false);
+ HashVirtualPartitioner.getInstance()
+ .setStorageGroupNum(IoTDBDescriptor.getInstance().getConfig().getVirtualPartitionNum());
}
@Test
- public void loadNewTsfiles() throws IOException, StorageEngineException, IllegalPathException, InterruptedException {
+ public void loadNewTsfiles()
+ throws IOException, StorageEngineException, IllegalPathException, InterruptedException {
fileLoader = FileLoader.createFileLoader(getReceiverFolderFile());
Map<String, List<File>> allFileList = new HashMap<>();
Map<String, Set<String>> correctSequenceLoadedFileMap = new HashMap<>();
@@ -102,7 +107,7 @@ public class FileLoaderTest {
correctSequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
String rand = String.valueOf(r.nextInt(10000));
String fileName =
- getSnapshotFolder() + File.separator + SG_NAME + i + File.separator
+ getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + "0" + File.separator + "0" + File.separator
+ (time + i * 100 + j) + IoTDBConstant.FILE_NAME_SEPARATOR + rand
+ IoTDBConstant.FILE_NAME_SEPARATOR + "0.tsfile";
File syncFile = new File(fileName);
@@ -135,7 +140,8 @@ public class FileLoaderTest {
}
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
+ StorageGroupProcessor processor = StorageEngine.getInstance()
+ .getProcessor(new PartialPath(SG_NAME + i));
assertTrue(processor.getSequenceFileTreeSet().isEmpty());
assertTrue(processor.getUnSequenceFileList().isEmpty());
}
@@ -167,7 +173,8 @@ public class FileLoaderTest {
assertFalse(new File(getReceiverFolderFile(), SyncConstant.RECEIVER_DATA_FOLDER_NAME).exists());
Map<String, Set<String>> sequenceLoadedFileMap = new HashMap<>();
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
+ StorageGroupProcessor processor = StorageEngine.getInstance()
+ .getProcessor(new PartialPath(SG_NAME + i));
sequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
assertEquals(10, processor.getSequenceFileTreeSet().size());
for (TsFileResource tsFileResource : processor.getSequenceFileTreeSet()) {
@@ -199,7 +206,8 @@ public class FileLoaderTest {
correctLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
String rand = String.valueOf(r.nextInt(10000));
String fileName =
- getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + (time + i * 100
+ getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + "0"
+ + File.separator + "0" + File.separator + (time + i * 100
+ j) + IoTDBConstant.FILE_NAME_SEPARATOR + rand
+ IoTDBConstant.FILE_NAME_SEPARATOR + "0.tsfile";
@@ -209,7 +217,8 @@ public class FileLoaderTest {
syncFile.getParentFile().getName() + File.separator + syncFile.getName());
File loadDataFile = new File(
DirectoryManager.getInstance().getNextFolderForSequenceFile(),
- syncFile.getParentFile().getName() + File.separator + fromTimeToTimePartition(i)
+ syncFile.getParentFile().getParentFile().getParentFile().getName() + File.separator
+ + "0" + File.separator + fromTimeToTimePartition(i)
+ File.separator + syncFile.getName());
correctLoadedFileMap.get(SG_NAME + i).add(loadDataFile.getAbsolutePath());
allFileList.get(SG_NAME + i).add(syncFile);
@@ -233,7 +242,8 @@ public class FileLoaderTest {
}
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
+ StorageGroupProcessor processor = StorageEngine.getInstance()
+ .getProcessor(new PartialPath(SG_NAME + i));
assertTrue(processor.getSequenceFileTreeSet().isEmpty());
assertTrue(processor.getUnSequenceFileList().isEmpty());
}
@@ -265,7 +275,8 @@ public class FileLoaderTest {
assertFalse(new File(getReceiverFolderFile(), SyncConstant.RECEIVER_DATA_FOLDER_NAME).exists());
Map<String, Set<String>> loadedFileMap = new HashMap<>();
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
+ StorageGroupProcessor processor = StorageEngine.getInstance()
+ .getProcessor(new PartialPath(SG_NAME + i));
loadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
assertEquals(25, processor.getSequenceFileTreeSet().size());
for (TsFileResource tsFileResource : processor.getSequenceFileTreeSet()) {
@@ -290,7 +301,8 @@ public class FileLoaderTest {
if (!snapFile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) {
File dataFile = new File(
DirectoryManager.getInstance().getNextFolderForSequenceFile() + File.separator
- + snapFile.getParentFile().getName(), "0" + File.separator + snapFile.getName());
+ + snapFile.getParentFile().getParentFile().getParentFile().getName(),
+ "0" + File.separator + "0" + File.separator + snapFile.getName());
correctLoadedFileMap.get(sg).remove(dataFile.getAbsolutePath());
snapFile.delete();
fileLoader.addDeletedFileName(snapFile);
@@ -319,7 +331,8 @@ public class FileLoaderTest {
loadedFileMap.clear();
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
+ StorageGroupProcessor processor = StorageEngine.getInstance()
+ .getProcessor(new PartialPath(SG_NAME + i));
loadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
for (TsFileResource tsFileResource : processor.getSequenceFileTreeSet()) {
loadedFileMap.get(SG_NAME + i).add(tsFileResource.getTsFile().getAbsolutePath());