You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/31 10:41:19 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12] change name for loading dir and add an IT for loading dir (#3873)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new bc540ef  [To rel/0.12] change name for loading dir and add an IT for loading dir (#3873)
bc540ef is described below

commit bc540ef85aa65e36354287c47380e45bbd69608d
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Tue Aug 31 18:40:57 2021 +0800

    [To rel/0.12] change name for loading dir and add an IT for loading dir (#3873)
---
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   6 +-
 ...IoTDBLoadExternalTsFileWithTimePartitionIT.java | 137 +++++++++++++++++++--
 2 files changed, 130 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index a159990..d4363ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1018,13 +1018,13 @@ public class PlanExecutor implements IPlanExecutor {
           String.format("File path %s doesn't exists.", file.getPath()));
     }
     if (file.isDirectory()) {
-      recursionFileDir(file, plan);
+      loadDir(file, plan);
     } else {
       loadFile(file, plan);
     }
   }
 
-  private void recursionFileDir(File curFile, OperateFilePlan plan) throws QueryProcessException {
+  private void loadDir(File curFile, OperateFilePlan plan) throws QueryProcessException {
     File[] files = curFile.listFiles();
     long[] establishTime = new long[files.length];
     List<Integer> tsfiles = new ArrayList<>();
@@ -1051,7 +1051,7 @@ public class PlanExecutor implements IPlanExecutor {
 
     for (File file : files) {
       if (file.isDirectory()) {
-        recursionFileDir(file, plan);
+        loadDir(file, plan);
       }
     }
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java
index d991123..80e0642 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java
@@ -46,10 +46,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
 import java.util.Arrays;
 
 public class IoTDBLoadExternalTsFileWithTimePartitionIT {
@@ -58,8 +55,8 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
   String tempDir = "temp";
 
   String STORAGE_GROUP = "root.ln";
-  String[] devices = new String[] {"d1", "d2"};
-  String[] measurements = new String[] {"s1", "s2"};
+  String[] devices = new String[] {"d1", "d2", "d3"};
+  String[] measurements = new String[] {"s1", "s2", "s3"};
 
   // generate several tsFiles, with timestamp from startTime(inclusive) to endTime(exclusive)
   long startTime = 0;
@@ -69,6 +66,8 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
 
   long recordTimeGap = 1000;
 
+  long[] deviceDataPointNumber = new long[devices.length];
+
   int originalTsFileNum = 0;
 
   boolean originalIsEnablePartition;
@@ -86,8 +85,6 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
 
     StorageEngine.setEnablePartition(true);
     StorageEngine.setTimePartitionInterval(timePartition);
-
-    prepareData();
   }
 
   @After
@@ -106,7 +103,7 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
 
   /** get the name of tsfile given counter */
   String getName(int counter) {
-    return tempDir + File.separator + System.currentTimeMillis() + "-" + counter + "-0.tsfile";
+    return tempDir + File.separator + System.currentTimeMillis() + "-" + counter + "-0-0.tsfile";
   }
 
   /** write a record, given timestamp */
@@ -171,10 +168,11 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
   }
 
   @Test
-  public void loadTsFileWithTimePartition() {
+  public void loadTsFileWithTimePartitionTest() {
     try (Connection connection =
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
+      prepareData();
 
       statement.execute(String.format("load \"%s\"", new File(tempDir).getAbsolutePath()));
 
@@ -205,6 +203,125 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
       }
     } catch (SQLException | IllegalPathException throwables) {
       throwables.printStackTrace();
+      Assert.fail();
+    }
+  }
+
+  void writeDataWithDifferentDevice(TsFileWriter tsFileWriter, long timestamp, int counter)
+      throws IOException, WriteProcessException {
+    int mod = (counter % 6);
+    if (mod < 3) {
+      TSRecord tsRecord = new TSRecord(timestamp, STORAGE_GROUP + DOT + devices[mod]);
+      deviceDataPointNumber[mod] += 1;
+      for (String measurement : measurements) {
+        DataPoint dPoint = new LongDataPoint(measurement, 10000);
+        tsRecord.addTuple(dPoint);
+      }
+      tsFileWriter.write(tsRecord);
+    } else {
+      for (int i = 2; i <= devices.length; i++) {
+        for (int j = 1; j < i; j++) {
+          if (i + j == mod) {
+            TSRecord tsRecord1 = new TSRecord(timestamp, STORAGE_GROUP + DOT + devices[i - 1]);
+            TSRecord tsRecord2 = new TSRecord(timestamp, STORAGE_GROUP + DOT + devices[j - 1]);
+            deviceDataPointNumber[i - 1] += 1;
+            deviceDataPointNumber[j - 1] += 1;
+            for (String measurement : measurements) {
+              DataPoint dataPoint1 = new LongDataPoint(measurement, 100);
+              DataPoint dataPoint2 = new LongDataPoint(measurement, 10000);
+              tsRecord1.addTuple(dataPoint1);
+              tsRecord2.addTuple(dataPoint2);
+            }
+            tsFileWriter.write(tsRecord1);
+            tsFileWriter.write(tsRecord2);
+            return;
+          }
+        }
+      }
+    }
+  }
+
+  void prepareDataWithDifferentDevice() {
+    startTime = 0;
+    endTime = 100_000;
+    recordTimeGap = 10;
+
+    long tsfileMaxTime = 1000;
+    File dir = new File(tempDir);
+    if (dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    dir.mkdir();
+    try {
+      File f;
+      TsFileWriter tsFileWriter = null;
+      int counter = 0;
+      for (long timestamp = startTime; timestamp < endTime; timestamp += recordTimeGap) {
+        if (timestamp % tsfileMaxTime == 0) {
+          if (tsFileWriter != null) {
+            tsFileWriter.flushAllChunkGroups();
+            tsFileWriter.close();
+            counter++;
+          }
+          String path = getName(counter);
+          f = FSFactoryProducer.getFSFactory().getFile(path);
+          tsFileWriter = new TsFileWriter(new TsFileIOWriter(f));
+          register(tsFileWriter);
+        }
+        writeDataWithDifferentDevice(tsFileWriter, timestamp, counter);
+      }
+      tsFileWriter.flushAllChunkGroups();
+      tsFileWriter.close();
+      originalTsFileNum = counter + 1;
+    } catch (Throwable e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void loadTsFileWithDifferentDevice() {
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      prepareDataWithDifferentDevice();
+
+      statement.execute(String.format("load \"%s\"", new File(tempDir).getAbsolutePath()));
+
+      String dataDir = config.getDataDirs()[0];
+      // sequence/logical_sg/virtual_sg/time_partitions
+      File f =
+          new File(
+              dataDir,
+              new PartialPath("sequence") + File.separator + "root.ln" + File.separator + "0");
+      Assert.assertEquals((endTime - startTime) / (timePartition), f.list().length);
+
+      int totalPartitionsNum = (int) ((endTime - startTime) / (timePartition));
+      int[] splitTsFilePartitions = new int[totalPartitionsNum];
+      for (int i = 0; i < splitTsFilePartitions.length; i++) {
+        splitTsFilePartitions[i] = Integer.parseInt(f.list()[i]);
+      }
+      Arrays.sort(splitTsFilePartitions);
+
+      for (int i = 0; i < (endTime - startTime) / (timePartition); i++) {
+        Assert.assertEquals(i, splitTsFilePartitions[i]);
+      }
+
+      // each time partition folder should contain 2 files, the tsfile and the resource file
+      for (int i = 0; i < (endTime - startTime) / (timePartition); i++) {
+        Assert.assertEquals(2, new File(f.getAbsolutePath(), "" + i).list().length);
+      }
+
+      for (int i = 0; i < devices.length; i++) {
+        statement.executeQuery(
+            "select count(" + measurements[0] + ") from " + STORAGE_GROUP + DOT + devices[i]);
+        ResultSet set = statement.getResultSet();
+        set.next();
+        long number = set.getLong(1);
+        Assert.assertEquals(deviceDataPointNumber[i], number);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
     }
   }
 }