You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/03/23 10:20:37 UTC
[incubator-iotdb] branch partitioned_file_version_management
updated: code refinements
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch partitioned_file_version_management
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/partitioned_file_version_management by this push:
new 790d2a7 code refinements
790d2a7 is described below
commit 790d2a7a5330e31594f0e12ae0b7abed8b398568
Author: jt2594838 <jt...@163.com>
AuthorDate: Mon Mar 23 18:20:23 2020 +0800
code refinements
---
.../engine/storagegroup/StorageGroupProcessor.java | 162 ++++++++++++++-------
1 file changed, 108 insertions(+), 54 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index c490723..a5f627f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -126,6 +126,15 @@ public class StorageGroupProcessor {
private static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods";
private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class);
private static final int MAX_CACHE_SENSORS = 5000;
+
+ /**
+ * indicating the file to be loaded already exists locally.
+ */
+ private static final int POS_ALREADY_EXIST = -2;
+ /**
+ * indicating the file to be loaded overlap with some files.
+ */
+ private static final int POS_OVERLAP = -3;
/**
* a read write lock for guaranteeing concurrent safety when accessing all fields in this class
* (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor,
@@ -1592,65 +1601,24 @@ public class StorageGroupProcessor {
public void loadNewTsFile(TsFileResource newTsFileResource)
throws TsFileProcessorException {
File tsfileToBeInserted = newTsFileResource.getFile();
- long newFilePartitionId = Long.parseLong(tsfileToBeInserted.getParent());
+
writeLock();
mergeLock.writeLock().lock();
try {
- boolean isOverlap = false;
- int preIndex = -1, subsequentIndex = sequenceFileTreeSet.size();
-
- List<TsFileResource> sequenceList = new ArrayList<>(sequenceFileTreeSet);
- // check new tsfile
- outer:
- for (int i = 0; i < sequenceList.size(); i++) {
- TsFileResource localFile = sequenceList.get(i);
- if (localFile.getFile().getName().equals(tsfileToBeInserted.getName())) {
- return;
- }
- long localPartitionId = Long.parseLong(localFile.getFile().getParent());
- if (i == sequenceList.size() - 1 && localFile.getEndTimeMap().isEmpty()
- || newFilePartitionId != localPartitionId) {
- continue;
- }
- boolean hasPre = false, hasSubsequence = false;
- for (String device : newTsFileResource.getStartTimeMap().keySet()) {
- if (localFile.getStartTimeMap().containsKey(device)) {
- long startTime1 = localFile.getStartTimeMap().get(device);
- long endTime1 = localFile.getEndTimeMap().get(device);
- long startTime2 = newTsFileResource.getStartTimeMap().get(device);
- long endTime2 = newTsFileResource.getEndTimeMap().get(device);
- if (startTime1 > endTime2) {
- hasSubsequence = true;
- } else if (startTime2 > endTime1) {
- hasPre = true;
- } else {
- isOverlap = true;
- break outer;
- }
- }
- }
- if (hasPre && hasSubsequence) {
- isOverlap = true;
- break;
- }
- if (!hasPre && hasSubsequence) {
- subsequentIndex = i;
- break;
- }
- if (hasPre) {
- preIndex = i;
- }
+ int insertPos = findInsertionPosition(newTsFileResource);
+ if (insertPos == POS_ALREADY_EXIST) {
+ return;
}
// loading tsfile by type
- if (isOverlap) {
+ if (insertPos == POS_OVERLAP) {
loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted, newTsFileResource);
} else {
// check whether the file name needs to be renamed.
- if (subsequentIndex != sequenceFileTreeSet.size() || preIndex != -1) {
- String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), preIndex,
- subsequentIndex, getTimePartitionFromTsFileResource(newTsFileResource));
+ if (insertPos != sequenceFileTreeSet.size() - 1 || insertPos != -1) {
+ String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), insertPos,
+ getTimePartitionFromTsFileResource(newTsFileResource));
if (!newFileName.equals(tsfileToBeInserted.getName())) {
logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
tsfileToBeInserted.getName(), newFileName);
@@ -1679,6 +1647,90 @@ public class StorageGroupProcessor {
}
/**
+ * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
+ * @param newTsFileResource
+ * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one to be inserted
+ * POS_OVERLAP(-3) if some file overlaps the new file
+ * an insertion position i >= -1 if the new file can be inserted between [i, i+1]
+ */
+ private int findInsertionPosition(TsFileResource newTsFileResource) {
+ File tsfileToBeInserted = newTsFileResource.getFile();
+ long newFilePartitionId = Long.parseLong(tsfileToBeInserted.getParent());
+ int insertPos = -1;
+
+ List<TsFileResource> sequenceList = new ArrayList<>(sequenceFileTreeSet);
+ // find the position where the new file should be inserted
+ for (int i = 0; i < sequenceList.size(); i++) {
+ TsFileResource localFile = sequenceList.get(i);
+ if (localFile.getFile().getName().equals(tsfileToBeInserted.getName())) {
+ return POS_ALREADY_EXIST;
+ }
+ long localPartitionId = Long.parseLong(localFile.getFile().getParent());
+ if (i == sequenceList.size() - 1 && localFile.getEndTimeMap().isEmpty()
+ || newFilePartitionId != localPartitionId) {
+ // skip files that are not in the partition as the new file and the last empty file
+ continue;
+ }
+
+ int fileComparison = compareTsFileDevices(newTsFileResource, localFile);
+ switch (fileComparison) {
+ case 0:
+ // some devices are newer but some devices are older, the two files overlap in general
+ return POS_OVERLAP;
+ case -1:
+ // all devices in the local file are newer than the new file, the new file can be
+ // inserted before the new file
+ return i - 1;
+ default:
+ // all devices in the local file are older than the new file, proceed to the next file
+ insertPos = i;
+ }
+ }
+ return insertPos;
+ }
+
+ /**
+ * Compare each device in the two files to find the time relation of them.
+ * @param fileA
+ * @param fileB
+ * @return -1 if fileA is totally older than fileB (A < B)
+ * 0 if fileA is partially older than fileB and partially newer than fileB (A X B)
+ * 1 if fileA is totally newer than fileB (B < A)
+ */
+ private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
+ boolean hasPre = false, hasSubsequence = false;
+ for (String device : fileA.getStartTimeMap().keySet()) {
+ if (!fileB.getStartTimeMap().containsKey(device)) {
+ continue;
+ }
+ long startTimeA = fileA.getStartTimeMap().get(device);
+ long endTimeA = fileA.getEndTimeMap().get(device);
+ long startTimeB = fileB.getStartTimeMap().get(device);
+ long endTimeB = fileB.getEndTimeMap().get(device);
+ if (startTimeA > endTimeB) {
+ // A's data of the device is later than to the B's data
+ hasSubsequence = true;
+ } else if (startTimeB > endTimeA) {
+ // A's data of the device is previous to the B's data
+ hasPre = true;
+ } else {
+ // the two files overlap in the device
+ return 0;
+ }
+ }
+ if (hasPre && hasSubsequence) {
+ // some devices are newer but some devices are older, the two files overlap in general
+ return 0;
+ }
+ if (!hasPre && hasSubsequence) {
+ // all devices in B are newer than those in A
+ return -1;
+ }
+ // all devices in B are older than those in A
+ return 1;
+ }
+
+ /**
* If the historical versions of a file is a sub-set of the given file's, remove it to reduce
* unnecessary merge. Only used when the file sender and the receiver share the same file
* close policy.
@@ -1744,24 +1796,26 @@ public class StorageGroupProcessor {
* version number is the version number in the tsfile with a larger timestamp.
*
* @param tsfileName origin tsfile name
+ * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex
+ * + 1]
* @return appropriate filename
*/
- private String getFileNameForLoadingFile(String tsfileName, int preIndex, int subsequentIndex,
+ private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
long timePartitionId) {
long currentTsFileTime = Long
.parseLong(tsfileName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
long preTime;
List<TsFileResource> sequenceList = new ArrayList<>(sequenceFileTreeSet);
- if (preIndex == -1) {
+ if (insertIndex == -1) {
preTime = 0L;
} else {
- String preName = sequenceList.get(preIndex).getFile().getName();
+ String preName = sequenceList.get(insertIndex).getFile().getName();
preTime = Long.parseLong(preName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
}
- if (subsequentIndex == sequenceFileTreeSet.size()) {
+ if (insertIndex == sequenceFileTreeSet.size() - 1) {
return preTime < currentTsFileTime ? tsfileName : getNewTsFileName(timePartitionId);
} else {
- String subsequenceName = sequenceList.get(subsequentIndex).getFile().getName();
+ String subsequenceName = sequenceList.get(insertIndex + 1).getFile().getName();
long subsequenceTime = Long
.parseLong(subsequenceName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
long subsequenceVersion = Long