You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/14 06:42:03 UTC
[iotdb] branch rel/1.1 updated: [IOTDB-5869] Load strategy: load all files to unsequence dir (#9837)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 3a7023eda65 [IOTDB-5869] Load strategy: load all files to unsequence dir (#9837)
3a7023eda65 is described below
commit 3a7023eda65c0643794043f7dcda3380576060b9
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Sun May 14 14:41:57 2023 +0800
[IOTDB-5869] Load strategy: load all files to unsequence dir (#9837)
---
.../org/apache/iotdb/it/utils/TsFileGenerator.java | 42 ++++
.../iotdb/db/engine/storagegroup/DataRegion.java | 268 +++------------------
2 files changed, 75 insertions(+), 235 deletions(-)
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java b/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java
index c790e0b92fc..3d399862c23 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java
@@ -129,6 +129,48 @@ public class TsFileGenerator implements AutoCloseable {
logger.info(String.format("Write %d points into device %s", number, device));
}
+ public void generateData(
+ String device, int number, long timeGap, boolean isAligned, long startTimestamp)
+ throws IOException, WriteProcessException {
+ List<MeasurementSchema> schemas = device2MeasurementSchema.get(device);
+ TreeSet<Long> timeSet = device2TimeSet.get(device);
+ Tablet tablet = new Tablet(device, schemas);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ long sensorNum = schemas.size();
+ long startTime = startTimestamp;
+
+ for (long r = 0; r < number; r++) {
+ int row = tablet.rowSize++;
+ startTime += timeGap;
+ timestamps[row] = startTime;
+ timeSet.add(startTime);
+ for (int i = 0; i < sensorNum; i++) {
+ generateDataPoint(values[i], row, schemas.get(i));
+ }
+ // write
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ if (!isAligned) {
+ writer.write(tablet);
+ } else {
+ writer.writeAligned(tablet);
+ }
+ tablet.reset();
+ }
+ }
+ // write
+ if (tablet.rowSize != 0) {
+ if (!isAligned) {
+ writer.write(tablet);
+ } else {
+ writer.writeAligned(tablet);
+ }
+ tablet.reset();
+ }
+
+ logger.info(String.format("Write %d points into device %s", number, device));
+ }
+
private void generateDataPoint(Object obj, int row, MeasurementSchema schema) {
switch (schema.getType()) {
case INT32:
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index e3023a131ad..de3612fd5cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -2305,16 +2305,9 @@ public class DataRegion implements IDataRegionForQuery {
}
/**
- * Load a new tsfile to database processor. Tne file may have overlap with other files.
+ * Load a new tsfile to unsequence dir.
*
- * <p>that there has no file which is overlapping with the new file.
- *
- * <p>Firstly, determine the loading type of the file, whether it needs to be loaded in sequence
- * list or unsequence list.
- *
- * <p>Secondly, execute the loading process by the type.
- *
- * <p>Finally, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
+ * <p>Then, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
*
* @param newTsFileResource tsfile resource @UsedBy load external tsfile module
* @param deleteOriginFile whether to delete origin tsfile
@@ -2325,43 +2318,31 @@ public class DataRegion implements IDataRegionForQuery {
long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
writeLock("loadNewTsFile");
try {
- List<TsFileResource> sequenceList =
- tsFileManager.getOrCreateSequenceListByTimePartition(newFilePartitionId);
-
- int insertPos = findInsertionPosition(newTsFileResource, sequenceList);
- LoadTsFileType tsFileType = getLoadingTsFileType(insertPos, sequenceList);
- String renameInfo =
- (tsFileType == LoadTsFileType.LOAD_SEQUENCE)
- ? IoTDBConstant.SEQUENCE_FLODER_NAME
- : IoTDBConstant.UNSEQUENCE_FLODER_NAME;
- newTsFileResource.setSeq(tsFileType == LoadTsFileType.LOAD_SEQUENCE);
+ newTsFileResource.setSeq(false);
String newFileName =
- getLoadingTsFileName(tsFileType, insertPos, newTsFileResource, sequenceList);
+ getNewTsFileName(
+ System.currentTimeMillis(),
+ getAndSetNewVersion(newFilePartitionId, newTsFileResource),
+ 0,
+ 0);
if (!newFileName.equals(tsfileToBeInserted.getName())) {
logger.info(
- "TsFile {} must be renamed to {} for loading into the " + renameInfo + " list.",
+ "TsFile {} must be renamed to {} for loading into the unsequence list.",
tsfileToBeInserted.getName(),
newFileName);
newTsFileResource.setFile(
fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
}
- loadTsFileByType(
- tsFileType,
- tsfileToBeInserted,
- newTsFileResource,
- newFilePartitionId,
- insertPos,
- deleteOriginFile);
- TsFileMetricManager.getInstance()
- .addFile(
- newTsFileResource.getTsFile().length(), tsFileType == LoadTsFileType.LOAD_SEQUENCE);
+ loadTsFileToUnSequence(
+ tsfileToBeInserted, newTsFileResource, newFilePartitionId, deleteOriginFile);
+ TsFileMetricManager.getInstance().addFile(newTsFileResource.getTsFile().length(), false);
resetLastCacheWhenLoadingTsFile(); // update last cache
updateLastFlushTime(newTsFileResource); // update last flush time
long partitionNum = newTsFileResource.getTimePartition();
updatePartitionFileVersion(partitionNum, newTsFileResource.getVersion());
- logger.info("TsFile {} is successfully loaded in {} list.", newFileName, renameInfo);
+ logger.info("TsFile {} is successfully loaded in unsequence list.", newFileName);
} catch (DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to database processor {} because the disk space is insufficient.",
@@ -2392,104 +2373,6 @@ public class DataRegion implements IDataRegionForQuery {
return Math.max(oldVersion, newVersion);
}
- private Long getTsFileResourceEstablishTime(TsFileResource tsFileResource) {
- String tsFileName = tsFileResource.getTsFile().getName();
- return Long.parseLong(tsFileName.split(FILE_NAME_SEPARATOR)[0]);
- }
-
- private LoadTsFileType getLoadingTsFileType(int insertPos, List<TsFileResource> sequenceList) {
- if (insertPos == POS_OVERLAP) {
- return LoadTsFileType.LOAD_UNSEQUENCE;
- }
- if (insertPos == sequenceList.size() - 1) {
- return LoadTsFileType.LOAD_SEQUENCE;
- }
- long preTime =
- (insertPos == -1) ? 0 : getTsFileResourceEstablishTime(sequenceList.get(insertPos));
- long subsequenceTime = getTsFileResourceEstablishTime(sequenceList.get(insertPos + 1));
- return preTime == subsequenceTime
- ? LoadTsFileType.LOAD_UNSEQUENCE
- : LoadTsFileType.LOAD_SEQUENCE;
- }
-
- /**
- * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
- *
- * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted
- * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new
- * file can be inserted between [i, i+1]
- */
- private int findInsertionPosition(
- TsFileResource newTsFileResource, List<TsFileResource> sequenceList) {
-
- int insertPos = -1;
-
- // find the position where the new file should be inserted
- for (int i = 0; i < sequenceList.size(); i++) {
- TsFileResource localFile = sequenceList.get(i);
-
- if (!localFile.isClosed() && localFile.getProcessor() != null) {
- // we cannot compare two files by TsFileResource unless they are both closed
- syncCloseOneTsFileProcessor(true, localFile.getProcessor());
- }
- int fileComparison = compareTsFileDevices(newTsFileResource, localFile);
- switch (fileComparison) {
- case 0:
- // some devices are newer but some devices are older, the two files overlap in general
- return POS_OVERLAP;
- case -1:
- // all devices in localFile are newer than the new file, the new file can be
- // inserted before localFile
- return i - 1;
- default:
- // all devices in the local file are older than the new file, proceed to the next file
- insertPos = i;
- }
- }
- return insertPos;
- }
-
- /**
- * Compare each device in the two files to find the time relation of them.
- *
- * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than
- * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A)
- */
- private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
- boolean hasPre = false, hasSubsequence = false;
- Set<String> fileADevices = fileA.getDevices();
- Set<String> fileBDevices = fileB.getDevices();
- for (String device : fileADevices) {
- if (!fileBDevices.contains(device)) {
- continue;
- }
- long startTimeA = fileA.getStartTime(device);
- long endTimeA = fileA.getEndTime(device);
- long startTimeB = fileB.getStartTime(device);
- long endTimeB = fileB.getEndTime(device);
- if (startTimeA > endTimeB) {
- // A's data of the device is later than to the B's data
- hasPre = true;
- } else if (startTimeB > endTimeA) {
- // A's data of the device is previous to the B's data
- hasSubsequence = true;
- } else {
- // the two files overlap in the device
- return 0;
- }
- }
- if (hasPre && hasSubsequence) {
- // some devices are newer but some devices are older, the two files overlap in general
- return 0;
- }
- if (!hasPre && hasSubsequence) {
- // all devices in B are newer than those in A
- return -1;
- }
- // all devices in B are older than those in A
- return 1;
- }
-
/**
* If the historical versions of a file is a sub-set of the given file's, (close and) remove it to
* reduce unnecessary merge. Only used when the file sender and the receiver share the same file
@@ -2569,50 +2452,6 @@ public class DataRegion implements IDataRegionForQuery {
tsFileResource.remove();
}
- /**
- * Get an appropriate filename to ensure the order between files. The tsfile is named after
- * ({systemTime}-{versionNum}-{in_space_compaction_num}-{cross_space_compaction_num}.tsfile).
- *
- * <p>The sorting rules for tsfile names @see {@link this#compareFileName}, we can restore the
- * list based on the file name and ensure the correctness of the order, so there are three cases.
- *
- * <p>1. The tsfile is to be inserted in the first place of the list. Timestamp can be set to half
- * of the timestamp value in the file name of the first tsfile in the list , and the version
- * number will be updated to the largest number in this time partition.
- *
- * <p>2. The tsfile is to be inserted in the last place of the list. The file name is generated by
- * the system according to the naming rules and returned.
- *
- * <p>3. This file is inserted between two files. The time stamp is the mean of the timestamps of
- * the two files, the version number will be updated to the largest number in this time partition.
- *
- * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
- * 1]
- * @return appropriate filename
- */
- private String getLoadingTsFileName(
- LoadTsFileType tsFileType,
- int insertIndex,
- TsFileResource newTsFileResource,
- List<TsFileResource> sequenceList) {
- long timePartitionId = newTsFileResource.getTimePartition();
- if (tsFileType == LoadTsFileType.LOAD_UNSEQUENCE || insertIndex == sequenceList.size() - 1) {
- return getNewTsFileName(
- System.currentTimeMillis(),
- getAndSetNewVersion(timePartitionId, newTsFileResource),
- 0,
- 0);
- }
-
- long preTime =
- (insertIndex == -1) ? 0 : getTsFileResourceEstablishTime(sequenceList.get(insertIndex));
- long subsequenceTime = getTsFileResourceEstablishTime(sequenceList.get(insertIndex + 1));
- long meanTime = preTime + ((subsequenceTime - preTime) >> 1);
-
- return getNewTsFileName(
- meanTime, getAndSetNewVersion(timePartitionId, newTsFileResource), 0, 0);
- }
-
private long getAndSetNewVersion(long timePartitionId, TsFileResource tsFileResource) {
long version = partitionMaxFileVersions.getOrDefault(timePartitionId, 0L) + 1;
partitionMaxFileVersions.put(timePartitionId, version);
@@ -2636,74 +2475,38 @@ public class DataRegion implements IDataRegionForQuery {
/**
* Execute the loading process by the type.
*
- * @param type load type
* @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
* @param deleteOriginFile whether to delete the original file
* @return load the file successfully @UsedBy sync module, load external tsfile module.
*/
- private boolean loadTsFileByType(
- LoadTsFileType type,
+ private boolean loadTsFileToUnSequence(
File tsFileToLoad,
TsFileResource tsFileResource,
long filePartitionId,
- int insertPos,
boolean deleteOriginFile)
throws LoadFileException, DiskSpaceInsufficientException {
File targetFile;
- switch (type) {
- case LOAD_UNSEQUENCE:
- targetFile =
- fsFactory.getFile(
- DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
- databaseName
- + File.separatorChar
- + dataRegionId
- + File.separatorChar
- + filePartitionId
- + File.separator
- + tsFileResource.getTsFile().getName());
- tsFileResource.setFile(targetFile);
- if (tsFileManager.contains(tsFileResource, false)) {
- logger.error("The file {} has already been loaded in unsequence list", tsFileResource);
- return false;
- }
- tsFileManager.add(tsFileResource, false);
- logger.info(
- "Load tsfile in unsequence list, move file from {} to {}",
- tsFileToLoad.getAbsolutePath(),
- targetFile.getAbsolutePath());
- break;
- case LOAD_SEQUENCE:
- targetFile =
- fsFactory.getFile(
- DirectoryManager.getInstance().getNextFolderForSequenceFile(),
- databaseName
- + File.separatorChar
- + dataRegionId
- + File.separatorChar
- + filePartitionId
- + File.separator
- + tsFileResource.getTsFile().getName());
- tsFileResource.setFile(targetFile);
- if (tsFileManager.contains(tsFileResource, true)) {
- logger.error("The file {} has already been loaded in sequence list", tsFileResource);
- return false;
- }
- if (insertPos == -1) {
- tsFileManager.insertToPartitionFileList(tsFileResource, filePartitionId, true, 0);
- } else {
- tsFileManager.insertToPartitionFileList(
- tsFileResource, filePartitionId, true, insertPos + 1);
- }
- logger.info(
- "Load tsfile in sequence list, move file from {} to {}",
- tsFileToLoad.getAbsolutePath(),
- targetFile.getAbsolutePath());
- break;
- default:
- throw new LoadFileException(String.format("Unsupported type of loading tsfile : %s", type));
+ targetFile =
+ fsFactory.getFile(
+ DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+ databaseName
+ + File.separatorChar
+ + dataRegionId
+ + File.separatorChar
+ + filePartitionId
+ + File.separator
+ + tsFileResource.getTsFile().getName());
+ tsFileResource.setFile(targetFile);
+ if (tsFileManager.contains(tsFileResource, false)) {
+ logger.error("The file {} has already been loaded in unsequence list", tsFileResource);
+ return false;
}
+ tsFileManager.add(tsFileResource, false);
+ logger.info(
+ "Load tsfile in unsequence list, move file from {} to {}",
+ tsFileToLoad.getAbsolutePath(),
+ targetFile.getAbsolutePath());
// move file from sync dir to data dir
if (!targetFile.getParentFile().exists()) {
@@ -3300,11 +3103,6 @@ public class DataRegion implements IDataRegionForQuery {
this.tsFileManager.setAllowCompaction(allowCompaction);
}
- private enum LoadTsFileType {
- LOAD_SEQUENCE,
- LOAD_UNSEQUENCE
- }
-
@FunctionalInterface
public interface CloseTsFileCallBack {