You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/18 09:11:37 UTC

[iotdb] 01/01: [IOTDB-3871] Make last query be able to update last cache even if there is no other write happens after restarting

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch UpdateLastCache
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 21e30ddf43ab4e50f6a78021ae103fc0c254ea24
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Jul 18 17:11:23 2022 +0800

    [IOTDB-3871] Make last query be able to update last cache even if there is no other write happens after restarting
---
 .../org/apache/iotdb/db/metadata/path/AlignedPath.java   |  7 +++++++
 .../apache/iotdb/db/metadata/path/MeasurementPath.java   |  8 ++++++++
 .../operator/process/UpdateLastCacheOperator.java        |  6 +++---
 .../iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java | 16 ++++++++++------
 4 files changed, 28 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index 1c6a9326e5..3d81316027 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -337,6 +337,13 @@ public class AlignedPath extends PartialPath {
     return getDevicePath().concatNode(measurementList.get(0));
   }
 
+  public MeasurementPath getMeasurementPath() {
+    if (schemaList.size() != 1) {
+      throw new UnsupportedOperationException();
+    }
+    return new MeasurementPath(transformToPartialPath(), schemaList.get(0), true);
+  }
+
   public String getFormattedString() {
     return getDevicePath().toString() + "[" + String.join(",", measurementList) + "]";
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index 469d223d58..2525844e31 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -58,8 +58,16 @@ public class MeasurementPath extends PartialPath {
   }
 
   public MeasurementPath(PartialPath measurementPath, IMeasurementSchema measurementSchema) {
+    this(measurementPath, measurementSchema, false);
+  }
+
+  public MeasurementPath(
+      PartialPath measurementPath,
+      IMeasurementSchema measurementSchema,
+      boolean isUnderAlignedEntity) {
     super(measurementPath.getNodes());
     this.measurementSchema = measurementSchema;
+    this.isUnderAlignedEntity = isUnderAlignedEntity;
   }
 
   public MeasurementPath(String device, String measurement, IMeasurementSchema measurementSchema)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
index 1bfebb251c..4088a13396 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
-import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
@@ -47,7 +47,7 @@ public class UpdateLastCacheOperator implements ProcessOperator {
   // fullPath for queried time series
   // It should be exact PartialPath, neither MeasurementPath nor AlignedPath, because lastCache only
   // accept PartialPath
-  private final PartialPath fullPath;
+  private final MeasurementPath fullPath;
 
   // dataType for queried time series;
   private final String dataType;
@@ -61,7 +61,7 @@ public class UpdateLastCacheOperator implements ProcessOperator {
   public UpdateLastCacheOperator(
       OperatorContext operatorContext,
       Operator child,
-      PartialPath fullPath,
+      MeasurementPath fullPath,
       TSDataType dataType,
       DataNodeSchemaCache dataNodeSchemaCache,
       boolean needUpdateCache) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 2cddb29cc5..a6a7d5cde9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -1184,7 +1184,7 @@ public class LocalExecutionPlanner {
       PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
       TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
       if (timeValuePair == null) { // last value is not cached
-        return createUpdateLastCacheOperator(node, context, seriesPath);
+        return createUpdateLastCacheOperator(node, context, node.getSeriesPath());
       } else if (!satisfyFilter(
           context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
 
@@ -1193,7 +1193,7 @@ public class LocalExecutionPlanner {
                 || context.lastQueryTimeFilter instanceof GtEq);
         // time filter is not > or >=, we still need to read from disk
         if (!isFilterGtOrGe) {
-          return createUpdateLastCacheOperator(node, context, seriesPath);
+          return createUpdateLastCacheOperator(node, context, node.getSeriesPath());
         } else { // otherwise, we just ignore it and return null
           return null;
         }
@@ -1204,7 +1204,7 @@ public class LocalExecutionPlanner {
     }
 
     private UpdateLastCacheOperator createUpdateLastCacheOperator(
-        LastQueryScanNode node, LocalExecutionPlanContext context, PartialPath fullPath) {
+        LastQueryScanNode node, LocalExecutionPlanContext context, MeasurementPath fullPath) {
       SeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator(node, context);
 
       OperatorContext operatorContext =
@@ -1256,7 +1256,8 @@ public class LocalExecutionPlanner {
       PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
       TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
       if (timeValuePair == null) { // last value is not cached
-        return createUpdateLastCacheOperator(node, context, seriesPath);
+        return createUpdateLastCacheOperator(
+            node, context, node.getSeriesPath().getMeasurementPath());
       } else if (!satisfyFilter(
           context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
 
@@ -1265,7 +1266,8 @@ public class LocalExecutionPlanner {
                 || context.lastQueryTimeFilter instanceof GtEq);
         // time filter is not > or >=, we still need to read from disk
         if (!isFilterGtOrGe) {
-          return createUpdateLastCacheOperator(node, context, seriesPath);
+          return createUpdateLastCacheOperator(
+              node, context, node.getSeriesPath().getMeasurementPath());
         } else { // otherwise, we just ignore it and return null
           return null;
         }
@@ -1276,7 +1278,9 @@ public class LocalExecutionPlanner {
     }
 
     private UpdateLastCacheOperator createUpdateLastCacheOperator(
-        AlignedLastQueryScanNode node, LocalExecutionPlanContext context, PartialPath fullPath) {
+        AlignedLastQueryScanNode node,
+        LocalExecutionPlanContext context,
+        MeasurementPath fullPath) {
       AlignedSeriesAggregationScanOperator lastQueryScan =
           createLastQueryScanOperator(node, context);