You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2021/03/24 06:04:42 UTC
[iotdb] 01/01: add it
This is an automated email from the ASF dual-hosted git repository.
yuyuankang pushed a commit to branch fix_time_partition_load_tsfile
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 47637bf3a3fb09753bcfbf77730f6780f0f1466a
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Wed Mar 24 10:54:35 2021 +0800
add it
---
.../db/engine/storagegroup/TsFileResource.java | 16 +-
.../IoTDBLoadExternalTsfileWithTimePartition.java | 201 +++++++++++++++++++++
2 files changed, 206 insertions(+), 11 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 2cca8d7..2433259 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -717,8 +717,8 @@ public class TsFileResource {
*/
public long getTimePartitionWithCheck() throws PartitionViolationException {
long partitionId = -1;
- for (Long startTime : startTimes) {
- long p = StorageEngine.getTimePartition(startTime);
+ for (int index : deviceToIndex.values()) {
+ long p = StorageEngine.getTimePartition(startTimes[index]);
if (partitionId == -1) {
partitionId = p;
} else {
@@ -726,15 +726,9 @@ public class TsFileResource {
throw new PartitionViolationException(this);
}
}
- }
- for (Long endTime : endTimes) {
- long p = StorageEngine.getTimePartition(endTime);
- if (partitionId == -1) {
- partitionId = p;
- } else {
- if (partitionId != p) {
- throw new PartitionViolationException(this);
- }
+ p = StorageEngine.getTimePartition(endTimes[index]);
+ if (partitionId != p) {
+ throw new PartitionViolationException(this);
}
}
if (partitionId == -1) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithTimePartition.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithTimePartition.java
new file mode 100644
index 0000000..ecdbc97
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithTimePartition.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBLoadExternalTsfileWithTimePartition {
+
+ String DOT = ".";
+ String tempDir = "temp";
+
+ String STORAGE_GROUP = "root.ln";
+ String[] devices = new String[]{"d1", "d2"};
+ String[] measurements = new String[]{"s1", "s2"};
+
+ // generate several tsfiles, with timestamp from startTime(inclusive) to endTime(exclusive)
+ long startTime = 0;
+ long endTime = 1000_000;
+
+ long timePartition = 100;// unit s
+
+ boolean originalIsEnablePartition;
+ long originalPartitionInterval;
+
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ @Before
+ public void setUp() throws Exception {
+ originalIsEnablePartition = config.isEnablePartition();
+ originalPartitionInterval = config.getPartitionInterval();
+ config.setEnablePartition(true);
+ config.setPartitionInterval(timePartition);
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ prepareData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ config.setEnablePartition(originalIsEnablePartition);
+ config.setPartitionInterval(originalPartitionInterval);
+ File f = new File(tempDir);
+ if (f.exists()) {
+ FileUtils.deleteDirectory(f);
+ }
+ }
+
+ /**
+ * get the name of tsfile given counter
+ */
+ String getName(int counter) {
+ return tempDir + File.separator + System.currentTimeMillis() + "-" + counter
+ + "-0.tsfile";
+ }
+
+ /**
+ * write a record, given timestamp
+ */
+ void writeData(TsFileWriter tsFileWriter, long timestamp)
+ throws IOException, WriteProcessException {
+ for (String deviceId : devices) {
+ TSRecord tsRecord = new TSRecord(timestamp,
+ STORAGE_GROUP + DOT + deviceId);
+ for (String measurement : measurements) {
+ DataPoint dPoint = new LongDataPoint(measurement, 10000);
+ tsRecord.addTuple(dPoint);
+ }
+ tsFileWriter.write(tsRecord);
+ }
+ }
+
+ /**
+ * register all timeseries in tsfiles
+ */
+ void register(TsFileWriter tsFileWriter) {
+ try {
+ for (String deviceId : devices) {
+ for (String measurement : measurements) {
+ tsFileWriter.registerTimeseries(
+ new Path(STORAGE_GROUP + DOT + deviceId, measurement),
+ new MeasurementSchema(measurement, TSDataType.INT64, TSEncoding.RLE));
+ }
+ }
+ } catch (WriteProcessException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * create multiple tsfiles, each correspond to a time partition.
+ */
+ private void prepareData() throws IOException {
+ File dir = new File(tempDir);
+ if (dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
+ dir.mkdir();
+ try {
+ File f;
+ TsFileWriter tsFileWriter = null;
+ int counter = 0;
+ for (long timestamp = startTime; timestamp < endTime; timestamp += 1000) {
+ if (timestamp % (timePartition*1000) == 0) {
+ if (tsFileWriter != null) {
+ tsFileWriter.flushAllChunkGroups();
+ tsFileWriter.close();
+ counter++;
+ }
+ String path = getName(counter);
+ f = FSFactoryProducer.getFSFactory().getFile(path);
+ tsFileWriter = new TsFileWriter(new TsFileIOWriter(f));
+ register(tsFileWriter);
+ }
+ writeData(tsFileWriter, timestamp);
+ }
+ tsFileWriter.flushAllChunkGroups();
+ tsFileWriter.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+
+ }
+
+
+ @Test
+ public void loadTsfileWithTimePartition() {
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ statement.execute(String.format("load \"%s\"", new File(tempDir).getAbsolutePath()));
+
+ String dataDir = config.getDataDirs()[0];
+ File f = new File(dataDir,
+ new PartialPath("sequence") + File.separator + "root.ln");
+ System.out.println(Arrays.toString(f.list()));
+ Assert.assertEquals((endTime - startTime) / (timePartition * 1000), f.list().length);
+
+ for (int i = 0; i < (endTime - startTime) / (timePartition * 1000); i++) {
+ Assert.assertEquals("" + i, f.list()[i]);
+ }
+ // each time partition folder should contain 2 files, the tsfile and the resource file
+ for (int i = 0; i < (endTime - startTime) / (timePartition * 1000); i++) {
+ Assert.assertEquals(2, new File(f.getAbsolutePath(), "" + i).list().length);
+ }
+ } catch (SQLException | IllegalPathException throwables) {
+ throwables.printStackTrace();
+ }
+ }
+
+}
\ No newline at end of file