You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/14 06:42:03 UTC

[iotdb] branch rel/1.1 updated: [IOTDB-5869] Load strategy: load all files to unsequence dir (#9837)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 3a7023eda65 [IOTDB-5869] Load strategy: load all files to unsequence dir (#9837)
3a7023eda65 is described below

commit 3a7023eda65c0643794043f7dcda3380576060b9
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Sun May 14 14:41:57 2023 +0800

    [IOTDB-5869] Load strategy: load all files to unsequence dir (#9837)
---
 .../org/apache/iotdb/it/utils/TsFileGenerator.java |  42 ++++
 .../iotdb/db/engine/storagegroup/DataRegion.java   | 268 +++------------------
 2 files changed, 75 insertions(+), 235 deletions(-)

diff --git a/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java b/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java
index c790e0b92fc..3d399862c23 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java
@@ -129,6 +129,48 @@ public class TsFileGenerator implements AutoCloseable {
     logger.info(String.format("Write %d points into device %s", number, device));
   }
 
+  public void generateData(
+      String device, int number, long timeGap, boolean isAligned, long startTimestamp)
+      throws IOException, WriteProcessException {
+    List<MeasurementSchema> schemas = device2MeasurementSchema.get(device);
+    TreeSet<Long> timeSet = device2TimeSet.get(device);
+    Tablet tablet = new Tablet(device, schemas);
+    long[] timestamps = tablet.timestamps;
+    Object[] values = tablet.values;
+    long sensorNum = schemas.size();
+    long startTime = startTimestamp;
+
+    for (long r = 0; r < number; r++) {
+      int row = tablet.rowSize++;
+      startTime += timeGap;
+      timestamps[row] = startTime;
+      timeSet.add(startTime);
+      for (int i = 0; i < sensorNum; i++) {
+        generateDataPoint(values[i], row, schemas.get(i));
+      }
+      // write
+      if (tablet.rowSize == tablet.getMaxRowNumber()) {
+        if (!isAligned) {
+          writer.write(tablet);
+        } else {
+          writer.writeAligned(tablet);
+        }
+        tablet.reset();
+      }
+    }
+    // write
+    if (tablet.rowSize != 0) {
+      if (!isAligned) {
+        writer.write(tablet);
+      } else {
+        writer.writeAligned(tablet);
+      }
+      tablet.reset();
+    }
+
+    logger.info(String.format("Write %d points into device %s", number, device));
+  }
+
   private void generateDataPoint(Object obj, int row, MeasurementSchema schema) {
     switch (schema.getType()) {
       case INT32:
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index e3023a131ad..de3612fd5cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -2305,16 +2305,9 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   /**
-   * Load a new tsfile to database processor. Tne file may have overlap with other files.
+   * Load a new tsfile to unsequence dir.
    *
-   * <p>that there has no file which is overlapping with the new file.
-   *
-   * <p>Firstly, determine the loading type of the file, whether it needs to be loaded in sequence
-   * list or unsequence list.
-   *
-   * <p>Secondly, execute the loading process by the type.
-   *
-   * <p>Finally, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
+   * <p>Then, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
    *
    * @param newTsFileResource tsfile resource @UsedBy load external tsfile module
    * @param deleteOriginFile whether to delete origin tsfile
@@ -2325,43 +2318,31 @@ public class DataRegion implements IDataRegionForQuery {
     long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
     writeLock("loadNewTsFile");
     try {
-      List<TsFileResource> sequenceList =
-          tsFileManager.getOrCreateSequenceListByTimePartition(newFilePartitionId);
-
-      int insertPos = findInsertionPosition(newTsFileResource, sequenceList);
-      LoadTsFileType tsFileType = getLoadingTsFileType(insertPos, sequenceList);
-      String renameInfo =
-          (tsFileType == LoadTsFileType.LOAD_SEQUENCE)
-              ? IoTDBConstant.SEQUENCE_FLODER_NAME
-              : IoTDBConstant.UNSEQUENCE_FLODER_NAME;
-      newTsFileResource.setSeq(tsFileType == LoadTsFileType.LOAD_SEQUENCE);
+      newTsFileResource.setSeq(false);
       String newFileName =
-          getLoadingTsFileName(tsFileType, insertPos, newTsFileResource, sequenceList);
+          getNewTsFileName(
+              System.currentTimeMillis(),
+              getAndSetNewVersion(newFilePartitionId, newTsFileResource),
+              0,
+              0);
 
       if (!newFileName.equals(tsfileToBeInserted.getName())) {
         logger.info(
-            "TsFile {} must be renamed to {} for loading into the " + renameInfo + " list.",
+            "TsFile {} must be renamed to {} for loading into the unsequence list.",
             tsfileToBeInserted.getName(),
             newFileName);
         newTsFileResource.setFile(
             fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
       }
-      loadTsFileByType(
-          tsFileType,
-          tsfileToBeInserted,
-          newTsFileResource,
-          newFilePartitionId,
-          insertPos,
-          deleteOriginFile);
-      TsFileMetricManager.getInstance()
-          .addFile(
-              newTsFileResource.getTsFile().length(), tsFileType == LoadTsFileType.LOAD_SEQUENCE);
+      loadTsFileToUnSequence(
+          tsfileToBeInserted, newTsFileResource, newFilePartitionId, deleteOriginFile);
+      TsFileMetricManager.getInstance().addFile(newTsFileResource.getTsFile().length(), false);
 
       resetLastCacheWhenLoadingTsFile(); // update last cache
       updateLastFlushTime(newTsFileResource); // update last flush time
       long partitionNum = newTsFileResource.getTimePartition();
       updatePartitionFileVersion(partitionNum, newTsFileResource.getVersion());
-      logger.info("TsFile {} is successfully loaded in {} list.", newFileName, renameInfo);
+      logger.info("TsFile {} is successfully loaded in unsequence list.", newFileName);
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to database processor {} because the disk space is insufficient.",
@@ -2392,104 +2373,6 @@ public class DataRegion implements IDataRegionForQuery {
     return Math.max(oldVersion, newVersion);
   }
 
-  private Long getTsFileResourceEstablishTime(TsFileResource tsFileResource) {
-    String tsFileName = tsFileResource.getTsFile().getName();
-    return Long.parseLong(tsFileName.split(FILE_NAME_SEPARATOR)[0]);
-  }
-
-  private LoadTsFileType getLoadingTsFileType(int insertPos, List<TsFileResource> sequenceList) {
-    if (insertPos == POS_OVERLAP) {
-      return LoadTsFileType.LOAD_UNSEQUENCE;
-    }
-    if (insertPos == sequenceList.size() - 1) {
-      return LoadTsFileType.LOAD_SEQUENCE;
-    }
-    long preTime =
-        (insertPos == -1) ? 0 : getTsFileResourceEstablishTime(sequenceList.get(insertPos));
-    long subsequenceTime = getTsFileResourceEstablishTime(sequenceList.get(insertPos + 1));
-    return preTime == subsequenceTime
-        ? LoadTsFileType.LOAD_UNSEQUENCE
-        : LoadTsFileType.LOAD_SEQUENCE;
-  }
-
-  /**
-   * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
-   *
-   * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted
-   *     POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new
-   *     file can be inserted between [i, i+1]
-   */
-  private int findInsertionPosition(
-      TsFileResource newTsFileResource, List<TsFileResource> sequenceList) {
-
-    int insertPos = -1;
-
-    // find the position where the new file should be inserted
-    for (int i = 0; i < sequenceList.size(); i++) {
-      TsFileResource localFile = sequenceList.get(i);
-
-      if (!localFile.isClosed() && localFile.getProcessor() != null) {
-        // we cannot compare two files by TsFileResource unless they are both closed
-        syncCloseOneTsFileProcessor(true, localFile.getProcessor());
-      }
-      int fileComparison = compareTsFileDevices(newTsFileResource, localFile);
-      switch (fileComparison) {
-        case 0:
-          // some devices are newer but some devices are older, the two files overlap in general
-          return POS_OVERLAP;
-        case -1:
-          // all devices in localFile are newer than the new file, the new file can be
-          // inserted before localFile
-          return i - 1;
-        default:
-          // all devices in the local file are older than the new file, proceed to the next file
-          insertPos = i;
-      }
-    }
-    return insertPos;
-  }
-
-  /**
-   * Compare each device in the two files to find the time relation of them.
-   *
-   * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than
-   *     fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A)
-   */
-  private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
-    boolean hasPre = false, hasSubsequence = false;
-    Set<String> fileADevices = fileA.getDevices();
-    Set<String> fileBDevices = fileB.getDevices();
-    for (String device : fileADevices) {
-      if (!fileBDevices.contains(device)) {
-        continue;
-      }
-      long startTimeA = fileA.getStartTime(device);
-      long endTimeA = fileA.getEndTime(device);
-      long startTimeB = fileB.getStartTime(device);
-      long endTimeB = fileB.getEndTime(device);
-      if (startTimeA > endTimeB) {
-        // A's data of the device is later than to the B's data
-        hasPre = true;
-      } else if (startTimeB > endTimeA) {
-        // A's data of the device is previous to the B's data
-        hasSubsequence = true;
-      } else {
-        // the two files overlap in the device
-        return 0;
-      }
-    }
-    if (hasPre && hasSubsequence) {
-      // some devices are newer but some devices are older, the two files overlap in general
-      return 0;
-    }
-    if (!hasPre && hasSubsequence) {
-      // all devices in B are newer than those in A
-      return -1;
-    }
-    // all devices in B are older than those in A
-    return 1;
-  }
-
   /**
    * If the historical versions of a file is a sub-set of the given file's, (close and) remove it to
    * reduce unnecessary merge. Only used when the file sender and the receiver share the same file
@@ -2569,50 +2452,6 @@ public class DataRegion implements IDataRegionForQuery {
     tsFileResource.remove();
   }
 
-  /**
-   * Get an appropriate filename to ensure the order between files. The tsfile is named after
-   * ({systemTime}-{versionNum}-{in_space_compaction_num}-{cross_space_compaction_num}.tsfile).
-   *
-   * <p>The sorting rules for tsfile names @see {@link this#compareFileName}, we can restore the
-   * list based on the file name and ensure the correctness of the order, so there are three cases.
-   *
-   * <p>1. The tsfile is to be inserted in the first place of the list. Timestamp can be set to half
-   * of the timestamp value in the file name of the first tsfile in the list , and the version
-   * number will be updated to the largest number in this time partition.
-   *
-   * <p>2. The tsfile is to be inserted in the last place of the list. The file name is generated by
-   * the system according to the naming rules and returned.
-   *
-   * <p>3. This file is inserted between two files. The time stamp is the mean of the timestamps of
-   * the two files, the version number will be updated to the largest number in this time partition.
-   *
-   * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
-   *     1]
-   * @return appropriate filename
-   */
-  private String getLoadingTsFileName(
-      LoadTsFileType tsFileType,
-      int insertIndex,
-      TsFileResource newTsFileResource,
-      List<TsFileResource> sequenceList) {
-    long timePartitionId = newTsFileResource.getTimePartition();
-    if (tsFileType == LoadTsFileType.LOAD_UNSEQUENCE || insertIndex == sequenceList.size() - 1) {
-      return getNewTsFileName(
-          System.currentTimeMillis(),
-          getAndSetNewVersion(timePartitionId, newTsFileResource),
-          0,
-          0);
-    }
-
-    long preTime =
-        (insertIndex == -1) ? 0 : getTsFileResourceEstablishTime(sequenceList.get(insertIndex));
-    long subsequenceTime = getTsFileResourceEstablishTime(sequenceList.get(insertIndex + 1));
-    long meanTime = preTime + ((subsequenceTime - preTime) >> 1);
-
-    return getNewTsFileName(
-        meanTime, getAndSetNewVersion(timePartitionId, newTsFileResource), 0, 0);
-  }
-
   private long getAndSetNewVersion(long timePartitionId, TsFileResource tsFileResource) {
     long version = partitionMaxFileVersions.getOrDefault(timePartitionId, 0L) + 1;
     partitionMaxFileVersions.put(timePartitionId, version);
@@ -2636,74 +2475,38 @@ public class DataRegion implements IDataRegionForQuery {
   /**
    * Execute the loading process by the type.
    *
-   * @param type load type
    * @param tsFileResource tsfile resource to be loaded
    * @param filePartitionId the partition id of the new file
    * @param deleteOriginFile whether to delete the original file
    * @return load the file successfully @UsedBy sync module, load external tsfile module.
    */
-  private boolean loadTsFileByType(
-      LoadTsFileType type,
+  private boolean loadTsFileToUnSequence(
       File tsFileToLoad,
       TsFileResource tsFileResource,
       long filePartitionId,
-      int insertPos,
       boolean deleteOriginFile)
       throws LoadFileException, DiskSpaceInsufficientException {
     File targetFile;
-    switch (type) {
-      case LOAD_UNSEQUENCE:
-        targetFile =
-            fsFactory.getFile(
-                DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
-                databaseName
-                    + File.separatorChar
-                    + dataRegionId
-                    + File.separatorChar
-                    + filePartitionId
-                    + File.separator
-                    + tsFileResource.getTsFile().getName());
-        tsFileResource.setFile(targetFile);
-        if (tsFileManager.contains(tsFileResource, false)) {
-          logger.error("The file {} has already been loaded in unsequence list", tsFileResource);
-          return false;
-        }
-        tsFileManager.add(tsFileResource, false);
-        logger.info(
-            "Load tsfile in unsequence list, move file from {} to {}",
-            tsFileToLoad.getAbsolutePath(),
-            targetFile.getAbsolutePath());
-        break;
-      case LOAD_SEQUENCE:
-        targetFile =
-            fsFactory.getFile(
-                DirectoryManager.getInstance().getNextFolderForSequenceFile(),
-                databaseName
-                    + File.separatorChar
-                    + dataRegionId
-                    + File.separatorChar
-                    + filePartitionId
-                    + File.separator
-                    + tsFileResource.getTsFile().getName());
-        tsFileResource.setFile(targetFile);
-        if (tsFileManager.contains(tsFileResource, true)) {
-          logger.error("The file {} has already been loaded in sequence list", tsFileResource);
-          return false;
-        }
-        if (insertPos == -1) {
-          tsFileManager.insertToPartitionFileList(tsFileResource, filePartitionId, true, 0);
-        } else {
-          tsFileManager.insertToPartitionFileList(
-              tsFileResource, filePartitionId, true, insertPos + 1);
-        }
-        logger.info(
-            "Load tsfile in sequence list, move file from {} to {}",
-            tsFileToLoad.getAbsolutePath(),
-            targetFile.getAbsolutePath());
-        break;
-      default:
-        throw new LoadFileException(String.format("Unsupported type of loading tsfile : %s", type));
+    targetFile =
+        fsFactory.getFile(
+            DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+            databaseName
+                + File.separatorChar
+                + dataRegionId
+                + File.separatorChar
+                + filePartitionId
+                + File.separator
+                + tsFileResource.getTsFile().getName());
+    tsFileResource.setFile(targetFile);
+    if (tsFileManager.contains(tsFileResource, false)) {
+      logger.error("The file {} has already been loaded in unsequence list", tsFileResource);
+      return false;
     }
+    tsFileManager.add(tsFileResource, false);
+    logger.info(
+        "Load tsfile in unsequence list, move file from {} to {}",
+        tsFileToLoad.getAbsolutePath(),
+        targetFile.getAbsolutePath());
 
     // move file from sync dir to data dir
     if (!targetFile.getParentFile().exists()) {
@@ -3300,11 +3103,6 @@ public class DataRegion implements IDataRegionForQuery {
     this.tsFileManager.setAllowCompaction(allowCompaction);
   }
 
-  private enum LoadTsFileType {
-    LOAD_SEQUENCE,
-    LOAD_UNSEQUENCE
-  }
-
   @FunctionalInterface
   public interface CloseTsFileCallBack {