You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/11 05:21:16 UTC

[iotdb] branch master updated: [IOTDB-3656] mpp load supports tsfile with non-standard name (#7527)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1210e4da15 [IOTDB-3656] mpp load supports tsfile with non-standard name (#7527)
1210e4da15 is described below

commit 1210e4da1529b43bfb408cab75637669d9b5fd04
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Tue Oct 11 13:21:09 2022 +0800

    [IOTDB-3656] mpp load supports tsfile with non-standard name (#7527)
---
 docs/UserGuide/Write-Data/Load-External-Tsfile.md  |  4 +-
 .../UserGuide/Write-Data/Load-External-Tsfile.md   |  4 +-
 .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java  | 60 ++++++++++++++++++++++
 .../db/engine/storagegroup/TsFileResource.java     | 46 +++++++++++++++++
 .../plan/statement/crud/LoadTsFileStatement.java   | 27 ++++------
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 52 ++++++++-----------
 6 files changed, 139 insertions(+), 54 deletions(-)

diff --git a/docs/UserGuide/Write-Data/Load-External-Tsfile.md b/docs/UserGuide/Write-Data/Load-External-Tsfile.md
index 1c262d54a5..b89f72a3ef 100644
--- a/docs/UserGuide/Write-Data/Load-External-Tsfile.md
+++ b/docs/UserGuide/Write-Data/Load-External-Tsfile.md
@@ -37,7 +37,7 @@ This command has two usages:
 
 1. Load a single tsfile by specifying a file path (absolute path). 
 
-The second parameter indicates the path of the tsfile to be loaded and the name of the tsfile needs to conform to the tsfile naming convention, that is, `{systemTime}-{versionNum}-{in_space_compaction_num}-{cross_space_compaction_num}.tsfile`. This command has three options: sglevel, verify, onSuccess.
+The first parameter indicates the path of the tsfile to be loaded. This command has three options: sglevel, verify, onSuccess.
 
 SGLEVEL option. If the storage group correspond to the tsfile does not exist, the user can set the level of storage group through the fourth parameter. By default, it uses the storage group level which is set in `iotdb-datanode.properties`.
 
@@ -61,7 +61,7 @@ Examples:
 
 2. Load a batch of files by specifying a folder path (absolute path). 
 
-The second parameter indicates the path of the tsfile to be loaded and the name of the tsfiles need to conform to the tsfile naming convention, that is, `{systemTime}-{versionNum}-{in_space_compaction_num}-{cross_space_compaction_num}.tsfile`. The options above also works for this command.
+The first parameter indicates the path of the tsfile to be loaded. The options above also works for this command.
 
 Examples:
 
diff --git a/docs/zh/UserGuide/Write-Data/Load-External-Tsfile.md b/docs/zh/UserGuide/Write-Data/Load-External-Tsfile.md
index 4c9b74092c..21d66d93a0 100644
--- a/docs/zh/UserGuide/Write-Data/Load-External-Tsfile.md
+++ b/docs/zh/UserGuide/Write-Data/Load-External-Tsfile.md
@@ -35,7 +35,7 @@
 
 1. 通过指定文件路径(绝对路径)加载单 tsfile 文件。
 
-第二个参数表示待加载的 tsfile 文件的路径,其中文件名称需要符合 tsfile 的命名规范,即`{systemTime}-{versionNum}-{in_space_compaction_num}-{cross_space_compaction_num}.tsfile`。load 命令有三个可选项,分别是 sglevel,值域为整数,verify,值域为 true/false,onSuccess,值域为delete/none。不同选项之间用空格隔开,选项之间无顺序要求。
+第一个参数表示待加载的 tsfile 文件的路径。load 命令有三个可选项,分别是 sglevel,值域为整数,verify,值域为 true/false,onSuccess,值域为delete/none。不同选项之间用空格隔开,选项之间无顺序要求。
 
 SGLEVEL 选项,当 tsfile 对应的存储组不存在时,用户可以通过 sglevel 参数的值来制定存储组的级别,默认为`iotdb-datanode.properties`中设置的级别。例如当设置 level 参数为1时表明此 tsfile 中所有时间序列中层级为1的前缀路径是存储组,即若存在设备 root.sg.d1.s1,此时 root.sg 被指定为存储组。
 
@@ -60,7 +60,7 @@ ONSUCCESS选项表示对于成功载入的tsfile的处置方式,默认为delet
 
 2. 通过指定文件夹路径(绝对路径)批量加载文件。
 
-第二个参数表示待加载的 tsfile 文件夹的路径,其中文件夹内所有文件名称需要符合 tsfile 的命名规范,即`{systemTime}-{versionNum}-{in_space_compaction_num}-{cross_space_compaction_num}.tsfile`。选项意义与加载单个 tsfile 文件相同。
+第一个参数表示待加载的 tsfile 文件夹的路径。选项意义与加载单个 tsfile 文件相同。
 
 示例:
 
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
index 823727e63c..565bb7c6da 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
@@ -453,6 +453,66 @@ public class IOTDBLoadTsFileIT {
     }
   }
 
+  @Test
+  public void testLoadWithOnNonStandardTsFileName() throws Exception {
+    File file1 = new File(tmpDir, "1-0-0-0.tsfile");
+    File file2 = new File(tmpDir, "a-1.tsfile");
+    long writtenPoint1 = 0;
+    // device 0, device 1, sg 0
+    try (TsFileGenerator generator = new TsFileGenerator(file1)) {
+      generator.registerTimeseries(
+          new Path(SchemaConfig.DEVICE_0),
+          Arrays.asList(
+              SchemaConfig.MEASUREMENT_00,
+              SchemaConfig.MEASUREMENT_01,
+              SchemaConfig.MEASUREMENT_02,
+              SchemaConfig.MEASUREMENT_03));
+      generator.registerAlignedTimeseries(
+          new Path(SchemaConfig.DEVICE_1),
+          Arrays.asList(
+              SchemaConfig.MEASUREMENT_10,
+              SchemaConfig.MEASUREMENT_11,
+              SchemaConfig.MEASUREMENT_12,
+              SchemaConfig.MEASUREMENT_13));
+      generator.generateData(new Path(SchemaConfig.DEVICE_0), 10000, false);
+      generator.generateData(new Path(SchemaConfig.DEVICE_1), 10000, true);
+      writtenPoint1 = generator.getTotalNumber();
+    }
+
+    long writtenPoint2 = 0;
+    // device 2, device 3, device4, sg 1
+    try (TsFileGenerator generator = new TsFileGenerator(file2)) {
+      generator.registerTimeseries(
+          new Path(SchemaConfig.DEVICE_2), Arrays.asList(SchemaConfig.MEASUREMENT_20));
+      generator.registerTimeseries(
+          new Path(SchemaConfig.DEVICE_3), Arrays.asList(SchemaConfig.MEASUREMENT_30));
+      generator.registerAlignedTimeseries(
+          new Path(SchemaConfig.DEVICE_4), Arrays.asList(SchemaConfig.MEASUREMENT_40));
+      generator.generateData(new Path(SchemaConfig.DEVICE_2), 10000, false);
+      generator.generateData(new Path(SchemaConfig.DEVICE_3), 10000, false);
+      generator.generateData(new Path(SchemaConfig.DEVICE_4), 10000, true);
+      writtenPoint2 = generator.getTotalNumber();
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));
+
+      try (ResultSet resultSet =
+          statement.executeQuery("select count(*) from root.** group by level=1,2")) {
+        if (resultSet.next()) {
+          long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
+          Assert.assertEquals(writtenPoint1, sg1Count);
+          long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
+          Assert.assertEquals(writtenPoint2, sg2Count);
+        } else {
+          Assert.fail("This ResultSet is empty.");
+        }
+      }
+    }
+  }
+
   private static class SchemaConfig {
     private static final String STORAGE_GROUP_0 = "root.sg.test_0";
     private static final String STORAGE_GROUP_1 = "root.sg.test_1";
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 289a88eccd..5fc6d4a06a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -929,6 +929,16 @@ public class TsFileResource {
     this.timeIndex = timeIndex;
   }
 
+  /**
+   * Compare the name of TsFiles corresponding to the two {@link TsFileResource}. Both names should
+   * meet the naming specifications.Take the generation time as the first keyword, the version
+   * number as the second keyword, the inner merge count as the third keyword, the cross merge as
+   * the fourth keyword.
+   *
+   * @param o1 a {@link TsFileResource}
+   * @param o2 a {@link TsFileResource}
+   * @return -1, if o1 is smaller than o2, 1 if bigger, 0 means o1 equals to o2
+   */
   // ({systemTime}-{versionNum}-{innerMergeNum}-{crossMergeNum}.tsfile)
   public static int compareFileName(TsFileResource o1, TsFileResource o2) {
     String[] items1 =
@@ -953,6 +963,42 @@ public class TsFileResource {
     }
   }
 
+  /**
+   * Compare two TsFile's name.This method will first check whether the two names meet the standard
+   * naming specifications, and then use the generating time as the first keyword, and use the
+   * version number as the second keyword to compare the size of the two names. Notice that this
+   * method will not compare the merge count.
+   *
+   * @param fileName1 a name of TsFile
+   * @param fileName2 a name of TsFile
+   * @return -1, if fileName1 is smaller than fileNam2, 1 if bigger, 0 means fileName1 equals to
+   *     fileName2
+   * @throws IOException if fileName1 or fileName2 do not meet the standard naming specifications.
+   */
+  public static int checkAndCompareFileName(String fileName1, String fileName2) throws IOException {
+    TsFileNameGenerator.TsFileName tsFileName1 = TsFileNameGenerator.getTsFileName(fileName1);
+    TsFileNameGenerator.TsFileName tsFileName2 = TsFileNameGenerator.getTsFileName(fileName2);
+    long timeDiff = tsFileName1.getTime() - tsFileName2.getTime();
+    if (timeDiff != 0) {
+      return timeDiff < 0 ? -1 : 1;
+    }
+    long versionDiff = tsFileName1.getVersion() - tsFileName2.getVersion();
+    if (versionDiff != 0) {
+      return versionDiff < 0 ? -1 : 1;
+    }
+    return 0;
+  }
+
+  /**
+   * Compare the name of TsFiles corresponding to the two {@link TsFileResource}.This method will
+   * first check whether the two names meet the standard naming specifications, and then compare
+   * version of two names.
+   *
+   * @param o1 a {@link TsFileResource}
+   * @param o2 a {@link TsFileResource}
+   * @return -1, if o1 is smaller than o2, 1 if bigger, 0 means o1 equals to o2 or do not meet the
+   *     naming specifications
+   */
   public static int compareFileNameByDesc(TsFileResource o1, TsFileResource o2) {
     try {
       TsFileNameGenerator.TsFileName n1 =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
index 95b9de179f..b127df23df 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
@@ -25,15 +25,13 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.utils.FilePathUtils;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 public class LoadTsFileStatement extends Statement {
   private File file;
@@ -59,7 +57,7 @@ public class LoadTsFileStatement extends Statement {
     } else {
       if (file.listFiles() == null) {
         throw new FileNotFoundException(
-            "Can not find %s on this machine, note that load can only handle files on this machine.");
+            "Can not find %s on this machine, notice that load can only handle files on this machine.");
       }
       findAllTsFile(file);
     }
@@ -77,22 +75,15 @@ public class LoadTsFileStatement extends Statement {
   }
 
   private void sortTsFiles(List<File> files) {
-    Map<File, Long> file2Timestamp = new HashMap<>();
-    Map<File, Long> file2Version = new HashMap<>();
-    for (File file : files) {
-      String[] splitStrings = file.getName().split(FilePathUtils.FILE_NAME_SEPARATOR);
-      file2Timestamp.put(file, Long.parseLong(splitStrings[0]));
-      file2Version.put(file, Long.parseLong(splitStrings[1]));
-    }
-
-    Collections.sort(
-        files,
+    files.sort(
         (o1, o2) -> {
-          long timestampDiff = file2Timestamp.get(o1) - file2Timestamp.get(o2);
-          if (timestampDiff != 0) {
-            return (int) (timestampDiff);
+          String file1Name = o1.getName();
+          String file2Name = o2.getName();
+          try {
+            return TsFileResource.checkAndCompareFileName(file1Name, file2Name);
+          } catch (IOException e) {
+            return file1Name.compareTo(file2Name);
           }
-          return (int) (file2Version.get(o1) - file2Version.get(o2));
         });
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 3a972a728d..00a275f1e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -242,7 +242,6 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCO
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TTL;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_USER;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_VALUE;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF;
@@ -1338,43 +1337,32 @@ public class PlanExecutor implements IPlanExecutor {
       throw new QueryProcessException(
           String.format("File path '%s' doesn't exists.", file.getPath()));
     }
-    if (file.isDirectory()) {
-      loadDir(file, plan);
-    } else {
-      loadFile(file, plan);
-    }
-  }
 
-  private void loadDir(File curFile, OperateFilePlan plan) throws QueryProcessException {
-    File[] files = curFile.listFiles();
-    long[] establishTime = new long[files.length];
-    List<Integer> tsfiles = new ArrayList<>();
-
-    for (int i = 0; i < files.length; i++) {
-      File file = files[i];
-      if (!file.isDirectory()) {
-        String fileName = file.getName();
-        if (fileName.endsWith(TSFILE_SUFFIX)) {
-          establishTime[i] = Long.parseLong(fileName.split(FILE_NAME_SEPARATOR)[0]);
-          tsfiles.add(i);
-        }
-      }
-    }
-    Collections.sort(
-        tsfiles,
+    List<File> tsFiles = new ArrayList<>();
+    findAllTsFiles(file, tsFiles);
+    tsFiles.sort(
         (o1, o2) -> {
-          if (establishTime[o1] == establishTime[o2]) {
-            return 0;
+          String file1Name = o1.getName();
+          String file2Name = o2.getName();
+          try {
+            return TsFileResource.checkAndCompareFileName(file1Name, file2Name);
+          } catch (IOException e) {
+            return file1Name.compareTo(file2Name);
           }
-          return establishTime[o1] < establishTime[o2] ? -1 : 1;
         });
-    for (Integer i : tsfiles) {
-      loadFile(files[i], plan);
+    for (File tsFile : tsFiles) {
+      loadFile(tsFile, plan);
     }
+  }
 
-    for (File file : files) {
-      if (file.isDirectory()) {
-        loadDir(file, plan);
+  private void findAllTsFiles(File curFile, List<File> files) {
+    if (curFile.isFile()) {
+      if (curFile.getName().endsWith(TSFILE_SUFFIX)) {
+        files.add(curFile);
+      }
+    } else {
+      for (File file : curFile.listFiles()) {
+        findAllTsFiles(file, files);
       }
     }
   }