You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/06/23 10:02:19 UTC
[incubator-iotdb] branch hot_compaction updated: update
StorageGroupProcessor
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch hot_compaction
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/hot_compaction by this push:
new a10b404 update StorageGroupProcessor
new d08b086 Merge pull request #1410 from zhanglingzhe0820/add_vm_merge
a10b404 is described below
commit a10b404edce4c5d5d85a16b225df1e47eb7a1ecb
Author: 张凌哲 <zh...@bytedance.com>
AuthorDate: Tue Jun 23 17:57:37 2020 +0800
update StorageGroupProcessor
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 10 -
.../engine/storagegroup/StorageGroupProcessor.java | 230 ++++++++++++---------
2 files changed, 137 insertions(+), 103 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 0ead37f..cf84a6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -23,14 +23,11 @@ import static org.apache.iotdb.db.utils.MergeUtils.writeBatchPoint;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_UPGRADE;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SEPARATOR;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.VM_SUFFIX;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -41,14 +38,9 @@ import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.metadata.mnode.InternalMNode;
-import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.reader.chunk.ChunkDataIterator;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.utils.datastructure.TVList;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -57,8 +49,6 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 009acac..ae12100 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -20,7 +20,9 @@ package org.apache.iotdb.db.engine.storagegroup;
import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SEPARATOR;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.VM_SUFFIX;
import java.io.File;
import java.io.IOException;
@@ -229,22 +231,20 @@ public class StorageGroupProcessor {
private TsFileFlushPolicy fileFlushPolicy;
/**
- * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close,
- * not including the files generated by merge) of each partition.
- * As data file close is managed by the leader in the distributed version, the files with the
- * same version(s) have the same data, despite that the inner structure (the size and
- * organization of chunks) may be different, so we can easily find what remote files we do not
- * have locally.
- * partition number -> version number set
+ * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close, not
+ * including the files generated by merge) of each partition. As data file close is managed by the
+ * leader in the distributed version, the files with the same version(s) have the same data,
+ * despite that the inner structure (the size and organization of chunks) may be different, so we
+ * can easily find what remote files we do not have locally. partition number -> version number
+ * set
*/
private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
/**
- * The max file versions in each partition. By recording this, if several IoTDB instances have
- * the same policy of closing file and their ingestion is identical, then files of the same
- * version in different IoTDB instance will have identical data, providing convenience for data
- * comparison across different instances.
- * partition number -> max version number
+ * The max file versions in each partition. By recording this, if several IoTDB instances have the
+ * same policy of closing file and their ingestion is identical, then files of the same version in
+ * different IoTDB instance will have identical data, providing convenience for data comparison
+ * across different instances. partition number -> max version number
*/
private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
@@ -272,27 +272,33 @@ public class StorageGroupProcessor {
try {
// collect candidate TsFiles from sequential and unsequential data directory
Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair = getAllFiles(
- DirectoryManager.getInstance().getAllSequenceFileFolders());
+ DirectoryManager.getInstance().getAllSequenceFileFolders());
List<TsFileResource> tmpSeqTsFiles = seqTsFilesPair.left;
List<TsFileResource> oldSeqTsFiles = seqTsFilesPair.right;
upgradeSeqFileList.addAll(oldSeqTsFiles);
Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair = getAllFiles(
- DirectoryManager.getInstance().getAllUnSequenceFileFolders());
+ DirectoryManager.getInstance().getAllUnSequenceFileFolders());
List<TsFileResource> tmpUnseqTsFiles = unseqTsFilesPair.left;
List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
upgradeUnseqFileList.addAll(oldUnseqTsFiles);
+ Map<String, List<TsFileResource>> vmSeqFiles = getAllVms(
+ DirectoryManager.getInstance().getAllSequenceFileFolders());
+ Map<String, List<TsFileResource>> vmUnseqFiles = getAllVms(
+ DirectoryManager.getInstance().getAllUnSequenceFileFolders());
- recoverSeqFiles(tmpSeqTsFiles);
- recoverUnseqFiles(tmpUnseqTsFiles);
+ recoverSeqFiles(tmpSeqTsFiles, vmSeqFiles);
+ recoverUnseqFiles(tmpUnseqTsFiles, vmUnseqFiles);
for (TsFileResource resource : sequenceFileTreeSet) {
long partitionNum = resource.getTimePartition();
- partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+ partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+ .addAll(resource.getHistoricalVersions());
updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
}
for (TsFileResource resource : unSequenceFileList) {
long partitionNum = resource.getTimePartition();
- partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+ partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+ .addAll(resource.getHistoricalVersions());
updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
}
@@ -317,7 +323,6 @@ public class StorageGroupProcessor {
throw new StorageGroupProcessorException(e);
}
-
for (TsFileResource resource : sequenceFileTreeSet) {
long timePartitionId = resource.getTimePartition();
Map<String, Long> endTimeMap = new HashMap<>();
@@ -346,11 +351,11 @@ public class StorageGroupProcessor {
/**
* use old seq file to update latestTimeForEachDevice, globalLatestFlushedTimeForEachDevice,
* partitionLatestFlushedTimeForEachDevice and timePartitionIdVersionControllerMap
- *
*/
private void updateLastestFlushedTime() throws IOException {
- VersionController versionController = new SimpleFileVersionController(storageGroupSysDir.getPath());
+ VersionController versionController = new SimpleFileVersionController(
+ storageGroupSysDir.getPath());
long currentVersion = versionController.currVersion();
for (TsFileResource resource : upgradeSeqFileList) {
for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) {
@@ -359,24 +364,27 @@ public class StorageGroupProcessor {
long endTime = resource.getEndTime(index);
long endTimePartitionId = StorageEngine.getTimePartition(endTime);
latestTimeForEachDevice.computeIfAbsent(endTimePartitionId, l -> new HashMap<>())
- .put(deviceId, endTime);
+ .put(deviceId, endTime);
globalLatestFlushedTimeForEachDevice.put(deviceId, endTime);
// set all the covered partition's LatestFlushedTime to Long.MAX_VALUE
long partitionId = StorageEngine.getTimePartition(resource.getStartTime(index));
while (partitionId <= endTimePartitionId) {
partitionLatestFlushedTimeForEachDevice.computeIfAbsent(partitionId, l -> new HashMap<>())
- .put(deviceId, Long.MAX_VALUE);
+ .put(deviceId, Long.MAX_VALUE);
if (!timePartitionIdVersionControllerMap.containsKey(partitionId)) {
- File directory = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, String.valueOf(partitionId));
- if(!directory.exists()){
+ File directory = SystemFileFactory.INSTANCE
+ .getFile(storageGroupSysDir, String.valueOf(partitionId));
+ if (!directory.exists()) {
directory.mkdirs();
}
- File versionFile = SystemFileFactory.INSTANCE.getFile(directory, SimpleFileVersionController.FILE_PREFIX + currentVersion);
+ File versionFile = SystemFileFactory.INSTANCE
+ .getFile(directory, SimpleFileVersionController.FILE_PREFIX + currentVersion);
if (!versionFile.createNewFile()) {
logger.warn("Version file {} has already been created ", versionFile);
}
- timePartitionIdVersionControllerMap.put(partitionId, new SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId));
+ timePartitionIdVersionControllerMap.put(partitionId,
+ new SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId));
}
partitionId++;
}
@@ -402,7 +410,8 @@ public class StorageGroupProcessor {
});
}
- private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders) throws IOException {
+ private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders)
+ throws IOException {
List<File> tsFiles = new ArrayList<>();
List<File> upgradeFiles = new ArrayList<>();
for (String baseDir : folders) {
@@ -420,9 +429,12 @@ public class StorageGroupProcessor {
// the process was interrupted before the merged files could be named
continueFailedRenames(fileFolder, MERGE_SUFFIX);
- File[] oldTsfileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
- File[] oldResourceFileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX);
- File[] oldModificationFileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), ModificationFile.FILE_SUFFIX);
+ File[] oldTsfileArray = fsFactory
+ .listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
+ File[] oldResourceFileArray = fsFactory
+ .listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX);
+ File[] oldModificationFileArray = fsFactory
+ .listFilesBySuffix(fileFolder.getAbsolutePath(), ModificationFile.FILE_SUFFIX);
File upgradeFolder = fsFactory.getFile(fileFolder, IoTDBConstant.UPGRADE_FOLDER_NAME);
// move the old files to upgrade folder if exists
if (oldTsfileArray.length != 0 || oldResourceFileArray.length != 0) {
@@ -498,6 +510,34 @@ public class StorageGroupProcessor {
return new Pair<>(ret, upgradeRet);
}
+ private Map<String, List<TsFileResource>> getAllVms(List<String> folders) throws IOException {
+ List<File> vmFiles = new ArrayList<>();
+ for (String baseDir : folders) {
+ File fileFolder = fsFactory.getFile(baseDir, storageGroupName);
+ if (!fileFolder.exists()) {
+ continue;
+ }
+ Collections
+ .addAll(vmFiles, fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), VM_SUFFIX));
+ }
+
+ Map<String, List<TsFileResource>> vmTsFileResourceMap = new HashMap<>();
+ for (File f : vmFiles) {
+ TsFileResource fileResource = new TsFileResource(f);
+ fileResource.setClosed(false);
+ // make sure the flush command is called before IoTDB is down.
+ fileResource.deserialize();
+ String tsfilePrefix = f.getName().split(TSFILE_SEPARATOR)[0];
+ List<TsFileResource> vmTsFileResource = new ArrayList<>();
+ if (vmTsFileResourceMap.containsKey(tsfilePrefix)) {
+ vmTsFileResource = vmTsFileResourceMap.get(tsfilePrefix);
+ }
+ vmTsFileResource.add(fileResource);
+ vmTsFileResourceMap.put(tsfilePrefix, vmTsFileResource);
+ }
+ return vmTsFileResourceMap;
+ }
+
private void continueFailedRenames(File fileFolder, String suffix) {
File[] files = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), suffix);
if (files != null) {
@@ -512,7 +552,8 @@ public class StorageGroupProcessor {
}
}
- private void recoverSeqFiles(List<TsFileResource> tsFiles) {
+ private void recoverSeqFiles(List<TsFileResource> tsFiles,
+ Map<String, List<TsFileResource>> vmFiles) {
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
long timePartitionId = tsFileResource.getTimePartition();
@@ -533,7 +574,9 @@ public class StorageGroupProcessor {
tsFileResource.setClosed(true);
} else if (writer.canWrite()) {
// the last file is not closed, continue writing to in
+ String tsfilePrefix = tsFileResource.getFile().getName().split(TSFILE_SEPARATOR)[0];
TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
+ vmFiles.get(tsfilePrefix),
getVersionControllerByTimePartitionId(timePartitionId),
this::closeUnsealedTsFileProcessorCallBack,
this::updateLatestFlushTimeCallback, true, writer);
@@ -548,7 +591,8 @@ public class StorageGroupProcessor {
}
}
- private void recoverUnseqFiles(List<TsFileResource> tsFiles) {
+ private void recoverUnseqFiles(List<TsFileResource> tsFiles,
+ Map<String, List<TsFileResource>> vmFiles) {
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
long timePartitionId = tsFileResource.getTimePartition();
@@ -568,7 +612,9 @@ public class StorageGroupProcessor {
tsFileResource.setClosed(true);
} else if (writer.canWrite()) {
// the last file is not closed, continue writing to in
+ String tsfilePrefix = tsFileResource.getFile().getName().split(TSFILE_SEPARATOR)[0];
TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
+ vmFiles.get(tsfilePrefix),
getVersionControllerByTimePartitionId(timePartitionId),
this::closeUnsealedTsFileProcessorCallBack,
this::unsequenceFlushCallback, false, writer);
@@ -611,7 +657,8 @@ public class StorageGroupProcessor {
long timePartitionId = StorageEngine.getTimePartition(insertPlan.getTime());
latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
- partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>());
// insert to sequence or unSequence file
insertToTsFileProcessor(insertPlan,
@@ -625,7 +672,7 @@ public class StorageGroupProcessor {
/**
* Insert a tablet (rows belonging to the same devices) into this storage group.
- * @param insertTabletPlan
+ *
* @throws WriteProcessException when update last cache failed
* @throws BatchInsertionException if some of the rows failed to be inserted
*/
@@ -660,7 +707,8 @@ public class StorageGroupProcessor {
// before is first start point
int before = loc;
// before time partition
- long beforeTimePartition = StorageEngine.getTimePartition(insertTabletPlan.getTimes()[before]);
+ long beforeTimePartition = StorageEngine
+ .getTimePartition(insertTabletPlan.getTimes()[before]);
// init map
long lastFlushTime = partitionLatestFlushedTimeForEachDevice.
computeIfAbsent(beforeTimePartition, id -> new HashMap<>()).
@@ -723,8 +771,8 @@ public class StorageGroupProcessor {
}
/**
- * insert batch to tsfile processor thread-safety that the caller need to guarantee
- * The rows to be inserted are in the range [start, end)
+ * insert batch to tsfile processor thread-safety that the caller need to guarantee The rows to be
+ * inserted are in the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
* @param sequence whether is sequence
@@ -948,12 +996,12 @@ public class StorageGroupProcessor {
VersionController versionController = getVersionControllerByTimePartitionId(timePartitionId);
if (sequence) {
tsFileProcessor = new TsFileProcessor(storageGroupName,
- fsFactory.getFileWithParent(filePath),
+ fsFactory.getFileWithParent(filePath), new ArrayList<>(),
versionController, this::closeUnsealedTsFileProcessorCallBack,
this::updateLatestFlushTimeCallback, true);
} else {
tsFileProcessor = new TsFileProcessor(storageGroupName,
- fsFactory.getFileWithParent(filePath),
+ fsFactory.getFileWithParent(filePath), new ArrayList<>(),
versionController, this::closeUnsealedTsFileProcessorCallBack,
this::unsequenceFlushCallback, false);
}
@@ -1321,7 +1369,8 @@ public class StorageGroupProcessor {
int deviceIndex = tsFileResource.getDeviceToIndexMap().get(deviceId);
long startTime = tsFileResource.getStartTime(deviceIndex);
- long endTime = tsFileResource.isClosed() || !isSeq ? tsFileResource.getEndTime(deviceIndex) : Long.MAX_VALUE;
+ long endTime = tsFileResource.isClosed() || !isSeq ? tsFileResource.getEndTime(deviceIndex)
+ : Long.MAX_VALUE;
if (!isAlive(endTime)) {
return false;
@@ -1395,7 +1444,8 @@ public class StorageGroupProcessor {
}
}
- private void logDeletion(long timestamp, String deviceId, String measurementId, long timePartitionId)
+ private void logDeletion(long timestamp, String deviceId, String measurementId,
+ long timePartitionId)
throws IOException {
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId));
@@ -1477,7 +1527,8 @@ public class StorageGroupProcessor {
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
.put(entry.getKey(), entry.getValue());
- updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(), entry.getKey(), entry.getValue());
+ updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
+ entry.getKey(), entry.getValue());
if (globalLatestFlushedTimeForEachDevice
.getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
@@ -1490,10 +1541,11 @@ public class StorageGroupProcessor {
/**
* used for upgrading
*/
- public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long partitionId, String deviceId, long time) {
+ public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long partitionId,
+ String deviceId, long time) {
newlyFlushedPartitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(partitionId, id -> new HashMap<>())
- .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
+ .computeIfAbsent(partitionId, id -> new HashMap<>())
+ .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
}
/**
@@ -1547,8 +1599,8 @@ public class StorageGroupProcessor {
for (TsFileResource resource : upgradedResources) {
long partitionId = resource.getTimePartition();
resource.getDeviceToIndexMap().forEach((device, index) ->
- updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device,
- resource.getEndTime(index))
+ updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device,
+ resource.getEndTime(index))
);
}
insertLock.writeLock().lock();
@@ -1747,7 +1799,6 @@ public class StorageGroupProcessor {
/**
* acquire the write locks of the resource and the merge lock
- * @param seqFile
*/
private void doubleWriteLock(TsFileResource seqFile) {
boolean fileLockGot;
@@ -1763,7 +1814,7 @@ public class StorageGroupProcessor {
if (fileLockGot) {
seqFile.writeUnlock();
}
- if(mergeLockGot) {
+ if (mergeLockGot) {
mergeLock.writeLock().unlock();
}
}
@@ -1772,7 +1823,6 @@ public class StorageGroupProcessor {
/**
* release the write locks of the resource and the merge lock
- * @param seqFile
*/
private void doubleWriteUnlock(TsFileResource seqFile) {
mergeLock.writeLock().unlock();
@@ -1798,7 +1848,7 @@ public class StorageGroupProcessor {
mergeLock.writeLock().lock();
try {
if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
- newFilePartitionId)){
+ newFilePartitionId)) {
updateLatestTimeMap(newTsFileResource);
}
} catch (DiskSpaceInsufficientException e) {
@@ -1865,7 +1915,8 @@ public class StorageGroupProcessor {
long partitionNum = newTsFileResource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
.addAll(newTsFileResource.getHistoricalVersions());
- updatePartitionFileVersion(partitionNum, Collections.max(newTsFileResource.getHistoricalVersions()));
+ updatePartitionFileVersion(partitionNum,
+ Collections.max(newTsFileResource.getHistoricalVersions()));
} catch (DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
@@ -1880,11 +1931,10 @@ public class StorageGroupProcessor {
/**
* Set the version in "partition" to "version" if "version" is larger than the current version.
- * @param partition
- * @param version
*/
public void setPartitionFileVersionToMax(long partition, long version) {
- partitionMaxFileVersions.compute(partition, (prt, oldVer) -> computeMaxVersion(oldVer, version));
+ partitionMaxFileVersions
+ .compute(partition, (prt, oldVer) -> computeMaxVersion(oldVer, version));
}
private long computeMaxVersion(Long oldVersion, Long newVersion) {
@@ -1895,12 +1945,12 @@ public class StorageGroupProcessor {
}
/**
- * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
- * @param newTsFileResource
- * @param newFilePartitionId
- * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one to be inserted
- * POS_OVERLAP(-3) if some file overlaps the new file
- * an insertion position i >= -1 if the new file can be inserted between [i, i+1]
+ * Find the position of "newTsFileResource" in the sequence files if it can be inserted into
+ * them.
+ *
+ * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted
+ * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new
+ * file can be inserted between [i, i+1]
*/
private int findInsertionPosition(TsFileResource newTsFileResource, long newFilePartitionId,
List<TsFileResource> sequenceList) {
@@ -1941,11 +1991,9 @@ public class StorageGroupProcessor {
/**
* Compare each device in the two files to find the time relation of them.
- * @param fileA
- * @param fileB
- * @return -1 if fileA is totally older than fileB (A < B)
- * 0 if fileA is partially older than fileB and partially newer than fileB (A X B)
- * 1 if fileA is totally newer than fileB (B < A)
+ *
+ * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than
+ * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A)
*/
private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
boolean hasPre = false, hasSubsequence = false;
@@ -1981,11 +2029,9 @@ public class StorageGroupProcessor {
}
/**
- * If the historical versions of a file is a sub-set of the given file's, (close and) remove it to reduce
- * unnecessary merge. Only used when the file sender and the receiver share the same file
- * close policy.
- * Warning: DO NOT REMOVE
- * @param resource
+ * If the historical versions of a file is a sub-set of the given file's, (close and) remove it to
+ * reduce unnecessary merge. Only used when the file sender and the receiver share the same file
+ * close policy. Warning: DO NOT REMOVE
*/
@SuppressWarnings("unused")
public void removeFullyOverlapFiles(TsFileResource resource) {
@@ -2026,13 +2072,10 @@ public class StorageGroupProcessor {
/**
* remove the given tsFileResource. If the corresponding tsFileProcessor is in the working status,
- * close it before remove the related resource files.
- * maybe time-consuming for closing a tsfile.
- * @param tsFileResource
- * @param iterator
- * @param isSeq
+ * close it before remove the related resource files. maybe time-consuming for closing a tsfile.
*/
- private void removeFullyOverlapFile(TsFileResource tsFileResource, Iterator<TsFileResource> iterator
+ private void removeFullyOverlapFile(TsFileResource tsFileResource,
+ Iterator<TsFileResource> iterator
, boolean isSeq) {
if (!tsFileResource.isClosed()) {
// also remove the TsFileProcessor if the overlapped file is not closed
@@ -2074,8 +2117,8 @@ public class StorageGroupProcessor {
* version number is the version number in the tsfile with a larger timestamp.
*
* @param tsfileName origin tsfile name
- * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex
- * + 1]
+ * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
+ * 1]
* @return appropriate filename
*/
private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2142,9 +2185,9 @@ public class StorageGroupProcessor {
* @param type load type
* @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
- * @UsedBy sync module, load external tsfile module.
* @return load the file successfully
* @UsedBy sync module, load external tsfile module.
+ * @UsedBy sync module, load external tsfile module.
*/
private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
TsFileResource tsFileResource, long filePartitionId)
@@ -2153,7 +2196,8 @@ public class StorageGroupProcessor {
switch (type) {
case LOAD_UNSEQUENCE:
targetFile = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
- storageGroupName + File.separatorChar + filePartitionId + File.separator + tsFileResource
+ storageGroupName + File.separatorChar + filePartitionId + File.separator
+ + tsFileResource
.getFile().getName());
tsFileResource.setFile(targetFile);
if (unSequenceFileList.contains(tsFileResource)) {
@@ -2213,7 +2257,8 @@ public class StorageGroupProcessor {
}
partitionDirectFileVersions.computeIfAbsent(filePartitionId,
p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions());
- updatePartitionFileVersion(filePartitionId, Collections.max(tsFileResource.getHistoricalVersions()));
+ updatePartitionFileVersion(filePartitionId,
+ Collections.max(tsFileResource.getHistoricalVersions()));
return true;
}
@@ -2354,6 +2399,7 @@ public class StorageGroupProcessor {
@FunctionalInterface
public interface CloseTsFileCallBack {
+
void call(TsFileProcessor caller) throws TsFileProcessorException, IOException;
}
@@ -2362,17 +2408,15 @@ public class StorageGroupProcessor {
}
/**
- * Check if the data of "tsFileResource" all exist locally by comparing the historical versions
- * in the partition of "partitionNumber". This is available only when the IoTDB instances which generated
- * "tsFileResource" have the same close file policy as the local one.
- * If one of the version in "tsFileResource" equals to a version of a working file, false is
- * returned because "tsFileResource" may have unwritten data of that file.
- * @param tsFileResource
- * @param partitionNum
+ * Check if the data of "tsFileResource" all exist locally by comparing the historical versions in
+ * the partition of "partitionNumber". This is available only when the IoTDB instances which
+ * generated "tsFileResource" have the same close file policy as the local one. If one of the
+ * version in "tsFileResource" equals to a version of a working file, false is returned because
+ * "tsFileResource" may have unwritten data of that file.
+ *
* @return true if the historicalVersions of "tsFileResource" is a subset of
- * partitionDirectFileVersions, or false if it is not a subset and it contains any
- * version of a working file
- * USED by cluster module
+ * partitionDirectFileVersions, or false if it is not a subset and it contains any version of a
+ * working file USED by cluster module
*/
public boolean isFileAlreadyExist(TsFileResource tsFileResource, long partitionNum) {
// consider the case: The local node crashes when it is writing TsFile no.5.