You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2020/11/30 11:57:20 UTC
[iotdb] 01/02: initial explore
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch virtual_partition
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 79a985d184cc2953b6fa5729621d7bce9e9f4e0a
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Mon Nov 30 18:46:09 2020 +0800
initial explore
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +
.../org/apache/iotdb/db/engine/StorageEngine.java | 378 ++++++++++++---------
.../iotdb/db/engine/merge/task/MergeTask.java | 42 ++-
.../storagegroup/HashVirtualPartitioner.java | 80 +++++
.../engine/storagegroup/StorageGroupProcessor.java | 13 +-
.../db/engine/storagegroup/VirtualPartitioner.java | 32 ++
.../apache/iotdb/db/engine/merge/MergeLogTest.java | 5 +
.../iotdb/db/engine/merge/MergeOverLapTest.java | 5 +
.../iotdb/db/engine/merge/MergeTaskTest.java | 6 +-
.../engine/modification/DeletionFileNodeTest.java | 4 +
.../storagegroup/StorageGroupProcessorTest.java | 5 +
.../iotdb/db/integration/IoTDBAggregationIT.java | 4 -
.../db/integration/IoTDBLoadExternalTsfileIT.java | 4 +
.../db/integration/IoTDBRemovePartitionIT.java | 5 +
.../iotdb/db/integration/IoTDBRestartIT.java | 6 +
.../apache/iotdb/db/utils/EnvironmentUtils.java | 2 +
16 files changed, 424 insertions(+), 180 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a1a922b..5f94d9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -782,6 +782,11 @@ public class IoTDBConfig {
*/
private boolean debugState = false;
+ /**
+ * whether we enable virtual partition
+ */
+ private boolean enableVirtualPartition = true;
+
public IoTDBConfig() {
// empty constructor
}
@@ -2093,4 +2098,12 @@ public class IoTDBConfig {
public void setDefaultIndexWindowRange(int defaultIndexWindowRange) {
this.defaultIndexWindowRange = defaultIndexWindowRange;
}
+
+ public boolean isEnableVirtualPartition() {
+ return enableVirtualPartition;
+ }
+
+ public void setEnableVirtualPartition(boolean enableVirtualPartition) {
+ this.enableVirtualPartition = enableVirtualPartition;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index ddb0895..2a93fab 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -49,10 +49,12 @@ import org.apache.iotdb.db.engine.flush.FlushListener;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.HashVirtualPartitioner;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualPartitioner;
import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.ShutdownException;
@@ -67,7 +69,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
-import org.apache.iotdb.db.monitor.MonitorConstants;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -89,70 +90,42 @@ import org.slf4j.LoggerFactory;
public class StorageEngine implements IService {
- private final Logger logger;
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
-
+ private static final VirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
+ // avoid final for test
+ private static ExecutorService recoveryThreadPool;
+ /**
+ * Time range for dividing storage group, the time unit is the same with IoTDB's
+ * TimestampPrecision
+ */
+ @ServerConfigConsistent
+ private static long timePartitionInterval = -1;
+ /**
+ * whether enable data partition if disabled, all data belongs to partition 0
+ */
+ @ServerConfigConsistent
+ private static boolean enablePartition =
+ IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
+ private final Logger logger;
/**
* a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
* will have a subfolder under the systemDir.
*/
private final String systemDir;
-
/**
* storage group name -> storage group processor
*/
private final ConcurrentHashMap<PartialPath, StorageGroupProcessor> processorMap = new ConcurrentHashMap<>();
-
- private static final ExecutorService recoveryThreadPool = IoTDBThreadPoolFactory
- .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
-
- public boolean isAllSgReady() {
- return isAllSgReady.get();
- }
-
- public void setAllSgReady(boolean allSgReady) {
- isAllSgReady.set(allSgReady);
- }
-
private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
private ExecutorService recoverAllSgThreadPool;
-
- static class InstanceHolder {
-
- private InstanceHolder() {
- // forbidding instantiation
- }
-
- private static final StorageEngine INSTANCE = new StorageEngine();
- }
-
- public static StorageEngine getInstance() {
- return InstanceHolder.INSTANCE;
- }
-
private ScheduledExecutorService ttlCheckThread;
private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
-
// add customized listeners here for flush and close events
private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
private List<FlushListener> customFlushListeners = new ArrayList<>();
- /**
- * Time range for dividing storage group, the time unit is the same with IoTDB's
- * TimestampPrecision
- */
- @ServerConfigConsistent
- private static long timePartitionInterval = -1;
-
- /**
- * whether enable data partition if disabled, all data belongs to partition 0
- */
- @ServerConfigConsistent
- private static boolean enablePartition =
- IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
-
private StorageEngine() {
logger = LoggerFactory.getLogger(StorageEngine.class);
systemDir = FilePathUtils.regularizePath(config.getSystemDir()) + "storage_groups";
@@ -176,7 +149,69 @@ public class StorageEngine implements IService {
recover();
}
+ public static StorageEngine getInstance() {
+ return InstanceHolder.INSTANCE;
+ }
+
+ private static void initTimePartition() {
+ timePartitionInterval = convertMilliWithPrecision(IoTDBDescriptor.getInstance().
+ getConfig().getPartitionInterval() * 1000L);
+ }
+
+ public static long convertMilliWithPrecision(long milliTime) {
+ long result = milliTime;
+ String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
+ switch (timePrecision) {
+ case "ns":
+ result = milliTime * 1000_000L;
+ break;
+ case "us":
+ result = milliTime * 1000L;
+ break;
+ default:
+ break;
+ }
+ return result;
+ }
+
+ public static long getTimePartitionInterval() {
+ if (timePartitionInterval == -1) {
+ initTimePartition();
+ }
+ return timePartitionInterval;
+ }
+
+ @TestOnly
+ public static void setTimePartitionInterval(long timePartitionInterval) {
+ StorageEngine.timePartitionInterval = timePartitionInterval;
+ }
+
+ public static long getTimePartition(long time) {
+ return enablePartition ? time / timePartitionInterval : 0;
+ }
+
+ @TestOnly
+ public static boolean isEnablePartition() {
+ return enablePartition;
+ }
+
+ @TestOnly
+ public static void setEnablePartition(boolean enablePartition) {
+ StorageEngine.enablePartition = enablePartition;
+ }
+
+ public boolean isAllSgReady() {
+ return isAllSgReady.get();
+ }
+
+ public void setAllSgReady(boolean allSgReady) {
+ isAllSgReady.set(allSgReady);
+ }
+
public void recover() {
+ setAllSgReady(false);
+ recoveryThreadPool = IoTDBThreadPoolFactory
+ .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
recoverAllSgThreadPool = IoTDBThreadPoolFactory
.newSingleThreadExecutor("Begin-Recovery-Pool");
recoverAllSgThreadPool.submit(this::recoverAllSgs);
@@ -186,25 +221,53 @@ public class StorageEngine implements IService {
/*
* recover all storage group processors.
*/
- List<StorageGroupMNode> sgNodes = IoTDB.metaManager.getAllStorageGroupNodes();
List<Future<Void>> futures = new ArrayList<>();
- for (StorageGroupMNode storageGroup : sgNodes) {
- futures.add(recoveryThreadPool.submit(() -> {
- try {
- StorageGroupProcessor processor = new StorageGroupProcessor(systemDir,
- storageGroup.getFullPath(), fileFlushPolicy);
- processor.setDataTTL(storageGroup.getDataTTL());
- processor.setCustomCloseFileListeners(customCloseFileListeners);
- processor.setCustomFlushListeners(customFlushListeners);
- processorMap.put(storageGroup.getPartialPath(), processor);
- logger.info("Storage Group Processor {} is recovered successfully",
- storageGroup.getFullPath());
- } catch (Exception e) {
- logger
- .error("meet error when recovering storage group: {}", storageGroup.getFullPath(), e);
- }
- return null;
- }));
+ if(!IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition()) {
+ List<StorageGroupMNode> sgNodes = IoTDB.metaManager.getAllStorageGroupNodes();
+ for (StorageGroupMNode storageGroup : sgNodes) {
+ futures.add(recoveryThreadPool.submit(() -> {
+ try {
+ StorageGroupProcessor processor = new StorageGroupProcessor(systemDir,
+ storageGroup.getFullPath(), fileFlushPolicy);
+ processor.setDataTTL(storageGroup.getDataTTL());
+ processor.setCustomCloseFileListeners(customCloseFileListeners);
+ processor.setCustomFlushListeners(customFlushListeners);
+ processorMap.put(storageGroup.getPartialPath(), processor);
+ logger.info("Storage Group Processor {} is recovered successfully",
+ storageGroup.getFullPath());
+ } catch (Exception e) {
+ logger
+ .error("meet error when recovering storage group: {}", storageGroup.getFullPath(),
+ e);
+ }
+ return null;
+ }));
+ }
+ }
+ else{
+ List<String> sgNames = new ArrayList<>();
+ for (int i = 0; i < partitioner.getPartitionCount(); i++) {
+ sgNames.add(String.valueOf(i));
+ }
+
+ for (String sgName : sgNames) {
+ futures.add(recoveryThreadPool.submit(() -> {
+ try {
+ StorageGroupProcessor processor = new StorageGroupProcessor(systemDir,
+ sgName, fileFlushPolicy);
+ processor.setCustomCloseFileListeners(customCloseFileListeners);
+ processor.setCustomFlushListeners(customFlushListeners);
+ processorMap.put(new PartialPath(sgName), processor);
+ logger.info("Storage Group Processor {} is recovered successfully",
+ sgName);
+ } catch (Exception e) {
+ logger
+ .error("meet error when recovering storage group: {}", sgName,
+ e);
+ }
+ return null;
+ }));
+ }
}
for (Future<Void> future : futures) {
try {
@@ -220,27 +283,6 @@ public class StorageEngine implements IService {
setAllSgReady(true);
}
- private static void initTimePartition() {
- timePartitionInterval = convertMilliWithPrecision(IoTDBDescriptor.getInstance().
- getConfig().getPartitionInterval() * 1000L);
- }
-
- public static long convertMilliWithPrecision(long milliTime) {
- long result = milliTime;
- String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
- switch (timePrecision) {
- case "ns":
- result = milliTime * 1000_000L;
- break;
- case "us":
- result = milliTime * 1000L;
- break;
- default:
- break;
- }
- return result;
- }
-
@Override
public void start() {
ttlCheckThread = Executors.newSingleThreadScheduledExecutor();
@@ -318,41 +360,74 @@ public class StorageEngine implements IService {
return ServiceType.STORAGE_ENGINE_SERVICE;
}
- public StorageGroupProcessor getProcessor(PartialPath path) throws StorageEngineException {
+ /**
+ * This method is for sync, delete tsfile or sth like them, just get storage group directly by sg
+ * name
+ * @param path storage group path
+ * @return storage group processor
+ */
+ public StorageGroupProcessor getProcessorDirectly(PartialPath path)
+ throws StorageEngineException {
PartialPath storageGroupPath;
try {
StorageGroupMNode storageGroupMNode = IoTDB.metaManager.getStorageGroupNodeByPath(path);
storageGroupPath = storageGroupMNode.getPartialPath();
- StorageGroupProcessor processor = processorMap.get(storageGroupPath);
- if (processor == null) {
- // if finish recover
- if (isAllSgReady.get()) {
- synchronized (storageGroupMNode) {
- processor = processorMap.get(storageGroupPath);
- if (processor == null) {
- logger.info("construct a processor instance, the storage group is {}, Thread is {}",
- storageGroupPath, Thread.currentThread().getId());
- processor = new StorageGroupProcessor(systemDir, storageGroupPath.getFullPath(),
- fileFlushPolicy);
- processor.setDataTTL(storageGroupMNode.getDataTTL());
- processor.setCustomFlushListeners(customFlushListeners);
- processor.setCustomCloseFileListeners(customCloseFileListeners);
- processorMap.put(storageGroupPath, processor);
- }
- }
- } else {
- // not finished recover, refuse the request
- throw new StorageEngineException(
- "the sg " + storageGroupPath + " may not ready now, please wait and retry later",
- TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
- }
+ return getStorageGroupProcessorByPath(storageGroupPath, storageGroupMNode);
+ } catch (StorageGroupProcessorException | MetadataException e) {
+ throw new StorageEngineException(e);
+ }
+ }
+
+ /**
+ * This method is for insert and query or sth like them, this may get a virtual storage group
+ * @param path device path
+ * @return storage group processor
+ */
+ public StorageGroupProcessor getProcessor(PartialPath path) throws StorageEngineException {
+ PartialPath storageGroupPath;
+ try {
+ StorageGroupMNode storageGroupMNode = IoTDB.metaManager.getStorageGroupNodeByPath(path);
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition()) {
+ storageGroupPath = partitioner.deviceToStorageGroup(path);
+ } else {
+ storageGroupPath = storageGroupMNode.getPartialPath();
}
- return processor;
+ return getStorageGroupProcessorByPath(storageGroupPath, storageGroupMNode);
} catch (StorageGroupProcessorException | MetadataException e) {
throw new StorageEngineException(e);
}
}
+ // get storage group processor by partial path
+ private StorageGroupProcessor getStorageGroupProcessorByPath(PartialPath storageGroupPath,
+ StorageGroupMNode storageGroupMNode)
+ throws StorageGroupProcessorException, StorageEngineException {
+ StorageGroupProcessor processor = processorMap.get(storageGroupPath);
+ if (processor == null) {
+ // if finish recover
+ if (isAllSgReady.get()) {
+ synchronized (storageGroupMNode) {
+ processor = processorMap.get(storageGroupPath);
+ if (processor == null) {
+ logger.info("construct a processor instance, the storage group is {}, Thread is {}",
+ storageGroupPath, Thread.currentThread().getId());
+ processor = new StorageGroupProcessor(systemDir, storageGroupPath.getFullPath(),
+ fileFlushPolicy);
+ processor.setDataTTL(storageGroupMNode.getDataTTL());
+ processor.setCustomFlushListeners(customFlushListeners);
+ processor.setCustomCloseFileListeners(customCloseFileListeners);
+ processorMap.put(storageGroupPath, processor);
+ }
+ }
+ } else {
+ // not finished recover, refuse the request
+ throw new StorageEngineException(
+ "the sg " + storageGroupPath + " may not ready now, please wait and retry later",
+ TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
+ }
+ }
+ return processor;
+ }
/**
* This function is just for unit test.
@@ -361,7 +436,6 @@ public class StorageEngine implements IService {
processorMap.clear();
}
-
/**
* insert an InsertRowPlan to a storage group.
*
@@ -520,11 +594,18 @@ public class StorageEngine implements IService {
public void delete(PartialPath path, long startTime, long endTime, long planIndex)
throws StorageEngineException {
try {
- List<PartialPath> sgPaths = IoTDB.metaManager.searchAllRelatedStorageGroups(path);
- for (PartialPath storageGroupPath : sgPaths) {
- StorageGroupProcessor storageGroupProcessor = getProcessor(storageGroupPath);
- PartialPath newPath = path.alterPrefixPath(storageGroupPath);
- storageGroupProcessor.delete(newPath, startTime, endTime, planIndex);
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition()) {
+ // Distribute the request to all sg, this can be improved in future
+ for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
+ storageGroupProcessor.delete(path, startTime, endTime, planIndex);
+ }
+ } else {
+ List<PartialPath> sgPaths = IoTDB.metaManager.searchAllRelatedStorageGroups(path);
+ for (PartialPath storageGroupPath : sgPaths) {
+ StorageGroupProcessor storageGroupProcessor = getProcessor(storageGroupPath);
+ PartialPath newPath = path.alterPrefixPath(storageGroupPath);
+ storageGroupProcessor.delete(newPath, startTime, endTime, planIndex);
+ }
}
} catch (IOException | MetadataException e) {
throw new StorageEngineException(e.getMessage());
@@ -537,10 +618,17 @@ public class StorageEngine implements IService {
public void deleteTimeseries(PartialPath path, long planIndex)
throws StorageEngineException {
try {
- for (PartialPath storageGroupPath : IoTDB.metaManager.searchAllRelatedStorageGroups(path)) {
- StorageGroupProcessor storageGroupProcessor = getProcessor(storageGroupPath);
- PartialPath newPath = path.alterPrefixPath(storageGroupPath);
- storageGroupProcessor.delete(newPath, Long.MIN_VALUE, Long.MAX_VALUE, planIndex);
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition()) {
+ // Distribute the request to all sg, this can be improved in future
+ for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
+ storageGroupProcessor.delete(path, Long.MIN_VALUE, Long.MAX_VALUE, planIndex);
+ }
+ } else {
+ for (PartialPath storageGroupPath : IoTDB.metaManager.searchAllRelatedStorageGroups(path)) {
+ StorageGroupProcessor storageGroupProcessor = getProcessor(storageGroupPath);
+ PartialPath newPath = path.alterPrefixPath(storageGroupPath);
+ storageGroupProcessor.delete(newPath, Long.MIN_VALUE, Long.MAX_VALUE, planIndex);
+ }
}
} catch (IOException | MetadataException e) {
throw new StorageEngineException(e.getMessage());
@@ -646,7 +734,7 @@ public class StorageEngine implements IService {
public void loadNewTsFileForSync(TsFileResource newTsFileResource)
throws StorageEngineException, LoadFileException, IllegalPathException {
- getProcessor(new PartialPath(newTsFileResource.getTsFile().getParentFile().getName()))
+ getProcessorDirectly(new PartialPath(newTsFileResource.getTsFile().getParentFile().getName()))
.loadNewTsFileForSync(newTsFileResource);
}
@@ -659,24 +747,24 @@ public class StorageEngine implements IService {
String device = deviceMap.keySet().iterator().next();
PartialPath devicePath = new PartialPath(device);
PartialPath storageGroupPath = IoTDB.metaManager.getStorageGroupPath(devicePath);
- getProcessor(storageGroupPath).loadNewTsFile(newTsFileResource);
+ getProcessorDirectly(storageGroupPath).loadNewTsFile(newTsFileResource);
}
public boolean deleteTsfileForSync(File deletedTsfile)
throws StorageEngineException, IllegalPathException {
- return getProcessor(new PartialPath(deletedTsfile.getParentFile().getName()))
+ return getProcessorDirectly(new PartialPath(deletedTsfile.getParentFile().getName()))
.deleteTsfile(deletedTsfile);
}
public boolean deleteTsfile(File deletedTsfile)
throws StorageEngineException, IllegalPathException {
- return getProcessor(new PartialPath(getSgByEngineFile(deletedTsfile)))
+ return getProcessorDirectly(new PartialPath(getSgByEngineFile(deletedTsfile)))
.deleteTsfile(deletedTsfile);
}
public boolean moveTsfile(File tsfileToBeMoved, File targetDir)
throws StorageEngineException, IllegalPathException {
- return getProcessor(new PartialPath(getSgByEngineFile(tsfileToBeMoved)))
+ return getProcessorDirectly(new PartialPath(getSgByEngineFile(tsfileToBeMoved)))
.moveTsfile(tsfileToBeMoved, targetDir);
}
@@ -722,22 +810,6 @@ public class StorageEngine implements IService {
return processor != null && processor.isFileAlreadyExist(tsFileResource, partitionNum);
}
- public static long getTimePartitionInterval() {
- if (timePartitionInterval == -1) {
- initTimePartition();
- }
- return timePartitionInterval;
- }
-
- @TestOnly
- public static void setTimePartitionInterval(long timePartitionInterval) {
- StorageEngine.timePartitionInterval = timePartitionInterval;
- }
-
- public static long getTimePartition(long time) {
- return enablePartition ? time / timePartitionInterval : 0;
- }
-
/**
* Set the version of given partition to newMaxVersion if it is larger than the current version.
*
@@ -748,13 +820,12 @@ public class StorageEngine implements IService {
public void setPartitionVersionToMax(PartialPath storageGroup, long partitionId,
long newMaxVersion)
throws StorageEngineException {
- getProcessor(storageGroup).setPartitionFileVersionToMax(partitionId, newMaxVersion);
+ getProcessorDirectly(storageGroup).setPartitionFileVersionToMax(partitionId, newMaxVersion);
}
-
public void removePartitions(PartialPath storageGroupPath, TimePartitionFilter filter)
throws StorageEngineException {
- getProcessor(storageGroupPath).removePartitions(filter);
+ getProcessorDirectly(storageGroupPath).removePartitions(filter);
}
public Map<PartialPath, StorageGroupProcessor> getProcessorMap() {
@@ -787,16 +858,6 @@ public class StorageEngine implements IService {
return res;
}
- @TestOnly
- public static void setEnablePartition(boolean enablePartition) {
- StorageEngine.enablePartition = enablePartition;
- }
-
- @TestOnly
- public static boolean isEnablePartition() {
- return enablePartition;
- }
-
/**
* Add a listener to listen flush start/end events. Notice that this addition only applies to
* TsFileProcessors created afterwards.
@@ -824,7 +885,7 @@ public class StorageEngine implements IService {
throws StorageEngineException {
Set<StorageGroupProcessor> set = new HashSet<>();
for (PartialPath path : pathList) {
- set.add(getProcessor(path));
+ set.add(getProcessor(path.getDevicePath()));
}
List<StorageGroupProcessor> list = set.stream()
.sorted(Comparator.comparing(StorageGroupProcessor::getStorageGroupName))
@@ -839,4 +900,13 @@ public class StorageEngine implements IService {
public void mergeUnLock(List<StorageGroupProcessor> list) {
list.forEach(storageGroupProcessor -> storageGroupProcessor.getTsFileManagement().readUnLock());
}
+
+ static class InstanceHolder {
+
+ private static final StorageEngine INSTANCE = new StorageEngine();
+
+ private InstanceHolder() {
+ // forbidding instantiation
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 8490410..f4d0a33 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -29,10 +29,13 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.merge.manage.MergeContext;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.HashVirtualPartitioner;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualPartitioner;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MNode;
@@ -45,31 +48,28 @@ import org.slf4j.LoggerFactory;
/**
* MergeTask merges given seqFiles and unseqFiles into new ones, which basically consists of three
- * steps: 1. rewrite overflowed, modified or small-sized chunks into temp merge files
- * 2. move the merged chunks in the temp files back to the seqFiles or move the unmerged
- * chunks in the seqFiles into temp files and replace the seqFiles with the temp files.
- * 3. remove unseqFiles
+ * steps: 1. rewrite overflowed, modified or small-sized chunks into temp merge files 2. move the
+ * merged chunks in the temp files back to the seqFiles or move the unmerged chunks in the seqFiles
+ * into temp files and replace the seqFiles with the temp files. 3. remove unseqFiles
*/
public class MergeTask implements Callable<Void> {
public static final String MERGE_SUFFIX = ".merge";
private static final Logger logger = LoggerFactory.getLogger(MergeTask.class);
+ private static final VirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
MergeResource resource;
String storageGroupSysDir;
String storageGroupName;
MergeLogger mergeLogger;
MergeContext mergeContext = new MergeContext();
-
- private MergeCallback callback;
int concurrentMergeSeriesNum;
String taskName;
boolean fullMerge;
-
States states = States.START;
-
MergeMultiChunkTask chunkTask;
MergeFileTask fileTask;
+ private MergeCallback callback;
MergeTask(List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles, String storageGroupSysDir, MergeCallback callback,
@@ -126,7 +126,15 @@ public class MergeTask implements Callable<Void> {
mergeLogger.logFiles(resource);
- Set<PartialPath> devices = IoTDB.metaManager.getDevices(new PartialPath(storageGroupName));
+ Set<PartialPath> devices;
+
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition()) {
+ devices = partitioner.storageGroupToDevice(new PartialPath(storageGroupName));
+ } else {
+ devices = IoTDB.metaManager.getDevices(new PartialPath(storageGroupName));
+ }
+ System.out.println(devices);
+
Map<PartialPath, MeasurementSchema> measurementSchemaMap = new HashMap<>();
List<PartialPath> unmergedSeries = new ArrayList<>();
for (PartialPath device : devices) {
@@ -212,14 +220,6 @@ public class MergeTask implements Callable<Void> {
return storageGroupName;
}
- enum States {
- START,
- MERGE_CHUNKS,
- MERGE_FILES,
- CLEAN_UP,
- ABORTED
- }
-
public String getProgress() {
switch (states) {
case ABORTED:
@@ -239,4 +239,12 @@ public class MergeTask implements Callable<Void> {
public String getTaskName() {
return taskName;
}
+
+ enum States {
+ START,
+ MERGE_CHUNKS,
+ MERGE_FILES,
+ CLEAN_UP,
+ ABORTED
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java
new file mode 100644
index 0000000..886ecb6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashVirtualPartitioner.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+public class HashVirtualPartitioner implements VirtualPartitioner {
+
+ public static final int STORGARE_GROUP_NUM = 2;
+ HashMap<Integer, Set<PartialPath>> sgToDevice;
+
+ private HashVirtualPartitioner() {
+ sgToDevice = new HashMap<>();
+ }
+
+ public static HashVirtualPartitioner getInstance() {
+ return HashVirtualPartitionerHolder.INSTANCE;
+ }
+
+ private int toPartitionId(PartialPath deviceId){
+ return deviceId.hashCode() % STORGARE_GROUP_NUM;
+ }
+
+ @Override
+ public PartialPath deviceToStorageGroup(PartialPath deviceId) {
+ int partitionId = toPartitionId(deviceId);
+ sgToDevice.computeIfAbsent(partitionId, id -> new HashSet<>()).add(deviceId);
+ try {
+ return new PartialPath("" + partitionId);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ }
+
+ return null;
+ }
+
+ @Override
+ public Set<PartialPath> storageGroupToDevice(PartialPath storageGroup) {
+ return sgToDevice.get(Integer.parseInt(storageGroup.getFullPath()));
+ }
+
+ @Override
+ public void clear(){
+ sgToDevice.clear();
+ }
+
+ @Override
+ public int getPartitionCount() {
+ return STORGARE_GROUP_NUM;
+ }
+
+ private static class HashVirtualPartitionerHolder {
+
+ private static final HashVirtualPartitioner INSTANCE = new HashVirtualPartitioner();
+
+ private HashVirtualPartitionerHolder() {
+ // allowed to do nothing
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 69aec66..ade474e 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1514,11 +1514,16 @@ public class StorageGroupProcessor {
lastUpdateTime = curTime;
}
}
- // There is no tsfile data, the delete operation is invalid
- if (lastUpdateTime == null) {
- logger.debug("No device {} in SG {}, deletion invalid", device, storageGroupName);
- return;
+
+ // virtual partition will push deletion to all sg
+ if(!IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition()){
+ // There is no tsfile data, the delete operation is invalid
+ if (lastUpdateTime == null) {
+ logger.debug("No device {} in SG {}, deletion invalid", device, storageGroupName);
+ return;
+ }
}
+
// delete Last cache record if necessary
tryToDeleteLastCache(device, path, startTime, endTime);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java
new file mode 100644
index 0000000..09bd9e6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualPartitioner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup;
+
+import java.util.Set;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+public interface VirtualPartitioner {
+ public PartialPath deviceToStorageGroup(PartialPath deviceId);
+
+ public Set<PartialPath> storageGroupToDevice(PartialPath storageGroup);
+
+ public void clear();
+
+ public int getPartitionCount();
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
index 56d7eae..86baa7e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
@@ -43,18 +43,23 @@ import org.junit.Test;
public class MergeLogTest extends MergeTest {
File tempSGDir;
+ private static boolean enableVirtualPartition = false;
+
@Before
public void setUp() throws IOException, WriteProcessException, MetadataException {
super.setUp();
tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
tempSGDir.mkdirs();
+ IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
+ enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
}
@After
public void tearDown() throws IOException, StorageEngineException {
super.tearDown();
FileUtils.deleteDirectory(tempSGDir);
+ IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
index d841fd0..894784f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.task.MergeTask;
@@ -54,6 +55,7 @@ import org.junit.Test;
public class MergeOverLapTest extends MergeTest {
private File tempSGDir;
+ private static boolean enableVirtualPartition = false;
@Before
public void setUp()
@@ -62,12 +64,15 @@ public class MergeOverLapTest extends MergeTest {
super.setUp();
tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
tempSGDir.mkdirs();
+ IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
+ enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
}
@After
public void tearDown() throws IOException, StorageEngineException {
super.tearDown();
FileUtils.deleteDirectory(tempSGDir);
+ IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
}
@Override
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index fb73a27..6611fa3 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -48,18 +48,22 @@ import org.junit.Test;
public class MergeTaskTest extends MergeTest {
private File tempSGDir;
+ private static boolean enableVirtualPartition = false;
+
@Before
public void setUp() throws IOException, WriteProcessException, MetadataException, MetadataException {
super.setUp();
tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
- tempSGDir.mkdirs();
+ tempSGDir.mkdirs();IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
+ enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
}
@After
public void tearDown() throws IOException, StorageEngineException {
super.tearDown();
FileUtils.deleteDirectory(tempSGDir);
+ IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index 98c104d..69ddcf5 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -66,6 +66,7 @@ public class DeletionFileNodeTest {
private static String[] measurements = new String[10];
private TSDataType dataType = TSDataType.DOUBLE;
private TSEncoding encoding = TSEncoding.PLAIN;
+ private static boolean enableVirtualPartition = false;
private int prevUnseqLevelNum = 0;
@@ -89,12 +90,15 @@ public class DeletionFileNodeTest {
encoding, TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
+ IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
+ enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
}
@After
public void teardown() throws IOException, StorageEngineException {
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(prevUnseqLevelNum);
+ IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
}
private void insertToStorageEngine(TSRecord record)
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 6e0efbc..f115f27 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -62,6 +62,8 @@ public class StorageGroupProcessorTest {
private String measurementId = "s0";
private StorageGroupProcessor processor;
private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT;
+ private static boolean enableVirtualPartition = false;
+
@Before
public void setUp() throws Exception {
@@ -71,6 +73,8 @@ public class StorageGroupProcessorTest {
EnvironmentUtils.envSetUp();
processor = new DummySGP(systemDir, storageGroup);
MergeManager.getINSTANCE().start();
+ IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
+ enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
}
@After
@@ -82,6 +86,7 @@ public class StorageGroupProcessorTest {
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig()
.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
}
private void insertToStorageGroupProcessor(TSRecord record)
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
index bdeb6a4..607a1a8 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
@@ -50,9 +50,6 @@ public class IoTDBAggregationIT {
private static final String TEMPERATURE_STR = "root.ln.wf01.wt01.temperature";
private static String[] creationSqls = new String[]{
- "SET STORAGE GROUP TO root.vehicle.d0",
- "SET STORAGE GROUP TO root.vehicle.d1",
-
"CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
"CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
"CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
@@ -60,7 +57,6 @@ public class IoTDBAggregationIT {
"CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN"
};
private static String[] dataSet2 = new String[]{
- "SET STORAGE GROUP TO root.ln.wf01.wt01",
"CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
"CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=FLOAT, ENCODING=PLAIN",
"CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, ENCODING=PLAIN",
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index 319fdd1..3e5a075 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -117,6 +117,7 @@ public class IoTDBLoadExternalTsfileIT {
private static final String TEST_D0_S0_STR = "root.test.d0.s0";
private static final String TEST_D0_S1_STR = "root.test.d0.s1";
private static final String TEST_D1_STR = "root.test.d1.g0.s0";
+ private static boolean enableVirtualPartition = false;
private static String[] deleteSqls = new String[]{
"DELETE STORAGE GROUP root.vehicle",
@@ -131,6 +132,8 @@ public class IoTDBLoadExternalTsfileIT {
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
prepareData(insertSequenceSqls);
+ enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
+ IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
}
@After
@@ -138,6 +141,7 @@ public class IoTDBLoadExternalTsfileIT {
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig()
.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
index 860ff45..d9176c4 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
@@ -29,6 +29,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -42,6 +43,7 @@ import org.junit.Test;
public class IoTDBRemovePartitionIT {
private static int partitionInterval = 100;
+ private static boolean enableVirtualPartition = false;
@Before
public void setUp() throws Exception {
@@ -49,6 +51,8 @@ public class IoTDBRemovePartitionIT {
EnvironmentUtils.envSetUp();
StorageEngine.setEnablePartition(true);
StorageEngine.setTimePartitionInterval(partitionInterval);
+ enableVirtualPartition = IoTDBDescriptor.getInstance().getConfig().isEnableVirtualPartition();
+ IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(false);
insertData();
}
@@ -57,6 +61,7 @@ public class IoTDBRemovePartitionIT {
StorageEngine.setEnablePartition(false);
StorageEngine.setTimePartitionInterval(-1);
EnvironmentUtils.cleanEnv();
+ IoTDBDescriptor.getInstance().getConfig().setEnableVirtualPartition(enableVirtualPartition);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
index 5b42f1e..73b244d 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -28,6 +28,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
@@ -113,6 +114,11 @@ public class IoTDBRestartIT {
try {
EnvironmentUtils.restartDaemon();
+ StorageEngine.getInstance().recover();
+ // wait for recover
+ while(!StorageEngine.getInstance().isAllSgReady()){
+ Thread.sleep(500);
+ }
} catch (Exception e) {
Assert.fail();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index c1ea53e..8731f44 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.HashVirtualPartitioner;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
@@ -120,6 +121,7 @@ public class EnvironmentUtils {
}
// close metadata
IoTDB.metaManager.clear();
+ HashVirtualPartitioner.getInstance().clear();
// close tracing
if (config.isEnablePerformanceTracing()) {