You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/18 07:04:55 UTC
[iotdb] branch master updated: [IOTDB-3656] mpp load supports modification (#7354)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new aac98fec73 [IOTDB-3656] mpp load supports modification (#7354)
aac98fec73 is described below
commit aac98fec73e3f34ef84115423c4dd9e09cfd9d5c
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Tue Oct 18 15:04:48 2022 +0800
[IOTDB-3656] mpp load supports modification (#7354)
---
.../java/org/apache/iotdb/it/env/MppConfig.java | 11 +-
.../org/apache/iotdb/it/utils/TsFileGenerator.java | 232 ++++++++++++++
.../org/apache/iotdb/itbase/env/BaseConfig.java | 8 +-
.../confignode/it/IoTDBClusterPartitionIT.java | 4 +-
.../confignode/it/IoTDBConfigNodeSnapshotIT.java | 4 +-
.../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java | 340 ++++++++-------------
...IoTDBLoadExternalTsFileWithTimePartitionIT.java | 329 --------------------
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +
.../iotdb/db/engine/load/AlignedChunkData.java | 9 +-
.../org/apache/iotdb/db/engine/load/ChunkData.java | 14 +-
.../apache/iotdb/db/engine/load/DeletionData.java | 72 +++++
.../iotdb/db/engine/load/LoadTsFileManager.java | 29 +-
.../iotdb/db/engine/load/NonAlignedChunkData.java | 1 +
.../load/{ChunkData.java => TsFileData.java} | 42 +--
.../iotdb/db/engine/storagegroup/DataRegion.java | 11 +-
.../db/engine/storagegroup/TsFileManager.java | 5 +-
.../storagegroup/timeindex/DeviceTimeIndex.java | 5 +-
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 11 +-
.../plan/node/load/LoadSingleTsFileNode.java | 69 ++++-
.../planner/plan/node/load/LoadTsFileNode.java | 3 +
.../plan/node/load/LoadTsFilePieceNode.java | 35 +--
.../plan/scheduler/load/LoadTsFileScheduler.java | 7 +
.../tsfile/write/chunk/AlignedChunkWriterImpl.java | 12 +
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 4 +
24 files changed, 622 insertions(+), 639 deletions(-)
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index 933150ccc4..a1c1b8dcfd 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -84,6 +84,13 @@ public class MppConfig implements BaseConfig {
return this;
}
+ @Override
+ public BaseConfig setTimePartitionIntervalForStorage(long partitionInterval) {
+ engineProperties.setProperty(
+ "time_partition_interval_for_storage", String.valueOf(partitionInterval));
+ return this;
+ }
+
@Override
public BaseConfig setCompressor(String compressor) {
engineProperties.setProperty("compressor", compressor);
@@ -242,9 +249,9 @@ public class MppConfig implements BaseConfig {
}
@Override
- public BaseConfig setTimePartitionInterval(long timePartitionInterval) {
+ public BaseConfig setTimePartitionIntervalForRouting(long timePartitionInterval) {
confignodeProperties.setProperty(
- "time_partition_interval", String.valueOf(timePartitionInterval));
+ "time_partition_interval_for_routing", String.valueOf(timePartitionInterval));
return this;
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java b/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java
new file mode 100644
index 0000000000..d04d1558ec
--- /dev/null
+++ b/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.it.utils;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeSet;
+
+public class TsFileGenerator implements AutoCloseable {
+ private static final Logger logger = LoggerFactory.getLogger(TsFileGenerator.class);
+
+ private final File tsFile;
+ private final TsFileWriter writer;
+ private final Map<String, TreeSet<Long>> device2TimeSet;
+ private final Map<String, List<MeasurementSchema>> device2MeasurementSchema;
+ private Random random;
+
+ public TsFileGenerator(File tsFile) throws IOException {
+ this.tsFile = tsFile;
+ this.writer = new TsFileWriter(tsFile);
+ this.device2TimeSet = new HashMap<>();
+ this.device2MeasurementSchema = new HashMap<>();
+ this.random = new Random();
+ }
+
+ public void resetRandom() {
+ random = new Random();
+ }
+
+ public void resetRandom(long seed) {
+ random = new Random(seed);
+ }
+
+ public void registerTimeseries(String path, List<MeasurementSchema> measurementSchemaList) {
+ if (device2MeasurementSchema.containsKey(path)) {
+ logger.error(String.format("Register same device %s.", path));
+ return;
+ }
+ writer.registerTimeseries(new Path(path), measurementSchemaList);
+ device2TimeSet.put(path, new TreeSet<>());
+ device2MeasurementSchema.put(path, measurementSchemaList);
+ }
+
+ public void registerAlignedTimeseries(String path, List<MeasurementSchema> measurementSchemaList)
+ throws WriteProcessException {
+ if (device2MeasurementSchema.containsKey(path)) {
+ logger.error(String.format("Register same device %s.", path));
+ return;
+ }
+ writer.registerAlignedTimeseries(new Path(path), measurementSchemaList);
+ device2TimeSet.put(path, new TreeSet<>());
+ device2MeasurementSchema.put(path, measurementSchemaList);
+ }
+
+ public void generateData(String device, int number, boolean isAligned)
+ throws IOException, WriteProcessException {
+ List<MeasurementSchema> schemas = device2MeasurementSchema.get(device);
+ TreeSet<Long> timeSet = device2TimeSet.get(device);
+ Tablet tablet = new Tablet(device, schemas);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ long sensorNum = schemas.size();
+ long startTime = timeSet.isEmpty() ? 0L : timeSet.last();
+
+ for (long r = 0; r < number; r++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = ++startTime;
+ timeSet.add(startTime);
+ for (int i = 0; i < sensorNum; i++) {
+ generateDataPoint(values[i], row, schemas.get(i));
+ }
+ // write
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ if (!isAligned) {
+ writer.write(tablet);
+ } else {
+ writer.writeAligned(tablet);
+ }
+ tablet.reset();
+ }
+ }
+ // write
+ if (tablet.rowSize != 0) {
+ if (!isAligned) {
+ writer.write(tablet);
+ } else {
+ writer.writeAligned(tablet);
+ }
+ tablet.reset();
+ }
+
+ logger.info(String.format("Write %d points into device %s", number, device));
+ }
+
+ private void generateDataPoint(Object obj, int row, MeasurementSchema schema) {
+ switch (schema.getType()) {
+ case INT32:
+ generateINT32(obj, row);
+ break;
+ case INT64:
+ generateINT64(obj, row);
+ break;
+ case FLOAT:
+ generateFLOAT(obj, row);
+ break;
+ case DOUBLE:
+ generateDOUBLE(obj, row);
+ break;
+ case BOOLEAN:
+ generateBOOLEAN(obj, row);
+ break;
+ case TEXT:
+ generateTEXT(obj, row);
+ break;
+ default:
+ logger.error(String.format("Wrong data type %s.", schema.getType()));
+ }
+ }
+
+ private void generateINT32(Object obj, int row) {
+ int[] ints = (int[]) obj;
+ ints[row] = random.nextInt();
+ }
+
+ private void generateINT64(Object obj, int row) {
+ long[] longs = (long[]) obj;
+ longs[row] = random.nextLong();
+ }
+
+ private void generateFLOAT(Object obj, int row) {
+ float[] floats = (float[]) obj;
+ floats[row] = random.nextFloat();
+ }
+
+ private void generateDOUBLE(Object obj, int row) {
+ double[] doubles = (double[]) obj;
+ doubles[row] = random.nextDouble();
+ }
+
+ private void generateBOOLEAN(Object obj, int row) {
+ boolean[] booleans = (boolean[]) obj;
+ booleans[row] = random.nextBoolean();
+ }
+
+ private void generateTEXT(Object obj, int row) {
+ Binary[] binaries = (Binary[]) obj;
+ binaries[row] = new Binary(String.format("test point %d", random.nextInt()));
+ }
+
+ public void generateDeletion(String device, int number) throws IOException, IllegalPathException {
+ try (ModificationFile modificationFile =
+ new ModificationFile(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX)) {
+ writer.flushAllChunkGroups();
+ TreeSet<Long> timeSet = device2TimeSet.get(device);
+ if (timeSet.isEmpty()) {
+ return;
+ }
+
+ long fileOffset = tsFile.length();
+ long maxTime = timeSet.last() - 1;
+ for (int i = 0; i < number; i++) {
+ int endTime = random.nextInt((int) (maxTime)) + 1;
+ int startTime = random.nextInt(endTime);
+ for (MeasurementSchema measurementSchema : device2MeasurementSchema.get(device)) {
+ Deletion deletion =
+ new Deletion(
+ new PartialPath(
+ device
+ + TsFileConstant.PATH_SEPARATOR
+ + measurementSchema.getMeasurementId()),
+ fileOffset,
+ startTime,
+ endTime);
+ modificationFile.write(deletion);
+ }
+ for (long j = startTime; j <= endTime; j++) {
+ timeSet.remove(j);
+ }
+ logger.info(
+ String.format("Delete %d - %d timestamp of device %s", startTime, endTime, device));
+ }
+ }
+ }
+
+ public long getTotalNumber() {
+ return device2TimeSet.entrySet().stream()
+ .mapToInt(
+ entry -> entry.getValue().size() * device2MeasurementSchema.get(entry.getKey()).size())
+ .sum();
+ }
+
+ @Override
+ public void close() throws Exception {
+ writer.close();
+ }
+}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index bbd5e5a753..4c58abed75 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -71,13 +71,17 @@ public interface BaseConfig {
}
default boolean isEnablePartition() {
- return true;
+ return false;
}
default BaseConfig setPartitionInterval(long partitionInterval) {
return this;
}
+ default BaseConfig setTimePartitionIntervalForStorage(long partitionInterval) {
+ return this;
+ }
+
default long getPartitionInterval() {
return 604800;
}
@@ -247,7 +251,7 @@ public interface BaseConfig {
return 1;
}
- default BaseConfig setTimePartitionInterval(long timePartitionInterval) {
+ default BaseConfig setTimePartitionIntervalForRouting(long timePartitionInterval) {
return this;
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterPartitionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterPartitionIT.java
index 05463076a9..85de4360eb 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterPartitionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterPartitionIT.java
@@ -115,7 +115,7 @@ public class IoTDBClusterPartitionIT {
ConfigFactory.getConfig().setDataReplicationFactor(testReplicationFactor);
originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
- ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
+ ConfigFactory.getConfig().setTimePartitionIntervalForRouting(testTimePartitionInterval);
EnvFactory.getEnv().initBeforeClass();
}
@@ -129,7 +129,7 @@ public class IoTDBClusterPartitionIT {
.setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
ConfigFactory.getConfig()
.setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
- ConfigFactory.getConfig().setTimePartitionInterval(originalTimePartitionInterval);
+ ConfigFactory.getConfig().setTimePartitionIntervalForRouting(originalTimePartitionInterval);
}
/** Generate a PatternTree and serialize it into a ByteBuffer */
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
index 068da3c45a..b0e4d929bc 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
@@ -90,7 +90,7 @@ public class IoTDBConfigNodeSnapshotIT {
ConfigFactory.getConfig().setRatisSnapshotTriggerThreshold(testRatisSnapshotTriggerThreshold);
originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
- ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
+ ConfigFactory.getConfig().setTimePartitionIntervalForRouting(testTimePartitionInterval);
// Init 3C3D cluster environment
EnvFactory.getEnv().initClusterEnvironment(3, 3);
@@ -104,7 +104,7 @@ public class IoTDBConfigNodeSnapshotIT {
.setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
ConfigFactory.getConfig()
.setRatisSnapshotTriggerThreshold(originalRatisSnapshotTriggerThreshold);
- ConfigFactory.getConfig().setTimePartitionInterval(originalTimePartitionInterval);
+ ConfigFactory.getConfig().setTimePartitionIntervalForRouting(originalTimePartitionInterval);
}
private ByteBuffer generatePatternTreeBuffer(String path)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
index 565bb7c6da..e8d7b026b5 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
@@ -22,15 +22,12 @@ package org.apache.iotdb.db.it;
import org.apache.iotdb.it.env.ConfigFactory;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.it.utils.TsFileGenerator;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.TsFileWriter;
-import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
@@ -43,7 +40,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
import java.nio.file.Files;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -53,16 +49,16 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IOTDBLoadTsFileIT {
private static final Logger LOGGER = LoggerFactory.getLogger(IOTDBLoadTsFileIT.class);
- private static final long PARTITION_INTERVAL = 10L;
+ private static final long PARTITION_INTERVAL = 10 * 1000L;
private boolean originEnablePartition;
private long originPartitionInterval;
+ private long originConfigNodePartitionInterval;
private File tmpDir;
@@ -71,7 +67,10 @@ public class IOTDBLoadTsFileIT {
tmpDir = new File(Files.createTempDirectory("load").toUri());
originEnablePartition = ConfigFactory.getConfig().isEnablePartition();
originPartitionInterval = ConfigFactory.getConfig().getPartitionInterval();
- ConfigFactory.getConfig().setPartitionInterval(PARTITION_INTERVAL);
+ originConfigNodePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
+ ConfigFactory.getConfig().setEnablePartition(true);
+ ConfigFactory.getConfig().setTimePartitionIntervalForStorage(PARTITION_INTERVAL);
+ ConfigFactory.getConfig().setTimePartitionIntervalForRouting(PARTITION_INTERVAL);
EnvFactory.getEnv().initBeforeTest();
}
@@ -81,7 +80,8 @@ public class IOTDBLoadTsFileIT {
EnvFactory.getEnv().cleanAfterTest();
ConfigFactory.getConfig().setEnablePartition(originEnablePartition);
- ConfigFactory.getConfig().setPartitionInterval(originPartitionInterval);
+ ConfigFactory.getConfig().setTimePartitionIntervalForStorage(originPartitionInterval);
+ ConfigFactory.getConfig().setTimePartitionIntervalForRouting(originConfigNodePartitionInterval);
if (!deleteDir()) {
LOGGER.error("Can not delete tmp dir for loading tsfile.");
@@ -164,21 +164,21 @@ public class IOTDBLoadTsFileIT {
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_0),
+ SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
- new Path(SchemaConfig.DEVICE_1),
+ SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
- generator.generateData(new Path(SchemaConfig.DEVICE_0), 100000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_1), 100000, true);
+ generator.generateData(SchemaConfig.DEVICE_0, 100000, false);
+ generator.generateData(SchemaConfig.DEVICE_1, 100000, true);
writtenPoint1 = generator.getTotalNumber();
}
@@ -186,14 +186,14 @@ public class IOTDBLoadTsFileIT {
// device 2, device 3, device4, sg 1
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) {
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_2), Arrays.asList(SchemaConfig.MEASUREMENT_20));
+ SchemaConfig.DEVICE_2, Arrays.asList(SchemaConfig.MEASUREMENT_20));
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_3), Arrays.asList(SchemaConfig.MEASUREMENT_30));
+ SchemaConfig.DEVICE_3, Arrays.asList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
- new Path(SchemaConfig.DEVICE_4), Arrays.asList(SchemaConfig.MEASUREMENT_40));
- generator.generateData(new Path(SchemaConfig.DEVICE_2), 10000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_3), 10000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_4), 10000, true);
+ SchemaConfig.DEVICE_4, Arrays.asList(SchemaConfig.MEASUREMENT_40));
+ generator.generateData(SchemaConfig.DEVICE_2, 10000, false);
+ generator.generateData(SchemaConfig.DEVICE_3, 10000, false);
+ generator.generateData(SchemaConfig.DEVICE_4, 10000, true);
writtenPoint2 = generator.getTotalNumber();
}
@@ -222,21 +222,21 @@ public class IOTDBLoadTsFileIT {
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_0),
+ SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
- new Path(SchemaConfig.DEVICE_1),
+ SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
- generator.generateData(new Path(SchemaConfig.DEVICE_0), 10000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_1), 10000, true);
+ generator.generateData(SchemaConfig.DEVICE_0, 10000, false);
+ generator.generateData(SchemaConfig.DEVICE_1, 10000, true);
writtenPoint1 = generator.getTotalNumber();
}
@@ -244,14 +244,14 @@ public class IOTDBLoadTsFileIT {
// device 2, device 3, device4, sg 1
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) {
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_2), Arrays.asList(SchemaConfig.MEASUREMENT_20));
+ SchemaConfig.DEVICE_2, Arrays.asList(SchemaConfig.MEASUREMENT_20));
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_3), Arrays.asList(SchemaConfig.MEASUREMENT_30));
+ SchemaConfig.DEVICE_3, Arrays.asList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
- new Path(SchemaConfig.DEVICE_4), Arrays.asList(SchemaConfig.MEASUREMENT_40));
- generator.generateData(new Path(SchemaConfig.DEVICE_2), 10000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_3), 10000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_4), 10000, true);
+ SchemaConfig.DEVICE_4, Arrays.asList(SchemaConfig.MEASUREMENT_40));
+ generator.generateData(SchemaConfig.DEVICE_2, 10000, false);
+ generator.generateData(SchemaConfig.DEVICE_3, 10000, false);
+ generator.generateData(SchemaConfig.DEVICE_4, 10000, true);
writtenPoint2 = generator.getTotalNumber();
}
@@ -301,21 +301,21 @@ public class IOTDBLoadTsFileIT {
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(file1)) {
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_0),
+ SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
- new Path(SchemaConfig.DEVICE_1),
+ SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
- generator.generateData(new Path(SchemaConfig.DEVICE_0), 10000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_1), 10000, true);
+ generator.generateData(SchemaConfig.DEVICE_0, 10000, false);
+ generator.generateData(SchemaConfig.DEVICE_1, 10000, true);
writtenPoint1 = generator.getTotalNumber();
}
@@ -323,14 +323,14 @@ public class IOTDBLoadTsFileIT {
// device 2, device 3, device4, sg 1
try (TsFileGenerator generator = new TsFileGenerator(file2)) {
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_2), Arrays.asList(SchemaConfig.MEASUREMENT_20));
+ SchemaConfig.DEVICE_2, Arrays.asList(SchemaConfig.MEASUREMENT_20));
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_3), Arrays.asList(SchemaConfig.MEASUREMENT_30));
+ SchemaConfig.DEVICE_3, Arrays.asList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
- new Path(SchemaConfig.DEVICE_4), Arrays.asList(SchemaConfig.MEASUREMENT_40));
- generator.generateData(new Path(SchemaConfig.DEVICE_2), 10000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_3), 10000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_4), 10000, true);
+ SchemaConfig.DEVICE_4, Arrays.asList(SchemaConfig.MEASUREMENT_40));
+ generator.generateData(SchemaConfig.DEVICE_2, 10000, false);
+ generator.generateData(SchemaConfig.DEVICE_3, 10000, false);
+ generator.generateData(SchemaConfig.DEVICE_4, 10000, true);
writtenPoint2 = generator.getTotalNumber();
}
@@ -403,21 +403,21 @@ public class IOTDBLoadTsFileIT {
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(file1)) {
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_0),
+ SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
- new Path(SchemaConfig.DEVICE_1),
+ SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
- generator.generateData(new Path(SchemaConfig.DEVICE_0), 10000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_1), 10000, true);
+ generator.generateData(SchemaConfig.DEVICE_0, 10000, false);
+ generator.generateData(SchemaConfig.DEVICE_1, 10000, true);
writtenPoint1 = generator.getTotalNumber();
}
@@ -425,14 +425,14 @@ public class IOTDBLoadTsFileIT {
// device 2, device 3, device4, sg 1
try (TsFileGenerator generator = new TsFileGenerator(file2)) {
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_2), Arrays.asList(SchemaConfig.MEASUREMENT_20));
+ SchemaConfig.DEVICE_2, Arrays.asList(SchemaConfig.MEASUREMENT_20));
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_3), Arrays.asList(SchemaConfig.MEASUREMENT_30));
+ SchemaConfig.DEVICE_3, Arrays.asList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
- new Path(SchemaConfig.DEVICE_4), Arrays.asList(SchemaConfig.MEASUREMENT_40));
- generator.generateData(new Path(SchemaConfig.DEVICE_2), 10000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_3), 10000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_4), 10000, true);
+ SchemaConfig.DEVICE_4, Arrays.asList(SchemaConfig.MEASUREMENT_40));
+ generator.generateData(SchemaConfig.DEVICE_2, 10000, false);
+ generator.generateData(SchemaConfig.DEVICE_3, 10000, false);
+ generator.generateData(SchemaConfig.DEVICE_4, 10000, true);
writtenPoint2 = generator.getTotalNumber();
}
@@ -445,7 +445,7 @@ public class IOTDBLoadTsFileIT {
statement.executeQuery(String.format("select last %s from %s", measurement, device))) {
if (resultSet.next()) {
String lastTime = resultSet.getString("Time");
- Assert.assertEquals("9999", lastTime);
+ Assert.assertEquals("10000", lastTime);
} else {
Assert.fail("This ResultSet is empty.");
}
@@ -461,21 +461,21 @@ public class IOTDBLoadTsFileIT {
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(file1)) {
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_0),
+ SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
- new Path(SchemaConfig.DEVICE_1),
+ SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
- generator.generateData(new Path(SchemaConfig.DEVICE_0), 10000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_1), 10000, true);
+ generator.generateData(SchemaConfig.DEVICE_0, 10000, false);
+ generator.generateData(SchemaConfig.DEVICE_1, 10000, true);
writtenPoint1 = generator.getTotalNumber();
}
@@ -483,14 +483,81 @@ public class IOTDBLoadTsFileIT {
// device 2, device 3, device4, sg 1
try (TsFileGenerator generator = new TsFileGenerator(file2)) {
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_2), Arrays.asList(SchemaConfig.MEASUREMENT_20));
+ SchemaConfig.DEVICE_2, Arrays.asList(SchemaConfig.MEASUREMENT_20));
generator.registerTimeseries(
- new Path(SchemaConfig.DEVICE_3), Arrays.asList(SchemaConfig.MEASUREMENT_30));
+ SchemaConfig.DEVICE_3, Arrays.asList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
- new Path(SchemaConfig.DEVICE_4), Arrays.asList(SchemaConfig.MEASUREMENT_40));
- generator.generateData(new Path(SchemaConfig.DEVICE_2), 10000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_3), 10000, false);
- generator.generateData(new Path(SchemaConfig.DEVICE_4), 10000, true);
+ SchemaConfig.DEVICE_4, Arrays.asList(SchemaConfig.MEASUREMENT_40));
+ generator.generateData(SchemaConfig.DEVICE_2, 10000, false);
+ generator.generateData(SchemaConfig.DEVICE_3, 10000, false);
+ generator.generateData(SchemaConfig.DEVICE_4, 10000, true);
+ writtenPoint2 = generator.getTotalNumber();
+ }
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));
+
+ try (ResultSet resultSet =
+ statement.executeQuery("select count(*) from root.** group by level=1,2")) {
+ if (resultSet.next()) {
+ long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
+ Assert.assertEquals(writtenPoint1, sg1Count);
+ long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
+ Assert.assertEquals(writtenPoint2, sg2Count);
+ } else {
+ Assert.fail("This ResultSet is empty.");
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testLoadWithMods() throws Exception {
+ long writtenPoint1 = 0;
+ // device 0, device 1, sg 0
+ try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
+ generator.registerTimeseries(
+ SchemaConfig.DEVICE_0,
+ Arrays.asList(
+ SchemaConfig.MEASUREMENT_00,
+ SchemaConfig.MEASUREMENT_01,
+ SchemaConfig.MEASUREMENT_02,
+ SchemaConfig.MEASUREMENT_03));
+ generator.registerAlignedTimeseries(
+ SchemaConfig.DEVICE_1,
+ Arrays.asList(
+ SchemaConfig.MEASUREMENT_10,
+ SchemaConfig.MEASUREMENT_11,
+ SchemaConfig.MEASUREMENT_12,
+ SchemaConfig.MEASUREMENT_13));
+ generator.generateData(SchemaConfig.DEVICE_0, 100000, false);
+ generator.generateData(SchemaConfig.DEVICE_1, 100000, true);
+ generator.generateDeletion(SchemaConfig.DEVICE_0, 10);
+ generator.generateDeletion(SchemaConfig.DEVICE_1, 10);
+ writtenPoint1 = generator.getTotalNumber();
+ }
+
+ long writtenPoint2 = 0;
+ // device 2, device 3, device4, sg 1
+ try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) {
+ generator.resetRandom(1000);
+ generator.registerTimeseries(
+ SchemaConfig.DEVICE_2, Arrays.asList(SchemaConfig.MEASUREMENT_20));
+ generator.registerTimeseries(
+ SchemaConfig.DEVICE_3, Arrays.asList(SchemaConfig.MEASUREMENT_30));
+ generator.registerAlignedTimeseries(
+ SchemaConfig.DEVICE_4, Arrays.asList(SchemaConfig.MEASUREMENT_40));
+ generator.generateData(SchemaConfig.DEVICE_2, 100, false);
+ generator.generateData(SchemaConfig.DEVICE_3, 100, false);
+ generator.generateData(SchemaConfig.DEVICE_4, 100, true);
+ generator.generateDeletion(SchemaConfig.DEVICE_2, 2);
+ generator.generateDeletion(SchemaConfig.DEVICE_4, 2);
+ generator.generateData(SchemaConfig.DEVICE_2, 100, false);
+ generator.generateData(SchemaConfig.DEVICE_4, 100, true);
+ generator.generateDeletion(SchemaConfig.DEVICE_2, 2);
+ generator.generateDeletion(SchemaConfig.DEVICE_4, 2);
writtenPoint2 = generator.getTotalNumber();
}
@@ -554,157 +621,4 @@ public class IOTDBLoadTsFileIT {
private static final MeasurementSchema MEASUREMENT_40 =
new MeasurementSchema("sensor_40", TSDataType.INT32, TSEncoding.RLE);
}
-
- public class TsFileGenerator implements AutoCloseable {
- private final File tsFile;
- private final TsFileWriter writer;
- private final Map<Path, Integer> device2Number;
- private final Map<Path, List<MeasurementSchema>> device2MeasurementSchema;
- private Random random;
- private long totalNumber;
-
- public TsFileGenerator(File tsFile) throws IOException {
- this.tsFile = tsFile;
- this.writer = new TsFileWriter(tsFile);
- this.device2Number = new HashMap<>();
- this.device2MeasurementSchema = new HashMap<>();
- this.random = new Random();
- this.totalNumber = 0;
- }
-
- public void resetRandom() {
- random = new Random();
- }
-
- public void resetRandom(long seed) {
- random = new Random(seed);
- }
-
- public void registerTimeseries(Path path, List<MeasurementSchema> measurementSchemaList) {
- if (device2Number.containsKey(path)) {
- LOGGER.error(String.format("Register same device %s.", path));
- return;
- }
- writer.registerTimeseries(path, measurementSchemaList);
- device2Number.put(path, 0);
- device2MeasurementSchema.put(path, measurementSchemaList);
- }
-
- public void registerAlignedTimeseries(Path path, List<MeasurementSchema> measurementSchemaList)
- throws WriteProcessException {
- if (device2Number.containsKey(path)) {
- LOGGER.error(String.format("Register same device %s.", path));
- return;
- }
- writer.registerAlignedTimeseries(path, measurementSchemaList);
- device2Number.put(path, 0);
- device2MeasurementSchema.put(path, measurementSchemaList);
- }
-
- public void generateData(Path path, int number, boolean isAligned)
- throws IOException, WriteProcessException {
- List<MeasurementSchema> schemas = device2MeasurementSchema.get(path);
- Tablet tablet = new Tablet(path.getFullPath(), schemas);
- long[] timestamps = tablet.timestamps;
- Object[] values = tablet.values;
- long sensorNum = schemas.size();
- long startTime = device2Number.get(path);
-
- for (long r = 0; r < number; r++) {
- int row = tablet.rowSize++;
- timestamps[row] = startTime++;
- for (int i = 0; i < sensorNum; i++) {
- generateDataPoint(values[i], row, schemas.get(i));
- totalNumber += 1;
- }
- // write
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- if (!isAligned) {
- writer.write(tablet);
- } else {
- writer.writeAligned(tablet);
- }
- tablet.reset();
- }
- }
- // write
- if (tablet.rowSize != 0) {
- if (!isAligned) {
- writer.write(tablet);
- } else {
- writer.writeAligned(tablet);
- }
- tablet.reset();
- }
- device2Number.compute(path, (k, v) -> v + number);
- }
-
- private void generateDataPoint(Object obj, int row, MeasurementSchema schema) {
- switch (schema.getType()) {
- case INT32:
- generateINT32(obj, row);
- break;
- case INT64:
- generateINT64(obj, row);
- break;
- case FLOAT:
- generateFLOAT(obj, row);
- break;
- case DOUBLE:
- generateDOUBLE(obj, row);
- break;
- case BOOLEAN:
- generateBOOLEAN(obj, row);
- break;
- case TEXT:
- generateTEXT(obj, row);
- break;
- default:
- LOGGER.error(String.format("Wrong data type %s.", schema.getType()));
- }
- }
-
- private void generateINT32(Object obj, int row) {
- int[] ints = (int[]) obj;
- ints[row] = random.nextInt();
- }
-
- private void generateINT64(Object obj, int row) {
- long[] longs = (long[]) obj;
- longs[row] = random.nextLong();
- }
-
- private void generateFLOAT(Object obj, int row) {
- float[] floats = (float[]) obj;
- floats[row] = random.nextFloat();
- }
-
- private void generateDOUBLE(Object obj, int row) {
- double[] doubles = (double[]) obj;
- doubles[row] = random.nextDouble();
- }
-
- private void generateBOOLEAN(Object obj, int row) {
- boolean[] booleans = (boolean[]) obj;
- booleans[row] = random.nextBoolean();
- }
-
- private void generateTEXT(Object obj, int row) {
- Binary[] binaries = (Binary[]) obj;
- binaries[row] = new Binary(String.format("test point %d", random.nextInt()));
- }
-
- public int getNumber(Path path) {
- return device2Number.get(path);
- }
-
- public long getTotalNumber() {
- return totalNumber;
- }
-
- @Override
- public void close() throws Exception {
- writer.close();
- }
- }
}
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java
deleted file mode 100644
index 6c67563a5e..0000000000
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.integration;
-
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.utils.FileUtils;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.itbase.category.LocalStandaloneTest;
-import org.apache.iotdb.jdbc.Config;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.write.TsFileWriter;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
-import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.File;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-
-@Category({LocalStandaloneTest.class})
-public class IoTDBLoadExternalTsFileWithTimePartitionIT {
-
- private String DOT = ".";
- private String tempDir = "temp";
-
- String STORAGE_GROUP = "root.ln";
- String[] devices = new String[] {"d1", "d2", "d3"};
- String[] measurements = new String[] {"s1", "s2", "s3"};
-
- // generate several tsFiles, with timestamp from startTime(inclusive) to endTime(exclusive)
- private long startTime = 0;
- private long endTime = 1000_000;
-
- private long timePartition = 100; // unit s
- long recordTimeGap = 1000;
-
- private int originalTsFileNum = 0;
-
- long[] deviceDataPointNumber = new long[devices.length];
-
- boolean originalIsEnablePartition;
- long originalPartitionInterval;
-
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
- @Before
- public void setUp() throws Exception {
- originalIsEnablePartition = config.isEnablePartition();
- originalPartitionInterval = config.getTimePartitionIntervalForStorage();
- EnvironmentUtils.envSetUp();
- Class.forName(Config.JDBC_DRIVER_NAME);
-
- StorageEngine.setEnablePartition(true);
- StorageEngine.setTimePartitionInterval(timePartition);
- }
-
- @After
- public void tearDown() throws Exception {
- EnvironmentUtils.cleanEnv();
- StorageEngine.setEnablePartition(originalIsEnablePartition);
- StorageEngine.setTimePartitionInterval(originalPartitionInterval);
- File f = new File(tempDir);
- if (f.exists()) {
- FileUtils.deleteDirectory(f);
- }
- }
-
- /** get the name of tsfile given counter */
- String getName(int counter) {
- return tempDir + File.separator + System.currentTimeMillis() + "-" + counter + "-0-0.tsfile";
- }
-
- /** write a record, given timestamp */
- private void writeData(TsFileWriter tsFileWriter, long timestamp)
- throws IOException, WriteProcessException {
- for (String deviceId : devices) {
- TSRecord tsRecord = new TSRecord(timestamp, STORAGE_GROUP + DOT + deviceId);
- for (String measurement : measurements) {
- DataPoint dPoint = new LongDataPoint(measurement, 10000);
- tsRecord.addTuple(dPoint);
- }
- tsFileWriter.write(tsRecord);
- }
- }
-
- /** register all timeseries in tsfiles */
- private void register(TsFileWriter tsFileWriter) {
- try {
- for (String deviceId : devices) {
- for (String measurement : measurements) {
- tsFileWriter.registerTimeseries(
- new Path(STORAGE_GROUP + DOT + deviceId),
- new MeasurementSchema(measurement, TSDataType.INT64, TSEncoding.RLE));
- }
- }
- } catch (WriteProcessException e) {
- e.printStackTrace();
- }
- }
-
- /** create multiple tsfiles, each correspond to a time partition. */
- private void prepareData() {
- File dir = new File(tempDir);
- if (dir.exists()) {
- FileUtils.deleteDirectory(dir);
- }
- dir.mkdir();
- try {
- File f;
- TsFileWriter tsFileWriter = null;
- int counter = 0;
- long recordTimeGap = 1000;
- for (long timestamp = startTime; timestamp < endTime; timestamp += recordTimeGap) {
- if (timestamp % (timePartition * recordTimeGap) == 0) {
- if (tsFileWriter != null) {
- tsFileWriter.flushAllChunkGroups();
- tsFileWriter.close();
- counter++;
- }
- String path = getName(counter);
- f = FSFactoryProducer.getFSFactory().getFile(path);
- tsFileWriter = new TsFileWriter(new TsFileIOWriter(f));
- register(tsFileWriter);
- }
- writeData(tsFileWriter, timestamp);
- }
- tsFileWriter.flushAllChunkGroups();
- tsFileWriter.close();
- originalTsFileNum = counter + 1;
- } catch (Throwable e) {
- e.printStackTrace();
- }
- }
-
- @Test
- public void loadTsFileWithTimePartitionTest() {
- try (Connection connection =
- DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
- prepareData();
-
- statement.execute(String.format("load '%s'", new File(tempDir).getAbsolutePath()));
-
- String dataDir = config.getDataDirs()[0];
- // sequence/logical_sg/virtual_sg/time_partitions
- File f =
- new File(
- dataDir,
- new PartialPath("sequence") + File.separator + "root.ln" + File.separator + "0");
- Assert.assertEquals(
- (endTime - startTime) / (timePartition), f.list().length * originalTsFileNum);
-
- int totalPartitionsNum = (int) ((endTime - startTime) / (timePartition) / originalTsFileNum);
- int[] splitTsFilePartitions = new int[totalPartitionsNum];
- for (int i = 0; i < splitTsFilePartitions.length; i++) {
- splitTsFilePartitions[i] = Integer.parseInt(f.list()[i]);
- }
- Arrays.sort(splitTsFilePartitions);
-
- for (int i = 0; i < (endTime - startTime) / (timePartition) / originalTsFileNum; i++) {
- Assert.assertEquals((i * originalTsFileNum), splitTsFilePartitions[i]);
- }
-
- // each time partition folder should contain 2 files, the tsfile and the resource file
- for (int i = 0; i < (endTime - startTime) / (timePartition) / originalTsFileNum; i++) {
- Assert.assertEquals(
- 2, new File(f.getAbsolutePath(), "" + i * originalTsFileNum).list().length);
- }
- } catch (SQLException | IllegalPathException throwables) {
- throwables.printStackTrace();
- Assert.fail();
- }
- }
-
- void writeDataWithDifferentDevice(TsFileWriter tsFileWriter, long timestamp, int counter)
- throws IOException, WriteProcessException {
- int mod = (counter % 6);
- if (mod < 3) {
- TSRecord tsRecord = new TSRecord(timestamp, STORAGE_GROUP + DOT + devices[mod]);
- deviceDataPointNumber[mod] += 1;
- for (String measurement : measurements) {
- DataPoint dPoint = new LongDataPoint(measurement, 10000);
- tsRecord.addTuple(dPoint);
- }
- tsFileWriter.write(tsRecord);
- } else {
- for (int i = 2; i <= devices.length; i++) {
- for (int j = 1; j < i; j++) {
- if (i + j == mod) {
- TSRecord tsRecord1 = new TSRecord(timestamp, STORAGE_GROUP + DOT + devices[i - 1]);
- TSRecord tsRecord2 = new TSRecord(timestamp, STORAGE_GROUP + DOT + devices[j - 1]);
- deviceDataPointNumber[i - 1] += 1;
- deviceDataPointNumber[j - 1] += 1;
- for (String measurement : measurements) {
- DataPoint dataPoint1 = new LongDataPoint(measurement, 100);
- DataPoint dataPoint2 = new LongDataPoint(measurement, 10000);
- tsRecord1.addTuple(dataPoint1);
- tsRecord2.addTuple(dataPoint2);
- }
- tsFileWriter.write(tsRecord1);
- tsFileWriter.write(tsRecord2);
- return;
- }
- }
- }
- }
- }
-
- void prepareDataWithDifferentDevice() {
- startTime = 0;
- endTime = 100_000;
- recordTimeGap = 10;
-
- long tsfileMaxTime = 1000;
- File dir = new File(tempDir);
- if (dir.exists()) {
- FileUtils.deleteDirectory(dir);
- }
- dir.mkdir();
- try {
- File f;
- TsFileWriter tsFileWriter = null;
- int counter = 0;
- for (long timestamp = startTime; timestamp < endTime; timestamp += recordTimeGap) {
- if (timestamp % tsfileMaxTime == 0) {
- if (tsFileWriter != null) {
- tsFileWriter.flushAllChunkGroups();
- tsFileWriter.close();
- counter++;
- }
- String path = getName(counter);
- f = FSFactoryProducer.getFSFactory().getFile(path);
- tsFileWriter = new TsFileWriter(new TsFileIOWriter(f));
- register(tsFileWriter);
- }
- writeDataWithDifferentDevice(tsFileWriter, timestamp, counter);
- }
- tsFileWriter.flushAllChunkGroups();
- tsFileWriter.close();
- originalTsFileNum = counter + 1;
- } catch (Throwable e) {
- e.printStackTrace();
- }
- }
-
- @Test
- public void loadTsFileWithDifferentDevice() {
- try (Connection connection =
- DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
- prepareDataWithDifferentDevice();
-
- statement.execute(String.format("load '%s'", new File(tempDir).getAbsolutePath()));
-
- String dataDir = config.getDataDirs()[0];
- // sequence/logical_sg/virtual_sg/time_partitions
- File f =
- new File(
- dataDir,
- new PartialPath("sequence") + File.separator + "root.ln" + File.separator + "0");
- Assert.assertEquals((endTime - startTime) / (timePartition), f.list().length);
-
- int totalPartitionsNum = (int) ((endTime - startTime) / (timePartition));
- int[] splitTsFilePartitions = new int[totalPartitionsNum];
- for (int i = 0; i < splitTsFilePartitions.length; i++) {
- splitTsFilePartitions[i] = Integer.parseInt(f.list()[i]);
- }
- Arrays.sort(splitTsFilePartitions);
-
- for (int i = 0; i < (endTime - startTime) / (timePartition); i++) {
- Assert.assertEquals(i, splitTsFilePartitions[i]);
- }
-
- // each time partition folder should contain 2 files, the tsfile and the resource file
- for (int i = 0; i < (endTime - startTime) / (timePartition); i++) {
- Assert.assertEquals(2, new File(f.getAbsolutePath(), "" + i).list().length);
- }
-
- for (int i = 0; i < devices.length; i++) {
- statement.executeQuery(
- "select count(" + measurements[0] + ") from " + STORAGE_GROUP + DOT + devices[i]);
- ResultSet set = statement.getResultSet();
- set.next();
- long number = set.getLong(1);
- Assert.assertEquals(deviceDataPointNumber[i], number);
- }
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 19731bc330..90e2eaa4ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1014,6 +1014,10 @@ public class IoTDBDescriptor {
conf.setTimePartitionIntervalForStorage(
DateTimeUtils.convertMilliTimeWithPrecision(
conf.getTimePartitionIntervalForStorage(), conf.getTimestampPrecision()));
+
+ if (!conf.isClusterMode()) {
+ conf.setTimePartitionIntervalForRouting(conf.getTimePartitionIntervalForStorage());
+ }
}
private void loadAuthorCache(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/load/AlignedChunkData.java b/server/src/main/java/org/apache/iotdb/db/engine/load/AlignedChunkData.java
index 54f4885d41..103503d870 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/AlignedChunkData.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/AlignedChunkData.java
@@ -168,6 +168,7 @@ public class AlignedChunkData implements ChunkData {
@Override
public void serialize(DataOutputStream stream, File tsFile) throws IOException {
+ ReadWriteIOUtils.write(isModification(), stream);
ReadWriteIOUtils.write(isAligned(), stream);
serializeAttr(stream);
serializeTsFileData(stream, tsFile);
@@ -403,7 +404,7 @@ public class AlignedChunkData implements ChunkData {
if (isTimeChunk) {
long time = ReadWriteIOUtils.readLong(stream);
timePageBatch[i] = time;
- chunkWriter.write(time);
+ chunkWriter.writeTime(time);
} else {
boolean isNull = ReadWriteIOUtils.readBool(stream);
switch (chunkHeader.getDataType()) {
@@ -441,6 +442,12 @@ public class AlignedChunkData implements ChunkData {
timeBatch.add(timePageBatch);
}
decodePageIndex += 1;
+
+ if (isTimeChunk) {
+ chunkWriter.sealCurrentTimePage();
+ } else {
+ chunkWriter.sealCurrentValuePage(valueChunkIndex);
+ }
} else {
PageHeader pageHeader = PageHeader.deserializeFrom(stream, chunkHeader.getDataType(), true);
if (isTimeChunk) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java b/server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java
index 36ab1e87fa..bf5dae6256 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java
@@ -24,20 +24,15 @@ import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-public interface ChunkData {
+public interface ChunkData extends TsFileData {
String getDevice();
TTimePartitionSlot getTimePartitionSlot();
- long getDataSize();
-
void addDataSize(long pageSize);
void setNotDecode(IChunkMetadata chunkMetadata);
@@ -52,9 +47,10 @@ public interface ChunkData {
boolean isAligned();
- void writeToFileWriter(TsFileIOWriter writer) throws IOException;
-
- void serialize(DataOutputStream stream, File tsFile) throws IOException;
+ @Override
+ default boolean isModification() {
+ return false;
+ }
static ChunkData deserialize(InputStream stream) throws PageException, IOException {
boolean isAligned = ReadWriteIOUtils.readBool(stream);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/load/DeletionData.java b/server/src/main/java/org/apache/iotdb/db/engine/load/DeletionData.java
new file mode 100644
index 0000000000..2b9cfe8e76
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/DeletionData.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.load;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class DeletionData implements TsFileData {
+ private final Deletion deletion;
+
+ public DeletionData(Deletion deletion) {
+ this.deletion = deletion;
+ }
+
+ @Override
+ public long getDataSize() {
+ return 0;
+ }
+
+ @Override
+ public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
+ File tsFile = writer.getFile();
+ try (ModificationFile modificationFile =
+ new ModificationFile(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX)) {
+ writer.flush();
+ deletion.setFileOffset(tsFile.length());
+ modificationFile.write(deletion);
+ }
+ }
+
+ @Override
+ public boolean isModification() {
+ return true;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream, File tsFile) throws IOException {
+ ReadWriteIOUtils.write(isModification(), stream);
+ deletion.serializeWithoutFileOffset(stream);
+ }
+
+ public static DeletionData deserialize(InputStream stream)
+ throws IllegalPathException, IOException {
+ return new DeletionData(Deletion.deserializeWithoutFileOffset(new DataInputStream(stream)));
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
index 0998125863..57119901cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
@@ -112,9 +112,14 @@ public class LoadTsFileManager {
TsFileWriterManager writerManager =
uuid2WriterManager.computeIfAbsent(
uuid, o -> new TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
- for (ChunkData chunkData : pieceNode.getAllChunkData()) {
- writerManager.write(
- new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData);
+ for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
+ if (!tsFileData.isModification()) {
+ ChunkData chunkData = (ChunkData) tsFileData;
+ writerManager.write(
+ new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData);
+ } else {
+ writerManager.writeDeletion(tsFileData);
+ }
}
}
@@ -172,7 +177,9 @@ public class LoadTsFileManager {
}
private void clearDir(File dir) {
- FileUtils.deleteDirectory(dir);
+ if (dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
if (dir.mkdirs()) {
logger.info(String.format("Load TsFile dir %s is created.", dir.getPath()));
}
@@ -204,6 +211,15 @@ public class LoadTsFileManager {
chunkData.writeToFileWriter(writer);
}
+ private void writeDeletion(TsFileData deletionData) throws IOException {
+ if (isClosed) {
+ throw new IOException(String.format("%s TsFileWriterManager has been closed.", taskDir));
+ }
+ for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) {
+ deletionData.writeToFileWriter(entry.getValue());
+ }
+ }
+
private void loadAll() throws IOException, LoadFileException {
if (isClosed) {
throw new IOException(String.format("%s TsFileWriterManager has been closed.", taskDir));
@@ -236,6 +252,9 @@ public class LoadTsFileManager {
}
private void close() {
+ if (isClosed) {
+ return;
+ }
if (dataPartition2Writer != null) {
for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) {
try {
@@ -243,7 +262,7 @@ public class LoadTsFileManager {
if (writer.canWrite()) {
writer.close();
}
- if (!writer.getFile().delete()) {
+ if (writer.getFile().exists() && !writer.getFile().delete()) {
logger.warn(String.format("Delete File %s error.", writer.getFile()));
}
} catch (IOException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/load/NonAlignedChunkData.java b/server/src/main/java/org/apache/iotdb/db/engine/load/NonAlignedChunkData.java
index ef3df5e769..c3d378a2b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/NonAlignedChunkData.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/NonAlignedChunkData.java
@@ -133,6 +133,7 @@ public class NonAlignedChunkData implements ChunkData {
@Override
public void serialize(DataOutputStream stream, File tsFile) throws IOException {
+ ReadWriteIOUtils.write(isModification(), stream);
ReadWriteIOUtils.write(isAligned(), stream);
serializeAttr(stream);
if (needDecodeChunk()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java b/server/src/main/java/org/apache/iotdb/db/engine/load/TsFileData.java
similarity index 53%
copy from server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java
copy to server/src/main/java/org/apache/iotdb/db/engine/load/TsFileData.java
index 36ab1e87fa..7f99064ff2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/TsFileData.java
@@ -19,10 +19,8 @@
package org.apache.iotdb.db.engine.load;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.tsfile.exception.write.PageException;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -31,42 +29,18 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-public interface ChunkData {
- String getDevice();
-
- TTimePartitionSlot getTimePartitionSlot();
-
+public interface TsFileData {
long getDataSize();
- void addDataSize(long pageSize);
-
- void setNotDecode(IChunkMetadata chunkMetadata);
-
- boolean needDecodeChunk();
-
- void setHeadPageNeedDecode(boolean headPageNeedDecode);
-
- void setTailPageNeedDecode(boolean tailPageNeedDecode);
-
- void setTimePartitionSlot(TTimePartitionSlot timePartitionSlot);
-
- boolean isAligned();
-
void writeToFileWriter(TsFileIOWriter writer) throws IOException;
- void serialize(DataOutputStream stream, File tsFile) throws IOException;
+ boolean isModification();
- static ChunkData deserialize(InputStream stream) throws PageException, IOException {
- boolean isAligned = ReadWriteIOUtils.readBool(stream);
- return isAligned
- ? AlignedChunkData.deserialize(stream)
- : NonAlignedChunkData.deserialize(stream);
- }
+ void serialize(DataOutputStream stream, File tsFile) throws IOException;
- static ChunkData createChunkData(
- boolean isAligned, long offset, String device, ChunkHeader chunkHeader) {
- return isAligned
- ? new AlignedChunkData(offset, device, chunkHeader)
- : new NonAlignedChunkData(offset, device, chunkHeader);
+ static TsFileData deserialize(InputStream stream)
+ throws IOException, PageException, IllegalPathException {
+ boolean isModification = ReadWriteIOUtils.readBool(stream);
+ return isModification ? DeletionData.deserialize(stream) : ChunkData.deserialize(stream);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 7c786233b5..240ea4e686 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -3076,9 +3076,10 @@ public class DataRegion {
return false;
}
if (insertPos == -1) {
- tsFileManager.insertToPartitionFileList(tsFileResource, true, 0);
+ tsFileManager.insertToPartitionFileList(tsFileResource, filePartitionId, true, 0);
} else {
- tsFileManager.insertToPartitionFileList(tsFileResource, true, insertPos + 1);
+ tsFileManager.insertToPartitionFileList(
+ tsFileResource, filePartitionId, true, insertPos + 1);
}
logger.info(
"Load tsfile in sequence list, move file from {} to {}",
@@ -3156,15 +3157,13 @@ public class DataRegion {
} catch (IOException e) {
logger.error(
"File renaming failed when loading .mod file. Origin: {}, Target: {}",
- resourceFileToLoad.getAbsolutePath(),
+ modFileToLoad.getAbsolutePath(),
targetModFile.getAbsolutePath(),
e);
throw new LoadFileException(
String.format(
"File renaming failed when loading .mod file. Origin: %s, Target: %s, because %s",
- resourceFileToLoad.getAbsolutePath(),
- targetModFile.getAbsolutePath(),
- e.getMessage()));
+ modFileToLoad.getAbsolutePath(), targetModFile.getAbsolutePath(), e.getMessage()));
} finally {
// ModFile will be updated during the next call to `getModFile`
tsFileResource.setModFile(null);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index 32c70b13da..58d13bb673 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -145,13 +145,12 @@ public class TsFileManager {
* first, if insert Pos = 1, then to the second.
*/
public void insertToPartitionFileList(
- TsFileResource tsFileResource, boolean sequence, int insertPos) {
+ TsFileResource tsFileResource, long timePartition, boolean sequence, int insertPos) {
writeLock("add");
try {
Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles;
TsFileResourceList tsFileResources =
- selectedMap.computeIfAbsent(
- tsFileResource.getTimePartition(), o -> new TsFileResourceList());
+ selectedMap.computeIfAbsent(timePartition, o -> new TsFileResourceList());
tsFileResources.set(insertPos, tsFileResource);
} finally {
writeUnlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index 8b40d5fb33..b4ef48e847 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.SerializeUtils;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
@@ -247,7 +248,7 @@ public class DeviceTimeIndex implements ITimeIndex {
private long getTimePartitionWithCheck() {
long partitionId = SPANS_MULTI_TIME_PARTITIONS_FLAG_ID;
for (int index : deviceToIndex.values()) {
- long p = StorageEngine.getTimePartition(startTimes[index]);
+ long p = StorageEngineV2.getTimePartition(startTimes[index]);
if (partitionId == SPANS_MULTI_TIME_PARTITIONS_FLAG_ID) {
partitionId = p;
} else {
@@ -256,7 +257,7 @@ public class DeviceTimeIndex implements ITimeIndex {
}
}
- p = StorageEngine.getTimePartition(endTimes[index]);
+ p = StorageEngineV2.getTimePartition(endTimes[index]);
if (partitionId != p) {
return SPANS_MULTI_TIME_PARTITIONS_FLAG_ID;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index fa31c9d2ba..db5dd1bc2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1540,9 +1540,14 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
// construct TsFileResource
TsFileResource resource = new TsFileResource(tsFile);
- FileLoaderUtils.updateTsFileResource(device2Metadata, resource);
- resource.updatePlanIndexes(reader.getMinPlanIndex());
- resource.updatePlanIndexes(reader.getMaxPlanIndex());
+ if (!resource.resourceFileExists()) {
+ FileLoaderUtils.updateTsFileResource(
+ device2Metadata, resource); // serialize it in LoadSingleTsFileNode
+ resource.updatePlanIndexes(reader.getMinPlanIndex());
+ resource.updatePlanIndexes(reader.getMaxPlanIndex());
+ } else {
+ resource.deserialize();
+ }
// construct device time range
for (String device : resource.getDevices()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index f4126fb64a..03bd5f3bae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -27,6 +27,11 @@ import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.load.AlignedChunkData;
import org.apache.iotdb.db.engine.load.ChunkData;
+import org.apache.iotdb.db.engine.load.DeletionData;
+import org.apache.iotdb.db.engine.load.TsFileData;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -66,6 +71,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
public class LoadSingleTsFileNode extends WritePlanNode {
private static final Logger logger = LoggerFactory.getLogger(LoadSingleTsFileNode.class);
@@ -103,7 +110,7 @@ public class LoadSingleTsFileNode extends WritePlanNode {
allRegionReplicaSet.addAll(dataPartition.getAllDataRegionReplicaSetForOneDevice(device));
}
needDecodeTsFile = !isDispatchedToLocal(allRegionReplicaSet);
- if (!needDecodeTsFile) {
+ if (!needDecodeTsFile && !resource.resourceFileExists()) {
resource.serialize();
}
}
@@ -204,9 +211,12 @@ public class LoadSingleTsFileNode extends WritePlanNode {
public void splitTsFileByDataPartition(DataPartition dataPartition) throws IOException {
replicaSet2Pieces = new HashMap<>();
- List<ChunkData> chunkDataList = new ArrayList<>();
+ List<TsFileData> tsFileDataList = new ArrayList<>();
try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) {
+ TreeMap<Long, List<Deletion>> offset2Deletions = new TreeMap<>();
+ getAllModification(offset2Deletions);
+
if (!checkMagic(reader)) {
throw new TsFileRuntimeException(
String.format("Magic String check error when parsing TsFile %s.", tsFile.getPath()));
@@ -226,6 +236,8 @@ public class LoadSingleTsFileNode extends WritePlanNode {
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
long chunkOffset = reader.position();
+ handleModification(offset2Deletions, tsFileDataList, chunkOffset);
+
ChunkHeader header = reader.readChunkHeader(marker);
if (header.getDataSize() == 0) {
throw new TsFileRuntimeException(
@@ -251,7 +263,7 @@ public class LoadSingleTsFileNode extends WritePlanNode {
}
chunkData.setNotDecode(chunkMetadata);
chunkData.addDataSize(header.getDataSize());
- chunkDataList.add(chunkData);
+ tsFileDataList.add(chunkData);
reader.position(reader.position() + header.getDataSize());
break;
}
@@ -283,7 +295,7 @@ public class LoadSingleTsFileNode extends WritePlanNode {
TTimePartitionSlot pageTimePartitionSlot =
TimePartitionUtils.getTimePartitionForRouting(startTime);
if (!timePartitionSlot.equals(pageTimePartitionSlot)) {
- chunkDataList.add(chunkData);
+ tsFileDataList.add(chunkData);
timePartitionSlot = pageTimePartitionSlot;
chunkData = ChunkData.createChunkData(isAligned, pageOffset, curDevice, header);
chunkData.setTimePartitionSlot(timePartitionSlot);
@@ -315,7 +327,7 @@ public class LoadSingleTsFileNode extends WritePlanNode {
.add((AlignedChunkData) chunkData);
}
}
- chunkDataList.add(chunkData);
+ tsFileDataList.add(chunkData);
chunkData =
ChunkData.createChunkData(
@@ -338,7 +350,7 @@ public class LoadSingleTsFileNode extends WritePlanNode {
dataSize -= pageDataSize;
}
- chunkDataList.add(chunkData);
+ tsFileDataList.add(chunkData);
break;
case MetaMarker.VALUE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
@@ -393,11 +405,38 @@ public class LoadSingleTsFileNode extends WritePlanNode {
MetaMarker.handleUnexpectedMarker(marker);
}
}
+
+ handleModification(offset2Deletions, tsFileDataList, Long.MAX_VALUE);
}
- for (ChunkData chunkData : chunkDataList) {
- getPieceNode(chunkData.getDevice(), chunkData.getTimePartitionSlot(), dataPartition)
- .addChunkData(chunkData);
+ for (TsFileData tsFileData : tsFileDataList) {
+ if (!tsFileData.isModification()) {
+ ChunkData chunkData = (ChunkData) tsFileData;
+ getPieceNode(chunkData.getDevice(), chunkData.getTimePartitionSlot(), dataPartition)
+ .addTsFileData(chunkData);
+ } else {
+ for (Map.Entry<TRegionReplicaSet, List<LoadTsFilePieceNode>> entry :
+ replicaSet2Pieces.entrySet()) {
+ LoadTsFilePieceNode pieceNode = entry.getValue().get(entry.getValue().size() - 1);
+ pieceNode.addTsFileData(tsFileData);
+ }
+ }
+ }
+
+ logger.info(
+ String.format(
+ "Finish Parsing TsFile %s, split to %d pieces, send to %d RegionReplicaSet.",
+ tsFile.getPath(), tsFileDataList.size(), replicaSet2Pieces.keySet().size()));
+ }
+
+ private void getAllModification(Map<Long, List<Deletion>> offset2Deletions) throws IOException {
+ try (ModificationFile modificationFile =
+ new ModificationFile(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX)) {
+ for (Modification modification : modificationFile.getModifications()) {
+ offset2Deletions
+ .computeIfAbsent(modification.getFileOffset(), o -> new ArrayList<>())
+ .add((Deletion) modification);
+ }
}
}
@@ -434,6 +473,18 @@ public class LoadSingleTsFileNode extends WritePlanNode {
}
}
+ private void handleModification(
+ TreeMap<Long, List<Deletion>> offset2Deletions,
+ List<TsFileData> tsFileDataList,
+ long chunkOffset) {
+ while (!offset2Deletions.isEmpty() && offset2Deletions.firstEntry().getKey() <= chunkOffset) {
+ tsFileDataList.addAll(
+ offset2Deletions.pollFirstEntry().getValue().stream()
+ .map(DeletionData::new)
+ .collect(Collectors.toList()));
+ }
+ }
+
private boolean needDecodeChunk(IChunkMetadata chunkMetadata) {
return !TimePartitionUtils.getTimePartitionForRouting(chunkMetadata.getStartTime())
.equals(TimePartitionUtils.getTimePartitionForRouting(chunkMetadata.getEndTime()));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
index fbe3468baf..0ecb865963 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -96,6 +96,9 @@ public class LoadTsFileNode extends WritePlanNode {
singleTsFileNode.checkIfNeedDecodeTsFile(analysis.getDataPartitionInfo());
if (singleTsFileNode.needDecodeTsFile()) {
singleTsFileNode.splitTsFileByDataPartition(analysis.getDataPartitionInfo());
+ } else {
+ logger.info(
+ String.format("TsFile %s will be loaded to local.", resource.getTsFile().getPath()));
}
res.add(singleTsFileNode);
} catch (Exception e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFilePieceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index 8ed86b9af7..b0021b6d46 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -20,9 +20,10 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.load;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.load.ChunkData;
+import org.apache.iotdb.db.engine.load.TsFileData;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -52,7 +53,7 @@ public class LoadTsFilePieceNode extends WritePlanNode {
private File tsFile;
private long dataSize;
- private List<ChunkData> chunkDataList;
+ private List<TsFileData> tsFileDataList;
public LoadTsFilePieceNode(PlanNodeId id) {
super(id);
@@ -62,7 +63,7 @@ public class LoadTsFilePieceNode extends WritePlanNode {
super(id);
this.tsFile = tsFile;
this.dataSize = 0;
- this.chunkDataList = new ArrayList<>();
+ this.tsFileDataList = new ArrayList<>();
}
public boolean exceedSize() {
@@ -70,13 +71,13 @@ public class LoadTsFilePieceNode extends WritePlanNode {
|| dataSize >= config.getAllocateMemoryForFree() / 2;
}
- public void addChunkData(ChunkData chunkData) {
- chunkDataList.add(chunkData);
- dataSize += chunkData.getDataSize();
+ public void addTsFileData(TsFileData tsFileData) {
+ tsFileDataList.add(tsFileData);
+ dataSize += tsFileData.getDataSize();
}
- public List<ChunkData> getAllChunkData() {
- return chunkDataList;
+ public List<TsFileData> getAllTsFileData() {
+ return tsFileDataList;
}
public File getTsFile() {
@@ -127,14 +128,14 @@ public class LoadTsFilePieceNode extends WritePlanNode {
protected void serializeAttributes(DataOutputStream stream) throws IOException {
PlanNodeType.LOAD_TSFILE.serialize(stream);
ReadWriteIOUtils.write(tsFile.getPath(), stream); // TODO: can save this space
- ReadWriteIOUtils.write(chunkDataList.size(), stream);
- for (ChunkData chunkData : chunkDataList) {
+ ReadWriteIOUtils.write(tsFileDataList.size(), stream);
+ for (TsFileData tsFileData : tsFileDataList) {
try {
- chunkData.serialize(stream, tsFile);
+ tsFileData.serialize(stream, tsFile);
} catch (IOException e) {
logger.error(
String.format(
- "Parse page of TsFile %s error, skip chunk %s", tsFile.getPath(), chunkData));
+ "Parse page of TsFile %s error, skip chunk %s", tsFile.getPath(), tsFileData));
}
}
}
@@ -150,14 +151,14 @@ public class LoadTsFilePieceNode extends WritePlanNode {
ReadWriteIOUtils.readShort(stream); // read PlanNodeType
File tsFile = new File(ReadWriteIOUtils.readString(stream));
LoadTsFilePieceNode pieceNode = new LoadTsFilePieceNode(new PlanNodeId(""), tsFile);
- int chunkDataSize = ReadWriteIOUtils.readInt(stream);
- for (int i = 0; i < chunkDataSize; i++) {
- ChunkData chunkData = ChunkData.deserialize(stream);
- pieceNode.addChunkData(chunkData);
+ int tsFileDataSize = ReadWriteIOUtils.readInt(stream);
+ for (int i = 0; i < tsFileDataSize; i++) {
+ TsFileData tsFileData = TsFileData.deserialize(stream);
+ pieceNode.addTsFileData(tsFileData);
}
pieceNode.setPlanNodeId(PlanNodeId.deserialize(stream));
return pieceNode;
- } catch (IOException | PageException e) {
+ } catch (IOException | PageException | IllegalPathException e) {
logger.error(String.format("Deserialize %s error.", LoadTsFilePieceNode.class.getName()), e);
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index dd403d46ea..ccb5b46d19 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -133,6 +133,9 @@ public class LoadTsFileScheduler implements IScheduler {
}
private boolean dispatchOneTsFile(LoadSingleTsFileNode node) {
+ logger.info(
+ String.format(
+ "Start dispatching TsFile %s", node.getTsFileResource().getTsFile().getPath()));
for (Map.Entry<TRegionReplicaSet, List<LoadTsFilePieceNode>> entry :
node.getReplicaSet2Pieces().entrySet()) {
allReplicaSets.add(entry.getKey());
@@ -194,6 +197,7 @@ public class LoadTsFileScheduler implements IScheduler {
}
private boolean secondPhase(boolean isFirstPhaseSuccess, String uuid) {
+ logger.info(String.format("Start dispatching Load command for uuid %s", uuid));
TLoadCommandReq loadCommandReq =
new TLoadCommandReq(
(isFirstPhaseSuccess ? LoadCommand.EXECUTE : LoadCommand.ROLLBACK).ordinal(), uuid);
@@ -224,6 +228,9 @@ public class LoadTsFileScheduler implements IScheduler {
}
private boolean loadLocally(LoadSingleTsFileNode node) {
+ logger.info(
+ String.format(
+ "Start load TsFile %s locally.", node.getTsFileResource().getTsFile().getPath()));
try {
FragmentInstance instance =
new FragmentInstance(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
index 2737317be3..f22909f268 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -191,6 +191,10 @@ public class AlignedChunkWriterImpl implements IChunkWriter {
}
}
+ public void writeTime(long time) {
+ timeChunkWriter.write(time);
+ }
+
public void write(TimeColumn timeColumn, Column[] valueColumns, int batchSize) {
if (remainingPointsNumber < batchSize) {
int pointsHasWritten = (int) remainingPointsNumber;
@@ -340,6 +344,14 @@ public class AlignedChunkWriterImpl implements IChunkWriter {
}
}
+ public void sealCurrentTimePage() {
+ timeChunkWriter.sealCurrentPage();
+ }
+
+ public void sealCurrentValuePage(int valueIndex) {
+ valueChunkWriterList.get(valueIndex).sealCurrentPage();
+ }
+
@Override
public void clearPageWriter() {
timeChunkWriter.clearPageWriter();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index f2e73d56ae..a8de080e0e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -691,4 +691,8 @@ public class TsFileIOWriter implements AutoCloseable {
public String getCurrentChunkGroupDeviceId() {
return currentChunkGroupDeviceId;
}
+
+ public void flush() throws IOException {
+ out.flush();
+ }
}