You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/09 09:36:07 UTC
[iotdb] branch master updated: [IOTDB-3656] mpp load supports last cache (#7529)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9fe2889948 [IOTDB-3656] mpp load supports last cache (#7529)
9fe2889948 is described below
commit 9fe288994877af568dfb192275f61cb077b0380b
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Sun Oct 9 17:36:00 2022 +0800
[IOTDB-3656] mpp load supports last cache (#7529)
---
.../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java | 80 ++++++++++++++++++++++
.../iotdb/db/engine/storagegroup/DataRegion.java | 35 +++++-----
2 files changed, 98 insertions(+), 17 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
index b59a77f41d..823727e63c 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
@@ -373,6 +373,86 @@ public class IOTDBLoadTsFileIT {
}
}
+ @Test
+ public void testLoadWithLastCache() throws Exception {
+ registerSchema();
+
+ String device = SchemaConfig.DEVICE_0;
+ String measurement = SchemaConfig.MEASUREMENT_00.getMeasurementId();
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ statement.execute(
+ String.format("insert into %s(timestamp, %s) values(100, 100)", device, measurement));
+
+ try (ResultSet resultSet =
+ statement.executeQuery(String.format("select last %s from %s", measurement, device))) {
+ if (resultSet.next()) {
+ String lastValue = resultSet.getString("value");
+ Assert.assertEquals("100", lastValue);
+ } else {
+ Assert.fail("This ResultSet is empty.");
+ }
+ }
+ }
+
+ File file1 = new File(tmpDir, "1-0-0-0.tsfile");
+ File file2 = new File(tmpDir, "2-0-0-0.tsfile");
+ long writtenPoint1 = 0;
+ // device 0, device 1, sg 0
+ try (TsFileGenerator generator = new TsFileGenerator(file1)) {
+ generator.registerTimeseries(
+ new Path(SchemaConfig.DEVICE_0),
+ Arrays.asList(
+ SchemaConfig.MEASUREMENT_00,
+ SchemaConfig.MEASUREMENT_01,
+ SchemaConfig.MEASUREMENT_02,
+ SchemaConfig.MEASUREMENT_03));
+ generator.registerAlignedTimeseries(
+ new Path(SchemaConfig.DEVICE_1),
+ Arrays.asList(
+ SchemaConfig.MEASUREMENT_10,
+ SchemaConfig.MEASUREMENT_11,
+ SchemaConfig.MEASUREMENT_12,
+ SchemaConfig.MEASUREMENT_13));
+ generator.generateData(new Path(SchemaConfig.DEVICE_0), 10000, false);
+ generator.generateData(new Path(SchemaConfig.DEVICE_1), 10000, true);
+ writtenPoint1 = generator.getTotalNumber();
+ }
+
+ long writtenPoint2 = 0;
+ // device 2, device 3, device4, sg 1
+ try (TsFileGenerator generator = new TsFileGenerator(file2)) {
+ generator.registerTimeseries(
+ new Path(SchemaConfig.DEVICE_2), Arrays.asList(SchemaConfig.MEASUREMENT_20));
+ generator.registerTimeseries(
+ new Path(SchemaConfig.DEVICE_3), Arrays.asList(SchemaConfig.MEASUREMENT_30));
+ generator.registerAlignedTimeseries(
+ new Path(SchemaConfig.DEVICE_4), Arrays.asList(SchemaConfig.MEASUREMENT_40));
+ generator.generateData(new Path(SchemaConfig.DEVICE_2), 10000, false);
+ generator.generateData(new Path(SchemaConfig.DEVICE_3), 10000, false);
+ generator.generateData(new Path(SchemaConfig.DEVICE_4), 10000, true);
+ writtenPoint2 = generator.getTotalNumber();
+ }
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));
+
+ try (ResultSet resultSet =
+ statement.executeQuery(String.format("select last %s from %s", measurement, device))) {
+ if (resultSet.next()) {
+ String lastTime = resultSet.getString("Time");
+ Assert.assertEquals("9999", lastTime);
+ } else {
+ Assert.fail("This ResultSet is empty.");
+ }
+ }
+ }
+ }
+
private static class SchemaConfig {
private static final String STORAGE_GROUP_0 = "root.sg.test_0";
private static final String STORAGE_GROUP_1 = "root.sg.test_1";
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 19cb56d6ec..188bea0a9e 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -2669,22 +2669,24 @@ public class DataRegion {
}
}
- private void resetLastCacheWhenLoadingTsfile(TsFileResource newTsFileResource)
+ private void resetLastCacheWhenLoadingTsFile(TsFileResource resource)
throws IllegalPathException {
- for (String device : newTsFileResource.getDevices()) {
- tryToDeleteLastCacheByDevice(new PartialPath(device));
- }
- }
-
- private void tryToDeleteLastCacheByDevice(PartialPath deviceId) {
if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
return;
}
- // try { TODO: support last cache
- // IoTDB.schemaProcessor.deleteLastCacheByDevice(deviceId);
- // } catch (MetadataException e) {
- // // the path doesn't cache in cluster mode now, ignore
- // }
+
+ if (config.isMppMode()) {
+ // TODO: implement more precise process
+ DataNodeSchemaCache.getInstance().cleanUp();
+ } else {
+ for (String device : resource.getDevices()) {
+ try {
+ IoTDB.schemaProcessor.deleteLastCacheByDevice(new PartialPath(device));
+ } catch (MetadataException e) {
+ logger.warn(String.format("Create device %s error.", device));
+ }
+ }
+ }
}
/**
@@ -2736,10 +2738,9 @@ public class DataRegion {
newFilePartitionId,
insertPos,
deleteOriginFile);
- resetLastCacheWhenLoadingTsfile(newTsFileResource);
- // update latest time map
- updateLatestTimeMap(newTsFileResource);
+ resetLastCacheWhenLoadingTsFile(newTsFileResource); // update last cache
+ updateLastFlushTime(newTsFileResource); // update last flush time
long partitionNum = newTsFileResource.getTimePartition();
updatePartitionFileVersion(partitionNum, newTsFileResource.getVersion());
logger.info("TsFile {} is successfully loaded in {} list.", newFileName, renameInfo);
@@ -3005,10 +3006,10 @@ public class DataRegion {
* Update latest time in latestTimeForEachDevice and
* partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load external tsfile module.
*/
- private void updateLatestTimeMap(TsFileResource newTsFileResource) {
+ private void updateLastFlushTime(TsFileResource newTsFileResource) {
for (String device : newTsFileResource.getDevices()) {
long endTime = newTsFileResource.getEndTime(device);
- long timePartitionId = StorageEngine.getTimePartition(endTime);
+ long timePartitionId = StorageEngineV2.getTimePartition(endTime);
lastFlushTimeManager.updateLastTime(timePartitionId, device, endTime);
lastFlushTimeManager.updateFlushedTime(timePartitionId, device, endTime);
lastFlushTimeManager.updateGlobalFlushedTime(device, endTime);