You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/18 09:11:36 UTC

[iotdb] branch UpdateLastCache created (now 21e30ddf43)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch UpdateLastCache
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 21e30ddf43 [IOTDB-3871] Make last query be able to update last cache even if there is no other write happens after restarting

This branch includes the following new commits:

     new 21e30ddf43 [IOTDB-3871] Make last query be able to update last cache even if there is no other write happens after restarting

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-3871] Make last query be able to update last cache even if there is no other write happens after restarting

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch UpdateLastCache
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 21e30ddf43ab4e50f6a78021ae103fc0c254ea24
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Jul 18 17:11:23 2022 +0800

    [IOTDB-3871] Make last query be able to update last cache even if there is no other write happens after restarting
---
 .../org/apache/iotdb/db/metadata/path/AlignedPath.java   |  7 +++++++
 .../apache/iotdb/db/metadata/path/MeasurementPath.java   |  8 ++++++++
 .../operator/process/UpdateLastCacheOperator.java        |  6 +++---
 .../iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java | 16 ++++++++++------
 4 files changed, 28 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index 1c6a9326e5..3d81316027 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -337,6 +337,13 @@ public class AlignedPath extends PartialPath {
     return getDevicePath().concatNode(measurementList.get(0));
   }
 
+  public MeasurementPath getMeasurementPath() {
+    if (schemaList.size() != 1) {
+      throw new UnsupportedOperationException();
+    }
+    return new MeasurementPath(transformToPartialPath(), schemaList.get(0), true);
+  }
+
   public String getFormattedString() {
     return getDevicePath().toString() + "[" + String.join(",", measurementList) + "]";
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index 469d223d58..2525844e31 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -58,8 +58,16 @@ public class MeasurementPath extends PartialPath {
   }
 
   public MeasurementPath(PartialPath measurementPath, IMeasurementSchema measurementSchema) {
+    this(measurementPath, measurementSchema, false);
+  }
+
+  public MeasurementPath(
+      PartialPath measurementPath,
+      IMeasurementSchema measurementSchema,
+      boolean isUnderAlignedEntity) {
     super(measurementPath.getNodes());
     this.measurementSchema = measurementSchema;
+    this.isUnderAlignedEntity = isUnderAlignedEntity;
   }
 
   public MeasurementPath(String device, String measurement, IMeasurementSchema measurementSchema)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
index 1bfebb251c..4088a13396 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
-import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
@@ -47,7 +47,7 @@ public class UpdateLastCacheOperator implements ProcessOperator {
   // fullPath for queried time series
   // It should be exact PartialPath, neither MeasurementPath nor AlignedPath, because lastCache only
   // accept PartialPath
-  private final PartialPath fullPath;
+  private final MeasurementPath fullPath;
 
   // dataType for queried time series;
   private final String dataType;
@@ -61,7 +61,7 @@ public class UpdateLastCacheOperator implements ProcessOperator {
   public UpdateLastCacheOperator(
       OperatorContext operatorContext,
       Operator child,
-      PartialPath fullPath,
+      MeasurementPath fullPath,
       TSDataType dataType,
       DataNodeSchemaCache dataNodeSchemaCache,
       boolean needUpdateCache) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 2cddb29cc5..a6a7d5cde9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -1184,7 +1184,7 @@ public class LocalExecutionPlanner {
       PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
       TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
       if (timeValuePair == null) { // last value is not cached
-        return createUpdateLastCacheOperator(node, context, seriesPath);
+        return createUpdateLastCacheOperator(node, context, node.getSeriesPath());
       } else if (!satisfyFilter(
           context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
 
@@ -1193,7 +1193,7 @@ public class LocalExecutionPlanner {
                 || context.lastQueryTimeFilter instanceof GtEq);
         // time filter is not > or >=, we still need to read from disk
         if (!isFilterGtOrGe) {
-          return createUpdateLastCacheOperator(node, context, seriesPath);
+          return createUpdateLastCacheOperator(node, context, node.getSeriesPath());
         } else { // otherwise, we just ignore it and return null
           return null;
         }
@@ -1204,7 +1204,7 @@ public class LocalExecutionPlanner {
     }
 
     private UpdateLastCacheOperator createUpdateLastCacheOperator(
-        LastQueryScanNode node, LocalExecutionPlanContext context, PartialPath fullPath) {
+        LastQueryScanNode node, LocalExecutionPlanContext context, MeasurementPath fullPath) {
       SeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator(node, context);
 
       OperatorContext operatorContext =
@@ -1256,7 +1256,8 @@ public class LocalExecutionPlanner {
       PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
       TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
       if (timeValuePair == null) { // last value is not cached
-        return createUpdateLastCacheOperator(node, context, seriesPath);
+        return createUpdateLastCacheOperator(
+            node, context, node.getSeriesPath().getMeasurementPath());
       } else if (!satisfyFilter(
           context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
 
@@ -1265,7 +1266,8 @@ public class LocalExecutionPlanner {
                 || context.lastQueryTimeFilter instanceof GtEq);
         // time filter is not > or >=, we still need to read from disk
         if (!isFilterGtOrGe) {
-          return createUpdateLastCacheOperator(node, context, seriesPath);
+          return createUpdateLastCacheOperator(
+              node, context, node.getSeriesPath().getMeasurementPath());
         } else { // otherwise, we just ignore it and return null
           return null;
         }
@@ -1276,7 +1278,9 @@ public class LocalExecutionPlanner {
     }
 
     private UpdateLastCacheOperator createUpdateLastCacheOperator(
-        AlignedLastQueryScanNode node, LocalExecutionPlanContext context, PartialPath fullPath) {
+        AlignedLastQueryScanNode node,
+        LocalExecutionPlanContext context,
+        MeasurementPath fullPath) {
       AlignedSeriesAggregationScanOperator lastQueryScan =
           createLastQueryScanOperator(node, context);