You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2021/03/24 06:04:41 UTC

[iotdb] branch fix_time_partition_load_tsfile created (now 47637bf)

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a change to branch fix_time_partition_load_tsfile
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 47637bf  add it

This branch includes the following new commits:

     new 47637bf  add it

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: add it

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch fix_time_partition_load_tsfile
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 47637bf3a3fb09753bcfbf77730f6780f0f1466a
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Wed Mar 24 10:54:35 2021 +0800

    add it
---
 .../db/engine/storagegroup/TsFileResource.java     |  16 +-
 .../IoTDBLoadExternalTsfileWithTimePartition.java  | 201 +++++++++++++++++++++
 2 files changed, 206 insertions(+), 11 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 2cca8d7..2433259 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -717,8 +717,8 @@ public class TsFileResource {
    */
   public long getTimePartitionWithCheck() throws PartitionViolationException {
     long partitionId = -1;
-    for (Long startTime : startTimes) {
-      long p = StorageEngine.getTimePartition(startTime);
+    for (int index : deviceToIndex.values()) {
+      long p = StorageEngine.getTimePartition(startTimes[index]);
       if (partitionId == -1) {
         partitionId = p;
       } else {
@@ -726,15 +726,9 @@ public class TsFileResource {
           throw new PartitionViolationException(this);
         }
       }
-    }
-    for (Long endTime : endTimes) {
-      long p = StorageEngine.getTimePartition(endTime);
-      if (partitionId == -1) {
-        partitionId = p;
-      } else {
-        if (partitionId != p) {
-          throw new PartitionViolationException(this);
-        }
+      p = StorageEngine.getTimePartition(endTimes[index]);
+      if (partitionId != p) {
+        throw new PartitionViolationException(this);
       }
     }
     if (partitionId == -1) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithTimePartition.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithTimePartition.java
new file mode 100644
index 0000000..ecdbc97
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithTimePartition.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBLoadExternalTsfileWithTimePartition {
+
+  String DOT = ".";
+  String tempDir = "temp";
+
+  String STORAGE_GROUP = "root.ln";
+  String[] devices = new String[]{"d1", "d2"};
+  String[] measurements = new String[]{"s1", "s2"};
+
+  // generate several tsfiles, with timestamp from startTime(inclusive) to endTime(exclusive)
+  long startTime = 0;
+  long endTime = 1000_000;
+
+  long timePartition = 100;// unit s
+
+  boolean originalIsEnablePartition;
+  long originalPartitionInterval;
+
+  IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  @Before
+  public void setUp() throws Exception {
+    originalIsEnablePartition = config.isEnablePartition();
+    originalPartitionInterval = config.getPartitionInterval();
+    config.setEnablePartition(true);
+    config.setPartitionInterval(timePartition);
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+    config.setEnablePartition(originalIsEnablePartition);
+    config.setPartitionInterval(originalPartitionInterval);
+    File f = new File(tempDir);
+    if (f.exists()) {
+      FileUtils.deleteDirectory(f);
+    }
+  }
+
+  /**
+   * get the name of tsfile given counter
+   */
+  String getName(int counter) {
+    return tempDir + File.separator + System.currentTimeMillis() + "-" + counter
+        + "-0.tsfile";
+  }
+
+  /**
+   * write a record, given timestamp
+   */
+  void writeData(TsFileWriter tsFileWriter, long timestamp)
+      throws IOException, WriteProcessException {
+    for (String deviceId : devices) {
+      TSRecord tsRecord = new TSRecord(timestamp,
+          STORAGE_GROUP + DOT + deviceId);
+      for (String measurement : measurements) {
+        DataPoint dPoint = new LongDataPoint(measurement, 10000);
+        tsRecord.addTuple(dPoint);
+      }
+      tsFileWriter.write(tsRecord);
+    }
+  }
+
+  /**
+   * register all timeseries in tsfiles
+   */
+  void register(TsFileWriter tsFileWriter) {
+    try {
+      for (String deviceId : devices) {
+        for (String measurement : measurements) {
+          tsFileWriter.registerTimeseries(
+              new Path(STORAGE_GROUP + DOT + deviceId, measurement),
+              new MeasurementSchema(measurement, TSDataType.INT64, TSEncoding.RLE));
+        }
+      }
+    } catch (WriteProcessException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * create multiple tsfiles, each correspond to a time partition.
+   */
+  private void prepareData() throws IOException {
+    File dir = new File(tempDir);
+    if (dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    dir.mkdir();
+    try {
+      File f;
+      TsFileWriter tsFileWriter = null;
+      int counter = 0;
+      for (long timestamp = startTime; timestamp < endTime; timestamp += 1000) {
+        if (timestamp % (timePartition*1000) == 0) {
+          if (tsFileWriter != null) {
+            tsFileWriter.flushAllChunkGroups();
+            tsFileWriter.close();
+            counter++;
+          }
+          String path = getName(counter);
+          f = FSFactoryProducer.getFSFactory().getFile(path);
+          tsFileWriter = new TsFileWriter(new TsFileIOWriter(f));
+          register(tsFileWriter);
+        }
+        writeData(tsFileWriter, timestamp);
+      }
+      tsFileWriter.flushAllChunkGroups();
+      tsFileWriter.close();
+    } catch (Throwable e) {
+      e.printStackTrace();
+    }
+
+  }
+
+
+  @Test
+  public void loadTsfileWithTimePartition() {
+    try (Connection connection =
+        DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      statement.execute(String.format("load \"%s\"", new File(tempDir).getAbsolutePath()));
+
+      String dataDir = config.getDataDirs()[0];
+      File f = new File(dataDir,
+          new PartialPath("sequence") + File.separator + "root.ln");
+      System.out.println(Arrays.toString(f.list()));
+      Assert.assertEquals((endTime - startTime) / (timePartition * 1000), f.list().length);
+
+      for (int i = 0; i < (endTime - startTime) / (timePartition * 1000); i++) {
+        Assert.assertEquals("" + i, f.list()[i]);
+      }
+      // each time partition folder should contain 2 files, the tsfile and the resource file
+      for (int i = 0; i < (endTime - startTime) / (timePartition * 1000); i++) {
+        Assert.assertEquals(2, new File(f.getAbsolutePath(), "" + i).list().length);
+      }
+    } catch (SQLException | IllegalPathException throwables) {
+      throwables.printStackTrace();
+    }
+  }
+
+}
\ No newline at end of file