You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/09 09:36:07 UTC

[iotdb] branch master updated: [IOTDB-3656] mpp load supports last cache (#7529)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fe2889948 [IOTDB-3656] mpp load supports last cache (#7529)
9fe2889948 is described below

commit 9fe288994877af568dfb192275f61cb077b0380b
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Sun Oct 9 17:36:00 2022 +0800

    [IOTDB-3656] mpp load supports last cache (#7529)
---
 .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java  | 80 ++++++++++++++++++++++
 .../iotdb/db/engine/storagegroup/DataRegion.java   | 35 +++++-----
 2 files changed, 98 insertions(+), 17 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
index b59a77f41d..823727e63c 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
@@ -373,6 +373,86 @@ public class IOTDBLoadTsFileIT {
     }
   }
 
+  @Test
+  public void testLoadWithLastCache() throws Exception {
+    registerSchema();
+
+    String device = SchemaConfig.DEVICE_0;
+    String measurement = SchemaConfig.MEASUREMENT_00.getMeasurementId();
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      statement.execute(
+          String.format("insert into %s(timestamp, %s) values(100, 100)", device, measurement));
+
+      try (ResultSet resultSet =
+          statement.executeQuery(String.format("select last %s from %s", measurement, device))) {
+        if (resultSet.next()) {
+          String lastValue = resultSet.getString("value");
+          Assert.assertEquals("100", lastValue);
+        } else {
+          Assert.fail("This ResultSet is empty.");
+        }
+      }
+    }
+
+    File file1 = new File(tmpDir, "1-0-0-0.tsfile");
+    File file2 = new File(tmpDir, "2-0-0-0.tsfile");
+    long writtenPoint1 = 0;
+    // device 0, device 1, sg 0
+    try (TsFileGenerator generator = new TsFileGenerator(file1)) {
+      generator.registerTimeseries(
+          new Path(SchemaConfig.DEVICE_0),
+          Arrays.asList(
+              SchemaConfig.MEASUREMENT_00,
+              SchemaConfig.MEASUREMENT_01,
+              SchemaConfig.MEASUREMENT_02,
+              SchemaConfig.MEASUREMENT_03));
+      generator.registerAlignedTimeseries(
+          new Path(SchemaConfig.DEVICE_1),
+          Arrays.asList(
+              SchemaConfig.MEASUREMENT_10,
+              SchemaConfig.MEASUREMENT_11,
+              SchemaConfig.MEASUREMENT_12,
+              SchemaConfig.MEASUREMENT_13));
+      generator.generateData(new Path(SchemaConfig.DEVICE_0), 10000, false);
+      generator.generateData(new Path(SchemaConfig.DEVICE_1), 10000, true);
+      writtenPoint1 = generator.getTotalNumber();
+    }
+
+    long writtenPoint2 = 0;
+    // device 2, device 3, device4, sg 1
+    try (TsFileGenerator generator = new TsFileGenerator(file2)) {
+      generator.registerTimeseries(
+          new Path(SchemaConfig.DEVICE_2), Arrays.asList(SchemaConfig.MEASUREMENT_20));
+      generator.registerTimeseries(
+          new Path(SchemaConfig.DEVICE_3), Arrays.asList(SchemaConfig.MEASUREMENT_30));
+      generator.registerAlignedTimeseries(
+          new Path(SchemaConfig.DEVICE_4), Arrays.asList(SchemaConfig.MEASUREMENT_40));
+      generator.generateData(new Path(SchemaConfig.DEVICE_2), 10000, false);
+      generator.generateData(new Path(SchemaConfig.DEVICE_3), 10000, false);
+      generator.generateData(new Path(SchemaConfig.DEVICE_4), 10000, true);
+      writtenPoint2 = generator.getTotalNumber();
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));
+
+      try (ResultSet resultSet =
+          statement.executeQuery(String.format("select last %s from %s", measurement, device))) {
+        if (resultSet.next()) {
+          String lastTime = resultSet.getString("Time");
+          Assert.assertEquals("9999", lastTime);
+        } else {
+          Assert.fail("This ResultSet is empty.");
+        }
+      }
+    }
+  }
+
   private static class SchemaConfig {
     private static final String STORAGE_GROUP_0 = "root.sg.test_0";
     private static final String STORAGE_GROUP_1 = "root.sg.test_1";
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 19cb56d6ec..188bea0a9e 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -2669,22 +2669,24 @@ public class DataRegion {
     }
   }
 
-  private void resetLastCacheWhenLoadingTsfile(TsFileResource newTsFileResource)
+  private void resetLastCacheWhenLoadingTsFile(TsFileResource resource)
       throws IllegalPathException {
-    for (String device : newTsFileResource.getDevices()) {
-      tryToDeleteLastCacheByDevice(new PartialPath(device));
-    }
-  }
-
-  private void tryToDeleteLastCacheByDevice(PartialPath deviceId) {
     if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
       return;
     }
-    //    try { TODO: support last cache
-    //      IoTDB.schemaProcessor.deleteLastCacheByDevice(deviceId);
-    //    } catch (MetadataException e) {
-    //      // the path doesn't cache in cluster mode now, ignore
-    //    }
+
+    if (config.isMppMode()) {
+      // TODO: implement more precise process
+      DataNodeSchemaCache.getInstance().cleanUp();
+    } else {
+      for (String device : resource.getDevices()) {
+        try {
+          IoTDB.schemaProcessor.deleteLastCacheByDevice(new PartialPath(device));
+        } catch (MetadataException e) {
+          logger.warn(String.format("Create device %s error.", device));
+        }
+      }
+    }
   }
 
   /**
@@ -2736,10 +2738,9 @@ public class DataRegion {
           newFilePartitionId,
           insertPos,
           deleteOriginFile);
-      resetLastCacheWhenLoadingTsfile(newTsFileResource);
 
-      // update latest time map
-      updateLatestTimeMap(newTsFileResource);
+      resetLastCacheWhenLoadingTsFile(newTsFileResource); // update last cache
+      updateLastFlushTime(newTsFileResource); // update last flush time
       long partitionNum = newTsFileResource.getTimePartition();
       updatePartitionFileVersion(partitionNum, newTsFileResource.getVersion());
       logger.info("TsFile {} is successfully loaded in {} list.", newFileName, renameInfo);
@@ -3005,10 +3006,10 @@ public class DataRegion {
    * Update latest time in latestTimeForEachDevice and
    * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load external tsfile module.
    */
-  private void updateLatestTimeMap(TsFileResource newTsFileResource) {
+  private void updateLastFlushTime(TsFileResource newTsFileResource) {
     for (String device : newTsFileResource.getDevices()) {
       long endTime = newTsFileResource.getEndTime(device);
-      long timePartitionId = StorageEngine.getTimePartition(endTime);
+      long timePartitionId = StorageEngineV2.getTimePartition(endTime);
       lastFlushTimeManager.updateLastTime(timePartitionId, device, endTime);
       lastFlushTimeManager.updateFlushedTime(timePartitionId, device, endTime);
       lastFlushTimeManager.updateGlobalFlushedTime(device, endTime);