You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/09/23 08:14:54 UTC

[iotdb] branch master updated: [IOTDB-1694] change name for loading dir and add an IT for loading dir (#3985)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ceafab1  [IOTDB-1694] change name for loading dir and add an IT for loading dir (#3985)
ceafab1 is described below

commit ceafab1436c4aef888ac49d1160caad0eea7b03d
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Thu Sep 23 16:14:30 2021 +0800

    [IOTDB-1694] change name for loading dir and add an IT for loading dir (#3985)
---
 .../level/LevelCompactionTsFileManagement.java     |   2 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  25 ++--
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  32 ++++-
 ...IoTDBLoadExternalTsFileWithTimePartitionIT.java | 137 +++++++++++++++++++--
 4 files changed, 166 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index cc4566c..03c14ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -193,7 +193,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
       List<TsFileResource> result = new ArrayList<>();
       if (sequence) {
         List<SortedSet<TsFileResource>> sequenceTsFileList =
-            sequenceTsFileResources.get(timePartition);
+            sequenceTsFileResources.getOrDefault(timePartition, new ArrayList<>());
         for (int i = sequenceTsFileList.size() - 1; i >= 0; i--) {
           result.addAll(sequenceTsFileList.get(i));
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 88c296d..54cb225 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2407,9 +2407,10 @@ public class StorageGroupProcessor {
     long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
     writeLock("loadNewTsFile");
     try {
-      List<TsFileResource> sequenceList = tsFileManagement.getTsFileList(true);
+      List<TsFileResource> sequenceList =
+          tsFileManagement.getTsFileListByTimePartition(true, newFilePartitionId);
 
-      int insertPos = findInsertionPosition(newTsFileResource, newFilePartitionId, sequenceList);
+      int insertPos = findInsertionPosition(newTsFileResource, sequenceList);
       String newFileName, renameInfo;
       LoadTsFileType tsFileType;
 
@@ -2447,6 +2448,7 @@ public class StorageGroupProcessor {
       updateLatestTimeMap(newTsFileResource);
       long partitionNum = newTsFileResource.getTimePartition();
       updatePartitionFileVersion(partitionNum, newTsFileResource.getVersion());
+      logger.info("TsFile {} is successfully loaded in {} list.", newFileName, renameInfo);
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
@@ -2486,21 +2488,13 @@ public class StorageGroupProcessor {
    *     file can be inserted between [i, i+1]
    */
   private int findInsertionPosition(
-      TsFileResource newTsFileResource,
-      long newFilePartitionId,
-      List<TsFileResource> sequenceList) {
+      TsFileResource newTsFileResource, List<TsFileResource> sequenceList) {
 
     int insertPos = -1;
 
     // find the position where the new file should be inserted
     for (int i = 0; i < sequenceList.size(); i++) {
       TsFileResource localFile = sequenceList.get(i);
-      long localPartitionId = Long.parseLong(localFile.getTsFile().getParentFile().getName());
-
-      if (newFilePartitionId > localPartitionId) {
-        insertPos = i;
-        continue;
-      }
 
       if (!localFile.isClosed() && localFile.getProcessor() != null) {
         // we cannot compare two files by TsFileResource unless they are both closed
@@ -2531,8 +2525,10 @@ public class StorageGroupProcessor {
    */
   private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
     boolean hasPre = false, hasSubsequence = false;
-    for (String device : fileA.getDevices()) {
-      if (!fileB.getDevices().contains(device)) {
+    Set<String> fileADevices = fileA.getDevices();
+    Set<String> fileBDevices = fileB.getDevices();
+    for (String device : fileADevices) {
+      if (!fileBDevices.contains(device)) {
         continue;
       }
       long startTimeA = fileA.getStartTime(device);
@@ -2665,6 +2661,7 @@ public class StorageGroupProcessor {
   private String getFileNameForSequenceLoadingFile(
       int insertIndex, TsFileResource newTsFileResource, List<TsFileResource> sequenceList)
       throws LoadFileException {
+    int sequenceListLength = sequenceList.size();
     long timePartitionId = newTsFileResource.getTimePartition();
     long preTime, subsequenceTime;
 
@@ -2674,7 +2671,7 @@ public class StorageGroupProcessor {
       String preName = sequenceList.get(insertIndex).getTsFile().getName();
       preTime = Long.parseLong(preName.split(FILE_NAME_SEPARATOR)[0]);
     }
-    if (insertIndex == tsFileManagement.size(true) - 1) {
+    if (insertIndex == sequenceListLength - 1) {
       subsequenceTime = preTime + ((System.currentTimeMillis() - preTime) << 1);
     } else {
       String subsequenceName = sequenceList.get(insertIndex + 1).getTsFile().getName();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 162dda6..4933329 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -198,6 +198,7 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
 import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF;
 import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF;
 import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF;
@@ -1062,19 +1063,40 @@ public class PlanExecutor implements IPlanExecutor {
           String.format("File path %s doesn't exists.", file.getPath()));
     }
     if (file.isDirectory()) {
-      recursionFileDir(file, plan);
+      loadDir(file, plan);
     } else {
       loadFile(file, plan);
     }
   }
 
-  private void recursionFileDir(File curFile, OperateFilePlan plan) throws QueryProcessException {
+  private void loadDir(File curFile, OperateFilePlan plan) throws QueryProcessException {
     File[] files = curFile.listFiles();
+    long[] establishTime = new long[files.length];
+    List<Integer> tsfiles = new ArrayList<>();
+
+    for (int i = 0; i < files.length; i++) {
+      File file = files[i];
+      if (!file.isDirectory()) {
+        String fileName = file.getName();
+        if (fileName.endsWith(TSFILE_SUFFIX)) {
+          establishTime[i] = Long.parseLong(fileName.split(FILE_NAME_SEPARATOR)[0]);
+          tsfiles.add(i);
+        }
+      }
+    }
+    Collections.sort(
+        tsfiles,
+        (o1, o2) -> {
+          if (establishTime[o1] == establishTime[o2]) return 0;
+          return establishTime[o1] < establishTime[o2] ? -1 : 1;
+        });
+    for (Integer i : tsfiles) {
+      loadFile(files[i], plan);
+    }
+
     for (File file : files) {
       if (file.isDirectory()) {
-        recursionFileDir(file, plan);
-      } else {
-        loadFile(file, plan);
+        loadDir(file, plan);
       }
     }
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java
index d991123..80e0642 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java
@@ -46,10 +46,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
 import java.util.Arrays;
 
 public class IoTDBLoadExternalTsFileWithTimePartitionIT {
@@ -58,8 +55,8 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
   String tempDir = "temp";
 
   String STORAGE_GROUP = "root.ln";
-  String[] devices = new String[] {"d1", "d2"};
-  String[] measurements = new String[] {"s1", "s2"};
+  String[] devices = new String[] {"d1", "d2", "d3"};
+  String[] measurements = new String[] {"s1", "s2", "s3"};
 
   // generate several tsFiles, with timestamp from startTime(inclusive) to endTime(exclusive)
   long startTime = 0;
@@ -69,6 +66,8 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
 
   long recordTimeGap = 1000;
 
+  long[] deviceDataPointNumber = new long[devices.length];
+
   int originalTsFileNum = 0;
 
   boolean originalIsEnablePartition;
@@ -86,8 +85,6 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
 
     StorageEngine.setEnablePartition(true);
     StorageEngine.setTimePartitionInterval(timePartition);
-
-    prepareData();
   }
 
   @After
@@ -106,7 +103,7 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
 
   /** get the name of tsfile given counter */
   String getName(int counter) {
-    return tempDir + File.separator + System.currentTimeMillis() + "-" + counter + "-0.tsfile";
+    return tempDir + File.separator + System.currentTimeMillis() + "-" + counter + "-0-0.tsfile";
   }
 
   /** write a record, given timestamp */
@@ -171,10 +168,11 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
   }
 
   @Test
-  public void loadTsFileWithTimePartition() {
+  public void loadTsFileWithTimePartitionTest() {
     try (Connection connection =
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
+      prepareData();
 
       statement.execute(String.format("load \"%s\"", new File(tempDir).getAbsolutePath()));
 
@@ -205,6 +203,125 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
       }
     } catch (SQLException | IllegalPathException throwables) {
       throwables.printStackTrace();
+      Assert.fail();
+    }
+  }
+
+  void writeDataWithDifferentDevice(TsFileWriter tsFileWriter, long timestamp, int counter)
+      throws IOException, WriteProcessException {
+    int mod = (counter % 6);
+    if (mod < 3) {
+      TSRecord tsRecord = new TSRecord(timestamp, STORAGE_GROUP + DOT + devices[mod]);
+      deviceDataPointNumber[mod] += 1;
+      for (String measurement : measurements) {
+        DataPoint dPoint = new LongDataPoint(measurement, 10000);
+        tsRecord.addTuple(dPoint);
+      }
+      tsFileWriter.write(tsRecord);
+    } else {
+      for (int i = 2; i <= devices.length; i++) {
+        for (int j = 1; j < i; j++) {
+          if (i + j == mod) {
+            TSRecord tsRecord1 = new TSRecord(timestamp, STORAGE_GROUP + DOT + devices[i - 1]);
+            TSRecord tsRecord2 = new TSRecord(timestamp, STORAGE_GROUP + DOT + devices[j - 1]);
+            deviceDataPointNumber[i - 1] += 1;
+            deviceDataPointNumber[j - 1] += 1;
+            for (String measurement : measurements) {
+              DataPoint dataPoint1 = new LongDataPoint(measurement, 100);
+              DataPoint dataPoint2 = new LongDataPoint(measurement, 10000);
+              tsRecord1.addTuple(dataPoint1);
+              tsRecord2.addTuple(dataPoint2);
+            }
+            tsFileWriter.write(tsRecord1);
+            tsFileWriter.write(tsRecord2);
+            return;
+          }
+        }
+      }
+    }
+  }
+
+  void prepareDataWithDifferentDevice() {
+    startTime = 0;
+    endTime = 100_000;
+    recordTimeGap = 10;
+
+    long tsfileMaxTime = 1000;
+    File dir = new File(tempDir);
+    if (dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    dir.mkdir();
+    try {
+      File f;
+      TsFileWriter tsFileWriter = null;
+      int counter = 0;
+      for (long timestamp = startTime; timestamp < endTime; timestamp += recordTimeGap) {
+        if (timestamp % tsfileMaxTime == 0) {
+          if (tsFileWriter != null) {
+            tsFileWriter.flushAllChunkGroups();
+            tsFileWriter.close();
+            counter++;
+          }
+          String path = getName(counter);
+          f = FSFactoryProducer.getFSFactory().getFile(path);
+          tsFileWriter = new TsFileWriter(new TsFileIOWriter(f));
+          register(tsFileWriter);
+        }
+        writeDataWithDifferentDevice(tsFileWriter, timestamp, counter);
+      }
+      tsFileWriter.flushAllChunkGroups();
+      tsFileWriter.close();
+      originalTsFileNum = counter + 1;
+    } catch (Throwable e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void loadTsFileWithDifferentDevice() {
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      prepareDataWithDifferentDevice();
+
+      statement.execute(String.format("load \"%s\"", new File(tempDir).getAbsolutePath()));
+
+      String dataDir = config.getDataDirs()[0];
+      // sequence/logical_sg/virtual_sg/time_partitions
+      File f =
+          new File(
+              dataDir,
+              new PartialPath("sequence") + File.separator + "root.ln" + File.separator + "0");
+      Assert.assertEquals((endTime - startTime) / (timePartition), f.list().length);
+
+      int totalPartitionsNum = (int) ((endTime - startTime) / (timePartition));
+      int[] splitTsFilePartitions = new int[totalPartitionsNum];
+      for (int i = 0; i < splitTsFilePartitions.length; i++) {
+        splitTsFilePartitions[i] = Integer.parseInt(f.list()[i]);
+      }
+      Arrays.sort(splitTsFilePartitions);
+
+      for (int i = 0; i < (endTime - startTime) / (timePartition); i++) {
+        Assert.assertEquals(i, splitTsFilePartitions[i]);
+      }
+
+      // each time partition folder should contain 2 files, the tsfile and the resource file
+      for (int i = 0; i < (endTime - startTime) / (timePartition); i++) {
+        Assert.assertEquals(2, new File(f.getAbsolutePath(), "" + i).list().length);
+      }
+
+      for (int i = 0; i < devices.length; i++) {
+        statement.executeQuery(
+            "select count(" + measurements[0] + ") from " + STORAGE_GROUP + DOT + devices[i]);
+        ResultSet set = statement.getResultSet();
+        set.next();
+        long number = set.getLong(1);
+        Assert.assertEquals(deviceDataPointNumber[i], number);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
     }
   }
 }