You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/11/28 12:54:56 UTC

[incubator-iotdb] branch add_load_tsfile_feature updated: complete load tsfile and fix some bugs of corner cases

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch add_load_tsfile_feature
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/add_load_tsfile_feature by this push:
     new b68c549  complete load tsfile and fix some bugs of corner cases
b68c549 is described below

commit b68c5499adbe1370c33a1196045ac415ec23cbdc
Author: lta <li...@163.com>
AuthorDate: Thu Nov 28 20:54:39 2019 +0800

    complete load tsfile and fix some bugs of corner cases
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  30 +++-
 .../engine/storagegroup/StorageGroupProcessor.java | 158 ++++++++++++++++++++-
 .../iotdb/db/qp/executor/QueryProcessExecutor.java |  16 ++-
 .../iotdb/db/sync/receiver/load/FileLoader.java    |   2 +-
 .../{MemEst => memestimation}/MemEstTool.java      |   2 +-
 .../{MemEst => memestimation}/MemEstToolCmd.java   |   2 +-
 6 files changed, 195 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 1fa1ced..e8c4c34 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -428,6 +428,12 @@ public class StorageEngine implements IService {
     }
   }
 
+  public void loadNewTsFileForSync(TsFileResource newTsFileResource)
+      throws TsFileProcessorException, StorageEngineException {
+    getProcessor(newTsFileResource.getFile().getParentFile().getName())
+        .loadNewTsFileForSync(newTsFileResource);
+  }
+
   public void loadNewTsFile(TsFileResource newTsFileResource)
       throws TsFileProcessorException, StorageEngineException {
     getProcessor(newTsFileResource.getFile().getParentFile().getName())
@@ -435,13 +441,29 @@ public class StorageEngine implements IService {
   }
 
   public boolean deleteTsfile(File deletedTsfile) throws StorageEngineException {
-    return getProcessor(deletedTsfile.getParentFile().getName()).deleteTsfile(deletedTsfile);
+    if (deletedTsfile.getParentFile() != null) {
+      return getProcessor(deletedTsfile.getParentFile().getName()).deleteTsfile(deletedTsfile);
+    }
+    List<String> storageGroupNames = MManager.getInstance().getAllStorageGroupNames();
+    boolean hasDeleted = false;
+    for (String storageGroupName : storageGroupNames) {
+      hasDeleted |= getProcessor(storageGroupName).deleteTsfile(deletedTsfile);
+    }
+    return hasDeleted;
   }
 
-  public boolean moveTsfile(File deletedTsfile, File targetDir)
+  public boolean moveTsfile(File tsfileToBeMoved, File targetDir)
       throws StorageEngineException, IOException {
-    return getProcessor(deletedTsfile.getParentFile().getName())
-        .moveTsfile(deletedTsfile, targetDir);
+    if(tsfileToBeMoved.getParentFile() != null){
+      return getProcessor(tsfileToBeMoved.getParentFile().getName())
+          .moveTsfile(tsfileToBeMoved, targetDir);
+    }
+    List<String> storageGroupNames = MManager.getInstance().getAllStorageGroupNames();
+    boolean hasMoved = false;
+    for (String storageGroupName : storageGroupNames) {
+      hasMoved |= getProcessor(storageGroupName).moveTsfile(tsfileToBeMoved, targetDir);
+    }
+    return hasMoved;
   }
 
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a21e6d9..d47645f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1144,7 +1144,8 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * Load a new tsfile to storage group processor
+   * Load a new tsfile to storage group processor. The mechanism of the sync module will make sure that
+   * there has no file which is overlapping with the new file.
    *
    * Firstly, determine the loading type of the file, whether it needs to be loaded in sequence list
    * or unsequence list.
@@ -1156,7 +1157,7 @@ public class StorageGroupProcessor {
    * @param newTsFileResource tsfile resource
    * @UsedBy sync module.
    */
-  public void loadNewTsFile(TsFileResource newTsFileResource)
+  public void loadNewTsFileForSync(TsFileResource newTsFileResource)
       throws TsFileProcessorException {
     File tsfileToBeInserted = newTsFileResource.getFile();
     writeLock();
@@ -1177,6 +1178,155 @@ public class StorageGroupProcessor {
   }
 
   /**
+   * Load a new tsfile to storage group processor. Tne file may have overlap with other files.
+   *
+   * Firstly, determine the loading type of the file, whether it needs to be loaded in sequence list
+   * or unsequence list.
+   *
+   * Secondly, execute the loading process by the type.
+   *
+   * Finally, update the latestTimeForEachDevice and latestFlushedTimeForEachDevice.
+   *
+   * @param newTsFileResource tsfile resource
+   * @UsedBy load external tsfile module
+   */
+  public void loadNewTsFile(TsFileResource newTsFileResource)
+      throws TsFileProcessorException {
+    File tsfileToBeInserted = newTsFileResource.getFile();
+    writeLock();
+    mergeLock.writeLock().lock();
+    try {
+      boolean isOverlap = false;
+      int preIndex = -1, subsequentIndex = sequenceFileList.size();
+
+      // check new tsfile
+      outer:
+      for (int i = 0; i < sequenceFileList.size(); i++) {
+        if (sequenceFileList.get(i).getFile().getName().equals(tsfileToBeInserted.getName())) {
+          return;
+        }
+        if (i == sequenceFileList.size() - 1 && sequenceFileList.get(i).getEndTimeMap().isEmpty()) {
+          continue;
+        }
+        boolean hasPre = false, hasSubsequence = false;
+        for (String device : newTsFileResource.getStartTimeMap().keySet()) {
+          if (sequenceFileList.get(i).getStartTimeMap().containsKey(device)) {
+            long startTime1 = sequenceFileList.get(i).getStartTimeMap().get(device);
+            long endTime1 = sequenceFileList.get(i).getEndTimeMap().get(device);
+            long startTime2 = newTsFileResource.getStartTimeMap().get(device);
+            long endTime2 = newTsFileResource.getEndTimeMap().get(device);
+            if (startTime1 > endTime2) {
+              hasSubsequence = true;
+            } else if (startTime2 > endTime1) {
+              hasPre = true;
+            } else {
+              isOverlap = true;
+              break outer;
+            }
+          }
+        }
+        if (hasPre && hasSubsequence) {
+          isOverlap = true;
+          break;
+        }
+        if (!hasPre && hasSubsequence) {
+          subsequentIndex = i;
+          break;
+        }
+        if (hasPre) {
+          preIndex = i;
+        }
+      }
+
+      // loading tsfile by type
+      if (isOverlap) {
+        loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted, newTsFileResource,
+            unSequenceFileList.size());
+      } else {
+
+        // check whether the file name needs to be renamed.
+        if (subsequentIndex != sequenceFileList.size() || preIndex == -1) {
+          String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), preIndex,
+              subsequentIndex);
+          if (!newFileName.equals(tsfileToBeInserted.getName())) {
+            logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
+                tsfileToBeInserted.getName(), newFileName);
+            newTsFileResource.setFile(new File(tsfileToBeInserted.getParentFile(), newFileName));
+          }
+        }
+        loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
+            getBinarySearchIndex(newTsFileResource));
+      }
+
+      // update latest time map
+      updateLatestTimeMap(newTsFileResource);
+    } catch (TsFileProcessorException | DiskSpaceInsufficientException e) {
+      logger.error("Failed to append the tsfile {} to storage group processor {}.",
+          tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName());
+      IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+      throw new TsFileProcessorException(e);
+    } finally {
+      mergeLock.writeLock().unlock();
+      writeUnlock();
+    }
+  }
+
+  /**
+   * Get an appropriate filename to ensure the order between files. The tsfile is named after
+   * ({systemTime}-{versionNum}-{mergeNum}.tsfile).
+   *
+   * The sorting rules for tsfile names @see {@link this#compareFileName}, we can restore the list
+   * based on the file name and ensure the correctness of the order, so there are three cases.
+   *
+   * 1. The tsfile is to be inserted in the first place of the list. If the timestamp in the file
+   * name is less than the timestamp in the file name of the first tsfile  in the list, then the
+   * file name is legal and the file name is returned directly. Otherwise, its timestamp can be set
+   * to half of the timestamp value in the file name of the first tsfile in the list , and the
+   * version number is the version number in the file name of the first tsfile in the list.
+   *
+   * 2. The tsfile is to be inserted in the last place of the list. If the timestamp in the file
+   * name is lager than the timestamp in the file name of the last tsfile  in the list, then the
+   * file name is legal and the file name is returned directly. Otherwise, the file name is
+   * generated by the system according to the naming rules and returned.
+   *
+   * 3. This file is inserted between two files. If the timestamp in the name of the file satisfies
+   * the timestamp between the timestamps in the name of the two files, then it is a legal name and
+   * returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
+   * version number is the version number in the tsfile with a larger timestamp.
+   *
+   * @param tsfileName origin tsfile name
+   * @return appropriate filename
+   */
+  private String getFileNameForLoadingFile(String tsfileName, int preIndex, int subsequentIndex) {
+    long currentTsFileTime = Long
+        .parseLong(tsfileName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
+    long preTime;
+    if (preIndex == -1) {
+      preTime = 0L;
+    } else {
+      String preName = sequenceFileList.get(preIndex).getFile().getName();
+      preTime = Long.parseLong(preName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
+    }
+    if (subsequentIndex == sequenceFileList.size()) {
+      return preTime < currentTsFileTime ? tsfileName
+          : System.currentTimeMillis() + IoTDBConstant.TSFILE_NAME_SEPARATOR + versionController
+              .nextVersion() + IoTDBConstant.TSFILE_NAME_SEPARATOR + "0" + TSFILE_SUFFIX;
+    } else {
+      String subsequenceName = sequenceFileList.get(subsequentIndex).getFile().getName();
+      long subsequenceTime = Long
+          .parseLong(subsequenceName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
+      long subsequenceVersion = Long
+          .parseLong(subsequenceName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[1]);
+      if (preTime < currentTsFileTime && currentTsFileTime < subsequenceTime) {
+        return tsfileName;
+      } else {
+        return (preTime + ((subsequenceTime - preTime) >> 1)) + IoTDBConstant.TSFILE_NAME_SEPARATOR
+            + subsequenceVersion + IoTDBConstant.TSFILE_NAME_SEPARATOR + "0" + TSFILE_SUFFIX;
+      }
+    }
+  }
+
+  /**
    * Get binary search index in @code{sequenceFileList}
    *
    * @return right index to insert
@@ -1205,7 +1355,7 @@ public class StorageGroupProcessor {
   /**
    * Update latest time in latestTimeForEachDevice and latestFlushedTimeForEachDevice.
    *
-   * @UsedBy sync module
+   * @UsedBy sync module, load external tsfile module.
    */
   private void updateLatestTimeMap(TsFileResource newTsFileResource) {
     for (Entry<String, Long> entry : newTsFileResource.getEndTimeMap().entrySet()) {
@@ -1228,7 +1378,7 @@ public class StorageGroupProcessor {
    * @param type load type
    * @param tsFileResource tsfile resource to be loaded
    * @param index the index in sequenceFileList/unSequenceFileList
-   * @UsedBy sync module
+   * @UsedBy sync module, load external tsfile module.
    */
   private void loadTsFileByType(LoadTsFileType type, File syncedTsFile,
       TsFileResource tsFileResource, int index)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 81e6f5a..146e27f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.path.PathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -71,6 +72,7 @@ import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.db.utils.AuthUtils;
 import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.cache.CacheException;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -187,8 +189,14 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
     TsFileResource tsFileResource = new TsFileResource(file);
     try {
       FileLoaderUtils.checkTsFileResource(tsFileResource);
-      //TODO check version and metadata
-    } catch (IOException e) {
+      if (UpgradeUtils.isNeedUpgrade(tsFileResource)) {
+        throw new QueryProcessException(
+            String.format(
+                "Cannot load file %s because the file's version is old which needs to be upgraded.",
+                file.getAbsolutePath()));
+      }
+      StorageEngine.getInstance().loadNewTsFile(tsFileResource);
+    } catch (IOException | TsFileProcessorException | StorageEngineException e) {
       throw new QueryProcessException(
           String.format("Cannot load file %s because %s", file.getAbsolutePath(), e.getMessage()));
     }
@@ -198,7 +206,7 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
     try {
       if (!StorageEngine.getInstance().deleteTsfile(plan.getFile())) {
         throw new QueryProcessException(
-            String.format("File %s doesn't exists.", plan.getFile().getName()));
+            String.format("File %s doesn't exist.", plan.getFile().getName()));
       }
     } catch (StorageEngineException e) {
       throw new QueryProcessException(
@@ -214,7 +222,7 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
     try {
       if (!StorageEngine.getInstance().moveTsfile(plan.getFile(), plan.getTargetDir())) {
         throw new QueryProcessException(
-            String.format("File %s doesn't exists.", plan.getFile().getName()));
+            String.format("File %s doesn't exist.", plan.getFile().getName()));
       }
     } catch (StorageEngineException | IOException e) {
       throw new QueryProcessException(
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index be2cb61..f6551e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -136,7 +136,7 @@ public class FileLoader implements IFileLoader {
     FileLoaderUtils.checkTsFileResource(tsFileResource);
     try {
       FileLoaderManager.getInstance().checkAndUpdateDeviceOwner(tsFileResource);
-      StorageEngine.getInstance().loadNewTsFile(tsFileResource);
+      StorageEngine.getInstance().loadNewTsFileForSync(tsFileResource);
     } catch (SyncDeviceOwnerConflictException e) {
       LOGGER.error("Device owner has conflicts, so skip the loading file", e);
     } catch (TsFileProcessorException | StorageEngineException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstTool.java b/server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstTool.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstTool.java
rename to server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstTool.java
index c623c80..8e9e222 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstTool.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.tools.MemEst;
+package org.apache.iotdb.db.tools.memestimation;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java b/server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstToolCmd.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java
rename to server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstToolCmd.java
index 0d6c5f4..2df8e9f 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstToolCmd.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.tools.MemEst;
+package org.apache.iotdb.db.tools.memestimation;
 
 import io.airlift.airline.Command;
 import io.airlift.airline.Option;