You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/01/18 07:48:34 UTC
[iotdb] branch master updated: [IOTDB-1079] Virtual storage group
first mile stone (#2405)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 44e764b [IOTDB-1079] Virtual storage group first mile stone (#2405)
44e764b is described below
commit 44e764b173fba160909d6d37b33bad097dd0cbcc
Author: SilverNarcissus <15...@smail.nju.edu.cn>
AuthorDate: Mon Jan 18 15:48:12 2021 +0800
[IOTDB-1079] Virtual storage group first mile stone (#2405)
---
.../iotdb/cluster/ClusterFileFlushPolicy.java | 2 +-
.../cluster/query/filter/SlotTsFileFilter.java | 4 +-
.../resources/conf/iotdb-engine.properties | 5 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 15 +-
.../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 8 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 592 +++++++++++----------
.../iotdb/db/engine/merge/task/MergeTask.java | 28 +-
.../engine/storagegroup/StorageGroupProcessor.java | 199 ++++---
.../virtualSg/HashVirtualPartitioner.java | 67 +++
.../storagegroup/virtualSg/VirtualPartitioner.java | 40 ++
.../virtualSg/VirtualStorageGroupManager.java | 426 +++++++++++++++
.../org/apache/iotdb/db/monitor/StatMonitor.java | 14 +-
.../db/tools/virtualsg/DeviceMappingViewer.java | 61 +++
.../db/engine/cache/ChunkMetadataCacheTest.java | 2 +-
.../apache/iotdb/db/engine/merge/MergeLogTest.java | 1 +
.../iotdb/db/engine/merge/MergeOverLapTest.java | 1 -
.../iotdb/db/engine/merge/MergeTaskTest.java | 1 +
.../engine/modification/DeletionFileNodeTest.java | 27 +-
.../storagegroup/StorageGroupProcessorTest.java | 3 +-
.../iotdb/db/engine/storagegroup/TTLTest.java | 34 +-
.../virtualSg/HashVirtualPartitionerTest.java | 64 +++
.../iotdb/db/integration/IoTDBCompleteIT.java | 5 +
.../db/integration/IoTDBLoadExternalTsfileIT.java | 33 +-
.../iotdb/db/integration/IoTDBMultiDeviceIT.java | 322 +++++++++++
.../iotdb/db/integration/IoTDBRestartIT.java | 16 +
.../iotdb/db/monitor/IoTDBStatMonitorTest.java | 21 +-
.../db/sync/receiver/load/FileLoaderTest.java | 38 +-
.../recover/SyncReceiverLogAnalyzerTest.java | 2 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 3 +
30 files changed, 1588 insertions(+), 452 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java
index cca5c2a..d575ded 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java
@@ -58,7 +58,7 @@ public class ClusterFileFlushPolicy implements TsFileFlushPolicy {
// find the related DataGroupMember and close the processor through it
// we execute it in another thread to avoid deadlocks
closePartitionExecutor
- .submit(() -> metaGroupMember.closePartition(storageGroupProcessor.getStorageGroupName(),
+ .submit(() -> metaGroupMember.closePartition(storageGroupProcessor.getVirtualStorageGroupId(),
processor.getTimeRangeId(), isSeq));
}
// flush the memtable anyway to avoid the insertion trigger the policy again
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/filter/SlotTsFileFilter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/filter/SlotTsFileFilter.java
index 8c0f762..188fa44 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/filter/SlotTsFileFilter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/filter/SlotTsFileFilter.java
@@ -43,9 +43,9 @@ public class SlotTsFileFilter implements TsFileFilter {
}
private static boolean fileNotInSlots(TsFileResource res, List<Integer> nodeSlots) {
- // <storageGroupName>/<partitionNum>/<fileName>
+ // <storageGroupName>/<virtualStorageGroupNumber>/<partitionNum>/<fileName>
String[] pathSegments = FilePathUtils.splitTsFilePath(res);
- String storageGroupName = pathSegments[pathSegments.length - 3];
+ String storageGroupName = pathSegments[pathSegments.length - 4];
int partitionNum = Integer.parseInt(pathSegments[pathSegments.length - 2]);
int slot = SlotPartitionTable.getSlotStrategy()
.calculateSlotByPartitionNum(storageGroupName, partitionNum,
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 8201e9d..e9296a6 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -231,6 +231,11 @@ mtree_snapshot_interval=100000
# Only take effect when enable_mtree_snapshot=true.
mtree_snapshot_threshold_time=3600
+# number of virtual storage groups per user-defined storage group
+# a virtual storage group is the unit of parallelism in memory as all ingestions in one virtual storage group are serialized
+# recommended value is [virtual storage group number] = [CPU core number] / [user-defined storage group number]
+virtual_storage_group_num = 1
+
# Level of TimeIndex, which records the start time and end time of TsFileResource. Currently,
# DEVICE_TIME_INDEX and FILE_TIME_INDEX are supported, and could not be changed after first set.
time_index_level=DEVICE_TIME_INDEX
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0871744..d03114d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -845,6 +845,11 @@ public class IoTDBConfig {
*/
private boolean debugState = false;
+ /**
+ * the number of virtual storage groups per user-defined storage group
+ */
+ private int virtualStorageGroupNum = 1;
+
public IoTDBConfig() {
// empty constructor
}
@@ -1139,7 +1144,7 @@ public class IoTDBConfig {
return schemaDir;
}
- void setSchemaDir(String schemaDir) {
+ public void setSchemaDir(String schemaDir) {
this.schemaDir = schemaDir;
}
@@ -2240,6 +2245,14 @@ public class IoTDBConfig {
this.defaultIndexWindowRange = defaultIndexWindowRange;
}
+ public int getVirtualStorageGroupNum() {
+ return virtualStorageGroupNum;
+ }
+
+ public void setVirtualStorageGroupNum(int virtualStorageGroupNum) {
+ this.virtualStorageGroupNum = virtualStorageGroupNum;
+ }
+
public boolean isRpcAdvancedCompressionEnable() {
return rpcAdvancedCompressionEnable;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
index 9d7efd1..f19db87 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
@@ -81,6 +81,9 @@ public class IoTDBConfigCheck {
private static String maxDegreeOfIndexNode = String
.valueOf(TSFileDescriptor.getInstance().getConfig().getMaxDegreeOfIndexNode());
+ private static final String VIRTUAL_STORAGE_GROUP_NUM = "virtual_storage_group_num";
+ private static String virtualStorageGroupNum = String.valueOf(config.getVirtualStorageGroupNum());
+
private static final String IOTDB_VERSION_STRING = "iotdb_version";
public static IoTDBConfigCheck getInstance() {
@@ -131,6 +134,7 @@ public class IoTDBConfigCheck {
systemProperties.put(ENABLE_PARTITION_STRING, String.valueOf(enablePartition));
systemProperties.put(TAG_ATTRIBUTE_SIZE_STRING, tagAttributeTotalSize);
systemProperties.put(MAX_DEGREE_OF_INDEX_STRING, maxDegreeOfIndexNode);
+ systemProperties.put(VIRTUAL_STORAGE_GROUP_NUM, virtualStorageGroupNum);
}
@@ -298,6 +302,10 @@ public class IoTDBConfigCheck {
if (!(properties.getProperty(MAX_DEGREE_OF_INDEX_STRING).equals(maxDegreeOfIndexNode))) {
printErrorLogAndExit(MAX_DEGREE_OF_INDEX_STRING);
}
+
+ if (!(properties.getProperty(VIRTUAL_STORAGE_GROUP_NUM).equals(virtualStorageGroupNum))) {
+ printErrorLogAndExit(VIRTUAL_STORAGE_GROUP_NUM);
+ }
}
private void printErrorLogAndExit(String property) {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index b11f9d3..a2f7d41 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -45,8 +45,8 @@ import org.slf4j.LoggerFactory;
public class IoTDBDescriptor {
private static final Logger logger = LoggerFactory.getLogger(IoTDBDescriptor.class);
- private IoTDBConfig conf = new IoTDBConfig();
private static CommandLine commandLine;
+ private IoTDBConfig conf = new IoTDBConfig();
protected IoTDBDescriptor() {
loadProps();
@@ -443,6 +443,7 @@ public class IoTDBDescriptor {
.parseInt(properties.getProperty("performance_stat_memory_in_kb",
Integer.toString(conf.getPerformanceStatMemoryInKB())).trim()));
+
int maxConcurrentClientNum = Integer.parseInt(properties.
getProperty("rpc_max_concurrent_client_num",
Integer.toString(conf.getRpcMaxConcurrentClientNum()).trim()));
@@ -538,6 +539,9 @@ public class IoTDBDescriptor {
conf.setDebugState(Boolean.parseBoolean(properties
.getProperty("debug_state", String.valueOf(conf.isDebugOn()))));
+ conf.setVirtualStorageGroupNum(Integer.parseInt(properties
+ .getProperty("virtual_storage_group_num",
+ String.valueOf(conf.getVirtualStorageGroupNum()))));
// mqtt
if (properties.getProperty(IoTDBConstant.MQTT_HOST_NAME) != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 30d5d52..cca2510 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -25,6 +25,7 @@ import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -51,8 +52,8 @@ import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
-import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualStorageGroupManager;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.ShutdownException;
@@ -90,70 +91,41 @@ import org.slf4j.LoggerFactory;
public class StorageEngine implements IService {
- private final Logger logger;
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
+ /**
+ * Time range for dividing storage group, the time unit is the same with IoTDB's
+ * TimestampPrecision
+ */
+ @ServerConfigConsistent
+ private static long timePartitionInterval = -1;
+ /**
+ * whether enable data partition if disabled, all data belongs to partition 0
+ */
+ @ServerConfigConsistent
+ private static boolean enablePartition =
+ IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
+ private final Logger logger;
/**
* a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
* will have a subfolder under the systemDir.
*/
private final String systemDir;
-
/**
* storage group name -> storage group processor
*/
- private final ConcurrentHashMap<PartialPath, StorageGroupProcessor> processorMap = new ConcurrentHashMap<>();
-
- private static final ExecutorService recoveryThreadPool = IoTDBThreadPoolFactory
- .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
-
- public boolean isAllSgReady() {
- return isAllSgReady.get();
- }
-
- public void setAllSgReady(boolean allSgReady) {
- isAllSgReady.set(allSgReady);
- }
-
+ private final ConcurrentHashMap<PartialPath, VirtualStorageGroupManager> processorMap = new ConcurrentHashMap<>();
private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
private ExecutorService recoverAllSgThreadPool;
-
- static class InstanceHolder {
-
- private InstanceHolder() {
- // forbidding instantiation
- }
-
- private static final StorageEngine INSTANCE = new StorageEngine();
- }
-
- public static StorageEngine getInstance() {
- return InstanceHolder.INSTANCE;
- }
-
private ScheduledExecutorService ttlCheckThread;
private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
-
+ private ExecutorService recoveryThreadPool;
// add customized listeners here for flush and close events
private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
private List<FlushListener> customFlushListeners = new ArrayList<>();
- /**
- * Time range for dividing storage group, the time unit is the same with IoTDB's
- * TimestampPrecision
- */
- @ServerConfigConsistent
- private static long timePartitionInterval = -1;
-
- /**
- * whether enable data partition if disabled, all data belongs to partition 0
- */
- @ServerConfigConsistent
- private static boolean enablePartition =
- IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
-
private StorageEngine() {
logger = LoggerFactory.getLogger(StorageEngine.class);
systemDir = FilePathUtils.regularizePath(config.getSystemDir()) + "storage_groups";
@@ -177,7 +149,88 @@ public class StorageEngine implements IService {
recover();
}
+ public static StorageEngine getInstance() {
+ return InstanceHolder.INSTANCE;
+ }
+
+ private static void initTimePartition() {
+ timePartitionInterval = convertMilliWithPrecision(IoTDBDescriptor.getInstance().
+ getConfig().getPartitionInterval() * 1000L);
+ }
+
+ public static long convertMilliWithPrecision(long milliTime) {
+ long result = milliTime;
+ String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
+ switch (timePrecision) {
+ case "ns":
+ result = milliTime * 1000_000L;
+ break;
+ case "us":
+ result = milliTime * 1000L;
+ break;
+ default:
+ break;
+ }
+ return result;
+ }
+
+ public static long getTimePartitionInterval() {
+ if (timePartitionInterval == -1) {
+ initTimePartition();
+ }
+ return timePartitionInterval;
+ }
+
+ @TestOnly
+ public static void setTimePartitionInterval(long timePartitionInterval) {
+ StorageEngine.timePartitionInterval = timePartitionInterval;
+ }
+
+ public static long getTimePartition(long time) {
+ return enablePartition ? time / timePartitionInterval : 0;
+ }
+
+ @TestOnly
+ public static boolean isEnablePartition() {
+ return enablePartition;
+ }
+
+ @TestOnly
+ public static void setEnablePartition(boolean enablePartition) {
+ StorageEngine.enablePartition = enablePartition;
+ }
+
+ /**
+ * block insertion if the insertion is rejected by memory control
+ */
+ public static void blockInsertionIfReject() throws WriteProcessRejectException {
+ long startTime = System.currentTimeMillis();
+ while (SystemInfo.getInstance().isRejected()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+ if (System.currentTimeMillis() - startTime > config.getMaxWaitingTimeWhenInsertBlocked()) {
+ throw new WriteProcessRejectException(
+ "System rejected over " + config.getMaxWaitingTimeWhenInsertBlocked() +
+ "ms");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public boolean isAllSgReady() {
+ return isAllSgReady.get();
+ }
+
+ public void setAllSgReady(boolean allSgReady) {
+ isAllSgReady.set(allSgReady);
+ }
+
public void recover() {
+ setAllSgReady(false);
+ recoveryThreadPool = IoTDBThreadPoolFactory
+ .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
recoverAllSgThreadPool = IoTDBThreadPoolFactory
.newSingleThreadExecutor("Begin-Recovery-Pool");
recoverAllSgThreadPool.submit(this::recoverAllSgs);
@@ -187,26 +240,9 @@ public class StorageEngine implements IService {
/*
* recover all storage group processors.
*/
- List<StorageGroupMNode> sgNodes = IoTDB.metaManager.getAllStorageGroupNodes();
List<Future<Void>> futures = new ArrayList<>();
- for (StorageGroupMNode storageGroup : sgNodes) {
- futures.add(recoveryThreadPool.submit(() -> {
- try {
- StorageGroupProcessor processor = new StorageGroupProcessor(systemDir,
- storageGroup.getFullPath(), fileFlushPolicy);
- processor.setDataTTL(storageGroup.getDataTTL());
- processor.setCustomCloseFileListeners(customCloseFileListeners);
- processor.setCustomFlushListeners(customFlushListeners);
- processorMap.put(storageGroup.getPartialPath(), processor);
- logger.info("Storage Group Processor {} is recovered successfully",
- storageGroup.getFullPath());
- } catch (Exception e) {
- logger
- .error("meet error when recovering storage group: {}", storageGroup.getFullPath(), e);
- }
- return null;
- }));
- }
+ recoverStorageGroupProcessor(futures);
+
for (Future<Void> future : futures) {
try {
future.get();
@@ -221,25 +257,33 @@ public class StorageEngine implements IService {
setAllSgReady(true);
}
- private static void initTimePartition() {
- timePartitionInterval = convertMilliWithPrecision(IoTDBDescriptor.getInstance().
- getConfig().getPartitionInterval() * 1000L);
- }
+ /**
+ * recover logic storage group processor
+ *
+ * @param futures recover future task
+ */
+ private void recoverStorageGroupProcessor(List<Future<Void>> futures) {
+ List<StorageGroupMNode> sgNodes = IoTDB.metaManager.getAllStorageGroupNodes();
+ for (StorageGroupMNode storageGroup : sgNodes) {
+ futures.add(recoveryThreadPool.submit(() -> {
+ try {
+ // for recovery in test
+ VirtualStorageGroupManager virtualStorageGroupManager = processorMap
+ .computeIfAbsent(storageGroup.getPartialPath(),
+ id -> new VirtualStorageGroupManager());
- public static long convertMilliWithPrecision(long milliTime) {
- long result = milliTime;
- String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
- switch (timePrecision) {
- case "ns":
- result = milliTime * 1000_000L;
- break;
- case "us":
- result = milliTime * 1000L;
- break;
- default:
- break;
+ virtualStorageGroupManager.recover(storageGroup);
+
+ logger.info("Storage Group Processor {} is recovered successfully",
+ storageGroup.getFullPath());
+ } catch (Exception e) {
+ logger
+ .error("meet error when recovering storage group: {}", storageGroup.getFullPath(),
+ e);
+ }
+ return null;
+ }));
}
- return result;
}
@Override
@@ -251,8 +295,8 @@ public class StorageEngine implements IService {
private void checkTTL() {
try {
- for (StorageGroupProcessor processor : processorMap.values()) {
- processor.checkFilesTTL();
+ for (VirtualStorageGroupManager processor : processorMap.values()) {
+ processor.checkTTL();
}
} catch (ConcurrentModificationException e) {
// ignore
@@ -319,35 +363,96 @@ public class StorageEngine implements IService {
return ServiceType.STORAGE_ENGINE_SERVICE;
}
- public StorageGroupProcessor getProcessor(PartialPath path) throws StorageEngineException {
+ /**
+ * This method is for sync, delete tsfile or sth like them, just get storage group directly by sg
+ * name
+ *
+ * @param path storage group path
+ * @return storage group processor
+ */
+ public StorageGroupProcessor getProcessorDirectly(PartialPath path)
+ throws StorageEngineException {
PartialPath storageGroupPath;
try {
StorageGroupMNode storageGroupMNode = IoTDB.metaManager.getStorageGroupNodeByPath(path);
storageGroupPath = storageGroupMNode.getPartialPath();
- StorageGroupProcessor processor = processorMap.get(storageGroupPath);
- if (processor == null) {
- waitAllSgReady(storageGroupPath);
- // if finish recover
- if (isAllSgReady.get()) {
- synchronized (storageGroupMNode) {
- processor = processorMap.get(storageGroupPath);
- if (processor == null) {
- logger.info("construct a processor instance, the storage group is {}, Thread is {}",
- storageGroupPath, Thread.currentThread().getId());
- processor = new StorageGroupProcessor(systemDir, storageGroupPath.getFullPath(),
- fileFlushPolicy);
- processor.setDataTTL(storageGroupMNode.getDataTTL());
- processor.setCustomFlushListeners(customFlushListeners);
- processor.setCustomCloseFileListeners(customCloseFileListeners);
- processorMap.put(storageGroupPath, processor);
- }
+ return getStorageGroupProcessorByPath(storageGroupPath, storageGroupMNode);
+ } catch (StorageGroupProcessorException | MetadataException e) {
+ throw new StorageEngineException(e);
+ }
+ }
+
+ /**
+ * This method is for insert and query or sth like them, this may get a virtual storage group
+ *
+ * @param path device path
+ * @return storage group processor
+ */
+ public StorageGroupProcessor getProcessor(PartialPath path) throws StorageEngineException {
+ try {
+ StorageGroupMNode storageGroupMNode = IoTDB.metaManager.getStorageGroupNodeByPath(path);
+ return getStorageGroupProcessorByPath(path, storageGroupMNode);
+ } catch (StorageGroupProcessorException | MetadataException e) {
+ throw new StorageEngineException(e);
+ }
+ }
+
+ /**
+ * get storage group processor by device path
+ *
+ * @param devicePath path of the device
+ * @param storageGroupMNode mnode of the storage group, we need synchronize this to avoid
+ * modification in mtree
+ * @return found or new storage group processor
+ */
+ @SuppressWarnings("java:S2445")
+ // actually storageGroupMNode is a unique object on the mtree, synchronize it is reasonable
+ private StorageGroupProcessor getStorageGroupProcessorByPath(PartialPath devicePath,
+ StorageGroupMNode storageGroupMNode)
+ throws StorageGroupProcessorException, StorageEngineException {
+ VirtualStorageGroupManager virtualStorageGroupManager = processorMap
+ .get(storageGroupMNode.getPartialPath());
+ if (virtualStorageGroupManager == null) {
+ // if finish recover
+ if (isAllSgReady.get()) {
+ waitAllSgReady(devicePath);
+ synchronized (storageGroupMNode) {
+ virtualStorageGroupManager = processorMap.get(storageGroupMNode.getPartialPath());
+ if (virtualStorageGroupManager == null) {
+ virtualStorageGroupManager = new VirtualStorageGroupManager();
+ processorMap.put(storageGroupMNode.getPartialPath(), virtualStorageGroupManager);
}
}
+ } else {
+ // not finished recover, refuse the request
+ throw new StorageEngineException(
+ "the sg " + storageGroupMNode.getPartialPath()
+ + " may not ready now, please wait and retry later",
+ TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
}
- return processor;
- } catch (StorageGroupProcessorException | MetadataException e) {
- throw new StorageEngineException(e);
}
+ return virtualStorageGroupManager.getProcessor(devicePath, storageGroupMNode);
+ }
+
+ /**
+ * build a new storage group processor
+ *
+ * @param virtualStorageGroupId virtual storage group id e.g. 1
+ * @param logicalStorageGroupName logical storage group name e.g. root.sg1
+ */
+ public StorageGroupProcessor buildNewStorageGroupProcessor(PartialPath logicalStorageGroupName,
+ StorageGroupMNode storageGroupMNode, String virtualStorageGroupId)
+ throws StorageGroupProcessorException {
+ StorageGroupProcessor processor;
+ logger.info("construct a processor instance, the storage group is {}, Thread is {}",
+ logicalStorageGroupName, Thread.currentThread().getId());
+ processor = new StorageGroupProcessor(systemDir + File.separator + logicalStorageGroupName,
+ virtualStorageGroupId,
+ fileFlushPolicy, storageGroupMNode.getFullPath());
+ processor.setDataTTL(storageGroupMNode.getDataTTL());
+ processor.setCustomFlushListeners(customFlushListeners);
+ processor.setCustomCloseFileListeners(customCloseFileListeners);
+ return processor;
}
private void waitAllSgReady(PartialPath storageGroupPath) throws StorageEngineException {
@@ -380,10 +485,11 @@ public class StorageEngine implements IService {
* This function is just for unit test.
*/
public synchronized void reset() {
- processorMap.clear();
+ for(VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()){
+ virtualStorageGroupManager.reset();
+ }
}
-
/**
* insert an InsertRowPlan to a storage group.
*
@@ -396,7 +502,14 @@ public class StorageEngine implements IService {
try {
storageGroupProcessor.insert(insertRowPlan);
if (config.isEnableStatMonitor()) {
- updateMonitorStatistics(storageGroupProcessor, insertRowPlan);
+ try {
+ StorageGroupMNode storageGroupMNode = IoTDB.metaManager
+ .getStorageGroupNodeByPath(insertRowPlan.getDeviceId());
+ updateMonitorStatistics(processorMap.get(storageGroupMNode.getPartialPath()),
+ insertRowPlan);
+ } catch (MetadataException e) {
+ logger.error("failed to record status", e);
+ }
}
} catch (WriteProcessException e) {
throw new StorageEngineException(e);
@@ -405,7 +518,8 @@ public class StorageEngine implements IService {
public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
throws StorageEngineException {
- StorageGroupProcessor storageGroupProcessor = getProcessor(insertRowsOfOneDevicePlan.getDeviceId());
+ StorageGroupProcessor storageGroupProcessor = getProcessor(
+ insertRowsOfOneDevicePlan.getDeviceId());
// TODO monitor: update statistics
try {
@@ -430,16 +544,24 @@ public class StorageEngine implements IService {
storageGroupProcessor.insertTablet(insertTabletPlan);
if (config.isEnableStatMonitor()) {
- updateMonitorStatistics(storageGroupProcessor, insertTabletPlan);
+ try {
+ StorageGroupMNode storageGroupMNode = IoTDB.metaManager
+ .getStorageGroupNodeByPath(insertTabletPlan.getDeviceId());
+ updateMonitorStatistics(processorMap.get(storageGroupMNode.getPartialPath()),
+ insertTabletPlan);
+ } catch (MetadataException e) {
+ logger.error("failed to record status", e);
+ }
}
}
- private void updateMonitorStatistics(StorageGroupProcessor processor, InsertPlan insertPlan) {
+ private void updateMonitorStatistics(VirtualStorageGroupManager virtualStorageGroupManager,
+ InsertPlan insertPlan) {
StatMonitor monitor = StatMonitor.getInstance();
int successPointsNum =
insertPlan.getMeasurements().length - insertPlan.getFailedMeasurementNumber();
// update to storage group statistics
- processor.updateMonitorSeriesValue(successPointsNum);
+ virtualStorageGroupManager.updateMonitorSeriesValue(successPointsNum);
// update to global statistics
monitor.updateStatGlobalValue(successPointsNum);
}
@@ -449,57 +571,26 @@ public class StorageEngine implements IService {
*/
public void syncCloseAllProcessor() {
logger.info("Start closing all storage group processor");
- for (StorageGroupProcessor processor : processorMap.values()) {
+ for (VirtualStorageGroupManager processor : processorMap.values()) {
processor.syncCloseAllWorkingTsFileProcessors();
}
}
public void forceCloseAllProcessor() throws TsFileProcessorException {
logger.info("Start force closing all storage group processor");
- for (StorageGroupProcessor processor : processorMap.values()) {
+ for (VirtualStorageGroupManager processor : processorMap.values()) {
processor.forceCloseAllWorkingTsFileProcessors();
}
}
public void closeStorageGroupProcessor(PartialPath storageGroupPath, boolean isSeq,
boolean isSync) {
- StorageGroupProcessor processor = processorMap.get(storageGroupPath);
- if (processor == null) {
+ if (!processorMap.containsKey(storageGroupPath)) {
return;
}
- if (logger.isInfoEnabled()) {
- logger.info("{} closing sg processor is called for closing {}, seq = {}",
- isSync ? "sync" : "async", storageGroupPath,
- isSeq);
- }
-
- processor.writeLock();
- try {
- if (isSeq) {
- // to avoid concurrent modification problem, we need a new array list
- for (TsFileProcessor tsfileProcessor : new ArrayList<>(
- processor.getWorkSequenceTsFileProcessors())) {
- if (isSync) {
- processor.syncCloseOneTsFileProcessor(true, tsfileProcessor);
- } else {
- processor.asyncCloseOneTsFileProcessor(true, tsfileProcessor);
- }
- }
- } else {
- // to avoid concurrent modification problem, we need a new array list
- for (TsFileProcessor tsfileProcessor : new ArrayList<>(
- processor.getWorkUnsequenceTsFileProcessors())) {
- if (isSync) {
- processor.syncCloseOneTsFileProcessor(false, tsfileProcessor);
- } else {
- processor.asyncCloseOneTsFileProcessor(false, tsfileProcessor);
- }
- }
- }
- } finally {
- processor.writeUnlock();
- }
+ VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroupPath);
+ virtualStorageGroupManager.closeStorageGroupProcessor(isSeq, isSync);
}
/**
@@ -513,32 +604,12 @@ public class StorageEngine implements IService {
boolean isSeq,
boolean isSync)
throws StorageGroupNotSetException {
- StorageGroupProcessor processor = processorMap.get(storageGroupPath);
- if (processor == null) {
- throw new StorageGroupNotSetException(storageGroupPath.getFullPath(), true);
- }
-
- logger.info("async closing sg processor is called for closing {}, seq = {}, partitionId = {}",
- storageGroupPath, isSeq, partitionId);
- processor.writeLock();
- // to avoid concurrent modification problem, we need a new array list
- List<TsFileProcessor> processors = isSeq ?
- new ArrayList<>(processor.getWorkSequenceTsFileProcessors()) :
- new ArrayList<>(processor.getWorkUnsequenceTsFileProcessors());
- try {
- for (TsFileProcessor tsfileProcessor : processors) {
- if (tsfileProcessor.getTimeRangeId() == partitionId) {
- if (isSync) {
- processor.syncCloseOneTsFileProcessor(isSeq, tsfileProcessor);
- } else {
- processor.asyncCloseOneTsFileProcessor(isSeq, tsfileProcessor);
- }
- break;
- }
- }
- } finally {
- processor.writeUnlock();
+ if (!processorMap.containsKey(storageGroupPath)) {
+ throw new StorageGroupNotSetException(storageGroupPath.getFullPath());
}
+
+ VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroupPath);
+ virtualStorageGroupManager.closeStorageGroupProcessor(partitionId, isSeq, isSync);
}
public void delete(PartialPath path, long startTime, long endTime, long planIndex)
@@ -546,9 +617,14 @@ public class StorageEngine implements IService {
try {
List<PartialPath> sgPaths = IoTDB.metaManager.searchAllRelatedStorageGroups(path);
for (PartialPath storageGroupPath : sgPaths) {
- StorageGroupProcessor storageGroupProcessor = getProcessor(storageGroupPath);
+ // storage group has no data
+ if (!processorMap.containsKey(storageGroupPath)) {
+ continue;
+ }
+
PartialPath newPath = path.alterPrefixPath(storageGroupPath);
- storageGroupProcessor.delete(newPath, startTime, endTime, planIndex);
+ processorMap.get(storageGroupPath).delete(newPath, startTime, endTime, planIndex);
+
}
} catch (IOException | MetadataException e) {
throw new StorageEngineException(e.getMessage());
@@ -561,10 +637,16 @@ public class StorageEngine implements IService {
public void deleteTimeseries(PartialPath path, long planIndex)
throws StorageEngineException {
try {
- for (PartialPath storageGroupPath : IoTDB.metaManager.searchAllRelatedStorageGroups(path)) {
- StorageGroupProcessor storageGroupProcessor = getProcessor(storageGroupPath);
+ List<PartialPath> sgPaths = IoTDB.metaManager.searchAllRelatedStorageGroups(path);
+ for (PartialPath storageGroupPath : sgPaths) {
+ // storage group has no data
+ if (!processorMap.containsKey(storageGroupPath)) {
+ continue;
+ }
+
PartialPath newPath = path.alterPrefixPath(storageGroupPath);
- storageGroupProcessor.delete(newPath, Long.MIN_VALUE, Long.MAX_VALUE, planIndex);
+ processorMap.get(storageGroupPath)
+ .delete(newPath, Long.MIN_VALUE, Long.MAX_VALUE, planIndex);
}
} catch (IOException | MetadataException e) {
throw new StorageEngineException(e.getMessage());
@@ -592,8 +674,8 @@ public class StorageEngine implements IService {
*/
public int countUpgradeFiles() {
int totalUpgradeFileNum = 0;
- for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
- totalUpgradeFileNum += storageGroupProcessor.countUpgradeFiles();
+ for (VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()) {
+ totalUpgradeFileNum += virtualStorageGroupManager.countUpgradeFiles();
}
return totalUpgradeFileNum;
}
@@ -608,8 +690,8 @@ public class StorageEngine implements IService {
throw new StorageEngineException(
"Current system mode is read only, does not support file upgrade");
}
- for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
- storageGroupProcessor.upgrade();
+ for (VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()) {
+ virtualStorageGroupManager.upgradeAll();
}
}
@@ -622,8 +704,9 @@ public class StorageEngine implements IService {
if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
throw new StorageEngineException("Current system mode is read only, does not support merge");
}
- for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
- storageGroupProcessor.merge(fullMerge);
+
+ for (VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()) {
+ virtualStorageGroupManager.mergeAll(fullMerge);
}
}
@@ -639,8 +722,7 @@ public class StorageEngine implements IService {
private void syncDeleteDataFiles(PartialPath storageGroupPath) {
logger.info("Force to delete the data in storage group processor {}", storageGroupPath);
- StorageGroupProcessor processor = processorMap.get(storageGroupPath);
- processor.syncDeleteDataFiles();
+ processorMap.get(storageGroupPath).syncDeleteDataFiles();
}
/**
@@ -656,21 +738,29 @@ public class StorageEngine implements IService {
}
public void setTTL(PartialPath storageGroup, long dataTTL) throws StorageEngineException {
- StorageGroupProcessor storageGroupProcessor = getProcessor(storageGroup);
- storageGroupProcessor.setDataTTL(dataTTL);
+ // storage group has no data
+ if (!processorMap.containsKey(storageGroup)) {
+ return;
+ }
+
+ processorMap.get(storageGroup).setTTL(dataTTL);
+
}
public void deleteStorageGroup(PartialPath storageGroupPath) {
- deleteAllDataFilesInOneStorageGroup(storageGroupPath);
- StorageGroupProcessor processor = processorMap.remove(storageGroupPath);
- if (processor != null) {
- processor.deleteFolder(systemDir);
+ if (!processorMap.containsKey(storageGroupPath)) {
+ return;
}
+
+ deleteAllDataFilesInOneStorageGroup(storageGroupPath);
+ VirtualStorageGroupManager virtualStorageGroupManager = processorMap.remove(storageGroupPath);
+ virtualStorageGroupManager
+ .deleteStorageGroup(systemDir + File.pathSeparator + storageGroupPath);
}
public void loadNewTsFileForSync(TsFileResource newTsFileResource)
throws StorageEngineException, LoadFileException, IllegalPathException {
- getProcessor(new PartialPath(newTsFileResource.getTsFile().getParentFile().getName()))
+ getProcessorDirectly(new PartialPath(getSgByEngineFile(newTsFileResource.getTsFile())))
.loadNewTsFileForSync(newTsFileResource);
}
@@ -683,24 +773,24 @@ public class StorageEngine implements IService {
String device = deviceSet.iterator().next();
PartialPath devicePath = new PartialPath(device);
PartialPath storageGroupPath = IoTDB.metaManager.getStorageGroupPath(devicePath);
- getProcessor(storageGroupPath).loadNewTsFile(newTsFileResource);
+ getProcessorDirectly(storageGroupPath).loadNewTsFile(newTsFileResource);
}
public boolean deleteTsfileForSync(File deletedTsfile)
throws StorageEngineException, IllegalPathException {
- return getProcessor(new PartialPath(deletedTsfile.getParentFile().getName()))
+ return getProcessorDirectly(new PartialPath(getSgByEngineFile(deletedTsfile)))
.deleteTsfile(deletedTsfile);
}
public boolean deleteTsfile(File deletedTsfile)
throws StorageEngineException, IllegalPathException {
- return getProcessor(new PartialPath(getSgByEngineFile(deletedTsfile)))
+ return getProcessorDirectly(new PartialPath(getSgByEngineFile(deletedTsfile)))
.deleteTsfile(deletedTsfile);
}
public boolean moveTsfile(File tsfileToBeMoved, File targetDir)
throws StorageEngineException, IllegalPathException {
- return getProcessor(new PartialPath(getSgByEngineFile(tsfileToBeMoved)))
+ return getProcessorDirectly(new PartialPath(getSgByEngineFile(tsfileToBeMoved)))
.moveTsfile(tsfileToBeMoved, targetDir);
}
@@ -711,8 +801,8 @@ public class StorageEngine implements IService {
* @param file internal file
* @return sg name
*/
- private String getSgByEngineFile(File file) {
- return file.getParentFile().getParentFile().getName();
+ public String getSgByEngineFile(File file) {
+ return file.getParentFile().getParentFile().getParentFile().getName();
}
/**
@@ -720,18 +810,8 @@ public class StorageEngine implements IService {
*/
public Map<PartialPath, Map<Long, List<TsFileResource>>> getAllClosedStorageGroupTsFile() {
Map<PartialPath, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
- for (Entry<PartialPath, StorageGroupProcessor> entry : processorMap.entrySet()) {
- List<TsFileResource> allResources = entry.getValue().getSequenceFileTreeSet();
- allResources.addAll(entry.getValue().getUnSequenceFileList());
- for (TsFileResource sequenceFile : allResources) {
- if (!sequenceFile.isClosed()) {
- continue;
- }
- long partitionNum = sequenceFile.getTimePartition();
- Map<Long, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(entry.getKey()
- , n -> new HashMap<>());
- storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>()).add(sequenceFile);
- }
+ for (Entry<PartialPath, VirtualStorageGroupManager> entry : processorMap.entrySet()) {
+ entry.getValue().getAllClosedStorageGroupTsFile(entry.getKey(), ret);
}
return ret;
}
@@ -742,46 +822,36 @@ public class StorageEngine implements IService {
public boolean isFileAlreadyExist(TsFileResource tsFileResource, PartialPath storageGroup,
long partitionNum) {
- StorageGroupProcessor processor = processorMap.get(storageGroup);
- return processor != null && processor.isFileAlreadyExist(tsFileResource, partitionNum);
- }
-
- public static long getTimePartitionInterval() {
- if (timePartitionInterval == -1) {
- initTimePartition();
+ VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroup);
+ if (virtualStorageGroupManager == null) {
+ return false;
}
- return timePartitionInterval;
- }
- @TestOnly
- public static void setTimePartitionInterval(long timePartitionInterval) {
- StorageEngine.timePartitionInterval = timePartitionInterval;
- }
+ Iterator<String> partialPathIterator = tsFileResource.getDevices().iterator();
+ try {
+ return getProcessor(new PartialPath(partialPathIterator.next()))
+ .isFileAlreadyExist(tsFileResource, partitionNum);
+ } catch (StorageEngineException | IllegalPathException e) {
+ logger.error("can't find processor with: " + tsFileResource, e);
+ }
- public static long getTimePartition(long time) {
- return enablePartition ? time / timePartitionInterval : 0;
+ return false;
}
/**
* Set the version of given partition to newMaxVersion if it is larger than the current version.
- *
- * @param storageGroup
- * @param partitionId
- * @param newMaxVersion
*/
public void setPartitionVersionToMax(PartialPath storageGroup, long partitionId,
- long newMaxVersion)
- throws StorageEngineException {
- getProcessor(storageGroup).setPartitionFileVersionToMax(partitionId, newMaxVersion);
+ long newMaxVersion) {
+ processorMap.get(storageGroup).setPartitionVersionToMax(partitionId, newMaxVersion);
}
-
public void removePartitions(PartialPath storageGroupPath, TimePartitionFilter filter)
throws StorageEngineException {
- getProcessor(storageGroupPath).removePartitions(filter);
+ processorMap.get(storageGroupPath).removePartitions(filter);
}
- public Map<PartialPath, StorageGroupProcessor> getProcessorMap() {
+ public Map<PartialPath, VirtualStorageGroupManager> getProcessorMap() {
return processorMap;
}
@@ -793,34 +863,12 @@ public class StorageEngine implements IService {
*/
public Map<String, List<Pair<Long, Boolean>>> getWorkingStorageGroupPartitions() {
Map<String, List<Pair<Long, Boolean>>> res = new ConcurrentHashMap<>();
- for (Entry<PartialPath, StorageGroupProcessor> entry : processorMap.entrySet()) {
- List<Pair<Long, Boolean>> partitionIdList = new ArrayList<>();
- StorageGroupProcessor processor = entry.getValue();
- for (TsFileProcessor tsFileProcessor : processor.getWorkSequenceTsFileProcessors()) {
- Pair<Long, Boolean> tmpPair = new Pair<>(tsFileProcessor.getTimeRangeId(), true);
- partitionIdList.add(tmpPair);
- }
-
- for (TsFileProcessor tsFileProcessor : processor.getWorkUnsequenceTsFileProcessors()) {
- Pair<Long, Boolean> tmpPair = new Pair<>(tsFileProcessor.getTimeRangeId(), false);
- partitionIdList.add(tmpPair);
- }
-
- res.put(entry.getKey().getFullPath(), partitionIdList);
+ for (Entry<PartialPath, VirtualStorageGroupManager> entry : processorMap.entrySet()) {
+ entry.getValue().getWorkingStorageGroupPartitions(entry.getKey().getFullPath(), res);
}
return res;
}
- @TestOnly
- public static void setEnablePartition(boolean enablePartition) {
- StorageEngine.enablePartition = enablePartition;
- }
-
- @TestOnly
- public static boolean isEnablePartition() {
- return enablePartition;
- }
-
/**
* Add a listener to listen flush start/end events. Notice that this addition only applies to
* TsFileProcessors created afterwards.
@@ -848,10 +896,10 @@ public class StorageEngine implements IService {
throws StorageEngineException {
Set<StorageGroupProcessor> set = new HashSet<>();
for (PartialPath path : pathList) {
- set.add(getProcessor(path));
+ set.add(getProcessor(path.getDevicePath()));
}
List<StorageGroupProcessor> list = set.stream()
- .sorted(Comparator.comparing(StorageGroupProcessor::getStorageGroupName))
+ .sorted(Comparator.comparing(StorageGroupProcessor::getVirtualStorageGroupId))
.collect(Collectors.toList());
list.forEach(storageGroupProcessor -> storageGroupProcessor.getTsFileManagement().readLock());
return list;
@@ -864,22 +912,12 @@ public class StorageEngine implements IService {
list.forEach(storageGroupProcessor -> storageGroupProcessor.getTsFileManagement().readUnLock());
}
- /**
- * block insertion if the insertion is rejected by memory control
- */
- public static void blockInsertionIfReject() throws WriteProcessRejectException {
- long startTime = System.currentTimeMillis();
- while (SystemInfo.getInstance().isRejected()) {
- try {
- TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
- if (System.currentTimeMillis() - startTime > config.getMaxWaitingTimeWhenInsertBlocked()) {
- throw new WriteProcessRejectException(
- "System rejected over " + config.getMaxWaitingTimeWhenInsertBlocked() +
- "ms");
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ static class InstanceHolder {
+
+ private static final StorageEngine INSTANCE = new StorageEngine();
+
+ private InstanceHolder() {
+ // forbidding instantiation
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 8490410..26316b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -45,10 +45,9 @@ import org.slf4j.LoggerFactory;
/**
* MergeTask merges given seqFiles and unseqFiles into new ones, which basically consists of three
- * steps: 1. rewrite overflowed, modified or small-sized chunks into temp merge files
- * 2. move the merged chunks in the temp files back to the seqFiles or move the unmerged
- * chunks in the seqFiles into temp files and replace the seqFiles with the temp files.
- * 3. remove unseqFiles
+ * steps: 1. rewrite overflowed, modified or small-sized chunks into temp merge files 2. move the
+ * merged chunks in the temp files back to the seqFiles or move the unmerged chunks in the seqFiles
+ * into temp files and replace the seqFiles with the temp files. 3. remove unseqFiles
*/
public class MergeTask implements Callable<Void> {
@@ -60,16 +59,13 @@ public class MergeTask implements Callable<Void> {
String storageGroupName;
MergeLogger mergeLogger;
MergeContext mergeContext = new MergeContext();
-
- private MergeCallback callback;
int concurrentMergeSeriesNum;
String taskName;
boolean fullMerge;
-
States states = States.START;
-
MergeMultiChunkTask chunkTask;
MergeFileTask fileTask;
+ private MergeCallback callback;
MergeTask(List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles, String storageGroupSysDir, MergeCallback callback,
@@ -212,14 +208,6 @@ public class MergeTask implements Callable<Void> {
return storageGroupName;
}
- enum States {
- START,
- MERGE_CHUNKS,
- MERGE_FILES,
- CLEAN_UP,
- ABORTED
- }
-
public String getProgress() {
switch (states) {
case ABORTED:
@@ -239,4 +227,12 @@ public class MergeTask implements Callable<Void> {
public String getTaskName() {
return taskName;
}
+
+ enum States {
+ START,
+ MERGE_CHUNKS,
+ MERGE_FILES,
+ CLEAN_UP,
+ ABORTED
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 8e84416..adeac80 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -134,8 +134,6 @@ public class StorageGroupProcessor {
private static final int MERGE_MOD_START_VERSION_NUM = 1;
private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class);
-
- private final boolean enableMemControl = config.isEnableMemControl();
/**
* indicating the file to be loaded already exists locally.
*/
@@ -144,6 +142,7 @@ public class StorageGroupProcessor {
* indicating the file to be loaded overlap with some files.
*/
private static final int POS_OVERLAP = -3;
+ private final boolean enableMemControl = config.isEnableMemControl();
/**
* a read write lock for guaranteeing concurrent safety when accessing all fields in this class
* (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor,
@@ -206,19 +205,17 @@ public class StorageGroupProcessor {
* latestFlushedTime of devices and will be updated along with partitionLatestFlushedTimeForEachDevice
*/
private Map<String, Long> globalLatestFlushedTimeForEachDevice = new HashMap<>();
- private String storageGroupName;
+ private String virtualStorageGroupId;
+ private String logicalStorageGroupName;
private File storageGroupSysDir;
-
// manage seqFileList and unSeqFileList
private TsFileManagement tsFileManagement;
-
/**
* time partition id -> version controller which assigns a version for each MemTable and
* deletion/update such that after they are persisted, the order of insertions, deletions and
* updates can be re-determined.
*/
private HashMap<Long, VersionController> timePartitionIdVersionControllerMap = new HashMap<>();
-
/**
* when the data in a storage group is older than dataTTL, it is considered invalid and will be
* eventually removed.
@@ -226,7 +223,6 @@ public class StorageGroupProcessor {
private long dataTTL = Long.MAX_VALUE;
private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
private TsFileFlushPolicy fileFlushPolicy;
-
/**
* The max file versions in each partition. By recording this, if several IoTDB instances have the
* same policy of closing file and their ingestion is identical, then files of the same version in
@@ -234,39 +230,33 @@ public class StorageGroupProcessor {
* across different instances. partition number -> max version number
*/
private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
-
- /**
- * value of root.stats."root.sg".TOTAL_POINTS
- */
- private long monitorSeriesValue;
private StorageGroupInfo storageGroupInfo = new StorageGroupInfo(this);
-
/**
* Record the device number of the last TsFile in each storage group, which is applied to
* initialize the array size of DeviceTimeIndex. It is reasonable to assume that the adjacent
* files should have similar numbers of devices. Default value: INIT_ARRAY_SIZE = 64
*/
private int deviceNumInLastClosedTsFile = DeviceTimeIndex.INIT_ARRAY_SIZE;
-
- public boolean isReady() {
- return isReady;
- }
-
- public void setReady(boolean ready) {
- isReady = ready;
- }
-
private boolean isReady = false;
-
private List<CloseFileListener> customCloseFileListeners = Collections.emptyList();
private List<FlushListener> customFlushListeners = Collections.emptyList();
- public StorageGroupProcessor(String systemDir, String storageGroupName,
- TsFileFlushPolicy fileFlushPolicy) throws StorageGroupProcessorException {
- this.storageGroupName = storageGroupName;
+ /**
+ * constrcut a storage group processor
+ *
+ * @param systemDir system dir path
+ * @param virtualStorageGroupId virtual storage group id e.g. 1
+ * @param fileFlushPolicy file flush policy
+ * @param logicalStorageGroupName logical storage group name e.g. root.sg1
+ */
+ public StorageGroupProcessor(String systemDir, String virtualStorageGroupId,
+ TsFileFlushPolicy fileFlushPolicy, String logicalStorageGroupName)
+ throws StorageGroupProcessorException {
+ this.virtualStorageGroupId = virtualStorageGroupId;
+ this.logicalStorageGroupName = logicalStorageGroupName;
this.fileFlushPolicy = fileFlushPolicy;
- storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, storageGroupName);
+ storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, virtualStorageGroupId);
if (storageGroupSysDir.mkdirs()) {
logger.info("Storage Group system Directory {} doesn't exist, create it",
storageGroupSysDir.getPath());
@@ -275,12 +265,24 @@ public class StorageGroupProcessor {
storageGroupSysDir.getPath());
}
this.tsFileManagement = IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
- .getTsFileManagement(storageGroupName, storageGroupSysDir.getAbsolutePath());
+ .getTsFileManagement(logicalStorageGroupName, storageGroupSysDir.getAbsolutePath());
recover();
}
+ public String getLogicalStorageGroupName() {
+ return logicalStorageGroupName;
+ }
+
+ public boolean isReady() {
+ return isReady;
+ }
+
+ public void setReady(boolean ready) {
+ isReady = ready;
+ }
+
private Map<Long, List<TsFileResource>> splitResourcesByPartition(
List<TsFileResource> resources) {
Map<Long, List<TsFileResource>> ret = new HashMap<>();
@@ -291,7 +293,7 @@ public class StorageGroupProcessor {
}
private void recover() throws StorageGroupProcessorException {
- logger.info("recover Storage Group {}", storageGroupName);
+ logger.info("recover Storage Group {}", logicalStorageGroupName + "-" + virtualStorageGroupId);
try {
// collect candidate TsFiles from sequential and unsequential data directory
@@ -319,7 +321,7 @@ public class StorageGroupProcessor {
recoverTsFiles(value, false);
}
- String taskName = storageGroupName + "-" + System.currentTimeMillis();
+ String taskName = virtualStorageGroupId + "-" + System.currentTimeMillis();
File mergingMods = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir,
MERGING_MODIFICATION_FILE_NAME);
if (mergingMods.exists()) {
@@ -330,8 +332,8 @@ public class StorageGroupProcessor {
tsFileManagement.getTsFileList(false), storageGroupSysDir.getPath(),
tsFileManagement::mergeEndAction,
taskName,
- IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
- logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
+ IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), logicalStorageGroupName);
+ logger.info("{} a RecoverMergeTask {} starts...", virtualStorageGroupId, taskName);
recoverMergeTask
.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
@@ -363,33 +365,21 @@ public class StorageGroupProcessor {
private void recoverCompaction() {
if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
compactionMergeWorking = true;
- logger.info("{} submit a compaction merge task", storageGroupName);
+ logger.info("{} submit a compaction merge task", virtualStorageGroupId);
try {
CompactionMergeTaskPoolManager.getInstance()
.submitTask(
tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
} catch (RejectedExecutionException e) {
this.closeCompactionMergeCallBack();
- logger.error("{} compaction submit task failed", storageGroupName);
+ logger.error("{} compaction submit task failed", virtualStorageGroupId);
}
} else {
logger.error("{} compaction pool not started ,recover failed",
- storageGroupName);
+ virtualStorageGroupId);
}
}
- public long getMonitorSeriesValue() {
- return monitorSeriesValue;
- }
-
- public void setMonitorSeriesValue(long monitorSeriesValue) {
- this.monitorSeriesValue = monitorSeriesValue;
- }
-
- public void updateMonitorSeriesValue(int successPointsNum) {
- this.monitorSeriesValue += successPointsNum;
- }
-
/**
* use old seq file to update latestTimeForEachDevice, globalLatestFlushedTimeForEachDevice,
* partitionLatestFlushedTimeForEachDevice and timePartitionIdVersionControllerMap
@@ -439,7 +429,8 @@ public class StorageGroupProcessor {
List<File> tsFiles = new ArrayList<>();
List<File> upgradeFiles = new ArrayList<>();
for (String baseDir : folders) {
- File fileFolder = fsFactory.getFile(baseDir, storageGroupName);
+ File fileFolder = fsFactory
+ .getFile(baseDir + File.separator + logicalStorageGroupName, virtualStorageGroupId);
if (!fileFolder.exists()) {
continue;
}
@@ -554,8 +545,8 @@ public class StorageGroupProcessor {
long timePartitionId = tsFileResource.getTimePartition();
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(
- storageGroupName + FILE_NAME_SEPARATOR, tsFileResource, isSeq,
- i == tsFiles.size() - 1);
+ logicalStorageGroupName + File.separator + virtualStorageGroupId
+ + FILE_NAME_SEPARATOR, tsFileResource, isSeq, i == tsFiles.size() - 1);
RestorableTsFileIOWriter writer;
try {
@@ -585,7 +576,7 @@ public class StorageGroupProcessor {
// the last file is not closed, continue writing to in
TsFileProcessor tsFileProcessor;
if (isSeq) {
- tsFileProcessor = new TsFileProcessor(storageGroupName, storageGroupInfo, tsFileResource,
+ tsFileProcessor = new TsFileProcessor(virtualStorageGroupId, storageGroupInfo, tsFileResource,
this::closeUnsealedTsFileProcessorCallBack, this::updateLatestFlushTimeCallback,
true, writer);
if (enableMemControl) {
@@ -597,7 +588,7 @@ public class StorageGroupProcessor {
}
workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
} else {
- tsFileProcessor = new TsFileProcessor(storageGroupName, storageGroupInfo, tsFileResource,
+ tsFileProcessor = new TsFileProcessor(virtualStorageGroupId, storageGroupInfo, tsFileResource,
this::closeUnsealedTsFileProcessorCallBack, this::unsequenceFlushCallback, false,
writer);
if (enableMemControl) {
@@ -989,7 +980,7 @@ public class StorageGroupProcessor {
"will close a {} TsFile because too many active partitions ({} > {}) in the storage group {},",
sequence, tsFileProcessorTreeMap.size(),
IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition(),
- storageGroupName);
+ logicalStorageGroupName + "-" + virtualStorageGroupId);
asyncCloseOneTsFileProcessor(sequence, processorEntry.getValue());
}
@@ -1017,16 +1008,19 @@ public class StorageGroupProcessor {
} else {
baseDir = DirectoryManager.getInstance().getNextFolderForUnSequenceFile();
}
- fsFactory.getFile(baseDir, storageGroupName).mkdirs();
+ fsFactory.getFile(baseDir + File.separator + logicalStorageGroupName, virtualStorageGroupId)
+ .mkdirs();
String filePath =
- baseDir + File.separator + storageGroupName + File.separator + timePartitionId
+ baseDir + File.separator + logicalStorageGroupName + File.separator + virtualStorageGroupId
+ + File.separator + timePartitionId
+ File.separator
+ getNewTsFileName(timePartitionId);
TsFileProcessor tsFileProcessor;
if (sequence) {
- tsFileProcessor = new TsFileProcessor(storageGroupName,
+ tsFileProcessor = new TsFileProcessor(
+ logicalStorageGroupName + File.separator + virtualStorageGroupId,
fsFactory.getFileWithParent(filePath), storageGroupInfo,
this::closeUnsealedTsFileProcessorCallBack,
this::updateLatestFlushTimeCallback, true, deviceNumInLastClosedTsFile);
@@ -1038,7 +1032,8 @@ public class StorageGroupProcessor {
.getTsFileResource().calculateRamSize());
}
} else {
- tsFileProcessor = new TsFileProcessor(storageGroupName,
+ tsFileProcessor = new TsFileProcessor(
+ logicalStorageGroupName + File.separator + virtualStorageGroupId,
fsFactory.getFileWithParent(filePath), storageGroupInfo,
this::closeUnsealedTsFileProcessorCallBack,
this::unsequenceFlushCallback, false, deviceNumInLastClosedTsFile);
@@ -1083,7 +1078,7 @@ public class StorageGroupProcessor {
closeStorageGroupCondition.wait(60_000);
if (System.currentTimeMillis() - startTime > 60_000) {
logger
- .warn("{} has spent {}s to wait for closing one tsfile.", this.storageGroupName,
+ .warn("{} has spent {}s to wait for closing one tsfile.", logicalStorageGroupName + "-" + this.virtualStorageGroupId,
(System.currentTimeMillis() - startTime) / 1000);
}
}
@@ -1091,7 +1086,7 @@ public class StorageGroupProcessor {
Thread.currentThread().interrupt();
logger
.error("syncCloseOneTsFileProcessor error occurs while waiting for closing the storage "
- + "group {}", storageGroupName, e);
+ + "group {}", logicalStorageGroupName + "-" + virtualStorageGroupId, e);
}
}
}
@@ -1118,7 +1113,7 @@ public class StorageGroupProcessor {
if (!workUnsequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) {
timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
}
- logger.info("close a sequence tsfile processor {}", storageGroupName);
+ logger.info("close a sequence tsfile processor {}", logicalStorageGroupName + "-" + virtualStorageGroupId);
} else {
closingUnSequenceTsFileProcessor.add(tsFileProcessor);
tsFileProcessor.asyncClose();
@@ -1135,16 +1130,17 @@ public class StorageGroupProcessor {
* delete the storageGroup's own folder in folder data/system/storage_groups
*/
public void deleteFolder(String systemDir) {
- logger.info("{} will close all files for deleting data folder {}", storageGroupName, systemDir);
+ logger.info("{} will close all files for deleting data folder {}", logicalStorageGroupName + "-" + virtualStorageGroupId, systemDir);
writeLock();
syncCloseAllWorkingTsFileProcessors();
try {
- File storageGroupFolder = SystemFileFactory.INSTANCE.getFile(systemDir, storageGroupName);
+ File storageGroupFolder = SystemFileFactory.INSTANCE.getFile(systemDir,
+ virtualStorageGroupId);
if (storageGroupFolder.exists()) {
org.apache.iotdb.db.utils.FileUtils.deleteDirectory(storageGroupFolder);
}
} catch (IOException e) {
- logger.error("Cannot delete the folder in storage group {}, because", storageGroupName, e);
+ logger.error("Cannot delete the folder in storage group {}, because", logicalStorageGroupName + "-" + virtualStorageGroupId, e);
} finally {
writeUnlock();
}
@@ -1168,7 +1164,7 @@ public class StorageGroupProcessor {
}
public void syncDeleteDataFiles() {
- logger.info("{} will close all files for deleting data files", storageGroupName);
+ logger.info("{} will close all files for deleting data files", logicalStorageGroupName + "-" + virtualStorageGroupId);
writeLock();
syncCloseAllWorkingTsFileProcessors();
//normally, mergingModification is just need to be closed by after a merge task is finished.
@@ -1201,7 +1197,8 @@ public class StorageGroupProcessor {
private void deleteAllSGFolders(List<String> folder) {
for (String tsfilePath : folder) {
- File storageGroupFolder = fsFactory.getFile(tsfilePath, storageGroupName);
+ File storageGroupFolder = fsFactory
+ .getFile(tsfilePath, logicalStorageGroupName + File.separator + virtualStorageGroupId);
if (storageGroupFolder.exists()) {
try {
org.apache.iotdb.db.utils.FileUtils.deleteDirectory(storageGroupFolder);
@@ -1217,12 +1214,12 @@ public class StorageGroupProcessor {
*/
public synchronized void checkFilesTTL() {
if (dataTTL == Long.MAX_VALUE) {
- logger.debug("{}: TTL not set, ignore the check", storageGroupName);
+ logger.debug("{}: TTL not set, ignore the check", logicalStorageGroupName + "-" + virtualStorageGroupId);
return;
}
long timeLowerBound = System.currentTimeMillis() - dataTTL;
if (logger.isDebugEnabled()) {
- logger.debug("{}: TTL removing files before {}", storageGroupName, new Date(timeLowerBound));
+ logger.debug("{}: TTL removing files before {}", logicalStorageGroupName + "-" + virtualStorageGroupId, new Date(timeLowerBound));
}
// copy to avoid concurrent modification of deletion
@@ -1284,13 +1281,13 @@ public class StorageGroupProcessor {
.isEmpty()) {
closeStorageGroupCondition.wait(60_000);
if (System.currentTimeMillis() - startTime > 60_000) {
- logger.warn("{} has spent {}s to wait for closing all TsFiles.", this.storageGroupName,
+ logger.warn("{} has spent {}s to wait for closing all TsFiles.", logicalStorageGroupName + "-" + this.virtualStorageGroupId,
(System.currentTimeMillis() - startTime) / 1000);
}
}
} catch (InterruptedException e) {
logger.error("CloseFileNodeCondition error occurs while waiting for closing the storage "
- + "group {}", storageGroupName, e);
+ + "group {}", logicalStorageGroupName + "-" + virtualStorageGroupId, e);
Thread.currentThread().interrupt();
}
}
@@ -1299,7 +1296,7 @@ public class StorageGroupProcessor {
public void asyncCloseAllWorkingTsFileProcessors() {
writeLock();
try {
- logger.info("async force close all files in storage group: {}", storageGroupName);
+ logger.info("async force close all files in storage group: {}", logicalStorageGroupName + "-" + virtualStorageGroupId);
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsFileProcessor : new ArrayList<>(
workSequenceTsFileProcessors.values())) {
@@ -1318,7 +1315,7 @@ public class StorageGroupProcessor {
public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException {
writeLock();
try {
- logger.info("force close all processors in storage group: {}", storageGroupName);
+ logger.info("force close all processors in storage group: {}", logicalStorageGroupName + "-" + virtualStorageGroupId);
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsFileProcessor : new ArrayList<>(
workSequenceTsFileProcessors.values())) {
@@ -1458,11 +1455,7 @@ public class StorageGroupProcessor {
lastUpdateTime = curTime;
}
}
- // There is no tsfile data, the delete operation is invalid
- if (lastUpdateTime == null) {
- logger.debug("No device {} in SG {}, deletion invalid", device, storageGroupName);
- return;
- }
+
// delete Last cache record if necessary
tryToDeleteLastCache(device, path, startTime, endTime);
}
@@ -1656,12 +1649,12 @@ public class StorageGroupProcessor {
synchronized (closeStorageGroupCondition) {
closeStorageGroupCondition.notifyAll();
}
- logger.info("signal closing storage group condition in {}", storageGroupName);
+ logger.info("signal closing storage group condition in {}", logicalStorageGroupName + "-" + virtualStorageGroupId);
if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance()
.isTerminated()) {
compactionMergeWorking = true;
- logger.info("{} submit a compaction merge task", storageGroupName);
+ logger.info("{} submit a compaction merge task", logicalStorageGroupName + "-" + virtualStorageGroupId);
try {
// fork and filter current tsfile, then commit then to compaction merge
tsFileManagement.forkCurrentFileList(tsFileProcessor.getTimeRangeId());
@@ -1671,11 +1664,11 @@ public class StorageGroupProcessor {
tsFileProcessor.getTimeRangeId()));
} catch (IOException | RejectedExecutionException e) {
this.closeCompactionMergeCallBack();
- logger.error("{} compaction submit task failed", storageGroupName);
+ logger.error("{} compaction submit task failed", logicalStorageGroupName + "-" + virtualStorageGroupId);
}
} else {
logger.info("{} last compaction merge task is working, skip current merge",
- storageGroupName);
+ logicalStorageGroupName + "-" + virtualStorageGroupId);
}
}
@@ -2142,7 +2135,8 @@ public class StorageGroupProcessor {
case LOAD_UNSEQUENCE:
targetFile = fsFactory
.getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
- storageGroupName + File.separatorChar + filePartitionId + File.separator
+ logicalStorageGroupName + File.separatorChar + virtualStorageGroupId + File.separatorChar
+ + filePartitionId + File.separator
+ tsFileResource.getTsFile().getName());
tsFileResource.setFile(targetFile);
if (tsFileManagement.contains(tsFileResource, false)) {
@@ -2156,7 +2150,8 @@ public class StorageGroupProcessor {
case LOAD_SEQUENCE:
targetFile =
fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
- storageGroupName + File.separatorChar + filePartitionId + File.separator
+ logicalStorageGroupName + File.separatorChar + virtualStorageGroupId + File.separatorChar
+ + filePartitionId + File.separator
+ tsFileResource.getTsFile().getName());
tsFileResource.setFile(targetFile);
if (tsFileManagement.contains(tsFileResource, true)) {
@@ -2335,8 +2330,8 @@ public class StorageGroupProcessor {
return tsFileManagement.getTsFileList(false);
}
- public String getStorageGroupName() {
- return storageGroupName;
+ public String getVirtualStorageGroupId() {
+ return virtualStorageGroupId;
}
public StorageGroupInfo getStorageGroupInfo() {
@@ -2404,7 +2399,7 @@ public class StorageGroupProcessor {
tsFileManagement.writeLock();
try {
// abort ongoing merges
- MergeManager.getINSTANCE().abortMerge(storageGroupName);
+ MergeManager.getINSTANCE().abortMerge(virtualStorageGroupId);
// close all working files that should be removed
removePartitions(filter, workSequenceTsFileProcessors.entrySet());
removePartitions(filter, workUnsequenceTsFileProcessors.entrySet());
@@ -2427,7 +2422,7 @@ public class StorageGroupProcessor {
Entry<Long, TsFileProcessor> longTsFileProcessorEntry = iterator.next();
long partitionId = longTsFileProcessorEntry.getKey();
TsFileProcessor processor = longTsFileProcessorEntry.getValue();
- if (filter.satisfy(storageGroupName, partitionId)) {
+ if (filter.satisfy(logicalStorageGroupName, partitionId)) {
processor.syncClose();
iterator.remove();
logger.debug("{} is removed during deleting partitions",
@@ -2440,7 +2435,7 @@ public class StorageGroupProcessor {
private void removePartitions(TimePartitionFilter filter, Iterator<TsFileResource> iterator) {
while (iterator.hasNext()) {
TsFileResource tsFileResource = iterator.next();
- if (filter.satisfy(storageGroupName, tsFileResource.getTimePartition())) {
+ if (filter.satisfy(logicalStorageGroupName, tsFileResource.getTimePartition())) {
tsFileResource.remove();
iterator.remove();
logger.debug("{} is removed during deleting partitions", tsFileResource.getTsFilePath());
@@ -2494,6 +2489,21 @@ public class StorageGroupProcessor {
}
+ @TestOnly
+ public long getPartitionMaxFileVersions(long partitionId) {
+ return partitionMaxFileVersions.getOrDefault(partitionId, -1L);
+ }
+
+ public void setCustomCloseFileListeners(
+ List<CloseFileListener> customCloseFileListeners) {
+ this.customCloseFileListeners = customCloseFileListeners;
+ }
+
+ public void setCustomFlushListeners(
+ List<FlushListener> customFlushListeners) {
+ this.customFlushListeners = customFlushListeners;
+ }
+
private enum LoadTsFileType {
LOAD_SEQUENCE, LOAD_UNSEQUENCE
}
@@ -2527,19 +2537,4 @@ public class StorageGroupProcessor {
boolean satisfy(String storageGroupName, long timePartitionId);
}
-
- @TestOnly
- public long getPartitionMaxFileVersions(long partitionId) {
- return partitionMaxFileVersions.getOrDefault(partitionId, -1L);
- }
-
- public void setCustomCloseFileListeners(
- List<CloseFileListener> customCloseFileListeners) {
- this.customCloseFileListeners = customCloseFileListeners;
- }
-
- public void setCustomFlushListeners(
- List<FlushListener> customFlushListeners) {
- this.customFlushListeners = customFlushListeners;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
new file mode 100644
index 0000000..998cb82
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitioner.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.TestOnly;
+
+public class HashVirtualPartitioner implements VirtualPartitioner {
+
+ public static int STORAGE_GROUP_NUM = IoTDBDescriptor.getInstance().getConfig()
+ .getVirtualStorageGroupNum();
+
+
+ private HashVirtualPartitioner() {
+
+ }
+
+ public static HashVirtualPartitioner getInstance() {
+ return HashVirtualPartitionerHolder.INSTANCE;
+ }
+
+ @Override
+ public int deviceToVirtualStorageGroupId(PartialPath deviceId) {
+ return toStorageGroupId(deviceId);
+ }
+
+ @TestOnly
+ public void setStorageGroupNum(int i) {
+ STORAGE_GROUP_NUM = i;
+ }
+
+ @Override
+ public int getPartitionCount() {
+ return STORAGE_GROUP_NUM;
+ }
+
+
+ private int toStorageGroupId(PartialPath deviceId) {
+ return Math.abs(deviceId.hashCode() % STORAGE_GROUP_NUM);
+ }
+
+ private static class HashVirtualPartitionerHolder {
+
+ private static final HashVirtualPartitioner INSTANCE = new HashVirtualPartitioner();
+
+ private HashVirtualPartitionerHolder() {
+ // allowed to do nothing
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitioner.java
new file mode 100644
index 0000000..3043fda
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualPartitioner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+
+public interface VirtualPartitioner {
+
+ /**
+ * use device id to determine storage group id
+ *
+ * @param deviceId device id
+ * @return virtual storage group id
+ */
+ public int deviceToVirtualStorageGroupId(PartialPath deviceId);
+
+ /**
+ * get total number of virtual storage group
+ *
+ * @return total number of virtual storage group
+ */
+ public int getPartitionCount();
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
new file mode 100644
index 0000000..13dec3e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VirtualStorageGroupManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(VirtualStorageGroupManager.class);
+
+ /**
+ * virtual storage group partitioner
+ */
+ VirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
+
+ /**
+ * all virtual storage group processor
+ */
+ StorageGroupProcessor[] virtualStorageGroupProcessor;
+
+ /**
+ * value of root.stats."root.sg".TOTAL_POINTS
+ */
+ private long monitorSeriesValue;
+
+ public VirtualStorageGroupManager() {
+ virtualStorageGroupProcessor = new StorageGroupProcessor[partitioner.getPartitionCount()];
+ }
+
+ /**
+ * push forceCloseAllWorkingTsFileProcessors down to all sg
+ */
+ public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ storageGroupProcessor.forceCloseAllWorkingTsFileProcessors();
+ }
+ }
+ }
+
+ /**
+ * push syncCloseAllWorkingTsFileProcessors down to all sg
+ */
+ public void syncCloseAllWorkingTsFileProcessors() {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
+ }
+ }
+ }
+
+ /**
+ * push check ttl down to all sg
+ */
+ public void checkTTL() {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ storageGroupProcessor.checkFilesTTL();
+ }
+ }
+ }
+
+ /**
+ * get processor from device id
+ *
+ * @param partialPath device path
+ * @return virtual storage group processor
+ */
+ @SuppressWarnings("java:S2445")
+ // actually storageGroupMNode is a unique object on the mtree, synchronize it is reasonable
+ public StorageGroupProcessor getProcessor(PartialPath partialPath,
+ StorageGroupMNode storageGroupMNode)
+ throws StorageGroupProcessorException, StorageEngineException {
+ int loc = partitioner.deviceToVirtualStorageGroupId(partialPath);
+
+ StorageGroupProcessor processor = virtualStorageGroupProcessor[loc];
+ if (processor == null) {
+ // if finish recover
+ if (StorageEngine.getInstance().isAllSgReady()) {
+ synchronized (storageGroupMNode) {
+ processor = virtualStorageGroupProcessor[loc];
+ if (processor == null) {
+ processor = StorageEngine.getInstance()
+ .buildNewStorageGroupProcessor(storageGroupMNode.getPartialPath(),
+ storageGroupMNode, String.valueOf(loc));
+ virtualStorageGroupProcessor[loc] = processor;
+ }
+ }
+ } else {
+ // not finished recover, refuse the request
+ throw new StorageEngineException(
+ "the sg " + partialPath + " may not ready now, please wait and retry later",
+ TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
+ }
+ }
+
+ return processor;
+ }
+
+ /**
+ * recover
+ *
+ * @param storageGroupMNode logical sg mnode
+ */
+ public void recover(StorageGroupMNode storageGroupMNode) {
+ List<Thread> threadList = new ArrayList<>(partitioner.getPartitionCount());
+ for (int i = 0; i < partitioner.getPartitionCount(); i++) {
+ int cur = i;
+ Thread recoverThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ StorageGroupProcessor processor = null;
+ try {
+ processor = StorageEngine.getInstance()
+ .buildNewStorageGroupProcessor(storageGroupMNode.getPartialPath(),
+ storageGroupMNode, String.valueOf(cur));
+ } catch (StorageGroupProcessorException e) {
+ logger.error(
+ "failed to recover storage group processor in " + storageGroupMNode.getFullPath()
+ + " virtual storage group id is " + cur);
+ }
+ virtualStorageGroupProcessor[cur] = processor;
+ }
+ });
+
+ threadList.add(recoverThread);
+ recoverThread.start();
+ }
+
+ for (int i = 0; i < partitioner.getPartitionCount(); i++) {
+ try {
+ threadList.get(i).join();
+ } catch (InterruptedException e) {
+ logger.error(
+ "failed to recover storage group processor in " + storageGroupMNode.getFullPath()
+ + " virtual storage group id is " + i);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public long getMonitorSeriesValue() {
+ return monitorSeriesValue;
+ }
+
+ public void setMonitorSeriesValue(long monitorSeriesValue) {
+ this.monitorSeriesValue = monitorSeriesValue;
+ }
+
+ public void updateMonitorSeriesValue(int successPointsNum) {
+ this.monitorSeriesValue += successPointsNum;
+ }
+
+ /**
+ * push closeStorageGroupProcessor operation down to all virtual storage group processors
+ */
+ public void closeStorageGroupProcessor(boolean isSeq, boolean isSync) {
+ for (StorageGroupProcessor processor : virtualStorageGroupProcessor) {
+ if (processor == null) {
+ continue;
+ }
+
+ if (logger.isInfoEnabled()) {
+ logger.info("{} closing sg processor is called for closing {}, seq = {}",
+ isSync ? "sync" : "async",
+ processor.getVirtualStorageGroupId() + "-" + processor.getLogicalStorageGroupName(),
+ isSeq);
+ }
+
+ processor.writeLock();
+ try {
+ if (isSeq) {
+ // to avoid concurrent modification problem, we need a new array list
+ for (TsFileProcessor tsfileProcessor : new ArrayList<>(
+ processor.getWorkSequenceTsFileProcessors())) {
+ if (isSync) {
+ processor.syncCloseOneTsFileProcessor(true, tsfileProcessor);
+ } else {
+ processor.asyncCloseOneTsFileProcessor(true, tsfileProcessor);
+ }
+ }
+ } else {
+ // to avoid concurrent modification problem, we need a new array list
+ for (TsFileProcessor tsfileProcessor : new ArrayList<>(
+ processor.getWorkUnsequenceTsFileProcessors())) {
+ if (isSync) {
+ processor.syncCloseOneTsFileProcessor(false, tsfileProcessor);
+ } else {
+ processor.asyncCloseOneTsFileProcessor(false, tsfileProcessor);
+ }
+ }
+ }
+ } finally {
+ processor.writeUnlock();
+ }
+ }
+ }
+
+ /**
+ * push closeStorageGroupProcessor operation down to all virtual storage group processors
+ */
+ public void closeStorageGroupProcessor(long partitionId,
+ boolean isSeq,
+ boolean isSync) {
+ for (StorageGroupProcessor processor : virtualStorageGroupProcessor) {
+ if (processor != null) {
+ logger
+ .info("async closing sg processor is called for closing {}, seq = {}, partitionId = {}",
+ processor.getVirtualStorageGroupId() + "-" + processor
+ .getLogicalStorageGroupName(), isSeq, partitionId);
+ processor.writeLock();
+ // to avoid concurrent modification problem, we need a new array list
+ List<TsFileProcessor> processors = isSeq ?
+ new ArrayList<>(processor.getWorkSequenceTsFileProcessors()) :
+ new ArrayList<>(processor.getWorkUnsequenceTsFileProcessors());
+ try {
+ for (TsFileProcessor tsfileProcessor : processors) {
+ if (tsfileProcessor.getTimeRangeId() == partitionId) {
+ if (isSync) {
+ processor.syncCloseOneTsFileProcessor(isSeq, tsfileProcessor);
+ } else {
+ processor.asyncCloseOneTsFileProcessor(isSeq, tsfileProcessor);
+ }
+ break;
+ }
+ }
+ } finally {
+ processor.writeUnlock();
+ }
+ }
+ }
+ }
+
+ /**
+ * push delete operation down to all virtual storage group processors
+ */
+ public void delete(PartialPath path, long startTime, long endTime, long planIndex)
+ throws IOException {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ storageGroupProcessor.delete(path, startTime, endTime, planIndex);
+ }
+ }
+ }
+
+ /**
+ * push countUpgradeFiles operation down to all virtual storage group processors
+ */
+ public int countUpgradeFiles() {
+ int totalUpgradeFileNum = 0;
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ totalUpgradeFileNum += storageGroupProcessor.countUpgradeFiles();
+ }
+ }
+
+ return totalUpgradeFileNum;
+ }
+
+ /**
+ * push upgradeAll operation down to all virtual storage group processors
+ */
+ public void upgradeAll() {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ storageGroupProcessor.upgrade();
+ }
+ }
+ }
+
+ /**
+ * push mergeAll operation down to all virtual storage group processors
+ */
+ public void mergeAll(boolean fullMerge) {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ storageGroupProcessor.merge(fullMerge);
+ }
+ }
+ }
+
+ /**
+ * push syncDeleteDataFiles operation down to all virtual storage group processors
+ */
+ public void syncDeleteDataFiles() {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ storageGroupProcessor.syncDeleteDataFiles();
+ }
+ }
+ }
+
+ /**
+ * push setTTL operation down to all virtual storage group processors
+ */
+ public void setTTL(long dataTTL) {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ storageGroupProcessor.setDataTTL(dataTTL);
+ }
+ }
+ }
+
+ /**
+ * push deleteStorageGroup operation down to all virtual storage group processors
+ */
+ public void deleteStorageGroup(String path) {
+ for (StorageGroupProcessor processor : virtualStorageGroupProcessor) {
+ if (processor != null) {
+ processor.deleteFolder(path);
+ }
+ }
+ }
+
+ /**
+ * push getAllClosedStorageGroupTsFile operation down to all virtual storage group processors
+ */
+ public void getAllClosedStorageGroupTsFile(PartialPath storageGroupName,
+ Map<PartialPath, Map<Long, List<TsFileResource>>> ret) {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ List<TsFileResource> allResources = storageGroupProcessor.getSequenceFileTreeSet();
+ allResources.addAll(storageGroupProcessor.getUnSequenceFileList());
+ for (TsFileResource tsfile : allResources) {
+ if (!tsfile.isClosed()) {
+ continue;
+ }
+ long partitionNum = tsfile.getTimePartition();
+ Map<Long, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(storageGroupName
+ , n -> new HashMap<>());
+ storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>())
+ .add(tsfile);
+ }
+ }
+ }
+ }
+
+ /**
+ * push setPartitionVersionToMax operation down to all virtual storage group processors
+ */
+ public void setPartitionVersionToMax(long partitionId, long newMaxVersion) {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ storageGroupProcessor.setPartitionFileVersionToMax(partitionId, newMaxVersion);
+ }
+ }
+ }
+
+ /**
+ * push removePartitions operation down to all virtual storage group processors
+ */
+ public void removePartitions(TimePartitionFilter filter) {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ storageGroupProcessor.removePartitions(filter);
+ }
+ }
+ }
+
+ /**
+ * push getWorkingStorageGroupPartitions operation down to all virtual storage group processors
+ */
+ public void getWorkingStorageGroupPartitions(String storageGroupName,
+ Map<String, List<Pair<Long, Boolean>>> res) {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ List<Pair<Long, Boolean>> partitionIdList = new ArrayList<>();
+ for (TsFileProcessor tsFileProcessor : storageGroupProcessor
+ .getWorkSequenceTsFileProcessors()) {
+ Pair<Long, Boolean> tmpPair = new Pair<>(tsFileProcessor.getTimeRangeId(), true);
+ partitionIdList.add(tmpPair);
+ }
+
+ for (TsFileProcessor tsFileProcessor : storageGroupProcessor
+ .getWorkUnsequenceTsFileProcessors()) {
+ Pair<Long, Boolean> tmpPair = new Pair<>(tsFileProcessor.getTimeRangeId(), false);
+ partitionIdList.add(tmpPair);
+ }
+
+ res.put(storageGroupName, partitionIdList);
+ }
+ }
+ }
+
+ /**
+ * only for test
+ */
+ public void reset() {
+ Arrays.fill(virtualStorageGroupProcessor, null);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index c9e8dcf..9f5d642 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualStorageGroupManager;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -104,7 +105,7 @@ public class StatMonitor implements StatMonitorMBean, IService {
TSRecord tsRecord = new TSRecord(insertTime, storageGroupSeries.getDevice());
tsRecord.addTuple(
new LongDataPoint(StatMeasurementConstants.TOTAL_POINTS.getMeasurement(),
- storageEngine.getProcessor(new PartialPath(storageGroupName)).getMonitorSeriesValue()));
+ storageEngine.getProcessorMap().get(new PartialPath(storageGroupName)).getMonitorSeriesValue()));
storageEngine.insert(new InsertRowPlan(tsRecord));
// update global monitor series
@@ -142,7 +143,7 @@ public class StatMonitor implements StatMonitorMBean, IService {
storageGroupPath.getFullPath());
TimeValuePair timeValuePair = getLastValue(monitorSeriesPath);
if (timeValuePair != null) {
- storageEngine.getProcessor(storageGroupPath)
+ storageEngine.getProcessorMap().get(storageGroupPath)
.setMonitorSeriesValue(timeValuePair.getValue().getLong());
}
}
@@ -216,8 +217,13 @@ public class StatMonitor implements StatMonitorMBean, IService {
@Override
public long getStorageGroupTotalPointsNum(String storageGroupName) {
try {
- return storageEngine.getProcessor(new PartialPath(storageGroupName)).getMonitorSeriesValue();
- } catch (StorageEngineException | IllegalPathException e) {
+ VirtualStorageGroupManager virtualStorageGroupManager = storageEngine.getProcessorMap().get(new PartialPath(storageGroupName));
+ if(virtualStorageGroupManager == null){
+ return 0;
+ }
+
+ return virtualStorageGroupManager.getMonitorSeriesValue();
+ } catch (IllegalPathException e) {
logger.error(e.getMessage());
return -1;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/virtualsg/DeviceMappingViewer.java b/server/src/main/java/org/apache/iotdb/db/tools/virtualsg/DeviceMappingViewer.java
new file mode 100644
index 0000000..b822a06
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/tools/virtualsg/DeviceMappingViewer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.virtualsg;
+
+import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.HashVirtualPartitioner;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+/**
+ * for DBA to view the mapping from device to virtual storage group ID
+ * usage: run this class with arguments [system_schema_dir], if args are not given, use default in config
+ */
+public class DeviceMappingViewer {
+
+ public static void main(String[] args) throws MetadataException {
+ // has schema log dir
+ if(args.length == 1){
+ IoTDBDescriptor.getInstance().getConfig().setSchemaDir(args[0]);
+ }
+
+ HashVirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
+ IoTDBDescriptor.getInstance().getConfig().setEnableMTreeSnapshot(false);
+ MManager mManager = MManager.getInstance();
+ mManager.init();
+
+ Set<PartialPath> partialPathSet = mManager.getDevices(new PartialPath("root.*"));
+
+ if(partialPathSet.isEmpty() && args.length == 1){
+ System.out.println("no mlog in given system schema dir: " + args[0] + " please have a check");
+ }
+ else{
+ System.out.println();
+ System.out.println("--------------------- mapping from device to virtual storage group ID ---------------------");
+ System.out.println("Format is: device name -> virtual storage group ID");
+ for(PartialPath partialPath : partialPathSet){
+ System.out.println(partialPath + " -> " + partitioner.deviceToVirtualStorageGroupId(partialPath));
+ }
+ }
+
+ mManager.clear();
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkMetadataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkMetadataCacheTest.java
index 7241405..908888d 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkMetadataCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkMetadataCacheTest.java
@@ -71,7 +71,7 @@ public class ChunkMetadataCacheTest {
EnvironmentUtils.envSetUp();
MetadataManagerHelper.initMetadata();
storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup,
- new DirectFlushPolicy());
+ new DirectFlushPolicy(), storageGroup);
insertData();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
index 56d7eae..61267c0 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
@@ -44,6 +44,7 @@ public class MergeLogTest extends MergeTest {
File tempSGDir;
+
@Before
public void setUp() throws IOException, WriteProcessException, MetadataException {
super.setUp();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
index 4cfc4b0..2959336 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index bcbafc5..a11fe3b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -49,6 +49,7 @@ public class MergeTaskTest extends MergeTest {
private File tempSGDir;
+
@Before
public void setUp() throws IOException, WriteProcessException, MetadataException, MetadataException {
super.setUp();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index 98c104d..d6a9092 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -61,13 +61,7 @@ import org.junit.Test;
public class DeletionFileNodeTest {
- private String processorName = "root.test";
-
private static String[] measurements = new String[10];
- private TSDataType dataType = TSDataType.DOUBLE;
- private TSEncoding encoding = TSEncoding.PLAIN;
-
- private int prevUnseqLevelNum = 0;
static {
for (int i = 0; i < 10; i++) {
@@ -75,6 +69,11 @@ public class DeletionFileNodeTest {
}
}
+ private String processorName = "root.test";
+ private TSDataType dataType = TSDataType.DOUBLE;
+ private TSEncoding encoding = TSEncoding.PLAIN;
+ private int prevUnseqLevelNum = 0;
+
@Before
public void setup() throws MetadataException {
prevUnseqLevelNum = IoTDBDescriptor.getInstance().getConfig().getUnseqLevelNum();
@@ -180,8 +179,12 @@ public class DeletionFileNodeTest {
assertTrue(directory.isDirectory());
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
- if (file.getPath().endsWith(ModificationFile.FILE_SUFFIX)) {
- modFiles.add(file);
+ if (file.isDirectory()) {
+ for (File tsfile : file.listFiles()) {
+ if (tsfile.getPath().endsWith(ModificationFile.FILE_SUFFIX)) {
+ modFiles.add(tsfile);
+ }
+ }
}
}
}
@@ -307,8 +310,12 @@ public class DeletionFileNodeTest {
assertTrue(directory.isDirectory());
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
- if (file.getPath().endsWith(ModificationFile.FILE_SUFFIX)) {
- modFiles.add(file);
+ if (file.isDirectory()) {
+ for (File tsfile : file.listFiles()) {
+ if (tsfile.getPath().endsWith(ModificationFile.FILE_SUFFIX)) {
+ modFiles.add(tsfile);
+ }
+ }
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 6fc7478..78dee2b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -63,6 +63,7 @@ public class StorageGroupProcessorTest {
private StorageGroupProcessor processor;
private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT;
+
@Before
public void setUp() throws Exception {
IoTDBDescriptor.getInstance().getConfig()
@@ -605,7 +606,7 @@ public class StorageGroupProcessorTest {
class DummySGP extends StorageGroupProcessor {
DummySGP(String systemInfoDir, String storageGroupName) throws StorageGroupProcessorException {
- super(systemInfoDir, storageGroupName, new TsFileFlushPolicy.DirectFlushPolicy());
+ super(systemInfoDir, storageGroupName, new TsFileFlushPolicy.DirectFlushPolicy(), storageGroupName);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 6bf57f1..6da63ad 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -112,7 +112,7 @@ public class TTLTest {
IoTDB.metaManager.setStorageGroup(new PartialPath(sg1));
IoTDB.metaManager.setStorageGroup(new PartialPath(sg2));
storageGroupProcessor = new StorageGroupProcessor(IoTDBDescriptor.getInstance().getConfig()
- .getSystemDir(), sg1, new DirectFlushPolicy());
+ .getSystemDir(), sg1, new DirectFlushPolicy(), sg1);
IoTDB.metaManager.createTimeseries(new PartialPath(g1s1), TSDataType.INT64, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED, Collections.emptyMap());
}
@@ -267,8 +267,12 @@ public class TTLTest {
for (File directory : seqDir.listFiles()) {
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
- if (file.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
- seqFiles.add(file);
+ if(file.isDirectory()){
+ for(File tsfile : file.listFiles()){
+ if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ seqFiles.add(file);
+ }
+ }
}
}
}
@@ -278,8 +282,12 @@ public class TTLTest {
for (File directory : unseqDir.listFiles()) {
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
- if (file.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
- unseqFiles.add(file);
+ if(file.isDirectory()){
+ for(File tsfile : file.listFiles()){
+ if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ unseqFiles.add(file);
+ }
+ }
}
}
}
@@ -301,8 +309,12 @@ public class TTLTest {
for (File directory : seqDir.listFiles()) {
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
- if (file.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
- seqFiles.add(file);
+ if(file.isDirectory()){
+ for(File tsfile : file.listFiles()){
+ if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ seqFiles.add(file);
+ }
+ }
}
}
}
@@ -312,8 +324,12 @@ public class TTLTest {
for (File directory : unseqDir.listFiles()) {
if (directory.isDirectory()) {
for (File file : directory.listFiles()) {
- if (file.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
- unseqFiles.add(file);
+ if(file.isDirectory()){
+ for(File tsfile : file.listFiles()){
+ if (tsfile.getPath().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ unseqFiles.add(file);
+ }
+ }
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitionerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitionerTest.java
new file mode 100644
index 0000000..8c0e0c3
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/HashVirtualPartitionerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Set;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HashVirtualPartitionerTest {
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ // init file dir
+ StorageEngine.getInstance();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void basicTest() throws IllegalPathException {
+ HashVirtualPartitioner hashVirtualPartitioner = HashVirtualPartitioner.getInstance();
+
+ // sg -> deviceId
+ HashMap<PartialPath, Set<PartialPath>> realMap = new HashMap<>();
+ PartialPath d1 = new PartialPath("root.sg1.d1");
+ PartialPath d2 = new PartialPath("root.sg1.d2");
+
+
+ int sg1 = hashVirtualPartitioner.deviceToVirtualStorageGroupId(d1);
+ int sg2 = hashVirtualPartitioner.deviceToVirtualStorageGroupId(d2);
+
+ assertEquals(sg1, Math.abs(d1.hashCode() % hashVirtualPartitioner.getPartitionCount()));
+ assertEquals(sg2, Math.abs(d2.hashCode() % hashVirtualPartitioner.getPartitionCount()));
+ }
+
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java
index f53b3f2..bbcecf9 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java
@@ -18,7 +18,9 @@
*/
package org.apache.iotdb.db.integration;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.HashVirtualPartitioner;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.After;
@@ -36,6 +38,8 @@ public class IoTDBCompleteIT {
@Before
public void setUp() throws Exception {
+ // test different partition
+ HashVirtualPartitioner.getInstance().setStorageGroupNum(16);
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.envSetUp();
}
@@ -43,6 +47,7 @@ public class IoTDBCompleteIT {
@After
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
+ HashVirtualPartitioner.getInstance().setStorageGroupNum(IoTDBDescriptor.getInstance().getConfig().getVirtualStorageGroupNum());
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index 154a501..b73d675 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.HashVirtualPartitioner;
import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -122,6 +123,7 @@ public class IoTDBLoadExternalTsfileIT {
private static final String TEST_D0_S0_STR = "root.test.d0.s0";
private static final String TEST_D0_S1_STR = "root.test.d0.s1";
private static final String TEST_D1_STR = "root.test.d1.g0.s0";
+ private static int virtualPartitionNum = 0;
private static String[] deleteSqls = new String[]{
"DELETE STORAGE GROUP root.vehicle",
@@ -132,8 +134,11 @@ public class IoTDBLoadExternalTsfileIT {
public void setUp() throws Exception {
IoTDBDescriptor.getInstance().getConfig()
.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ IoTDBDescriptor.getInstance().getConfig().setVirtualStorageGroupNum(1);
+ HashVirtualPartitioner.getInstance().setStorageGroupNum(1);
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.envSetUp();
+ virtualPartitionNum = IoTDBDescriptor.getInstance().getConfig().getVirtualStorageGroupNum();
Class.forName(Config.JDBC_DRIVER_NAME);
prepareData(insertSequenceSqls);
}
@@ -143,6 +148,8 @@ public class IoTDBLoadExternalTsfileIT {
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig()
.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ IoTDBDescriptor.getInstance().getConfig().setVirtualStorageGroupNum(virtualPartitionNum);
+ HashVirtualPartitioner.getInstance().setStorageGroupNum(virtualPartitionNum);
}
@Test
@@ -202,8 +209,8 @@ public class IoTDBLoadExternalTsfileIT {
StorageEngine.getInstance().getProcessor(new PartialPath("root.vehicle"))
.getSequenceFileTreeSet());
File tmpDir = new File(
- resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile(),
- "tmp" + File.separator + new PartialPath("root.vehicle") + File.separator + "0");
+ resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile().getParentFile(),
+ "tmp" + File.separator + new PartialPath("root.vehicle") + File.separator + "0" + File.separator + "0");
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
@@ -215,8 +222,8 @@ public class IoTDBLoadExternalTsfileIT {
resources = new ArrayList<>(
StorageEngine.getInstance().getProcessor(new PartialPath("root.test"))
.getSequenceFileTreeSet());
- tmpDir = new File(resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile(),
- "tmp" + File.separator + new PartialPath("root.test") + File.separator + "0");
+ tmpDir = new File(resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile().getParentFile(),
+ "tmp" + File.separator + new PartialPath("root.test") + File.separator + "0" + File.separator + "0");
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
@@ -225,7 +232,7 @@ public class IoTDBLoadExternalTsfileIT {
}
// load all tsfile in tmp dir
- tmpDir = new File(resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile(),
+ tmpDir = new File(resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile().getParentFile(),
"tmp");
statement.execute(String.format("load \"%s\"", tmpDir.getAbsolutePath()));
resources = new ArrayList<>(
@@ -237,8 +244,8 @@ public class IoTDBLoadExternalTsfileIT {
.getSequenceFileTreeSet());
assertEquals(2, resources.size());
assertNotNull(tmpDir.listFiles());
- assertEquals(0, new File(tmpDir, new PartialPath("root.vehicle") + File.separator + "0").listFiles().length);
- assertEquals(0, new File(tmpDir, new PartialPath("root.test") + File.separator + "0").listFiles().length);
+ assertEquals(0, new File(tmpDir, new PartialPath("root.vehicle") + File.separator + "0" + File.separator + "0").listFiles().length);
+ assertEquals(0, new File(tmpDir, new PartialPath("root.test") + File.separator + "0" + File.separator + "0").listFiles().length);
} catch (StorageEngineException | IllegalPathException e) {
Assert.fail();
}
@@ -393,8 +400,8 @@ public class IoTDBLoadExternalTsfileIT {
.getSequenceFileTreeSet());
File tmpDir = new File(
- resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile(),
- "tmp" + File.separator + "root.vehicle" + File.separator + "0");
+ resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile().getParentFile(),
+ "tmp" + File.separator + "root.vehicle" + File.separator + "0" + File.separator + "0");
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
@@ -406,8 +413,8 @@ public class IoTDBLoadExternalTsfileIT {
resources = new ArrayList<>(
StorageEngine.getInstance().getProcessor(new PartialPath("root.test"))
.getSequenceFileTreeSet());
- tmpDir = new File(resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile(),
- "tmp" + File.separator + "root.test" + File.separator + "0");
+ tmpDir = new File(resources.get(0).getTsFile().getParentFile().getParentFile().getParentFile().getParentFile(),
+ "tmp" + File.separator + "root.test" + File.separator + "0" + File.separator + "0");
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
@@ -448,7 +455,7 @@ public class IoTDBLoadExternalTsfileIT {
Assert.assertTrue(hasError);
// test load metadata automatically, it will succeed.
- tmpDir = tmpDir.getParentFile().getParentFile();
+ tmpDir = tmpDir.getParentFile().getParentFile().getParentFile();
statement.execute(String.format("load \"%s\" true 1", tmpDir.getAbsolutePath()));
resources = new ArrayList<>(
StorageEngine.getInstance().getProcessor(new PartialPath("root.vehicle"))
@@ -460,7 +467,7 @@ public class IoTDBLoadExternalTsfileIT {
assertEquals(2, resources.size());
assertEquals(2, tmpDir.listFiles().length);
for (File dir : tmpDir.listFiles()) {
- assertEquals(0, dir.listFiles()[0].listFiles().length);
+ assertEquals(0, dir.listFiles()[0].listFiles()[0].listFiles().length);
}
} catch (StorageEngineException | IllegalPathException e) {
e.printStackTrace();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiDeviceIT.java
new file mode 100644
index 0000000..a9511a4
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiDeviceIT.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Notice that, all test begins with "IoTDB" is integration test. All test which will start the
+ * IoTDB server should be defined as integration test.
+ */
+public class IoTDBMultiDeviceIT {
+
+ private static TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
+ private static int maxNumberOfPointsInPage;
+ private static int pageSizeInByte;
+ private static int groupSizeInByte;
+ private static long prevPartitionInterval;
+
+ @Before
+ public void setUp() throws Exception {
+
+ EnvironmentUtils.closeStatMonitor();
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+
+ // use small page setting
+ // origin value
+ maxNumberOfPointsInPage = tsFileConfig.getMaxNumberOfPointsInPage();
+ pageSizeInByte = tsFileConfig.getPageSizeInByte();
+ groupSizeInByte = tsFileConfig.getGroupSizeInByte();
+
+ // new value
+ tsFileConfig.setMaxNumberOfPointsInPage(1000);
+ tsFileConfig.setPageSizeInByte(1024 * 150);
+ tsFileConfig.setGroupSizeInByte(1024 * 1000);
+ IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 1000);
+ prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+ IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(100);
+ TSFileDescriptor.getInstance().getConfig().setCompressor("LZ4");
+
+ EnvironmentUtils.envSetUp();
+
+ insertData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // recovery value
+ tsFileConfig.setMaxNumberOfPointsInPage(maxNumberOfPointsInPage);
+ tsFileConfig.setPageSizeInByte(pageSizeInByte);
+ tsFileConfig.setGroupSizeInByte(groupSizeInByte);
+ EnvironmentUtils.cleanEnv();
+ IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval);
+ IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
+ TSFileDescriptor.getInstance().getConfig().setCompressor("SNAPPY");
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ }
+
+ private static void insertData()
+ throws ClassNotFoundException, SQLException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : TestConstant.create_sql) {
+ statement.execute(sql);
+ }
+
+ statement.execute("SET STORAGE GROUP TO root.fans");
+ statement.execute("CREATE TIMESERIES root.fans.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE");
+ statement.execute("CREATE TIMESERIES root.fans.d1.s0 WITH DATATYPE=INT32, ENCODING=RLE");
+ statement.execute("CREATE TIMESERIES root.fans.d2.s0 WITH DATATYPE=INT32, ENCODING=RLE");
+ statement.execute("CREATE TIMESERIES root.fans.d3.s0 WITH DATATYPE=INT32, ENCODING=RLE");
+ statement.execute("CREATE TIMESERIES root.car.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE");
+ statement.execute("CREATE TIMESERIES root.car.d1.s1 WITH DATATYPE=INT64, ENCODING=RLE");
+ statement.execute("CREATE TIMESERIES root.car.d2.s1 WITH DATATYPE=INT64, ENCODING=RLE");
+
+
+
+ // insert of data time range :0-1000 into fans
+ for (int time = 0; time < 1000; time++) {
+
+ String sql = String
+ .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ }
+
+ // insert large amount of data time range : 13700 ~ 24000
+ for (int time = 13700; time < 24000; time++) {
+
+ String sql = String
+ .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ }
+
+ // insert large amount of data time range : 3000 ~ 13600
+ for (int time = 3000; time < 13600; time++) {
+ // System.out.println("===" + time);
+ String sql = String
+ .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ }
+
+ statement.execute("flush");
+ statement.execute("merge");
+
+ // unsequential data, memory data
+ for (int time = 10000; time < 11000; time++) {
+
+ String sql = String
+ .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ }
+
+ // sequential data, memory data
+ for (int time = 200000; time < 201000; time++) {
+
+ String sql = String
+ .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", time, time % 70);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", time, time % 40);
+ statement.execute(sql);
+ }
+
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // "select * from root.vehicle" : test select wild data
+ @Test
+ public void selectAllTest() throws ClassNotFoundException {
+ String selectSql = "select * from root";
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet = statement.execute(selectSql);
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int cnt = 0;
+ long before = -1;
+ while (resultSet.next()) {
+ long cur = Long.parseLong(resultSet.getString(TestConstant.TIMESTAMP_STR));
+ if(cur <= before){
+ fail("time order wrong!");
+ }
+ before = cur;
+ cnt++;
+ }
+ assertEquals(22900, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // "select * from root.vehicle" : test select wild data
+ @Test
+ public void selectAfterDeleteTest() throws ClassNotFoundException {
+ String selectSql = "select * from root";
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ statement.execute("DELETE FROM root.fans.* WHERE time <= 1000");
+ statement.execute("DELETE FROM root.car.* WHERE time <= 1000");
+ statement.execute("DELETE FROM root.fans.* WHERE time >= 200500 and time < 201000");
+ statement.execute("DELETE FROM root.car.* WHERE time >= 200500 and time < 201000");
+
+
+ boolean hasResultSet = statement.execute(selectSql);
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int cnt = 0;
+ long before = -1;
+ while (resultSet.next()) {
+ long cur = Long.parseLong(resultSet.getString(TestConstant.TIMESTAMP_STR));
+ if(cur <= before){
+ fail("time order wrong!");
+ }
+ before = cur;
+ cnt++;
+ }
+ assertEquals(21400, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
index 4a23e84..beedb98 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -28,15 +28,21 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class IoTDBRestartIT {
+ private final Logger logger = LoggerFactory.getLogger(IoTDBRestartIT.class);
+
+
@Test
public void testRestart()
throws SQLException, ClassNotFoundException, IOException, StorageEngineException {
@@ -111,8 +117,18 @@ public class IoTDBRestartIT {
statement.execute("insert into root.turbine.d1(timestamp,s1) values(3,3)");
}
+ long time = 0;
try {
EnvironmentUtils.restartDaemon();
+ StorageEngine.getInstance().recover();
+ // wait for recover
+ while(!StorageEngine.getInstance().isAllSgReady()){
+ Thread.sleep(500);
+ time += 500;
+ if(time > 10000){
+ logger.warn("wait too long in restart, wait for: " + time / 1000 + "s");
+ }
+ }
} catch (Exception e) {
Assert.fail();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/monitor/IoTDBStatMonitorTest.java b/server/src/test/java/org/apache/iotdb/db/monitor/IoTDBStatMonitorTest.java
index 7df3d4f..fa60fdc 100644
--- a/server/src/test/java/org/apache/iotdb/db/monitor/IoTDBStatMonitorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/monitor/IoTDBStatMonitorTest.java
@@ -26,6 +26,7 @@ import java.sql.ResultSet;
import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -34,11 +35,14 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is a integration test for StatMonitor.
*/
public class IoTDBStatMonitorTest {
+ private static final Logger logger = LoggerFactory.getLogger(IoTDBStatMonitorTest.class);
private StatMonitor statMonitor;
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -59,7 +63,9 @@ public class IoTDBStatMonitorTest {
config.setEnableMonitorSeriesWrite(true);
EnvironmentUtils.envSetUp();
statMonitor = StatMonitor.getInstance();
- statMonitor.initMonitorSeriesInfo();
+ if (statMonitor.globalSeries.isEmpty()){
+ statMonitor.initMonitorSeriesInfo();
+ }
insertSomeData();
}
@@ -77,6 +83,19 @@ public class IoTDBStatMonitorTest {
// restart server
EnvironmentUtils.restartDaemon();
+ long time = 0;
+ while(!StorageEngine.getInstance().isAllSgReady()){
+ Thread.sleep(500);
+ time += 500;
+
+ if(time > 10000){
+ logger.warn("wait for sg ready for : " + (time / 1000) + " s");
+ }
+
+ if(time > 30000){
+ throw new IllegalStateException("wait too long in IoTDBStatMonitorTest");
+ }
+ }
recoveryTest();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
index f14b096..c9c7c74 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -39,6 +38,7 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.HashVirtualPartitioner;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -66,6 +66,7 @@ public class FileLoaderTest {
public void setUp()
throws IOException, InterruptedException, StartupException, DiskSpaceInsufficientException, MetadataException {
IoTDBDescriptor.getInstance().getConfig().setSyncEnable(true);
+ HashVirtualPartitioner.getInstance().setStorageGroupNum(1);
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.envSetUp();
dataDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile())
@@ -85,10 +86,13 @@ public class FileLoaderTest {
public void tearDown() throws InterruptedException, IOException, StorageEngineException {
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig().setSyncEnable(false);
+ HashVirtualPartitioner.getInstance()
+ .setStorageGroupNum(IoTDBDescriptor.getInstance().getConfig().getVirtualStorageGroupNum());
}
@Test
- public void loadNewTsfiles() throws IOException, StorageEngineException, IllegalPathException, InterruptedException {
+ public void loadNewTsfiles()
+ throws IOException, StorageEngineException, IllegalPathException, InterruptedException {
fileLoader = FileLoader.createFileLoader(getReceiverFolderFile());
Map<String, List<File>> allFileList = new HashMap<>();
Map<String, Set<String>> correctSequenceLoadedFileMap = new HashMap<>();
@@ -102,9 +106,10 @@ public class FileLoaderTest {
correctSequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
String rand = String.valueOf(r.nextInt(10000));
String fileName =
- getSnapshotFolder() + File.separator + SG_NAME + i + File.separator
+ getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + "0" + File.separator + "0" + File.separator
+ (time + i * 100 + j) + IoTDBConstant.FILE_NAME_SEPARATOR + rand
+ IoTDBConstant.FILE_NAME_SEPARATOR + "0.tsfile";
+
File syncFile = new File(fileName);
File dataFile = new File(
syncFile.getParentFile().getParentFile().getParentFile().getParentFile()
@@ -137,7 +142,8 @@ public class FileLoaderTest {
}
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
+ StorageGroupProcessor processor = StorageEngine.getInstance()
+ .getProcessor(new PartialPath(SG_NAME + i));
assertTrue(processor.getSequenceFileTreeSet().isEmpty());
assertTrue(processor.getUnSequenceFileList().isEmpty());
}
@@ -146,6 +152,7 @@ public class FileLoaderTest {
for (List<File> set : allFileList.values()) {
for (File newTsFile : set) {
if (!newTsFile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) {
+ LOGGER.error("sync file name is" + newTsFile.getAbsolutePath());
fileLoader.addTsfile(newTsFile);
}
}
@@ -169,7 +176,8 @@ public class FileLoaderTest {
assertFalse(new File(getReceiverFolderFile(), SyncConstant.RECEIVER_DATA_FOLDER_NAME).exists());
Map<String, Set<String>> sequenceLoadedFileMap = new HashMap<>();
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
+ StorageGroupProcessor processor = StorageEngine.getInstance()
+ .getProcessor(new PartialPath(SG_NAME + i));
sequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
assertEquals(10, processor.getSequenceFileTreeSet().size());
for (TsFileResource tsFileResource : processor.getSequenceFileTreeSet()) {
@@ -201,9 +209,11 @@ public class FileLoaderTest {
correctLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
String rand = String.valueOf(r.nextInt(10000));
String fileName =
- getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + (time + i * 100
+ getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + "0"
+ + File.separator + "0" + File.separator + (time + i * 100
+ j) + IoTDBConstant.FILE_NAME_SEPARATOR + rand
+ IoTDBConstant.FILE_NAME_SEPARATOR + "0.tsfile";
+ LOGGER.error("file name is" + fileName);
File syncFile = new File(fileName);
File dataFile = new File(
@@ -211,7 +221,8 @@ public class FileLoaderTest {
syncFile.getParentFile().getName() + File.separator + syncFile.getName());
File loadDataFile = new File(
DirectoryManager.getInstance().getNextFolderForSequenceFile(),
- syncFile.getParentFile().getName() + File.separator + fromTimeToTimePartition(i)
+ syncFile.getParentFile().getParentFile().getParentFile().getName() + File.separator
+ + "0" + File.separator + fromTimeToTimePartition(i)
+ File.separator + syncFile.getName());
correctLoadedFileMap.get(SG_NAME + i).add(loadDataFile.getAbsolutePath());
allFileList.get(SG_NAME + i).add(syncFile);
@@ -236,7 +247,8 @@ public class FileLoaderTest {
}
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
+ StorageGroupProcessor processor = StorageEngine.getInstance()
+ .getProcessor(new PartialPath(SG_NAME + i));
assertTrue(processor.getSequenceFileTreeSet().isEmpty());
assertTrue(processor.getUnSequenceFileList().isEmpty());
}
@@ -245,6 +257,7 @@ public class FileLoaderTest {
for (List<File> set : allFileList.values()) {
for (File newTsFile : set) {
if (!newTsFile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) {
+ LOGGER.error("sync file name is" + newTsFile.getAbsolutePath());
fileLoader.addTsfile(newTsFile);
}
}
@@ -268,7 +281,8 @@ public class FileLoaderTest {
assertFalse(new File(getReceiverFolderFile(), SyncConstant.RECEIVER_DATA_FOLDER_NAME).exists());
Map<String, Set<String>> loadedFileMap = new HashMap<>();
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
+ StorageGroupProcessor processor = StorageEngine.getInstance()
+ .getProcessor(new PartialPath(SG_NAME + i));
loadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
assertEquals(25, processor.getSequenceFileTreeSet().size());
for (TsFileResource tsFileResource : processor.getSequenceFileTreeSet()) {
@@ -293,7 +307,8 @@ public class FileLoaderTest {
if (!snapFile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) {
File dataFile = new File(
DirectoryManager.getInstance().getNextFolderForSequenceFile() + File.separator
- + snapFile.getParentFile().getName(), "0" + File.separator + snapFile.getName());
+ + snapFile.getParentFile().getParentFile().getParentFile().getName(),
+ "0" + File.separator + "0" + File.separator + snapFile.getName());
correctLoadedFileMap.get(sg).remove(dataFile.getAbsolutePath());
snapFile.delete();
fileLoader.addDeletedFileName(snapFile);
@@ -322,7 +337,8 @@ public class FileLoaderTest {
loadedFileMap.clear();
for (int i = 0; i < 3; i++) {
- StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
+ StorageGroupProcessor processor = StorageEngine.getInstance()
+ .getProcessor(new PartialPath(SG_NAME + i));
loadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
for (TsFileResource tsFileResource : processor.getSequenceFileTreeSet()) {
loadedFileMap.get(SG_NAME + i).add(tsFileResource.getTsFile().getAbsolutePath());
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
index 1488277..f1f7515 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
@@ -111,7 +111,7 @@ public class SyncReceiverLogAnalyzerTest {
correctSequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
String rand = String.valueOf(r.nextInt(10000) + i * j);
String fileName =
- getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + System
+ getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + "0" + File.separator + "0" + File.separator + System
.currentTimeMillis() + IoTDBConstant.FILE_NAME_SEPARATOR + rand
+ IoTDBConstant.FILE_NAME_SEPARATOR + "0.tsfile";
Thread.sleep(1);
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 2dafafd..0cd8f96 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -302,6 +302,9 @@ public class EnvironmentUtils {
}
// create storage group
createDir(config.getSystemDir());
+ // create sg dir
+ String sgDir = FilePathUtils.regularizePath(config.getSystemDir()) + "storage_groups";
+ createDir(sgDir);
// create wal
createDir(config.getWalDir());
// create query