You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/27 03:40:44 UTC

[iotdb] 01/01: use cache for last query

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 76cd146380429e55c1e0c23b43092b090f4806e3
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri May 27 11:40:27 2022 +0800

    use cache for last query
---
 .../org/apache/iotdb/commons/path/PartialPath.java |  4 ++++
 .../apache/iotdb/db/metadata/path/AlignedPath.java |  8 ++++++++
 .../iotdb/db/metadata/path/MeasurementPath.java    |  5 +++++
 .../db/mpp/execution/operator/LastQueryUtil.java   |  6 +++++-
 .../operator/process/UpdateLastCacheOperator.java  |  2 ++
 .../db/mpp/plan/planner/LocalExecutionPlanner.java | 22 ++++++++++++----------
 6 files changed, 36 insertions(+), 11 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index 31861e3e10..510a02dde3 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -547,4 +547,8 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
     partialPath.fullPath = path.getFullPath();
     return partialPath;
   }
+
+  public PartialPath transformToPartialPath() {
+    return this;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index e420cf4dde..4ef32ba2a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -303,4 +303,12 @@ public class AlignedPath extends PartialPath {
     alignedPath.fullPath = partialPath.getFullPath();
     return alignedPath;
   }
+
+  @Override
+  public PartialPath transformToPartialPath() {
+    if (measurementList.size() != 1) {
+      throw new UnsupportedOperationException();
+    }
+    return getDevicePath().concatNode(measurementList.get(0));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index e0f4fd1dba..0a4a83dc3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -219,4 +219,9 @@ public class MeasurementPath extends PartialPath {
     measurementPath.fullPath = partialPath.getFullPath();
     return measurementPath;
   }
+
+  @Override
+  public PartialPath transformToPartialPath() {
+    return getDevicePath().concatNode(getTailNode());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
index 0ec8d36dd6..f8cb5ae291 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.LastValueDescAccumulator;
 import org.apache.iotdb.db.mpp.aggregation.MaxTimeDescAccumulator;
@@ -39,6 +40,9 @@ import java.util.List;
 
 public class LastQueryUtil {
 
+  private static final boolean CACHE_ENABLED =
+      IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled();
+
   public static TsBlockBuilder createTsBlockBuilder() {
     return new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
   }
@@ -84,6 +88,6 @@ public class LastQueryUtil {
 
   public static boolean needUpdateCache(Filter timeFilter) {
     // Update the cache only when, the filter is gt (greater than) or ge (greater than or equal to)
-    return (timeFilter instanceof GtEq) || (timeFilter instanceof Gt);
+    return CACHE_ENABLED && (timeFilter instanceof GtEq) || (timeFilter instanceof Gt);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
index ee0eb898a8..4ce473e62b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -45,6 +45,8 @@ public class UpdateLastCacheOperator implements ProcessOperator {
   private final Operator child;
 
   // fullPath for queried time series
+  // It should be exact PartialPath, neither MeasurementPath nor AlignedPath, because lastCache only
+  // accept PartialPath
   private final PartialPath fullPath;
 
   // dataType for queried time series;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 9e37b44da9..6587e05306 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -1046,9 +1046,10 @@ public class LocalExecutionPlanner {
 
     @Override
     public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanContext context) {
-      TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+      PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
+      TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
       if (timeValuePair == null) { // last value is not cached
-        return createUpdateLastCacheOperator(node, context);
+        return createUpdateLastCacheOperator(node, context, seriesPath);
       } else if (!satisfyFilter(
           context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
 
@@ -1057,7 +1058,7 @@ public class LocalExecutionPlanner {
                 || context.lastQueryTimeFilter instanceof GtEq);
         // time filter is not > or >=, we still need to read from disk
         if (!isFilterGtOrGe) {
-          return createUpdateLastCacheOperator(node, context);
+          return createUpdateLastCacheOperator(node, context, seriesPath);
         } else { // otherwise, we just ignore it and return null
           return null;
         }
@@ -1069,7 +1070,7 @@ public class LocalExecutionPlanner {
     }
 
     private UpdateLastCacheOperator createUpdateLastCacheOperator(
-        LastQueryScanNode node, LocalExecutionPlanContext context) {
+        LastQueryScanNode node, LocalExecutionPlanContext context, PartialPath fullPath) {
       SeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator(node, context);
 
       return new UpdateLastCacheOperator(
@@ -1078,7 +1079,7 @@ public class LocalExecutionPlanner {
               node.getPlanNodeId(),
               UpdateLastCacheOperator.class.getSimpleName()),
           lastQueryScan,
-          node.getSeriesPath(),
+          fullPath,
           node.getSeriesPath().getSeriesType(),
           DATA_NODE_SCHEMA_CACHE,
           context.needUpdateLastCache);
@@ -1114,9 +1115,10 @@ public class LocalExecutionPlanner {
     @Override
     public Operator visitAlignedLastQueryScan(
         AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
-      TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+      PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
+      TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
       if (timeValuePair == null) { // last value is not cached
-        return createUpdateLastCacheOperator(node, context);
+        return createUpdateLastCacheOperator(node, context, seriesPath);
       } else if (!satisfyFilter(
           context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
 
@@ -1125,7 +1127,7 @@ public class LocalExecutionPlanner {
                 || context.lastQueryTimeFilter instanceof GtEq);
         // time filter is not > or >=, we still need to read from disk
         if (!isFilterGtOrGe) {
-          return createUpdateLastCacheOperator(node, context);
+          return createUpdateLastCacheOperator(node, context, seriesPath);
         } else { // otherwise, we just ignore it and return null
           return null;
         }
@@ -1137,7 +1139,7 @@ public class LocalExecutionPlanner {
     }
 
     private UpdateLastCacheOperator createUpdateLastCacheOperator(
-        AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+        AlignedLastQueryScanNode node, LocalExecutionPlanContext context, PartialPath fullPath) {
       AlignedSeriesAggregationScanOperator lastQueryScan =
           createLastQueryScanOperator(node, context);
 
@@ -1147,7 +1149,7 @@ public class LocalExecutionPlanner {
               node.getPlanNodeId(),
               UpdateLastCacheOperator.class.getSimpleName()),
           lastQueryScan,
-          node.getSeriesPath(),
+          fullPath,
           node.getSeriesPath().getSchemaList().get(0).getType(),
           DATA_NODE_SCHEMA_CACHE,
           context.needUpdateLastCache);