You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/03/31 02:57:23 UTC
[iotdb] branch rel/0.11 updated: [To rel/0.11] Add level compaction
to merge command (#2948)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new eb9c591 [To rel/0.11] Add level compaction to merge command (#2948)
eb9c591 is described below
commit eb9c5918a92aab3dd6f9e88a674b45b9a0dc6eab
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Wed Mar 31 10:57:00 2021 +0800
[To rel/0.11] Add level compaction to merge command (#2948)
cherry-picked from master(0.12.0-SNAPSHOT)
---
.../db/engine/compaction/TsFileManagement.java | 141 ++-
.../level/LevelCompactionTsFileManagement.java | 2 -
.../engine/storagegroup/StorageGroupProcessor.java | 1252 +++++++++++---------
.../storagegroup/StorageGroupProcessorTest.java | 154 ++-
4 files changed, 844 insertions(+), 705 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 0522066..afda967 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -55,9 +55,7 @@ public abstract class TsFileManagement {
protected String storageGroupName;
protected String storageGroupDir;
- /**
- * Serialize queries, delete resource files, compaction cleanup files
- */
+ /** Serialize queries, delete resource files, compaction cleanup files */
private final ReadWriteLock compactionMergeLock = new ReentrantReadWriteLock();
public volatile boolean isUnseqMerging = false;
@@ -67,76 +65,57 @@ public abstract class TsFileManagement {
* be invisible at this moment, without this, deletion/update during merge could be lost.
*/
public ModificationFile mergingModification;
+
private long mergeStartTime;
+ protected boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig().isForceFullMerge();
+
public TsFileManagement(String storageGroupName, String storageGroupDir) {
this.storageGroupName = storageGroupName;
this.storageGroupDir = storageGroupDir;
}
- /**
- * get the TsFile list in sequence
- */
+ public void setForceFullMerge(boolean forceFullMerge) {
+ isForceFullMerge = forceFullMerge;
+ }
+
+ /** get the TsFile list in sequence */
public abstract List<TsFileResource> getTsFileList(boolean sequence);
- /**
- * get the TsFile list iterator in sequence
- */
+ /** get the TsFile list iterator in sequence */
public abstract Iterator<TsFileResource> getIterator(boolean sequence);
- /**
- * remove one TsFile from list
- */
+ /** remove one TsFile from list */
public abstract void remove(TsFileResource tsFileResource, boolean sequence);
- /**
- * remove some TsFiles from list
- */
+ /** remove some TsFiles from list */
public abstract void removeAll(List<TsFileResource> tsFileResourceList, boolean sequence);
- /**
- * add one TsFile to list
- */
+ /** add one TsFile to list */
public abstract void add(TsFileResource tsFileResource, boolean sequence);
- /**
- * add one TsFile to list for recover
- */
+ /** add one TsFile to list for recover */
public abstract void addRecover(TsFileResource tsFileResource, boolean sequence);
- /**
- * add some TsFiles to list
- */
+ /** add some TsFiles to list */
public abstract void addAll(List<TsFileResource> tsFileResourceList, boolean sequence);
- /**
- * is one TsFile contained in list
- */
+ /** is one TsFile contained in list */
public abstract boolean contains(TsFileResource tsFileResource, boolean sequence);
- /**
- * clear list
- */
+ /** clear list */
public abstract void clear();
- /**
- * is the list empty
- */
+ /** is the list empty */
public abstract boolean isEmpty(boolean sequence);
- /**
- * return TsFile list size
- */
+ /** return TsFile list size */
public abstract int size(boolean sequence);
- /**
- * recover TsFile list
- */
+ /** recover TsFile list */
public abstract void recover();
- /**
- * fork current TsFile list (call this before merge)
- */
+ /** fork current TsFile list (call this before merge) */
public abstract void forkCurrentFileList(long timePartition) throws IOException;
public void readLock() {
@@ -166,8 +145,8 @@ public abstract class TsFileManagement {
private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
private long timePartitionId;
- public CompactionMergeTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack,
- long timePartitionId) {
+ public CompactionMergeTask(
+ CloseCompactionMergeCallBack closeCompactionMergeCallBack, long timePartitionId) {
this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
this.timePartitionId = timePartitionId;
}
@@ -194,11 +173,16 @@ public abstract class TsFileManagement {
}
}
- public void merge(boolean fullMerge, List<TsFileResource> seqMergeList,
- List<TsFileResource> unSeqMergeList, long dataTTL) {
+ public void merge(
+ boolean fullMerge,
+ List<TsFileResource> seqMergeList,
+ List<TsFileResource> unSeqMergeList,
+ long dataTTL) {
if (isUnseqMerging) {
if (logger.isInfoEnabled()) {
- logger.info("{} Last merge is ongoing, currently consumed time: {}ms", storageGroupName,
+ logger.info(
+ "{} Last merge is ongoing, currently consumed time: {}ms",
+ storageGroupName,
(System.currentTimeMillis() - mergeStartTime));
}
return;
@@ -235,8 +219,8 @@ public abstract class TsFileManagement {
try {
List[] mergeFiles = fileSelector.select();
if (mergeFiles.length == 0) {
- logger.info("{} cannot select merge candidates under the budget {}", storageGroupName,
- budget);
+ logger.info(
+ "{} cannot select merge candidates under the budget {}", storageGroupName, budget);
isUnseqMerging = false;
return;
}
@@ -255,15 +239,25 @@ public abstract class TsFileManagement {
}
mergeStartTime = System.currentTimeMillis();
- MergeTask mergeTask = new MergeTask(mergeResource, storageGroupDir,
- this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(),
- storageGroupName);
- mergingModification = new ModificationFile(
- storageGroupDir + File.separator + MERGING_MODIFICATION_FILE_NAME);
+ MergeTask mergeTask =
+ new MergeTask(
+ mergeResource,
+ storageGroupDir,
+ this::mergeEndAction,
+ taskName,
+ fullMerge,
+ fileSelector.getConcurrentMergeNum(),
+ storageGroupName);
+ mergingModification =
+ new ModificationFile(storageGroupDir + File.separator + MERGING_MODIFICATION_FILE_NAME);
MergeManager.getINSTANCE().submitMainTask(mergeTask);
if (logger.isInfoEnabled()) {
- logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
- storageGroupName, taskName, mergeFiles[0].size(), mergeFiles[1].size());
+ logger.info(
+ "{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
+ storageGroupName,
+ taskName,
+ mergeFiles[0].size(),
+ mergeFiles[1].size());
}
} catch (MergeException | IOException e) {
@@ -283,9 +277,7 @@ public abstract class TsFileManagement {
}
}
- /**
- * acquire the write locks of the resource , the merge lock and the compaction lock
- */
+ /** acquire the write locks of the resource , the merge lock and the compaction lock */
private void doubleWriteLock(TsFileResource seqFile) {
boolean fileLockGot;
boolean compactionLockGot;
@@ -307,9 +299,7 @@ public abstract class TsFileManagement {
}
}
- /**
- * release the write locks of the resource , the merge lock and the compaction lock
- */
+ /** release the write locks of the resource , the merge lock and the compaction lock */
private void doubleWriteUnlock(TsFileResource seqFile) {
writeUnlock();
seqFile.writeUnlock();
@@ -351,13 +341,16 @@ public abstract class TsFileManagement {
try {
seqFile.getModFile().close();
} catch (IOException e) {
- logger
- .error("Cannot close the ModificationFile {}", seqFile.getModFile().getFilePath(), e);
+ logger.error(
+ "Cannot close the ModificationFile {}", seqFile.getModFile().getFilePath(), e);
}
}
} catch (IOException e) {
- logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
- seqFile.getTsFile(), e);
+ logger.error(
+ "{} cannot clean the ModificationFile of {} after merge",
+ storageGroupName,
+ seqFile.getTsFile(),
+ e);
}
}
@@ -372,8 +365,8 @@ public abstract class TsFileManagement {
}
}
- public void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
- File mergeLog) {
+ public void mergeEndAction(
+ List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, File mergeLog) {
logger.info("{} a merge task is ending...", storageGroupName);
if (unseqFiles.isEmpty()) {
@@ -392,14 +385,14 @@ public abstract class TsFileManagement {
try {
updateMergeModification(seqFile);
if (i == seqFiles.size() - 1) {
- //FIXME if there is an exception, the the modification file will be not closed.
+ // FIXME if there is an exception, the the modification file will be not closed.
removeMergingModification();
isUnseqMerging = false;
Files.delete(mergeLog.toPath());
}
} catch (IOException e) {
- logger.error("{} a merge task ends but cannot delete log {}", storageGroupName,
- mergeLog.toPath());
+ logger.error(
+ "{} a merge task ends but cannot delete log {}", storageGroupName, mergeLog.toPath());
} finally {
doubleWriteUnlock(seqFile);
}
@@ -410,10 +403,8 @@ public abstract class TsFileManagement {
// ({systemTime}-{versionNum}-{mergeNum}.tsfile)
public static int compareFileName(File o1, File o2) {
- String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "")
- .split(FILE_NAME_SEPARATOR);
- String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "")
- .split(FILE_NAME_SEPARATOR);
+ String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
+ String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
long ver1 = Long.parseLong(items1[0]);
long ver2 = Long.parseLong(items2[0]);
int cmp = Long.compare(ver1, ver2);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index fbc7ef4..0cc543e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -76,8 +76,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
private final boolean enableUnseqCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
- private final boolean isForceFullMerge =
- IoTDBDescriptor.getInstance().getConfig().isForceFullMerge();
// First map is partition list; Second list is level list; Third list is file list in level;
private final Map<Long, List<SortedSet<TsFileResource>>> sequenceTsFileResources =
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 344f914..9c9db9f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -101,22 +101,22 @@ import org.slf4j.LoggerFactory;
/**
* For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
- * TsFileProcessor in the working status. <br/>
- * <p>
- * There are two situations to set the working TsFileProcessor to closing status:<br/>
- * <p>
- * (1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or
- * shouldClose())<br/>
- * <p>
- * (2) someone calls syncCloseAllWorkingTsFileProcessors(). (up to now, only flush command from cli
- * will call this method)<br/>
- * <p>
- * UnSequence data has the similar process as above.
- * <p>
- * When a sequence TsFileProcessor is submitted to be flushed, the updateLatestFlushTimeCallback()
- * method will be called as a callback.<br/>
- * <p>
- * When a TsFileProcessor is closed, the closeUnsealedTsFileProcessorCallBack() method will be
+ * TsFileProcessor in the working status. <br>
+ *
+ * <p>There are two situations to set the working TsFileProcessor to closing status:<br>
+ *
+ * <p>(1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or
+ * shouldClose())<br>
+ *
+ * <p>(2) someone calls syncCloseAllWorkingTsFileProcessors(). (up to now, only flush command from
+ * cli will call this method)<br>
+ *
+ * <p>UnSequence data has the similar process as above.
+ *
+ * <p>When a sequence TsFileProcessor is submitted to be flushed, the
+ * updateLatestFlushTimeCallback() method will be called as a callback.<br>
+ *
+ * <p>When a TsFileProcessor is closed, the closeUnsealedTsFileProcessorCallBack() method will be
* called as a callback.
*/
public class StorageGroupProcessor {
@@ -136,13 +136,9 @@ public class StorageGroupProcessor {
private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class);
private final boolean enableMemControl = config.isEnableMemControl();
- /**
- * indicating the file to be loaded already exists locally.
- */
+ /** indicating the file to be loaded already exists locally. */
private static final int POS_ALREADY_EXIST = -2;
- /**
- * indicating the file to be loaded overlap with some files.
- */
+ /** indicating the file to be loaded overlap with some files. */
private static final int POS_OVERLAP = -3;
/**
* a read write lock for guaranteeing concurrent safety when accessing all fields in this class
@@ -151,35 +147,29 @@ public class StorageGroupProcessor {
* partitionLatestFlushedTimeForEachDevice)
*/
private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
- /**
- * closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done.
- */
+ /** closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done. */
private final Object closeStorageGroupCondition = new Object();
/**
* avoid some tsfileResource is changed (e.g., from unsealed to sealed) when a query is executed.
*/
private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
- /**
- * time partition id in the storage group -> tsFileProcessor for this time partition
- */
+ /** time partition id in the storage group -> tsFileProcessor for this time partition */
private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>();
- /**
- * time partition id in the storage group -> tsFileProcessor for this time partition
- */
+ /** time partition id in the storage group -> tsFileProcessor for this time partition */
private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>();
- /**
- * compactionMergeWorking is used to wait for last compaction to be done.
- */
+ /** compactionMergeWorking is used to wait for last compaction to be done. */
private volatile boolean compactionMergeWorking = false;
// upgrading sequence TsFile resource list
private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
- private CopyOnReadLinkedList<TsFileProcessor> closingSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
+ private CopyOnReadLinkedList<TsFileProcessor> closingSequenceTsFileProcessor =
+ new CopyOnReadLinkedList<>();
// upgrading unsequence TsFile resource list
private List<TsFileResource> upgradeUnseqFileList = new LinkedList<>();
- private CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
+ private CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor =
+ new CopyOnReadLinkedList<>();
/*
* time partition id -> map, which contains
* device -> global latest timestamp of each device latestTimeForEachDevice caches non-flushed
@@ -196,16 +186,17 @@ public class StorageGroupProcessor {
*/
private Map<Long, Map<String, Long>> partitionLatestFlushedTimeForEachDevice = new HashMap<>();
- /**
- * used to record the latest flush time while upgrading and inserting
- */
- private Map<Long, Map<String, Long>> newlyFlushedPartitionLatestFlushedTimeForEachDevice = new HashMap<>();
+ /** used to record the latest flush time while upgrading and inserting */
+ private Map<Long, Map<String, Long>> newlyFlushedPartitionLatestFlushedTimeForEachDevice =
+ new HashMap<>();
/**
* global mapping of device -> largest timestamp of the latest memtable to * be submitted to
* asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to maintain global
- * latestFlushedTime of devices and will be updated along with partitionLatestFlushedTimeForEachDevice
+ * latestFlushedTime of devices and will be updated along with
+ * partitionLatestFlushedTimeForEachDevice
*/
private Map<String, Long> globalLatestFlushedTimeForEachDevice = new HashMap<>();
+
private String storageGroupName;
private File storageGroupSysDir;
@@ -224,6 +215,7 @@ public class StorageGroupProcessor {
* eventually removed.
*/
private long dataTTL = Long.MAX_VALUE;
+
private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
private TsFileFlushPolicy fileFlushPolicy;
@@ -260,24 +252,27 @@ public class StorageGroupProcessor {
private List<CloseFileListener> customCloseFileListeners = Collections.emptyList();
private List<FlushListener> customFlushListeners = Collections.emptyList();
- public StorageGroupProcessor(String systemDir, String storageGroupName,
- TsFileFlushPolicy fileFlushPolicy) throws StorageGroupProcessorException {
+ public StorageGroupProcessor(
+ String systemDir, String storageGroupName, TsFileFlushPolicy fileFlushPolicy)
+ throws StorageGroupProcessorException {
this.storageGroupName = storageGroupName;
this.fileFlushPolicy = fileFlushPolicy;
storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, storageGroupName);
if (storageGroupSysDir.mkdirs()) {
- logger.info("Storage Group system Directory {} doesn't exist, create it",
+ logger.info(
+ "Storage Group system Directory {} doesn't exist, create it",
storageGroupSysDir.getPath());
} else if (!storageGroupSysDir.exists()) {
- logger.error("create Storage Group system Directory {} failed",
- storageGroupSysDir.getPath());
+ logger.error("create Storage Group system Directory {} failed", storageGroupSysDir.getPath());
}
- this.tsFileManagement = IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
- .getTsFileManagement(storageGroupName, storageGroupSysDir.getAbsolutePath());
+ this.tsFileManagement =
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCompactionStrategy()
+ .getTsFileManagement(storageGroupName, storageGroupSysDir.getAbsolutePath());
recover();
-
}
private Map<Long, List<TsFileResource>> splitResourcesByPartition(
@@ -294,23 +289,23 @@ public class StorageGroupProcessor {
try {
// collect candidate TsFiles from sequential and unsequential data directory
- Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair = getAllFiles(
- DirectoryManager.getInstance().getAllSequenceFileFolders());
+ Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair =
+ getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders());
List<TsFileResource> tmpSeqTsFiles = seqTsFilesPair.left;
List<TsFileResource> oldSeqTsFiles = seqTsFilesPair.right;
upgradeSeqFileList.addAll(oldSeqTsFiles);
- Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair = getAllFiles(
- DirectoryManager.getInstance().getAllUnSequenceFileFolders());
+ Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair =
+ getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
List<TsFileResource> tmpUnseqTsFiles = unseqTsFilesPair.left;
List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
upgradeUnseqFileList.addAll(oldUnseqTsFiles);
// split by partition so that we can find the last file of each partition and decide to
// close it or not
- Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles = splitResourcesByPartition(
- tmpSeqTsFiles);
- Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles = splitResourcesByPartition(
- tmpUnseqTsFiles);
+ Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
+ splitResourcesByPartition(tmpSeqTsFiles);
+ Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
+ splitResourcesByPartition(tmpUnseqTsFiles);
for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
recoverTsFiles(value, true);
}
@@ -319,33 +314,38 @@ public class StorageGroupProcessor {
}
String taskName = storageGroupName + "-" + System.currentTimeMillis();
- File mergingMods = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir,
- MERGING_MODIFICATION_FILE_NAME);
+ File mergingMods =
+ SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, MERGING_MODIFICATION_FILE_NAME);
if (mergingMods.exists()) {
this.tsFileManagement.mergingModification = new ModificationFile(mergingMods.getPath());
}
- RecoverMergeTask recoverMergeTask = new RecoverMergeTask(
- new ArrayList<>(tsFileManagement.getTsFileList(true)),
- tsFileManagement.getTsFileList(false), storageGroupSysDir.getPath(),
- tsFileManagement::mergeEndAction,
- taskName,
- IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
+ RecoverMergeTask recoverMergeTask =
+ new RecoverMergeTask(
+ new ArrayList<>(tsFileManagement.getTsFileList(true)),
+ tsFileManagement.getTsFileList(false),
+ storageGroupSysDir.getPath(),
+ tsFileManagement::mergeEndAction,
+ taskName,
+ IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(),
+ storageGroupName);
logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
- recoverMergeTask
- .recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
+ recoverMergeTask.recoverMerge(
+ IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
mergingMods.delete();
}
recoverCompaction();
for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
long partitionNum = resource.getTimePartition();
- partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+ partitionDirectFileVersions
+ .computeIfAbsent(partitionNum, p -> new HashSet<>())
.addAll(resource.getHistoricalVersions());
updatePartitionFileVersion(partitionNum, resource.getMaxVersion());
}
for (TsFileResource resource : tsFileManagement.getTsFileList(false)) {
long partitionNum = resource.getTimePartition();
- partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+ partitionDirectFileVersions
+ .computeIfAbsent(partitionNum, p -> new HashSet<>())
.addAll(resource.getHistoricalVersions());
updatePartitionFileVersion(partitionNum, resource.getMaxVersion());
}
@@ -363,7 +363,8 @@ public class StorageGroupProcessor {
long endTime = resource.getEndTime(index);
endTimeMap.put(deviceId, endTime);
}
- latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
+ latestTimeForEachDevice
+ .computeIfAbsent(timePartitionId, l -> new HashMap<>())
.putAll(endTimeMap);
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(timePartitionId, id -> new HashMap<>())
@@ -385,8 +386,7 @@ public class StorageGroupProcessor {
logger.error("{} compaction submit task failed", storageGroupName);
}
} else {
- logger.error("{} compaction pool not started ,recover failed",
- storageGroupName);
+ logger.error("{} compaction pool not started ,recover failed", storageGroupName);
}
}
@@ -404,8 +404,8 @@ public class StorageGroupProcessor {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private void updateLatestFlushedTime() throws IOException {
- VersionController versionController = new SimpleFileVersionController(
- storageGroupSysDir.getPath());
+ VersionController versionController =
+ new SimpleFileVersionController(storageGroupSysDir.getPath());
long currentVersion = versionController.currVersion();
for (TsFileResource resource : upgradeSeqFileList) {
for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) {
@@ -413,27 +413,31 @@ public class StorageGroupProcessor {
int index = entry.getValue();
long endTime = resource.getEndTime(index);
long endTimePartitionId = StorageEngine.getTimePartition(endTime);
- latestTimeForEachDevice.computeIfAbsent(endTimePartitionId, l -> new HashMap<>())
+ latestTimeForEachDevice
+ .computeIfAbsent(endTimePartitionId, l -> new HashMap<>())
.put(deviceId, endTime);
globalLatestFlushedTimeForEachDevice.put(deviceId, endTime);
// set all the covered partition's LatestFlushedTime to Long.MAX_VALUE
long partitionId = StorageEngine.getTimePartition(resource.getStartTime(index));
while (partitionId <= endTimePartitionId) {
- partitionLatestFlushedTimeForEachDevice.computeIfAbsent(partitionId, l -> new HashMap<>())
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(partitionId, l -> new HashMap<>())
.put(deviceId, Long.MAX_VALUE);
if (!timePartitionIdVersionControllerMap.containsKey(partitionId)) {
- File directory = SystemFileFactory.INSTANCE
- .getFile(storageGroupSysDir, String.valueOf(partitionId));
+ File directory =
+ SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, String.valueOf(partitionId));
if (!directory.exists()) {
directory.mkdirs();
}
- File versionFile = SystemFileFactory.INSTANCE
- .getFile(directory, SimpleFileVersionController.FILE_PREFIX + currentVersion);
+ File versionFile =
+ SystemFileFactory.INSTANCE.getFile(
+ directory, SimpleFileVersionController.FILE_PREFIX + currentVersion);
if (!versionFile.createNewFile()) {
logger.warn("Version file {} has already been created ", versionFile);
}
- timePartitionIdVersionControllerMap.put(partitionId,
+ timePartitionIdVersionControllerMap.put(
+ partitionId,
new SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId));
}
partitionId++;
@@ -449,7 +453,8 @@ public class StorageGroupProcessor {
* @return version controller
*/
private VersionController getVersionControllerByTimePartitionId(long timePartitionId) {
- return timePartitionIdVersionControllerMap.computeIfAbsent(timePartitionId,
+ return timePartitionIdVersionControllerMap.computeIfAbsent(
+ timePartitionId,
id -> {
try {
return new SimpleFileVersionController(storageGroupSysDir.getPath(), timePartitionId);
@@ -480,22 +485,20 @@ public class StorageGroupProcessor {
// the process was interrupted before the merged files could be named
continueFailedRenames(fileFolder, MERGE_SUFFIX);
- File[] oldTsfileArray = fsFactory
- .listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
- File[] oldResourceFileArray = fsFactory
- .listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX);
- File[] oldModificationFileArray = fsFactory
- .listFilesBySuffix(fileFolder.getAbsolutePath(), ModificationFile.FILE_SUFFIX);
+ File[] oldTsfileArray =
+ fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
+ File[] oldResourceFileArray =
+ fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX);
+ File[] oldModificationFileArray =
+ fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), ModificationFile.FILE_SUFFIX);
File upgradeFolder = fsFactory.getFile(fileFolder, IoTDBConstant.UPGRADE_FOLDER_NAME);
// move the old files to upgrade folder if exists
if (oldTsfileArray.length != 0 || oldResourceFileArray.length != 0) {
// create upgrade directory if not exist
if (upgradeFolder.mkdirs()) {
- logger.info("Upgrade Directory {} doesn't exist, create it",
- upgradeFolder.getPath());
+ logger.info("Upgrade Directory {} doesn't exist, create it", upgradeFolder.getPath());
} else if (!upgradeFolder.exists()) {
- logger.error("Create upgrade Directory {} failed",
- upgradeFolder.getPath());
+ logger.error("Create upgrade Directory {} failed", upgradeFolder.getPath());
}
// move .tsfile to upgrade folder
for (File file : oldTsfileArray) {
@@ -516,12 +519,14 @@ public class StorageGroupProcessor {
}
}
- Collections.addAll(upgradeFiles,
+ Collections.addAll(
+ upgradeFiles,
fsFactory.listFilesBySuffix(upgradeFolder.getAbsolutePath(), TSFILE_SUFFIX));
}
- // if already move old files to upgradeFolder
+ // if already move old files to upgradeFolder
else if (upgradeFolder.exists()) {
- Collections.addAll(upgradeFiles,
+ Collections.addAll(
+ upgradeFiles,
fsFactory.listFilesBySuffix(upgradeFolder.getAbsolutePath(), TSFILE_SUFFIX));
}
@@ -531,20 +536,22 @@ public class StorageGroupProcessor {
if (!partitionFolder.isDirectory()) {
logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());
} else if (!partitionFolder.getName().equals(IoTDBConstant.UPGRADE_FOLDER_NAME)) {
- // some TsFileResource may be being persisted when the system crashed, try recovering such
+ // some TsFileResource may be being persisted when the system crashed, try recovering
+ // such
// resources
continueFailedRenames(partitionFolder, TEMP_SUFFIX);
- // some TsFiles were going to be replaced by the merged files when the system crashed and
+ // some TsFiles were going to be replaced by the merged files when the system crashed
+ // and
// the process was interrupted before the merged files could be named
continueFailedRenames(partitionFolder, MERGE_SUFFIX);
- Collections.addAll(tsFiles,
+ Collections.addAll(
+ tsFiles,
fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX));
}
}
}
-
}
tsFiles.sort(this::compareFileName);
List<TsFileResource> ret = new ArrayList<>();
@@ -579,10 +586,13 @@ public class StorageGroupProcessor {
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
long timePartitionId = tsFileResource.getTimePartition();
- TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(
- storageGroupName + FILE_NAME_SEPARATOR,
- getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, isSeq,
- i == tsFiles.size() - 1);
+ TsFileRecoverPerformer recoverPerformer =
+ new TsFileRecoverPerformer(
+ storageGroupName + FILE_NAME_SEPARATOR,
+ getVersionControllerByTimePartitionId(timePartitionId),
+ tsFileResource,
+ isSeq,
+ i == tsFiles.size() - 1);
RestorableTsFileIOWriter writer;
try {
@@ -600,8 +610,8 @@ public class StorageGroupProcessor {
writer = recoverPerformer.recover(true);
}
} catch (StorageGroupProcessorException e) {
- logger.warn("Skip TsFile: {} because of error in recover: ", tsFileResource.getTsFilePath(),
- e);
+ logger.warn(
+ "Skip TsFile: {} because of error in recover: ", tsFileResource.getTsFilePath(), e);
continue;
}
@@ -612,29 +622,41 @@ public class StorageGroupProcessor {
// the last file is not closed, continue writing to in
TsFileProcessor tsFileProcessor;
if (isSeq) {
- tsFileProcessor = new TsFileProcessor(storageGroupName, storageGroupInfo, tsFileResource,
- getVersionControllerByTimePartitionId(timePartitionId),
- this::closeUnsealedTsFileProcessorCallBack, this::updateLatestFlushTimeCallback,
- true, writer);
+ tsFileProcessor =
+ new TsFileProcessor(
+ storageGroupName,
+ storageGroupInfo,
+ tsFileResource,
+ getVersionControllerByTimePartitionId(timePartitionId),
+ this::closeUnsealedTsFileProcessorCallBack,
+ this::updateLatestFlushTimeCallback,
+ true,
+ writer);
if (enableMemControl) {
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
- tsFileProcessorInfo.addTSPMemCost(tsFileProcessor
- .getTsFileResource().calculateRamSize());
+ tsFileProcessorInfo.addTSPMemCost(
+ tsFileProcessor.getTsFileResource().calculateRamSize());
}
workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
} else {
- tsFileProcessor = new TsFileProcessor(storageGroupName, storageGroupInfo, tsFileResource,
- getVersionControllerByTimePartitionId(timePartitionId),
- this::closeUnsealedTsFileProcessorCallBack, this::unsequenceFlushCallback, false,
- writer);
+ tsFileProcessor =
+ new TsFileProcessor(
+ storageGroupName,
+ storageGroupInfo,
+ tsFileResource,
+ getVersionControllerByTimePartitionId(timePartitionId),
+ this::closeUnsealedTsFileProcessorCallBack,
+ this::unsequenceFlushCallback,
+ false,
+ writer);
if (enableMemControl) {
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
- tsFileProcessorInfo.addTSPMemCost(tsFileProcessor
- .getTsFileResource().calculateRamSize());
+ tsFileProcessorInfo.addTSPMemCost(
+ tsFileProcessor.getTsFileResource().calculateRamSize());
}
workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
}
@@ -661,10 +683,8 @@ public class StorageGroupProcessor {
// ({systemTime}-{versionNum}-{mergeNum}.tsfile)
private int compareFileName(File o1, File o2) {
- String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "")
- .split(FILE_NAME_SEPARATOR);
- String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "")
- .split(FILE_NAME_SEPARATOR);
+ String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
+ String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
long ver1 = Long.parseLong(items1[0]);
long ver2 = Long.parseLong(items2[0]);
int cmp = Long.compare(ver1, ver2);
@@ -688,16 +708,18 @@ public class StorageGroupProcessor {
// init map
long timePartitionId = StorageEngine.getTimePartition(insertRowPlan.getTime());
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>());
+ partitionLatestFlushedTimeForEachDevice.computeIfAbsent(
+ timePartitionId, id -> new HashMap<>());
boolean isSequence =
- insertRowPlan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
- .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
-
- //is unsequence and user set config to discard out of order data
- if (!isSequence && IoTDBDescriptor.getInstance().getConfig()
- .isEnableDiscardOutOfOrderData()) {
+ insertRowPlan.getTime()
+ > partitionLatestFlushedTimeForEachDevice
+ .get(timePartitionId)
+ .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+
+ // is unsequence and user set config to discard out of order data
+ if (!isSequence
+ && IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
return;
}
@@ -740,8 +762,10 @@ public class StorageGroupProcessor {
long currTime = insertTabletPlan.getTimes()[loc];
// skip points that do not satisfy TTL
if (!isAlive(currTime)) {
- results[loc] = RpcUtils.getStatus(TSStatusCode.OUT_OF_TTL_ERROR,
- "time " + currTime + " in current line is out of TTL: " + dataTTL);
+ results[loc] =
+ RpcUtils.getStatus(
+ TSStatusCode.OUT_OF_TTL_ERROR,
+ "time " + currTime + " in current line is out of TTL: " + dataTTL);
loc++;
noFailure = false;
} else {
@@ -755,12 +779,13 @@ public class StorageGroupProcessor {
// before is first start point
int before = loc;
// before time partition
- long beforeTimePartition = StorageEngine
- .getTimePartition(insertTabletPlan.getTimes()[before]);
+ long beforeTimePartition =
+ StorageEngine.getTimePartition(insertTabletPlan.getTimes()[before]);
// init map
- long lastFlushTime = partitionLatestFlushedTimeForEachDevice.
- computeIfAbsent(beforeTimePartition, id -> new HashMap<>()).
- computeIfAbsent(insertTabletPlan.getDeviceId().getFullPath(), id -> Long.MIN_VALUE);
+ long lastFlushTime =
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(beforeTimePartition, id -> new HashMap<>())
+ .computeIfAbsent(insertTabletPlan.getDeviceId().getFullPath(), id -> Long.MIN_VALUE);
// if is sequence
boolean isSequence = false;
while (loc < insertTabletPlan.getRowCount()) {
@@ -769,18 +794,21 @@ public class StorageGroupProcessor {
// start next partition
if (curTimePartition != beforeTimePartition) {
// insert last time partition
- if (isSequence || !IoTDBDescriptor.getInstance().getConfig()
- .isEnableDiscardOutOfOrderData()) {
- noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence,
- results,
- beforeTimePartition) && noFailure;
+ if (isSequence
+ || !IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
+ noFailure =
+ insertTabletToTsFileProcessor(
+ insertTabletPlan, before, loc, isSequence, results, beforeTimePartition)
+ && noFailure;
}
// re initialize
before = loc;
beforeTimePartition = curTimePartition;
- lastFlushTime = partitionLatestFlushedTimeForEachDevice.
- computeIfAbsent(beforeTimePartition, id -> new HashMap<>()).
- computeIfAbsent(insertTabletPlan.getDeviceId().getFullPath(), id -> Long.MIN_VALUE);
+ lastFlushTime =
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(beforeTimePartition, id -> new HashMap<>())
+ .computeIfAbsent(
+ insertTabletPlan.getDeviceId().getFullPath(), id -> Long.MIN_VALUE);
isSequence = false;
}
// still in this partition
@@ -790,8 +818,9 @@ public class StorageGroupProcessor {
// insert into unsequence and then start sequence
if (!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
noFailure =
- insertTabletToTsFileProcessor(insertTabletPlan, before, loc, false, results,
- beforeTimePartition) && noFailure;
+ insertTabletToTsFileProcessor(
+ insertTabletPlan, before, loc, false, results, beforeTimePartition)
+ && noFailure;
}
before = loc;
isSequence = true;
@@ -801,13 +830,17 @@ public class StorageGroupProcessor {
}
// do not forget last part
- if (before < loc && (isSequence || !IoTDBDescriptor.getInstance().getConfig()
- .isEnableDiscardOutOfOrderData())) {
- noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence,
- results, beforeTimePartition) && noFailure;
- }
- long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
- insertTabletPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+ if (before < loc
+ && (isSequence
+ || !IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData())) {
+ noFailure =
+ insertTabletToTsFileProcessor(
+ insertTabletPlan, before, loc, isSequence, results, beforeTimePartition)
+ && noFailure;
+ }
+ long globalLatestFlushedTime =
+ globalLatestFlushedTimeForEachDevice.getOrDefault(
+ insertTabletPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime);
if (!noFailure) {
@@ -818,9 +851,7 @@ public class StorageGroupProcessor {
}
}
- /**
- * @return whether the given time falls in ttl
- */
+ /** @return whether the given time falls in ttl */
private boolean isAlive(long time) {
return dataTTL == Long.MAX_VALUE || (System.currentTimeMillis() - time) <= dataTTL;
}
@@ -837,8 +868,13 @@ public class StorageGroupProcessor {
* @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true otherwise
*/
- private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
- int start, int end, boolean sequence, TSStatus[] results, long timePartitionId) {
+ private boolean insertTabletToTsFileProcessor(
+ InsertTabletPlan insertTabletPlan,
+ int start,
+ int end,
+ boolean sequence,
+ TSStatus[] results,
+ long timePartitionId) {
// return when start >= end
if (start >= end) {
return true;
@@ -847,8 +883,10 @@ public class StorageGroupProcessor {
TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
if (tsFileProcessor == null) {
for (int i = start; i < end; i++) {
- results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
- "can not create TsFileProcessor, timePartitionId: " + timePartitionId);
+ results[i] =
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR,
+ "can not create TsFileProcessor, timePartitionId: " + timePartitionId);
}
return false;
}
@@ -865,10 +903,13 @@ public class StorageGroupProcessor {
latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>());
// try to update the latest time of the device of this tsRecord
- if (sequence && latestTimeForEachDevice.get(timePartitionId)
- .getOrDefault(insertTabletPlan.getDeviceId().getFullPath(), Long.MIN_VALUE)
- < insertTabletPlan.getTimes()[end - 1]) {
- latestTimeForEachDevice.get(timePartitionId)
+ if (sequence
+ && latestTimeForEachDevice
+ .get(timePartitionId)
+ .getOrDefault(insertTabletPlan.getDeviceId().getFullPath(), Long.MIN_VALUE)
+ < insertTabletPlan.getTimes()[end - 1]) {
+ latestTimeForEachDevice
+ .get(timePartitionId)
.put(insertTabletPlan.getDeviceId().getFullPath(), insertTabletPlan.getTimes()[end - 1]);
}
@@ -890,14 +931,18 @@ public class StorageGroupProcessor {
}
// Update cached last value with high priority
if (mNodes[i] != null) {
- // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to update last cache
- IoTDB.metaManager.updateLastCache(null,
- plan.composeLastTimeValuePair(i), true, latestFlushedTime, mNodes[i]);
+ // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to
+ // update last cache
+ IoTDB.metaManager.updateLastCache(
+ null, plan.composeLastTimeValuePair(i), true, latestFlushedTime, mNodes[i]);
} else {
// measurementMNodes[i] is null, use the path to update remote cache
- IoTDB.metaManager
- .updateLastCache(plan.getDeviceId().concatNode(plan.getMeasurements()[i]),
- plan.composeLastTimeValuePair(i), true, latestFlushedTime, null);
+ IoTDB.metaManager.updateLastCache(
+ plan.getDeviceId().concatNode(plan.getMeasurements()[i]),
+ plan.composeLastTimeValuePair(i),
+ true,
+ latestFlushedTime,
+ null);
}
}
}
@@ -915,15 +960,18 @@ public class StorageGroupProcessor {
tsFileProcessor.insert(insertRowPlan);
// try to update the latest time of the device of this tsRecord
- if (latestTimeForEachDevice.get(timePartitionId)
- .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE) < insertRowPlan
- .getTime()) {
- latestTimeForEachDevice.get(timePartitionId)
+ if (latestTimeForEachDevice
+ .get(timePartitionId)
+ .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE)
+ < insertRowPlan.getTime()) {
+ latestTimeForEachDevice
+ .get(timePartitionId)
.put(insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getTime());
}
- long globalLatestFlushTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
- insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+ long globalLatestFlushTime =
+ globalLatestFlushedTimeForEachDevice.getOrDefault(
+ insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
tryToUpdateInsertLastCache(insertRowPlan, globalLatestFlushTime);
@@ -944,13 +992,17 @@ public class StorageGroupProcessor {
}
// Update cached last value with high priority
if (mNodes[i] != null) {
- // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to update last cache
- IoTDB.metaManager.updateLastCache(null,
- plan.composeTimeValuePair(i), true, latestFlushedTime, mNodes[i]);
+ // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to
+ // update last cache
+ IoTDB.metaManager.updateLastCache(
+ null, plan.composeTimeValuePair(i), true, latestFlushedTime, mNodes[i]);
} else {
- IoTDB.metaManager
- .updateLastCache(plan.getDeviceId().concatNode(plan.getMeasurements()[i]),
- plan.composeTimeValuePair(i), true, latestFlushedTime, null);
+ IoTDB.metaManager.updateLastCache(
+ plan.getDeviceId().concatNode(plan.getMeasurements()[i]),
+ plan.composeTimeValuePair(i),
+ true,
+ latestFlushedTime,
+ null);
}
}
}
@@ -958,8 +1010,8 @@ public class StorageGroupProcessor {
public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) {
writeLock();
try {
- if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) &&
- !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+ if (!closingSequenceTsFileProcessor.contains(tsFileProcessor)
+ && !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
}
} finally {
@@ -971,11 +1023,11 @@ public class StorageGroupProcessor {
TsFileProcessor tsFileProcessor = null;
try {
if (sequence) {
- tsFileProcessor = getOrCreateTsFileProcessorIntern(timeRangeId,
- workSequenceTsFileProcessors, true);
+ tsFileProcessor =
+ getOrCreateTsFileProcessorIntern(timeRangeId, workSequenceTsFileProcessors, true);
} else {
- tsFileProcessor = getOrCreateTsFileProcessorIntern(timeRangeId,
- workUnsequenceTsFileProcessors, false);
+ tsFileProcessor =
+ getOrCreateTsFileProcessorIntern(timeRangeId, workUnsequenceTsFileProcessors, false);
}
} catch (DiskSpaceInsufficientException e) {
logger.error(
@@ -983,9 +1035,8 @@ public class StorageGroupProcessor {
e);
IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
} catch (IOException e) {
- logger
- .error("meet IOException when creating TsFileProcessor, change system mode to read-only",
- e);
+ logger.error(
+ "meet IOException when creating TsFileProcessor, change system mode to read-only", e);
IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
}
return tsFileProcessor;
@@ -998,9 +1049,8 @@ public class StorageGroupProcessor {
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
* @param sequence whether is sequence or not
*/
- private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
- TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
- boolean sequence)
+ private TsFileProcessor getOrCreateTsFileProcessorIntern(
+ long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap, boolean sequence)
throws IOException, DiskSpaceInsufficientException {
TsFileProcessor res;
@@ -1015,7 +1065,8 @@ public class StorageGroupProcessor {
Map.Entry<Long, TsFileProcessor> processorEntry = tsFileProcessorTreeMap.firstEntry();
logger.info(
"will close a {} TsFile because too many active partitions ({} > {}) in the storage group {},",
- sequence, tsFileProcessorTreeMap.size(),
+ sequence,
+ tsFileProcessorTreeMap.size(),
IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition(),
storageGroupName);
asyncCloseOneTsFileProcessor(sequence, processorEntry.getValue());
@@ -1038,7 +1089,6 @@ public class StorageGroupProcessor {
return res;
}
-
private TsFileProcessor createTsFileProcessor(boolean sequence, long timePartitionId)
throws IOException, DiskSpaceInsufficientException {
String baseDir;
@@ -1050,37 +1100,49 @@ public class StorageGroupProcessor {
fsFactory.getFile(baseDir, storageGroupName).mkdirs();
String filePath =
- baseDir + File.separator + storageGroupName + File.separator + timePartitionId
+ baseDir
+ + File.separator
+ + storageGroupName
+ + File.separator
+ + timePartitionId
+ File.separator
+ getNewTsFileName(timePartitionId);
TsFileProcessor tsFileProcessor;
VersionController versionController = getVersionControllerByTimePartitionId(timePartitionId);
if (sequence) {
- tsFileProcessor = new TsFileProcessor(storageGroupName,
- fsFactory.getFileWithParent(filePath), storageGroupInfo,
- versionController, this::closeUnsealedTsFileProcessorCallBack,
- this::updateLatestFlushTimeCallback, true,
- partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
+ tsFileProcessor =
+ new TsFileProcessor(
+ storageGroupName,
+ fsFactory.getFileWithParent(filePath),
+ storageGroupInfo,
+ versionController,
+ this::closeUnsealedTsFileProcessorCallBack,
+ this::updateLatestFlushTimeCallback,
+ true,
+ partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
if (enableMemControl) {
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
- tsFileProcessorInfo.addTSPMemCost(tsFileProcessor
- .getTsFileResource().calculateRamSize());
+ tsFileProcessorInfo.addTSPMemCost(tsFileProcessor.getTsFileResource().calculateRamSize());
}
} else {
- tsFileProcessor = new TsFileProcessor(storageGroupName,
- fsFactory.getFileWithParent(filePath), storageGroupInfo,
- versionController, this::closeUnsealedTsFileProcessorCallBack,
- this::unsequenceFlushCallback, false,
- partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
+ tsFileProcessor =
+ new TsFileProcessor(
+ storageGroupName,
+ fsFactory.getFileWithParent(filePath),
+ storageGroupInfo,
+ versionController,
+ this::closeUnsealedTsFileProcessorCallBack,
+ this::unsequenceFlushCallback,
+ false,
+ partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
if (enableMemControl) {
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
- tsFileProcessorInfo.addTSPMemCost(tsFileProcessor
- .getTsFileResource().calculateRamSize());
+ tsFileProcessorInfo.addTSPMemCost(tsFileProcessor.getTsFileResource().calculateRamSize());
}
}
tsFileProcessor.addCloseFileListeners(customCloseFileListeners);
@@ -1103,8 +1165,7 @@ public class StorageGroupProcessor {
}
private String getNewTsFileName(long time, long version, int mergeCnt) {
- return time + FILE_NAME_SEPARATOR + version
- + FILE_NAME_SEPARATOR + mergeCnt + TSFILE_SUFFIX;
+ return time + FILE_NAME_SEPARATOR + version + FILE_NAME_SEPARATOR + mergeCnt + TSFILE_SUFFIX;
}
public void syncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
@@ -1116,31 +1177,33 @@ public class StorageGroupProcessor {
|| closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
closeStorageGroupCondition.wait(60_000);
if (System.currentTimeMillis() - startTime > 60_000) {
- logger
- .warn("{} has spent {}s to wait for closing one tsfile.", this.storageGroupName,
- (System.currentTimeMillis() - startTime) / 1000);
+ logger.warn(
+ "{} has spent {}s to wait for closing one tsfile.",
+ this.storageGroupName,
+ (System.currentTimeMillis() - startTime) / 1000);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger
- .error("syncCloseOneTsFileProcessor error occurs while waiting for closing the storage "
- + "group {}", storageGroupName, e);
+ logger.error(
+ "syncCloseOneTsFileProcessor error occurs while waiting for closing the storage "
+ + "group {}",
+ storageGroupName,
+ e);
}
}
}
- /**
- * thread-safety should be ensured by caller
- */
+ /** thread-safety should be ensured by caller */
public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
- //for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
- //for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
- if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
- closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+ // for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
+ // for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
+ if (closingSequenceTsFileProcessor.contains(tsFileProcessor)
+ || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
return;
}
- logger.info("Async close tsfile: {}",
+ logger.info(
+ "Async close tsfile: {}",
tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
if (sequence) {
closingSequenceTsFileProcessor.add(tsFileProcessor);
@@ -1148,7 +1211,8 @@ public class StorageGroupProcessor {
tsFileProcessor.asyncClose();
workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
- // if unsequence files don't contain this time range id, we should remove it's version controller
+ // if unsequence files don't contain this time range id, we should remove it's version
+ // controller
if (!workUnsequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) {
timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
}
@@ -1158,16 +1222,15 @@ public class StorageGroupProcessor {
tsFileProcessor.asyncClose();
workUnsequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
- // if sequence files don't contain this time range id, we should remove it's version controller
+ // if sequence files don't contain this time range id, we should remove it's version
+ // controller
if (!workSequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) {
timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
}
}
}
- /**
- * delete the storageGroup's own folder in folder data/system/storage_groups
- */
+ /** delete the storageGroup's own folder in folder data/system/storage_groups */
public void deleteFolder(String systemDir) {
logger.info("{} will close all files for deleting data folder {}", storageGroupName, systemDir);
writeLock();
@@ -1205,16 +1268,17 @@ public class StorageGroupProcessor {
logger.info("{} will close all files for deleting data files", storageGroupName);
writeLock();
syncCloseAllWorkingTsFileProcessors();
- //normally, mergingModification is just need to be closed by after a merge task is finished.
- //we close it here just for IT test.
+ // normally, mergingModification is just need to be closed by after a merge task is finished.
+ // we close it here just for IT test.
if (this.tsFileManagement.mergingModification != null) {
try {
this.tsFileManagement.mergingModification.close();
} catch (IOException e) {
- logger.error("Cannot close the mergingMod file {}",
- this.tsFileManagement.mergingModification.getFilePath(), e);
+ logger.error(
+ "Cannot close the mergingMod file {}",
+ this.tsFileManagement.mergingModification.getFilePath(),
+ e);
}
-
}
try {
closeAllResources();
@@ -1246,9 +1310,7 @@ public class StorageGroupProcessor {
}
}
- /**
- * Iterate each TsFile and try to lock and remove those out of TTL.
- */
+ /** Iterate each TsFile and try to lock and remove those out of TTL. */
public synchronized void checkFilesTTL() {
if (dataTTL == Long.MAX_VALUE) {
logger.debug("{}: TTL not set, ignore the check", storageGroupName);
@@ -1272,7 +1334,8 @@ public class StorageGroupProcessor {
}
private void checkFileTTL(TsFileResource resource, long timeLowerBound, boolean isSeq) {
- if (resource.isMerging() || !resource.isClosed()
+ if (resource.isMerging()
+ || !resource.isClosed()
|| !resource.isDeleted() && resource.stillLives(timeLowerBound)) {
return;
}
@@ -1293,8 +1356,11 @@ public class StorageGroupProcessor {
// physical removal
resource.remove();
if (logger.isInfoEnabled()) {
- logger.info("Removed a file {} before {} by ttl ({}ms)", resource.getTsFilePath(),
- new Date(timeLowerBound), dataTTL);
+ logger.info(
+ "Removed a file {} before {} by ttl ({}ms)",
+ resource.getTsFilePath(),
+ new Date(timeLowerBound),
+ dataTTL);
}
tsFileManagement.remove(resource, isSeq);
} finally {
@@ -1306,25 +1372,28 @@ public class StorageGroupProcessor {
}
}
- /**
- * This method will be blocked until all tsfile processors are closed.
- */
+ /** This method will be blocked until all tsfile processors are closed. */
public void syncCloseAllWorkingTsFileProcessors() {
synchronized (closeStorageGroupCondition) {
try {
asyncCloseAllWorkingTsFileProcessors();
long startTime = System.currentTimeMillis();
- while (!closingSequenceTsFileProcessor.isEmpty() || !closingUnSequenceTsFileProcessor
- .isEmpty()) {
+ while (!closingSequenceTsFileProcessor.isEmpty()
+ || !closingUnSequenceTsFileProcessor.isEmpty()) {
closeStorageGroupCondition.wait(60_000);
if (System.currentTimeMillis() - startTime > 60_000) {
- logger.warn("{} has spent {}s to wait for closing all TsFiles.", this.storageGroupName,
+ logger.warn(
+ "{} has spent {}s to wait for closing all TsFiles.",
+ this.storageGroupName,
(System.currentTimeMillis() - startTime) / 1000);
}
}
} catch (InterruptedException e) {
- logger.error("CloseFileNodeCondition error occurs while waiting for closing the storage "
- + "group {}", storageGroupName, e);
+ logger.error(
+ "CloseFileNodeCondition error occurs while waiting for closing the storage "
+ + "group {}",
+ storageGroupName,
+ e);
Thread.currentThread().interrupt();
}
}
@@ -1335,13 +1404,13 @@ public class StorageGroupProcessor {
try {
logger.info("async force close all files in storage group: {}", storageGroupName);
// to avoid concurrent modification problem, we need a new array list
- for (TsFileProcessor tsFileProcessor : new ArrayList<>(
- workSequenceTsFileProcessors.values())) {
+ for (TsFileProcessor tsFileProcessor :
+ new ArrayList<>(workSequenceTsFileProcessors.values())) {
asyncCloseOneTsFileProcessor(true, tsFileProcessor);
}
// to avoid concurrent modification problem, we need a new array list
- for (TsFileProcessor tsFileProcessor : new ArrayList<>(
- workUnsequenceTsFileProcessors.values())) {
+ for (TsFileProcessor tsFileProcessor :
+ new ArrayList<>(workUnsequenceTsFileProcessors.values())) {
asyncCloseOneTsFileProcessor(false, tsFileProcessor);
}
} finally {
@@ -1354,13 +1423,13 @@ public class StorageGroupProcessor {
try {
logger.info("force close all processors in storage group: {}", storageGroupName);
// to avoid concurrent modification problem, we need a new array list
- for (TsFileProcessor tsFileProcessor : new ArrayList<>(
- workSequenceTsFileProcessors.values())) {
+ for (TsFileProcessor tsFileProcessor :
+ new ArrayList<>(workSequenceTsFileProcessors.values())) {
tsFileProcessor.putMemTableBackAndClose();
}
// to avoid concurrent modification problem, we need a new array list
- for (TsFileProcessor tsFileProcessor : new ArrayList<>(
- workUnsequenceTsFileProcessors.values())) {
+ for (TsFileProcessor tsFileProcessor :
+ new ArrayList<>(workUnsequenceTsFileProcessors.values())) {
tsFileProcessor.putMemTableBackAndClose();
}
} finally {
@@ -1369,18 +1438,34 @@ public class StorageGroupProcessor {
}
// TODO need a read lock, please consider the concurrency with flush manager threads.
- public QueryDataSource query(PartialPath deviceId, String measurementId, QueryContext context,
- QueryFileManager filePathsManager, Filter timeFilter) throws QueryProcessException {
+ public QueryDataSource query(
+ PartialPath deviceId,
+ String measurementId,
+ QueryContext context,
+ QueryFileManager filePathsManager,
+ Filter timeFilter)
+ throws QueryProcessException {
insertLock.readLock().lock();
try {
- List<TsFileResource> seqResources = getFileResourceListForQuery(
- tsFileManagement.getTsFileList(true),
- upgradeSeqFileList, deviceId, measurementId, context, timeFilter, true);
- List<TsFileResource> unseqResources = getFileResourceListForQuery(
- tsFileManagement.getTsFileList(false),
- upgradeUnseqFileList, deviceId, measurementId, context, timeFilter, false);
- QueryDataSource dataSource = new QueryDataSource(deviceId,
- seqResources, unseqResources);
+ List<TsFileResource> seqResources =
+ getFileResourceListForQuery(
+ tsFileManagement.getTsFileList(true),
+ upgradeSeqFileList,
+ deviceId,
+ measurementId,
+ context,
+ timeFilter,
+ true);
+ List<TsFileResource> unseqResources =
+ getFileResourceListForQuery(
+ tsFileManagement.getTsFileList(false),
+ upgradeUnseqFileList,
+ deviceId,
+ measurementId,
+ context,
+ timeFilter,
+ false);
+ QueryDataSource dataSource = new QueryDataSource(deviceId, seqResources, unseqResources);
// used files should be added before mergeLock is unlocked, or they may be deleted by
// running merge
// is null only in tests
@@ -1404,32 +1489,40 @@ public class StorageGroupProcessor {
insertLock.writeLock().unlock();
}
-
/**
* @param tsFileResources includes sealed and unsealed tsfile resources
* @return fill unsealed tsfile resources with memory data and ChunkMetadataList of data in disk
*/
private List<TsFileResource> getFileResourceListForQuery(
- Collection<TsFileResource> tsFileResources, List<TsFileResource> upgradeTsFileResources,
- PartialPath deviceId, String measurementId, QueryContext context, Filter timeFilter,
+ Collection<TsFileResource> tsFileResources,
+ List<TsFileResource> upgradeTsFileResources,
+ PartialPath deviceId,
+ String measurementId,
+ QueryContext context,
+ Filter timeFilter,
boolean isSeq)
throws MetadataException {
if (context.isDebug()) {
- DEBUG_LOGGER
- .info("Path: {}.{}, get tsfile list: {} isSeq: {} timefilter: {}", deviceId.getFullPath(),
- measurementId, tsFileResources, isSeq, (timeFilter == null ? "null" : timeFilter));
+ DEBUG_LOGGER.info(
+ "Path: {}.{}, get tsfile list: {} isSeq: {} timefilter: {}",
+ deviceId.getFullPath(),
+ measurementId,
+ tsFileResources,
+ isSeq,
+ (timeFilter == null ? "null" : timeFilter));
}
MeasurementSchema schema = IoTDB.metaManager.getSeriesSchema(deviceId, measurementId);
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
- long timeLowerBound = dataTTL != Long.MAX_VALUE ? System.currentTimeMillis() - dataTTL : Long
- .MIN_VALUE;
+ long timeLowerBound =
+ dataTTL != Long.MAX_VALUE ? System.currentTimeMillis() - dataTTL : Long.MIN_VALUE;
context.setQueryTimeLowerBound(timeLowerBound);
for (TsFileResource tsFileResource : tsFileResources) {
- if (!isTsFileResourceSatisfied(tsFileResource, deviceId.getFullPath(), timeFilter, isSeq, context.isDebug())) {
+ if (!isTsFileResourceSatisfied(
+ tsFileResource, deviceId.getFullPath(), timeFilter, isSeq, context.isDebug())) {
continue;
}
closeQueryLock.readLock().lock();
@@ -1437,10 +1530,16 @@ public class StorageGroupProcessor {
if (tsFileResource.isClosed()) {
tsfileResourcesForQuery.add(tsFileResource);
} else {
- tsFileResource.getUnsealedFileProcessor()
- .query(deviceId.getFullPath(), measurementId, schema.getType(),
+ tsFileResource
+ .getUnsealedFileProcessor()
+ .query(
+ deviceId.getFullPath(),
+ measurementId,
+ schema.getType(),
schema.getEncodingType(),
- schema.getProps(), context, tsfileResourcesForQuery);
+ schema.getProps(),
+ context,
+ tsfileResourcesForQuery);
}
} catch (IOException e) {
throw new MetadataException(e);
@@ -1450,7 +1549,8 @@ public class StorageGroupProcessor {
}
// for upgrade files and old files must be closed
for (TsFileResource tsFileResource : upgradeTsFileResources) {
- if (!isTsFileResourceSatisfied(tsFileResource, deviceId.getFullPath(), timeFilter, isSeq, context.isDebug())) {
+ if (!isTsFileResourceSatisfied(
+ tsFileResource, deviceId.getFullPath(), timeFilter, isSeq, context.isDebug())) {
continue;
}
closeQueryLock.readLock().lock();
@@ -1463,28 +1563,32 @@ public class StorageGroupProcessor {
return tsfileResourcesForQuery;
}
- /**
- * @return true if the device is contained in the TsFile and it lives beyond TTL
- */
- private boolean isTsFileResourceSatisfied(TsFileResource tsFileResource, String deviceId,
- Filter timeFilter, boolean isSeq, boolean debug) {
+ /** @return true if the device is contained in the TsFile and it lives beyond TTL */
+ private boolean isTsFileResourceSatisfied(
+ TsFileResource tsFileResource,
+ String deviceId,
+ Filter timeFilter,
+ boolean isSeq,
+ boolean debug) {
if (!tsFileResource.containsDevice(deviceId)) {
if (debug) {
- DEBUG_LOGGER.info("Path: {} file {} is not satisfied because of no device!", deviceId,
- tsFileResource);
+ DEBUG_LOGGER.info(
+ "Path: {} file {} is not satisfied because of no device!", deviceId, tsFileResource);
}
return false;
}
int deviceIndex = tsFileResource.getDeviceToIndexMap().get(deviceId);
long startTime = tsFileResource.getStartTime(deviceIndex);
- long endTime = tsFileResource.isClosed() || !isSeq ? tsFileResource.getEndTime(deviceIndex)
- : Long.MAX_VALUE;
+ long endTime =
+ tsFileResource.isClosed() || !isSeq
+ ? tsFileResource.getEndTime(deviceIndex)
+ : Long.MAX_VALUE;
if (!isAlive(endTime)) {
if (debug) {
- DEBUG_LOGGER
- .info("Path: {} file {} is not satisfied because of ttl!", deviceId, tsFileResource);
+ DEBUG_LOGGER.info(
+ "Path: {} file {} is not satisfied because of ttl!", deviceId, tsFileResource);
}
return false;
}
@@ -1492,8 +1596,8 @@ public class StorageGroupProcessor {
if (timeFilter != null) {
boolean res = timeFilter.satisfyStartEndTime(startTime, endTime);
if (debug && !res) {
- DEBUG_LOGGER.info("Path: {} file {} is not satisfied because of time filter!", deviceId,
- tsFileResource);
+ DEBUG_LOGGER.info(
+ "Path: {} file {} is not satisfied because of time filter!", deviceId, tsFileResource);
}
return res;
}
@@ -1512,7 +1616,7 @@ public class StorageGroupProcessor {
throws IOException {
// TODO: how to avoid partial deletion?
// FIXME: notice that if we may remove a SGProcessor out of memory, we need to close all opened
- //mod files in mergingModification, sequenceFileList, and unsequenceFileList
+ // mod files in mergingModification, sequenceFileList, and unsequenceFileList
tsFileManagement.readLock();
writeLock();
@@ -1535,10 +1639,10 @@ public class StorageGroupProcessor {
updatedModFiles.add(tsFileManagement.mergingModification);
}
- deleteDataInFiles(tsFileManagement.getTsFileList(true), deletion, devicePaths,
- updatedModFiles, planIndex);
- deleteDataInFiles(tsFileManagement.getTsFileList(false), deletion, devicePaths,
- updatedModFiles, planIndex);
+ deleteDataInFiles(
+ tsFileManagement.getTsFileList(true), deletion, devicePaths, updatedModFiles, planIndex);
+ deleteDataInFiles(
+ tsFileManagement.getTsFileList(false), deletion, devicePaths, updatedModFiles, planIndex);
} catch (Exception e) {
// roll back
@@ -1552,8 +1656,7 @@ public class StorageGroupProcessor {
}
}
- private void logDeletion(long startTime, long endTime, PartialPath path)
- throws IOException {
+ private void logDeletion(long startTime, long endTime, PartialPath path) throws IOException {
long timePartitionStartId = StorageEngine.getTimePartition(startTime);
long timePartitionEndId = StorageEngine.getTimePartition(endTime);
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
@@ -1572,25 +1675,32 @@ public class StorageGroupProcessor {
}
}
- private boolean canSkipDelete(TsFileResource tsFileResource, Set<PartialPath> devicePaths,
- long deleteStart, long deleteEnd) {
+ private boolean canSkipDelete(
+ TsFileResource tsFileResource,
+ Set<PartialPath> devicePaths,
+ long deleteStart,
+ long deleteEnd) {
for (PartialPath device : devicePaths) {
- if (tsFileResource.containsDevice(device.getFullPath()) &&
- (deleteEnd >= tsFileResource.getStartTime(device.getFullPath()) &&
- deleteStart <= tsFileResource
- .getOrDefaultEndTime(device.getFullPath(), Long.MAX_VALUE))) {
+ if (tsFileResource.containsDevice(device.getFullPath())
+ && (deleteEnd >= tsFileResource.getStartTime(device.getFullPath())
+ && deleteStart
+ <= tsFileResource.getOrDefaultEndTime(device.getFullPath(), Long.MAX_VALUE))) {
return false;
}
}
return true;
}
- private void deleteDataInFiles(Collection<TsFileResource> tsFileResourceList, Deletion deletion,
- Set<PartialPath> devicePaths, List<ModificationFile> updatedModFiles, long planIndex)
+ private void deleteDataInFiles(
+ Collection<TsFileResource> tsFileResourceList,
+ Deletion deletion,
+ Set<PartialPath> devicePaths,
+ List<ModificationFile> updatedModFiles,
+ long planIndex)
throws IOException {
for (TsFileResource tsFileResource : tsFileResourceList) {
- if (canSkipDelete(tsFileResource, devicePaths, deletion.getStartTime(),
- deletion.getEndTime())) {
+ if (canSkipDelete(
+ tsFileResource, devicePaths, deletion.getStartTime(), deletion.getEndTime())) {
continue;
}
@@ -1601,8 +1711,11 @@ public class StorageGroupProcessor {
tsFileResource.getModFile().write(deletion);
// remember to close mod file
tsFileResource.getModFile().close();
- logger.info("[Deletion] Deletion with path:{}, time:{}-{} written into mods file.",
- deletion.getPath(), deletion.getStartTime(), deletion.getEndTime());
+ logger.info(
+ "[Deletion] Deletion with path:{}, time:{}-{} written into mods file.",
+ deletion.getPath(),
+ deletion.getStartTime(),
+ deletion.getEndTime());
tsFileResource.updatePlanIndexes(planIndex);
@@ -1617,8 +1730,9 @@ public class StorageGroupProcessor {
}
}
- private void tryToDeleteLastCache(PartialPath deviceId, PartialPath originalPath,
- long startTime, long endTime) throws WriteProcessException {
+ private void tryToDeleteLastCache(
+ PartialPath deviceId, PartialPath originalPath, long startTime, long endTime)
+ throws WriteProcessException {
if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
return;
}
@@ -1626,13 +1740,16 @@ public class StorageGroupProcessor {
MNode node = IoTDB.metaManager.getDeviceNode(deviceId);
for (MNode measurementNode : node.getChildren().values()) {
- if (measurementNode != null && originalPath
- .matchFullPath(measurementNode.getPartialPath())) {
+ if (measurementNode != null
+ && originalPath.matchFullPath(measurementNode.getPartialPath())) {
TimeValuePair lastPair = ((MeasurementMNode) measurementNode).getCachedLast();
- if (lastPair != null && startTime <= lastPair.getTimestamp()
+ if (lastPair != null
+ && startTime <= lastPair.getTimestamp()
&& lastPair.getTimestamp() <= endTime) {
((MeasurementMNode) measurementNode).resetCache();
- logger.info("[tryToDeleteLastCache] Last cache for path: {} is set to null", measurementNode.getFullPath());
+ logger.info(
+ "[tryToDeleteLastCache] Last cache for path: {} is set to null",
+ measurementNode.getFullPath());
}
}
}
@@ -1650,8 +1767,8 @@ public class StorageGroupProcessor {
TsFileResource resource = tsFileProcessor.getTsFileResource();
for (Entry<String, Integer> startTime : resource.getDeviceToIndexMap().entrySet()) {
String deviceId = startTime.getKey();
- resource.forceUpdateEndTime(deviceId,
- latestTimeForEachDevice.get(tsFileProcessor.getTimeRangeId()).get(deviceId));
+ resource.forceUpdateEndTime(
+ deviceId, latestTimeForEachDevice.get(tsFileProcessor.getTimeRangeId()).get(deviceId));
}
}
@@ -1661,13 +1778,15 @@ public class StorageGroupProcessor {
private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) {
// update the largest timestamp in the last flushing memtable
- Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice
- .get(processor.getTimeRangeId());
+ Map<String, Long> curPartitionDeviceLatestTime =
+ latestTimeForEachDevice.get(processor.getTimeRangeId());
if (curPartitionDeviceLatestTime == null) {
- logger.warn("Partition: {} does't have latest time for each device. "
+ logger.warn(
+ "Partition: {} does't have latest time for each device. "
+ "No valid record is written into memtable. Flushing tsfile is: {}",
- processor.getTimeRangeId(), processor.getTsFileResource().getTsFile());
+ processor.getTimeRangeId(),
+ processor.getTsFileResource().getTsFile());
return false;
}
@@ -1675,34 +1794,33 @@ public class StorageGroupProcessor {
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
.put(entry.getKey(), entry.getValue());
- updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
- entry.getKey(), entry.getValue());
- if (globalLatestFlushedTimeForEachDevice
- .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
+ updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
+ processor.getTimeRangeId(), entry.getKey(), entry.getValue());
+ if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE)
+ < entry.getValue()) {
globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
}
}
return true;
}
-
/**
- * <p>
- * update latest flush time for partition id
- * </>
+ * update latest flush time for partition id </>
+ *
* @param partitionId partition id
* @param latestFlushTime lastest flush time
* @return true if update latest flush time success
*/
private boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime) {
// update the largest timestamp in the last flushing memtable
- Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice
- .get(partitionId);
+ Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice.get(partitionId);
if (curPartitionDeviceLatestTime == null) {
- logger.warn("Partition: {} does't have latest time for each device. "
- + "No valid record is written into memtable. latest flush time is: {}",
- partitionId, latestFlushTime);
+ logger.warn(
+ "Partition: {} does't have latest time for each device. "
+ + "No valid record is written into memtable. latest flush time is: {}",
+ partitionId,
+ latestFlushTime);
return false;
}
@@ -1711,43 +1829,38 @@ public class StorageGroupProcessor {
entry.setValue(latestFlushTime);
partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(partitionId, id -> new HashMap<>())
- .put(entry.getKey(), entry.getValue());
+ .computeIfAbsent(partitionId, id -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
newlyFlushedPartitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(partitionId, id -> new HashMap<>())
- .put(entry.getKey(), entry.getValue());
- if (globalLatestFlushedTimeForEachDevice
- .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
+ .computeIfAbsent(partitionId, id -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE)
+ < entry.getValue()) {
globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
}
}
return true;
}
-
- /**
- * used for upgrading
- */
- public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long partitionId,
- String deviceId, long time) {
+ /** used for upgrading */
+ public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
+ long partitionId, String deviceId, long time) {
newlyFlushedPartitionLatestFlushedTimeForEachDevice
.computeIfAbsent(partitionId, id -> new HashMap<>())
.compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
}
- /**
- * put the memtable back to the MemTablePool and make the metadata in writer visible
- */
+ /** put the memtable back to the MemTablePool and make the metadata in writer visible */
// TODO please consider concurrency with query and insert method.
- private void closeUnsealedTsFileProcessorCallBack(
- TsFileProcessor tsFileProcessor) throws TsFileProcessorException {
+ private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor tsFileProcessor)
+ throws TsFileProcessorException {
closeQueryLock.writeLock().lock();
try {
tsFileProcessor.close();
} finally {
closeQueryLock.writeLock().unlock();
}
- //closingSequenceTsFileProcessor is a thread safety class.
+ // closingSequenceTsFileProcessor is a thread safety class.
if (closingSequenceTsFileProcessor.contains(tsFileProcessor)) {
closingSequenceTsFileProcessor.remove(tsFileProcessor);
} else {
@@ -1757,30 +1870,34 @@ public class StorageGroupProcessor {
closeStorageGroupCondition.notifyAll();
}
logger.info("signal closing storage group condition in {}", storageGroupName);
- if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance()
- .isTerminated()) {
+
+ executeCompaction(
+ tsFileProcessor.getTimeRangeId(),
+ IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ }
+
+ private void executeCompaction(long timePartition, boolean fullMerge) {
+ if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
compactionMergeWorking = true;
logger.info("{} submit a compaction merge task", storageGroupName);
try {
// fork and filter current tsfile, then commit then to compaction merge
- tsFileManagement.forkCurrentFileList(tsFileProcessor.getTimeRangeId());
+ tsFileManagement.forkCurrentFileList(timePartition);
+ tsFileManagement.setForceFullMerge(fullMerge);
CompactionMergeTaskPoolManager.getInstance()
.submitTask(
- tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack,
- tsFileProcessor.getTimeRangeId()));
+ tsFileManagement
+ .new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition));
} catch (IOException | RejectedExecutionException e) {
this.closeCompactionMergeCallBack();
logger.error("{} compaction submit task failed", storageGroupName);
}
} else {
- logger.info("{} last compaction merge task is working, skip current merge",
- storageGroupName);
+ logger.info("{} last compaction merge task is working, skip current merge", storageGroupName);
}
}
- /**
- * close compaction merge callback, to release some locks
- */
+ /** close compaction merge callback, to release some locks */
private void closeCompactionMergeCallBack() {
this.compactionMergeWorking = false;
}
@@ -1811,10 +1928,12 @@ public class StorageGroupProcessor {
List<TsFileResource> upgradedResources = tsFileResource.getUpgradedResources();
for (TsFileResource resource : upgradedResources) {
long partitionId = resource.getTimePartition();
- resource.getDeviceToIndexMap().forEach((device, index) ->
- updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device,
- resource.getEndTime(index))
- );
+ resource
+ .getDeviceToIndexMap()
+ .forEach(
+ (device, index) ->
+ updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
+ partitionId, device, resource.getEndTime(index)));
}
tsFileManagement.writeLock();
writeLock();
@@ -1833,17 +1952,18 @@ public class StorageGroupProcessor {
// after upgrade complete, update partitionLatestFlushedTimeForEachDevice
if (countUpgradeFiles() == 0) {
- for (Entry<Long, Map<String, Long>> entry : newlyFlushedPartitionLatestFlushedTimeForEachDevice
- .entrySet()) {
+ for (Entry<Long, Map<String, Long>> entry :
+ newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet()) {
long timePartitionId = entry.getKey();
- Map<String, Long> latestFlushTimeForPartition = partitionLatestFlushedTimeForEachDevice
- .getOrDefault(timePartitionId, new HashMap<>());
+ Map<String, Long> latestFlushTimeForPartition =
+ partitionLatestFlushedTimeForEachDevice.getOrDefault(timePartitionId, new HashMap<>());
for (Entry<String, Long> endTimeMap : entry.getValue().entrySet()) {
String device = endTimeMap.getKey();
long endTime = endTimeMap.getValue();
if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < endTime) {
partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>()).put(device, endTime);
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+ .put(device, endTime);
}
}
}
@@ -1856,8 +1976,9 @@ public class StorageGroupProcessor {
public void merge(boolean fullMerge) {
writeLock();
try {
- this.tsFileManagement.merge(fullMerge, tsFileManagement.getTsFileList(true),
- tsFileManagement.getTsFileList(false), dataTTL);
+ for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) {
+ executeCompaction(timePartitionId, fullMerge);
+ }
} finally {
writeUnlock();
}
@@ -1865,15 +1986,14 @@ public class StorageGroupProcessor {
/**
* Load a new tsfile to storage group processor. Tne file may have overlap with other files.
- * <p>
- * or unsequence list.
- * <p>
- * Secondly, execute the loading process by the type.
- * <p>
- * Finally, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
*
- * @param newTsFileResource tsfile resource
- * @UsedBy sync module.
+ * <p>or unsequence list.
+ *
+ * <p>Secondly, execute the loading process by the type.
+ *
+ * <p>Finally, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
+ *
+ * @param newTsFileResource tsfile resource @UsedBy sync module.
*/
public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws LoadFileException {
File tsfileToBeInserted = newTsFileResource.getTsFile();
@@ -1881,14 +2001,18 @@ public class StorageGroupProcessor {
tsFileManagement.writeLock();
writeLock();
try {
- if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
+ if (loadTsFileByType(
+ LoadTsFileType.LOAD_SEQUENCE,
+ tsfileToBeInserted,
+ newTsFileResource,
newFilePartitionId)) {
updateLatestTimeMap(newTsFileResource);
}
} catch (DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
- tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName());
+ tsfileToBeInserted.getAbsolutePath(),
+ tsfileToBeInserted.getParentFile().getName());
IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
throw new LoadFileException(e);
} finally {
@@ -1898,18 +2022,18 @@ public class StorageGroupProcessor {
}
/**
- * Load a new tsfile to storage group processor. Tne file may have overlap with other files. <p>
- * that there has no file which is overlapping with the new file.
- * <p>
- * Firstly, determine the loading type of the file, whether it needs to be loaded in sequence list
- * or unsequence list.
- * <p>
- * Secondly, execute the loading process by the type.
- * <p>
- * Finally, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
+ * Load a new tsfile to storage group processor. Tne file may have overlap with other files.
*
- * @param newTsFileResource tsfile resource
- * @UsedBy load external tsfile module
+ * <p>that there has no file which is overlapping with the new file.
+ *
+ * <p>Firstly, determine the loading type of the file, whether it needs to be loaded in sequence
+ * list or unsequence list.
+ *
+ * <p>Secondly, execute the loading process by the type.
+ *
+ * <p>Finally, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
+ *
+ * @param newTsFileResource tsfile resource @UsedBy load external tsfile module
*/
public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileException {
File tsfileToBeInserted = newTsFileResource.getTsFile();
@@ -1926,36 +2050,49 @@ public class StorageGroupProcessor {
// loading tsfile by type
if (insertPos == POS_OVERLAP) {
- loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted, newTsFileResource,
+ loadTsFileByType(
+ LoadTsFileType.LOAD_UNSEQUENCE,
+ tsfileToBeInserted,
+ newTsFileResource,
newFilePartitionId);
} else {
// check whether the file name needs to be renamed.
if (!tsFileManagement.isEmpty(true)) {
- String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), insertPos,
- newTsFileResource.getTimePartition(), sequenceList);
+ String newFileName =
+ getFileNameForLoadingFile(
+ tsfileToBeInserted.getName(),
+ insertPos,
+ newTsFileResource.getTimePartition(),
+ sequenceList);
if (!newFileName.equals(tsfileToBeInserted.getName())) {
- logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
- tsfileToBeInserted.getName(), newFileName);
- newTsFileResource
- .setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
+ logger.info(
+ "Tsfile {} must be renamed to {} for loading into the sequence list.",
+ tsfileToBeInserted.getName(),
+ newFileName);
+ newTsFileResource.setFile(
+ fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
}
}
- loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
+ loadTsFileByType(
+ LoadTsFileType.LOAD_SEQUENCE,
+ tsfileToBeInserted,
+ newTsFileResource,
newFilePartitionId);
}
// update latest time map
updateLatestTimeMap(newTsFileResource);
long partitionNum = newTsFileResource.getTimePartition();
- partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+ partitionDirectFileVersions
+ .computeIfAbsent(partitionNum, p -> new HashSet<>())
.addAll(newTsFileResource.getHistoricalVersions());
- updatePartitionFileVersion(partitionNum,
- newTsFileResource.getMaxVersion());
+ updatePartitionFileVersion(partitionNum, newTsFileResource.getMaxVersion());
} catch (DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
- tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName());
+ tsfileToBeInserted.getAbsolutePath(),
+ tsfileToBeInserted.getParentFile().getName());
IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
throw new LoadFileException(e);
} finally {
@@ -1968,8 +2105,8 @@ public class StorageGroupProcessor {
* Set the version in "partition" to "version" if "version" is larger than the current version.
*/
public void setPartitionFileVersionToMax(long partition, long version) {
- partitionMaxFileVersions
- .compute(partition, (prt, oldVer) -> computeMaxVersion(oldVer, version));
+ partitionMaxFileVersions.compute(
+ partition, (prt, oldVer) -> computeMaxVersion(oldVer, version));
}
private long computeMaxVersion(Long oldVersion, Long newVersion) {
@@ -1980,14 +2117,15 @@ public class StorageGroupProcessor {
}
/**
- * Find the position of "newTsFileResource" in the sequence files if it can be inserted into
- * them.
+ * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
*
* @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted
- * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new
- * file can be inserted between [i, i+1]
+ * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new
+ * file can be inserted between [i, i+1]
*/
- private int findInsertionPosition(TsFileResource newTsFileResource, long newFilePartitionId,
+ private int findInsertionPosition(
+ TsFileResource newTsFileResource,
+ long newFilePartitionId,
List<TsFileResource> sequenceList) {
File tsfileToBeInserted = newTsFileResource.getTsFile();
@@ -2028,7 +2166,7 @@ public class StorageGroupProcessor {
* Compare each device in the two files to find the time relation of them.
*
* @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than
- * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A)
+ * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A)
*/
private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
boolean hasPre = false, hasSubsequence = false;
@@ -2084,8 +2222,8 @@ public class StorageGroupProcessor {
}
}
- private void removeFullyOverlapFiles(TsFileResource newTsFile, Iterator<TsFileResource> iterator
- , boolean isSeq) {
+ private void removeFullyOverlapFiles(
+ TsFileResource newTsFile, Iterator<TsFileResource> iterator, boolean isSeq) {
while (iterator.hasNext()) {
TsFileResource existingTsFile = iterator.next();
if (newTsFile.getHistoricalVersions().containsAll(existingTsFile.getHistoricalVersions())
@@ -2096,8 +2234,10 @@ public class StorageGroupProcessor {
try {
removeFullyOverlapFile(existingTsFile, iterator, isSeq);
} catch (Exception e) {
- logger.error("Something gets wrong while removing FullyOverlapFiles: {}",
- existingTsFile.getTsFile().getAbsolutePath(), e);
+ logger.error(
+ "Something gets wrong while removing FullyOverlapFiles: {}",
+ existingTsFile.getTsFile().getAbsolutePath(),
+ e);
} finally {
existingTsFile.writeUnlock();
}
@@ -2109,17 +2249,16 @@ public class StorageGroupProcessor {
* remove the given tsFileResource. If the corresponding tsFileProcessor is in the working status,
* close it before remove the related resource files. maybe time-consuming for closing a tsfile.
*/
- private void removeFullyOverlapFile(TsFileResource tsFileResource,
- Iterator<TsFileResource> iterator
- , boolean isSeq) {
+ private void removeFullyOverlapFile(
+ TsFileResource tsFileResource, Iterator<TsFileResource> iterator, boolean isSeq) {
if (!tsFileResource.isClosed()) {
// also remove the TsFileProcessor if the overlapped file is not closed
long timePartition = tsFileResource.getTimePartition();
- Map<Long, TsFileProcessor> fileProcessorMap = isSeq ? workSequenceTsFileProcessors :
- workUnsequenceTsFileProcessors;
+ Map<Long, TsFileProcessor> fileProcessorMap =
+ isSeq ? workSequenceTsFileProcessors : workUnsequenceTsFileProcessors;
TsFileProcessor tsFileProcessor = fileProcessorMap.get(timePartition);
if (tsFileProcessor != null && tsFileProcessor.getTsFileResource() == tsFileResource) {
- //have to take some time to close the tsFileProcessor
+ // have to take some time to close the tsFileProcessor
tsFileProcessor.syncClose();
fileProcessorMap.remove(timePartition);
}
@@ -2132,35 +2271,34 @@ public class StorageGroupProcessor {
/**
* Get an appropriate filename to ensure the order between files. The tsfile is named after
* ({systemTime}-{versionNum}-{mergeNum}.tsfile).
- * <p>
- * The sorting rules for tsfile names @see {@link this#compareFileName}, we can restore the list
- * based on the file name and ensure the correctness of the order, so there are three cases.
- * <p>
- * 1. The tsfile is to be inserted in the first place of the list. If the timestamp in the file
- * name is less than the timestamp in the file name of the first tsfile in the list, then the
- * file name is legal and the file name is returned directly. Otherwise, its timestamp can be set
- * to half of the timestamp value in the file name of the first tsfile in the list , and the
- * version number is the version number in the file name of the first tsfile in the list.
- * <p>
- * 2. The tsfile is to be inserted in the last place of the list. If the timestamp in the file
- * name is lager than the timestamp in the file name of the last tsfile in the list, then the
- * file name is legal and the file name is returned directly. Otherwise, the file name is
- * generated by the system according to the naming rules and returned.
- * <p>
- * 3. This file is inserted between two files. If the timestamp in the name of the file satisfies
- * the timestamp between the timestamps in the name of the two files, then it is a legal name and
- * returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
- * version number is the version number in the tsfile with a larger timestamp.
+ *
+ * <p>The sorting rules for tsfile names @see {@link this#compareFileName}, we can restore the
+ * list based on the file name and ensure the correctness of the order, so there are three cases.
+ *
+ * <p>1. The tsfile is to be inserted in the first place of the list. If the timestamp in the file
+ * name is less than the timestamp in the file name of the first tsfile in the list, then the file
+ * name is legal and the file name is returned directly. Otherwise, its timestamp can be set to
+ * half of the timestamp value in the file name of the first tsfile in the list , and the version
+ * number is the version number in the file name of the first tsfile in the list.
+ *
+ * <p>2. The tsfile is to be inserted in the last place of the list. If the timestamp in the file
+ * name is lager than the timestamp in the file name of the last tsfile in the list, then the file
+ * name is legal and the file name is returned directly. Otherwise, the file name is generated by
+ * the system according to the naming rules and returned.
+ *
+ * <p>3. This file is inserted between two files. If the timestamp in the name of the file
+ * satisfies the timestamp between the timestamps in the name of the two files, then it is a legal
+ * name and returns directly; otherwise, the time stamp is the mean of the timestamps of the two
+ * files, the version number is the version number in the tsfile with a larger timestamp.
*
* @param tsfileName origin tsfile name
* @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
- * 1]
+ * 1]
* @return appropriate filename
*/
- private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
- long timePartitionId, List<TsFileResource> sequenceList) {
- long currentTsFileTime = Long
- .parseLong(tsfileName.split(FILE_NAME_SEPARATOR)[0]);
+ private String getFileNameForLoadingFile(
+ String tsfileName, int insertIndex, long timePartitionId, List<TsFileResource> sequenceList) {
+ long currentTsFileTime = Long.parseLong(tsfileName.split(FILE_NAME_SEPARATOR)[0]);
long preTime;
if (insertIndex == -1) {
preTime = 0L;
@@ -2172,23 +2310,20 @@ public class StorageGroupProcessor {
return preTime < currentTsFileTime ? tsfileName : getNewTsFileName(timePartitionId);
} else {
String subsequenceName = sequenceList.get(insertIndex + 1).getTsFile().getName();
- long subsequenceTime = Long
- .parseLong(subsequenceName.split(FILE_NAME_SEPARATOR)[0]);
- long subsequenceVersion = Long
- .parseLong(subsequenceName.split(FILE_NAME_SEPARATOR)[1]);
+ long subsequenceTime = Long.parseLong(subsequenceName.split(FILE_NAME_SEPARATOR)[0]);
+ long subsequenceVersion = Long.parseLong(subsequenceName.split(FILE_NAME_SEPARATOR)[1]);
if (preTime < currentTsFileTime && currentTsFileTime < subsequenceTime) {
return tsfileName;
} else {
- return getNewTsFileName(preTime + ((subsequenceTime - preTime) >> 1), subsequenceVersion,
- 0);
+ return getNewTsFileName(
+ preTime + ((subsequenceTime - preTime) >> 1), subsequenceVersion, 0);
}
}
}
/**
- * Update latest time in latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
- *
- * @UsedBy sync module, load external tsfile module.
+ * Update latest time in latestTimeForEachDevice and
+ * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load external tsfile module.
*/
private void updateLatestTimeMap(TsFileResource newTsFileResource) {
for (Entry<String, Integer> entry : newTsFileResource.getDeviceToIndexMap().entrySet()) {
@@ -2196,18 +2331,20 @@ public class StorageGroupProcessor {
int index = entry.getValue();
long endTime = newTsFileResource.getEndTime(index);
long timePartitionId = StorageEngine.getTimePartition(endTime);
- if (!latestTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>())
- .containsKey(device)
+ if (!latestTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+ .containsKey(device)
|| latestTimeForEachDevice.get(timePartitionId).get(device) < endTime) {
latestTimeForEachDevice.get(timePartitionId).put(device, endTime);
}
- Map<String, Long> latestFlushTimeForPartition = partitionLatestFlushedTimeForEachDevice
- .getOrDefault(timePartitionId, new HashMap<>());
+ Map<String, Long> latestFlushTimeForPartition =
+ partitionLatestFlushedTimeForEachDevice.getOrDefault(timePartitionId, new HashMap<>());
if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < endTime) {
partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>()).put(device, endTime);
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+ .put(device, endTime);
}
if (globalLatestFlushedTimeForEachDevice.getOrDefault(device, Long.MIN_VALUE) < endTime) {
globalLatestFlushedTimeForEachDevice.put(device, endTime);
@@ -2221,18 +2358,21 @@ public class StorageGroupProcessor {
* @param type load type
* @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
- * @return load the file successfully
- * @UsedBy sync module, load external tsfile module.
+ * @return load the file successfully @UsedBy sync module, load external tsfile module.
*/
- private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
- TsFileResource tsFileResource, long filePartitionId)
+ private boolean loadTsFileByType(
+ LoadTsFileType type, File syncedTsFile, TsFileResource tsFileResource, long filePartitionId)
throws LoadFileException, DiskSpaceInsufficientException {
File targetFile;
switch (type) {
case LOAD_UNSEQUENCE:
- targetFile = fsFactory
- .getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
- storageGroupName + File.separatorChar + filePartitionId + File.separator
+ targetFile =
+ fsFactory.getFile(
+ DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+ storageGroupName
+ + File.separatorChar
+ + filePartitionId
+ + File.separator
+ tsFileResource.getTsFile().getName());
tsFileResource.setFile(targetFile);
if (tsFileManagement.contains(tsFileResource, false)) {
@@ -2240,13 +2380,19 @@ public class StorageGroupProcessor {
return false;
}
tsFileManagement.add(tsFileResource, false);
- logger.info("Load tsfile in unsequence list, move file from {} to {}",
- syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
+ logger.info(
+ "Load tsfile in unsequence list, move file from {} to {}",
+ syncedTsFile.getAbsolutePath(),
+ targetFile.getAbsolutePath());
break;
case LOAD_SEQUENCE:
targetFile =
- fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
- storageGroupName + File.separatorChar + filePartitionId + File.separator
+ fsFactory.getFile(
+ DirectoryManager.getInstance().getNextFolderForSequenceFile(),
+ storageGroupName
+ + File.separatorChar
+ + filePartitionId
+ + File.separator
+ tsFileResource.getTsFile().getName());
tsFileResource.setFile(targetFile);
if (tsFileManagement.contains(tsFileResource, true)) {
@@ -2254,12 +2400,13 @@ public class StorageGroupProcessor {
return false;
}
tsFileManagement.add(tsFileResource, true);
- logger.info("Load tsfile in sequence list, move file from {} to {}",
- syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
+ logger.info(
+ "Load tsfile in sequence list, move file from {} to {}",
+ syncedTsFile.getAbsolutePath(),
+ targetFile.getAbsolutePath());
break;
default:
- throw new LoadFileException(
- String.format("Unsupported type of loading tsfile : %s", type));
+ throw new LoadFileException(String.format("Unsupported type of loading tsfile : %s", type));
}
// move file from sync dir to data dir
@@ -2269,46 +2416,55 @@ public class StorageGroupProcessor {
try {
FileUtils.moveFile(syncedTsFile, targetFile);
} catch (IOException e) {
- logger.error("File renaming failed when loading tsfile. Origin: {}, Target: {}",
- syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e);
- throw new LoadFileException(String.format(
- "File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s",
- syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
+ logger.error(
+ "File renaming failed when loading tsfile. Origin: {}, Target: {}",
+ syncedTsFile.getAbsolutePath(),
+ targetFile.getAbsolutePath(),
+ e);
+ throw new LoadFileException(
+ String.format(
+ "File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s",
+ syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
}
- File syncedResourceFile = fsFactory.getFile(
- syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
- File targetResourceFile = fsFactory.getFile(
- targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+ File syncedResourceFile =
+ fsFactory.getFile(syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+ File targetResourceFile =
+ fsFactory.getFile(targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
try {
FileUtils.moveFile(syncedResourceFile, targetResourceFile);
} catch (IOException e) {
- logger.error("File renaming failed when loading .resource file. Origin: {}, Target: {}",
- syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e);
- throw new LoadFileException(String.format(
- "File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s",
- syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(),
- e.getMessage()));
- }
- partitionDirectFileVersions.computeIfAbsent(filePartitionId,
- p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions());
+ logger.error(
+ "File renaming failed when loading .resource file. Origin: {}, Target: {}",
+ syncedResourceFile.getAbsolutePath(),
+ targetResourceFile.getAbsolutePath(),
+ e);
+ throw new LoadFileException(
+ String.format(
+ "File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s",
+ syncedResourceFile.getAbsolutePath(),
+ targetResourceFile.getAbsolutePath(),
+ e.getMessage()));
+ }
+ partitionDirectFileVersions
+ .computeIfAbsent(filePartitionId, p -> new HashSet<>())
+ .addAll(tsFileResource.getHistoricalVersions());
if (!tsFileResource.getHistoricalVersions().isEmpty()) {
- updatePartitionFileVersion(filePartitionId,
- tsFileResource.getMaxVersion());
+ updatePartitionFileVersion(filePartitionId, tsFileResource.getMaxVersion());
}
return true;
}
/**
* Delete tsfile if it exists.
- * <p>
- * Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
- * <p>
- * Secondly, delete the tsfile and .resource file.
+ *
+ * <p>Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
+ *
+ * <p>Secondly, delete the tsfile and .resource file.
*
* @param tsfieToBeDeleted tsfile to be deleted
- * @return whether the file to be deleted exists.
- * @UsedBy sync module, load external tsfile module.
+ * @return whether the file to be deleted exists. @UsedBy sync module, load external tsfile
+ * module.
*/
public boolean deleteTsfile(File tsfieToBeDeleted) {
tsFileManagement.writeLock();
@@ -2352,21 +2508,19 @@ public class StorageGroupProcessor {
return true;
}
-
public Collection<TsFileProcessor> getWorkSequenceTsFileProcessors() {
return workSequenceTsFileProcessors.values();
}
/**
* Move tsfile to the target directory if it exists.
- * <p>
- * Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
- * <p>
- * Secondly, move the tsfile and .resource file to the target directory.
+ *
+ * <p>Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
+ *
+ * <p>Secondly, move the tsfile and .resource file to the target directory.
*
* @param fileToBeMoved tsfile to be moved
- * @return whether the file to be moved exists.
- * @UsedBy load external tsfile module.
+ * @return whether the file to be moved exists. @UsedBy load external tsfile module.
*/
public boolean moveTsfile(File fileToBeMoved, File targetDir) {
tsFileManagement.writeLock();
@@ -2403,17 +2557,16 @@ public class StorageGroupProcessor {
tsFileResourceToBeMoved.writeLock();
try {
tsFileResourceToBeMoved.moveTo(targetDir);
- logger
- .info("Move tsfile {} to target dir {} successfully.",
- tsFileResourceToBeMoved.getTsFile(),
- targetDir.getPath());
+ logger.info(
+ "Move tsfile {} to target dir {} successfully.",
+ tsFileResourceToBeMoved.getTsFile(),
+ targetDir.getPath());
} finally {
tsFileResourceToBeMoved.writeUnlock();
}
return true;
}
-
public Collection<TsFileProcessor> getWorkUnsequenceTsFileProcessors() {
return workUnsequenceTsFileProcessors.values();
}
@@ -2447,8 +2600,8 @@ public class StorageGroupProcessor {
* "tsFileResource" may have unwritten data of that file.
*
* @return true if the historicalVersions of "tsFileResource" is a subset of
- * partitionDirectFileVersions, or false if it is not a subset and it contains any version of a
- * working file USED by cluster module
+ * partitionDirectFileVersions, or false if it is not a subset and it contains any version of
+ * a working file USED by cluster module
*/
public boolean isFileAlreadyExist(TsFileResource tsFileResource, long partitionNum) {
// consider the case: The local node crashes when it is writing TsFile no.5.
@@ -2468,16 +2621,16 @@ public class StorageGroupProcessor {
return false;
}
}
- Set<Long> partitionFileVersions = partitionDirectFileVersions
- .getOrDefault(partitionNum, Collections.emptySet());
- logger.debug("FileVersions/PartitionVersions: {}/{}", tsFileResource.getHistoricalVersions(),
+ Set<Long> partitionFileVersions =
+ partitionDirectFileVersions.getOrDefault(partitionNum, Collections.emptySet());
+ logger.debug(
+ "FileVersions/PartitionVersions: {}/{}",
+ tsFileResource.getHistoricalVersions(),
partitionFileVersions);
return partitionFileVersions.containsAll(tsFileResource.getHistoricalVersions());
}
- /**
- * remove all partitions that satisfy a filter.
- */
+ /** remove all partitions that satisfy a filter. */
public void removePartitions(TimePartitionFilter filter) {
// this requires blocking all other activities
tsFileManagement.writeLock();
@@ -2499,9 +2652,9 @@ public class StorageGroupProcessor {
}
}
- //may remove the processorEntrys
- private void removePartitions(TimePartitionFilter filter,
- Set<Entry<Long, TsFileProcessor>> processorEntrys) {
+ // may remove the processorEntrys
+ private void removePartitions(
+ TimePartitionFilter filter, Set<Entry<Long, TsFileProcessor>> processorEntrys) {
for (Iterator<Entry<Long, TsFileProcessor>> iterator = processorEntrys.iterator();
iterator.hasNext(); ) {
Entry<Long, TsFileProcessor> longTsFileProcessorEntry = iterator.next();
@@ -2511,13 +2664,14 @@ public class StorageGroupProcessor {
processor.syncClose();
iterator.remove();
updateLatestFlushTimeToPartition(partitionId, Long.MIN_VALUE);
- logger.debug("{} is removed during deleting partitions",
+ logger.debug(
+ "{} is removed during deleting partitions",
processor.getTsFileResource().getTsFilePath());
}
}
}
- //may remove the iterator's data
+ // may remove the iterator's data
private void removePartitions(TimePartitionFilter filter, Iterator<TsFileResource> iterator) {
while (iterator.hasNext()) {
TsFileResource tsFileResource = iterator.next();
@@ -2545,25 +2699,27 @@ public class StorageGroupProcessor {
boolean isSequence = false;
for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
if (!isAlive(plan.getTime())) {
- //we do not need to write these part of data, as they can not be queried
+ // we do not need to write these part of data, as they can not be queried
continue;
}
// init map
long timePartitionId = StorageEngine.getTimePartition(plan.getTime());
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>());
- //as the plans have been ordered, and we have get the write lock,
- //So, if a plan is sequenced, then all the rest plans are sequenced.
+ partitionLatestFlushedTimeForEachDevice.computeIfAbsent(
+ timePartitionId, id -> new HashMap<>());
+ // as the plans have been ordered, and we have get the write lock,
+ // So, if a plan is sequenced, then all the rest plans are sequenced.
//
if (!isSequence) {
isSequence =
- plan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
- .getOrDefault(plan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+ plan.getTime()
+ > partitionLatestFlushedTimeForEachDevice
+ .get(timePartitionId)
+ .getOrDefault(plan.getDeviceId().getFullPath(), Long.MIN_VALUE);
}
- //is unsequence and user set config to discard out of order data
- if (!isSequence && IoTDBDescriptor.getInstance().getConfig()
- .isEnableDiscardOutOfOrderData()) {
+ // is unsequence and user set config to discard out of order data
+ if (!isSequence
+ && IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
return;
}
@@ -2574,11 +2730,11 @@ public class StorageGroupProcessor {
} finally {
writeUnlock();
}
-
}
private enum LoadTsFileType {
- LOAD_SEQUENCE, LOAD_UNSEQUENCE
+ LOAD_SEQUENCE,
+ LOAD_UNSEQUENCE
}
@FunctionalInterface
@@ -2611,13 +2767,11 @@ public class StorageGroupProcessor {
boolean satisfy(String storageGroupName, long timePartitionId);
}
- public void setCustomCloseFileListeners(
- List<CloseFileListener> customCloseFileListeners) {
+ public void setCustomCloseFileListeners(List<CloseFileListener> customCloseFileListeners) {
this.customCloseFileListeners = customCloseFileListeners;
}
- public void setCustomFlushListeners(
- List<FlushListener> customFlushListeners) {
+ public void setCustomFlushListeners(List<FlushListener> customFlushListeners) {
this.customFlushListeners = customFlushListeners;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 8cd8d1a..4a9f513 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -26,11 +26,11 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -65,7 +65,8 @@ public class StorageGroupProcessorTest {
@Before
public void setUp() throws Exception {
- IoTDBDescriptor.getInstance().getConfig()
+ IoTDBDescriptor.getInstance()
+ .getConfig()
.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
MetadataManagerHelper.initMetadata();
EnvironmentUtils.envSetUp();
@@ -80,7 +81,8 @@ public class StorageGroupProcessorTest {
EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
MergeManager.getINSTANCE().stop();
EnvironmentUtils.cleanEnv();
- IoTDBDescriptor.getInstance().getConfig()
+ IoTDBDescriptor.getInstance()
+ .getConfig()
.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
}
@@ -118,9 +120,14 @@ public class StorageGroupProcessorTest {
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) {
- tsfileProcessor
- .query(deviceId, measurementId, TSDataType.INT32, TSEncoding.RLE, Collections.emptyMap(),
- new QueryContext(), tsfileResourcesForQuery);
+ tsfileProcessor.query(
+ deviceId,
+ measurementId,
+ TSDataType.INT32,
+ TSEncoding.RLE,
+ Collections.emptyMap(),
+ new QueryContext(),
+ tsfileResourcesForQuery);
}
Assert.assertEquals(1, tsfileResourcesForQuery.size());
@@ -152,9 +159,8 @@ public class StorageGroupProcessorTest {
e.printStackTrace();
}
processor.syncCloseAllWorkingTsFileProcessors();
- QueryDataSource queryDataSource = processor
- .query(new PartialPath(deviceId), measurementId, context,
- null, null);
+ QueryDataSource queryDataSource =
+ processor.query(new PartialPath(deviceId), measurementId, context, null, null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
@@ -163,7 +169,7 @@ public class StorageGroupProcessorTest {
@Test
public void testInsertDataAndRemovePartitionAndInsert()
- throws WriteProcessException, QueryProcessException, IllegalPathException {
+ throws WriteProcessException, QueryProcessException, IllegalPathException {
for (int j = 0; j < 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
@@ -182,9 +188,8 @@ public class StorageGroupProcessorTest {
}
processor.syncCloseAllWorkingTsFileProcessors();
- QueryDataSource queryDataSource = processor
- .query(new PartialPath(deviceId), measurementId, context,
- null, null);
+ QueryDataSource queryDataSource =
+ processor.query(new PartialPath(deviceId), measurementId, context, null, null);
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
}
@@ -200,14 +205,15 @@ public class StorageGroupProcessorTest {
dataTypes.add(TSDataType.INT64.ordinal());
MeasurementMNode[] measurementMNodes = new MeasurementMNode[2];
- measurementMNodes[0] = new MeasurementMNode(null, "s0",
- new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
- measurementMNodes[1] = new MeasurementMNode(null, "s1",
- new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
-
- InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
- measurements,
- dataTypes);
+ measurementMNodes[0] =
+ new MeasurementMNode(
+ null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
+ measurementMNodes[1] =
+ new MeasurementMNode(
+ null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
+
+ InsertTabletPlan insertTabletPlan1 =
+ new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
insertTabletPlan1.setMeasurementMNodes(measurementMNodes);
long[] times = new long[100];
@@ -227,9 +233,8 @@ public class StorageGroupProcessorTest {
processor.insertTablet(insertTabletPlan1);
processor.asyncCloseAllWorkingTsFileProcessors();
- InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
- measurements,
- dataTypes);
+ InsertTabletPlan insertTabletPlan2 =
+ new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
insertTabletPlan2.setMeasurementMNodes(measurementMNodes);
for (int r = 50; r < 149; r++) {
@@ -245,9 +250,8 @@ public class StorageGroupProcessorTest {
processor.asyncCloseAllWorkingTsFileProcessors();
processor.syncCloseAllWorkingTsFileProcessors();
- QueryDataSource queryDataSource = processor
- .query(new PartialPath(deviceId), measurementId, context,
- null, null);
+ QueryDataSource queryDataSource =
+ processor.query(new PartialPath(deviceId), measurementId, context, null, null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(1, queryDataSource.getUnseqResources().size());
@@ -256,7 +260,6 @@ public class StorageGroupProcessorTest {
}
}
-
@Test
public void testSeqAndUnSeqSyncClose()
throws WriteProcessException, QueryProcessException, IllegalPathException {
@@ -278,9 +281,8 @@ public class StorageGroupProcessorTest {
processor.syncCloseAllWorkingTsFileProcessors();
- QueryDataSource queryDataSource = processor
- .query(new PartialPath(deviceId), measurementId, context,
- null, null);
+ QueryDataSource queryDataSource =
+ processor.query(new PartialPath(deviceId), measurementId, context, null, null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
@@ -319,9 +321,8 @@ public class StorageGroupProcessorTest {
tsfileProcessor.syncFlush();
}
- QueryDataSource queryDataSource = processor
- .query(new PartialPath(deviceId), measurementId, context,
- null, null);
+ QueryDataSource queryDataSource =
+ processor.query(new PartialPath(deviceId), measurementId, context, null, null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
@@ -353,14 +354,15 @@ public class StorageGroupProcessorTest {
dataTypes.add(TSDataType.INT64.ordinal());
MeasurementMNode[] measurementMNodes = new MeasurementMNode[2];
- measurementMNodes[0] = new MeasurementMNode(null, "s0",
- new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
- measurementMNodes[1] = new MeasurementMNode(null, "s1",
- new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
+ measurementMNodes[0] =
+ new MeasurementMNode(
+ null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
+ measurementMNodes[1] =
+ new MeasurementMNode(
+ null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
- InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
- measurements,
- dataTypes);
+ InsertTabletPlan insertTabletPlan1 =
+ new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
long[] times = new long[100];
Object[] columns = new Object[2];
@@ -380,9 +382,8 @@ public class StorageGroupProcessorTest {
processor.insertTablet(insertTabletPlan1);
processor.asyncCloseAllWorkingTsFileProcessors();
- InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
- measurements,
- dataTypes);
+ InsertTabletPlan insertTabletPlan2 =
+ new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
for (int r = 149; r >= 50; r--) {
times[r - 50] = r;
@@ -402,9 +403,8 @@ public class StorageGroupProcessorTest {
tsfileProcessor.syncFlush();
}
- QueryDataSource queryDataSource = processor
- .query(new PartialPath(deviceId), measurementId, context,
- null, null);
+ QueryDataSource queryDataSource =
+ processor.query(new PartialPath(deviceId), measurementId, context, null, null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -436,14 +436,15 @@ public class StorageGroupProcessorTest {
dataTypes.add(TSDataType.INT64.ordinal());
MeasurementMNode[] measurementMNodes = new MeasurementMNode[2];
- measurementMNodes[0] = new MeasurementMNode(null, "s0",
- new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
- measurementMNodes[1] = new MeasurementMNode(null, "s1",
- new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
+ measurementMNodes[0] =
+ new MeasurementMNode(
+ null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
+ measurementMNodes[1] =
+ new MeasurementMNode(
+ null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
- InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
- measurements,
- dataTypes);
+ InsertTabletPlan insertTabletPlan1 =
+ new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
long[] times = new long[1200];
Object[] columns = new Object[2];
@@ -463,9 +464,8 @@ public class StorageGroupProcessorTest {
processor.insertTablet(insertTabletPlan1);
processor.asyncCloseAllWorkingTsFileProcessors();
- InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
- measurements,
- dataTypes);
+ InsertTabletPlan insertTabletPlan2 =
+ new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
for (int r = 1249; r >= 50; r--) {
times[r - 50] = r;
@@ -485,9 +485,8 @@ public class StorageGroupProcessorTest {
tsfileProcessor.syncFlush();
}
- QueryDataSource queryDataSource = processor
- .query(new PartialPath(deviceId), measurementId, context,
- null, null);
+ QueryDataSource queryDataSource =
+ processor.query(new PartialPath(deviceId), measurementId, context, null, null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -519,14 +518,15 @@ public class StorageGroupProcessorTest {
dataTypes.add(TSDataType.INT64.ordinal());
MeasurementMNode[] measurementMNodes = new MeasurementMNode[2];
- measurementMNodes[0] = new MeasurementMNode(null, "s0",
- new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
- measurementMNodes[1] = new MeasurementMNode(null, "s1",
- new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
+ measurementMNodes[0] =
+ new MeasurementMNode(
+ null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
+ measurementMNodes[1] =
+ new MeasurementMNode(
+ null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
- InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
- measurements,
- dataTypes);
+ InsertTabletPlan insertTabletPlan1 =
+ new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
long[] times = new long[1200];
Object[] columns = new Object[2];
@@ -546,9 +546,8 @@ public class StorageGroupProcessorTest {
processor.insertTablet(insertTabletPlan1);
processor.asyncCloseAllWorkingTsFileProcessors();
- InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
- measurements,
- dataTypes);
+ InsertTabletPlan insertTabletPlan2 =
+ new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
for (int r = 1249; r >= 50; r--) {
times[r - 50] = r;
@@ -568,9 +567,8 @@ public class StorageGroupProcessorTest {
tsfileProcessor.syncFlush();
}
- QueryDataSource queryDataSource = processor
- .query(new PartialPath(deviceId), measurementId, context,
- null, null);
+ QueryDataSource queryDataSource =
+ processor.query(new PartialPath(deviceId), measurementId, context, null, null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -607,11 +605,10 @@ public class StorageGroupProcessorTest {
// wait
}
- QueryDataSource queryDataSource = processor
- .query(new PartialPath(deviceId), measurementId, context,
- null, null);
+ QueryDataSource queryDataSource =
+ processor.query(new PartialPath(deviceId), measurementId, context, null, null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
- Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
@@ -625,6 +622,5 @@ public class StorageGroupProcessorTest {
DummySGP(String systemInfoDir, String storageGroupName) throws StorageGroupProcessorException {
super(systemInfoDir, storageGroupName, new TsFileFlushPolicy.DirectFlushPolicy());
}
-
}
-}
\ No newline at end of file
+}