You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/03/23 10:20:37 UTC

[incubator-iotdb] branch partitioned_file_version_management updated: code refinements

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch partitioned_file_version_management
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/partitioned_file_version_management by this push:
     new 790d2a7  code refinements
790d2a7 is described below

commit 790d2a7a5330e31594f0e12ae0b7abed8b398568
Author: jt2594838 <jt...@163.com>
AuthorDate: Mon Mar 23 18:20:23 2020 +0800

    code refinements
---
 .../engine/storagegroup/StorageGroupProcessor.java | 162 ++++++++++++++-------
 1 file changed, 108 insertions(+), 54 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index c490723..a5f627f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -126,6 +126,15 @@ public class StorageGroupProcessor {
   private static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods";
   private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class);
   private static final int MAX_CACHE_SENSORS = 5000;
+
+  /**
+   * indicating the file to be loaded already exists locally.
+   */
+  private static final int POS_ALREADY_EXIST = -2;
+  /**
+   * indicating the file to be loaded overlap with some files.
+   */
+  private static final int POS_OVERLAP = -3;
   /**
    * a read write lock for guaranteeing concurrent safety when accessing all fields in this class
    * (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor,
@@ -1592,65 +1601,24 @@ public class StorageGroupProcessor {
   public void loadNewTsFile(TsFileResource newTsFileResource)
       throws TsFileProcessorException {
     File tsfileToBeInserted = newTsFileResource.getFile();
-    long newFilePartitionId = Long.parseLong(tsfileToBeInserted.getParent());
+
     writeLock();
     mergeLock.writeLock().lock();
     try {
-      boolean isOverlap = false;
-      int preIndex = -1, subsequentIndex = sequenceFileTreeSet.size();
-
-      List<TsFileResource> sequenceList = new ArrayList<>(sequenceFileTreeSet);
-      // check new tsfile
-      outer:
-      for (int i = 0; i < sequenceList.size(); i++) {
-        TsFileResource localFile = sequenceList.get(i);
-        if (localFile.getFile().getName().equals(tsfileToBeInserted.getName())) {
-          return;
-        }
-        long localPartitionId = Long.parseLong(localFile.getFile().getParent());
-        if (i == sequenceList.size() - 1 && localFile.getEndTimeMap().isEmpty()
-            || newFilePartitionId != localPartitionId) {
-          continue;
-        }
-        boolean hasPre = false, hasSubsequence = false;
-        for (String device : newTsFileResource.getStartTimeMap().keySet()) {
-          if (localFile.getStartTimeMap().containsKey(device)) {
-            long startTime1 = localFile.getStartTimeMap().get(device);
-            long endTime1 = localFile.getEndTimeMap().get(device);
-            long startTime2 = newTsFileResource.getStartTimeMap().get(device);
-            long endTime2 = newTsFileResource.getEndTimeMap().get(device);
-            if (startTime1 > endTime2) {
-              hasSubsequence = true;
-            } else if (startTime2 > endTime1) {
-              hasPre = true;
-            } else {
-              isOverlap = true;
-              break outer;
-            }
-          }
-        }
-        if (hasPre && hasSubsequence) {
-          isOverlap = true;
-          break;
-        }
-        if (!hasPre && hasSubsequence) {
-          subsequentIndex = i;
-          break;
-        }
-        if (hasPre) {
-          preIndex = i;
-        }
+      int insertPos = findInsertionPosition(newTsFileResource);
+      if (insertPos == POS_ALREADY_EXIST) {
+        return;
       }
 
       // loading tsfile by type
-      if (isOverlap) {
+      if (insertPos == POS_OVERLAP) {
         loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted, newTsFileResource);
       } else {
 
         // check whether the file name needs to be renamed.
-        if (subsequentIndex != sequenceFileTreeSet.size() || preIndex != -1) {
-          String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), preIndex,
-              subsequentIndex, getTimePartitionFromTsFileResource(newTsFileResource));
+        if (insertPos != sequenceFileTreeSet.size() - 1 || insertPos != -1) {
+          String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), insertPos,
+              getTimePartitionFromTsFileResource(newTsFileResource));
           if (!newFileName.equals(tsfileToBeInserted.getName())) {
             logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
                 tsfileToBeInserted.getName(), newFileName);
@@ -1679,6 +1647,90 @@ public class StorageGroupProcessor {
   }
 
   /**
+   * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
+   * @param newTsFileResource
+   * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one to be inserted
+   *         POS_OVERLAP(-3) if some file overlaps the new file
+   *         an insertion position i >= -1 if the new file can be inserted between [i, i+1]
+   */
+  private int findInsertionPosition(TsFileResource newTsFileResource) {
+    File tsfileToBeInserted = newTsFileResource.getFile();
+    long newFilePartitionId = Long.parseLong(tsfileToBeInserted.getParent());
+    int insertPos = -1;
+
+    List<TsFileResource> sequenceList = new ArrayList<>(sequenceFileTreeSet);
+    // find the position where the new file should be inserted
+    for (int i = 0; i < sequenceList.size(); i++) {
+      TsFileResource localFile = sequenceList.get(i);
+      if (localFile.getFile().getName().equals(tsfileToBeInserted.getName())) {
+        return POS_ALREADY_EXIST;
+      }
+      long localPartitionId = Long.parseLong(localFile.getFile().getParent());
+      if (i == sequenceList.size() - 1 && localFile.getEndTimeMap().isEmpty()
+          || newFilePartitionId != localPartitionId) {
+        // skip files that are not in the partition as the new file and the last empty file
+        continue;
+      }
+
+      int fileComparison = compareTsFileDevices(newTsFileResource, localFile);
+      switch (fileComparison) {
+        case 0:
+          // some devices are newer but some devices are older, the two files overlap in general
+          return POS_OVERLAP;
+        case -1:
+          // all devices in the local file are newer than the new file, the new file can be
+          // inserted before the new file
+          return i - 1;
+        default:
+          // all devices in the local file are older than the new file, proceed to the next file
+          insertPos = i;
+      }
+    }
+    return insertPos;
+  }
+
+  /**
+   * Compare each device in the two files to find the time relation of them.
+   * @param fileA
+   * @param fileB
+   * @return -1 if fileA is totally older than fileB (A < B)
+   *          0 if fileA is partially older than fileB and partially newer than fileB (A X B)
+   *          1 if fileA is totally newer than fileB (B < A)
+   */
+  private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
+    boolean hasPre = false, hasSubsequence = false;
+    for (String device : fileA.getStartTimeMap().keySet()) {
+      if (!fileB.getStartTimeMap().containsKey(device)) {
+        continue;
+      }
+      long startTimeA = fileA.getStartTimeMap().get(device);
+      long endTimeA = fileA.getEndTimeMap().get(device);
+      long startTimeB = fileB.getStartTimeMap().get(device);
+      long endTimeB = fileB.getEndTimeMap().get(device);
+      if (startTimeA > endTimeB) {
+        // A's data of the device is later than to the B's data
+        hasSubsequence = true;
+      } else if (startTimeB > endTimeA) {
+        // A's data of the device is previous to the B's data
+        hasPre = true;
+      } else {
+        // the two files overlap in the device
+        return 0;
+      }
+    }
+    if (hasPre && hasSubsequence) {
+      // some devices are newer but some devices are older, the two files overlap in general
+      return 0;
+    }
+    if (!hasPre && hasSubsequence) {
+      // all devices in B are newer than those in A
+      return -1;
+    }
+    // all devices in B are older than those in A
+    return 1;
+  }
+
+  /**
    * If the historical versions of a file is a sub-set of the given file's, remove it to reduce
    * unnecessary merge. Only used when the file sender and the receiver share the same file
    * close policy.
@@ -1744,24 +1796,26 @@ public class StorageGroupProcessor {
    * version number is the version number in the tsfile with a larger timestamp.
    *
    * @param tsfileName origin tsfile name
+   * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex
+   *                   + 1]
    * @return appropriate filename
    */
-  private String getFileNameForLoadingFile(String tsfileName, int preIndex, int subsequentIndex,
+  private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
       long timePartitionId) {
     long currentTsFileTime = Long
         .parseLong(tsfileName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
     long preTime;
     List<TsFileResource> sequenceList = new ArrayList<>(sequenceFileTreeSet);
-    if (preIndex == -1) {
+    if (insertIndex == -1) {
       preTime = 0L;
     } else {
-      String preName = sequenceList.get(preIndex).getFile().getName();
+      String preName = sequenceList.get(insertIndex).getFile().getName();
       preTime = Long.parseLong(preName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
     }
-    if (subsequentIndex == sequenceFileTreeSet.size()) {
+    if (insertIndex == sequenceFileTreeSet.size() - 1) {
       return preTime < currentTsFileTime ? tsfileName : getNewTsFileName(timePartitionId);
     } else {
-      String subsequenceName = sequenceList.get(subsequentIndex).getFile().getName();
+      String subsequenceName = sequenceList.get(insertIndex + 1).getFile().getName();
       long subsequenceTime = Long
           .parseLong(subsequenceName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
       long subsequenceVersion = Long