You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/27 03:40:43 UTC

[iotdb] branch LastOperator created (now 76cd146380)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 76cd146380 use cache for last query

This branch includes the following new commits:

     new 76cd146380 use cache for last query

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: use cache for last query

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 76cd146380429e55c1e0c23b43092b090f4806e3
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri May 27 11:40:27 2022 +0800

    use cache for last query
---
 .../org/apache/iotdb/commons/path/PartialPath.java |  4 ++++
 .../apache/iotdb/db/metadata/path/AlignedPath.java |  8 ++++++++
 .../iotdb/db/metadata/path/MeasurementPath.java    |  5 +++++
 .../db/mpp/execution/operator/LastQueryUtil.java   |  6 +++++-
 .../operator/process/UpdateLastCacheOperator.java  |  2 ++
 .../db/mpp/plan/planner/LocalExecutionPlanner.java | 22 ++++++++++++----------
 6 files changed, 36 insertions(+), 11 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index 31861e3e10..510a02dde3 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -547,4 +547,8 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
     partialPath.fullPath = path.getFullPath();
     return partialPath;
   }
+
+  public PartialPath transformToPartialPath() {
+    return this;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index e420cf4dde..4ef32ba2a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -303,4 +303,12 @@ public class AlignedPath extends PartialPath {
     alignedPath.fullPath = partialPath.getFullPath();
     return alignedPath;
   }
+
+  @Override
+  public PartialPath transformToPartialPath() {
+    if (measurementList.size() != 1) {
+      throw new UnsupportedOperationException();
+    }
+    return getDevicePath().concatNode(measurementList.get(0));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index e0f4fd1dba..0a4a83dc3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -219,4 +219,9 @@ public class MeasurementPath extends PartialPath {
     measurementPath.fullPath = partialPath.getFullPath();
     return measurementPath;
   }
+
+  @Override
+  public PartialPath transformToPartialPath() {
+    return getDevicePath().concatNode(getTailNode());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
index 0ec8d36dd6..f8cb5ae291 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.LastValueDescAccumulator;
 import org.apache.iotdb.db.mpp.aggregation.MaxTimeDescAccumulator;
@@ -39,6 +40,9 @@ import java.util.List;
 
 public class LastQueryUtil {
 
+  private static final boolean CACHE_ENABLED =
+      IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled();
+
   public static TsBlockBuilder createTsBlockBuilder() {
     return new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
   }
@@ -84,6 +88,6 @@ public class LastQueryUtil {
 
   public static boolean needUpdateCache(Filter timeFilter) {
     // Update the cache only when, the filter is gt (greater than) or ge (greater than or equal to)
-    return (timeFilter instanceof GtEq) || (timeFilter instanceof Gt);
+    return CACHE_ENABLED && (timeFilter instanceof GtEq) || (timeFilter instanceof Gt);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
index ee0eb898a8..4ce473e62b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -45,6 +45,8 @@ public class UpdateLastCacheOperator implements ProcessOperator {
   private final Operator child;
 
   // fullPath for queried time series
+  // It should be exact PartialPath, neither MeasurementPath nor AlignedPath, because lastCache only
+  // accept PartialPath
   private final PartialPath fullPath;
 
   // dataType for queried time series;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 9e37b44da9..6587e05306 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -1046,9 +1046,10 @@ public class LocalExecutionPlanner {
 
     @Override
     public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanContext context) {
-      TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+      PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
+      TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
       if (timeValuePair == null) { // last value is not cached
-        return createUpdateLastCacheOperator(node, context);
+        return createUpdateLastCacheOperator(node, context, seriesPath);
       } else if (!satisfyFilter(
           context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
 
@@ -1057,7 +1058,7 @@ public class LocalExecutionPlanner {
                 || context.lastQueryTimeFilter instanceof GtEq);
         // time filter is not > or >=, we still need to read from disk
         if (!isFilterGtOrGe) {
-          return createUpdateLastCacheOperator(node, context);
+          return createUpdateLastCacheOperator(node, context, seriesPath);
         } else { // otherwise, we just ignore it and return null
           return null;
         }
@@ -1069,7 +1070,7 @@ public class LocalExecutionPlanner {
     }
 
     private UpdateLastCacheOperator createUpdateLastCacheOperator(
-        LastQueryScanNode node, LocalExecutionPlanContext context) {
+        LastQueryScanNode node, LocalExecutionPlanContext context, PartialPath fullPath) {
       SeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator(node, context);
 
       return new UpdateLastCacheOperator(
@@ -1078,7 +1079,7 @@ public class LocalExecutionPlanner {
               node.getPlanNodeId(),
               UpdateLastCacheOperator.class.getSimpleName()),
           lastQueryScan,
-          node.getSeriesPath(),
+          fullPath,
           node.getSeriesPath().getSeriesType(),
           DATA_NODE_SCHEMA_CACHE,
           context.needUpdateLastCache);
@@ -1114,9 +1115,10 @@ public class LocalExecutionPlanner {
     @Override
     public Operator visitAlignedLastQueryScan(
         AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
-      TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+      PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
+      TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
       if (timeValuePair == null) { // last value is not cached
-        return createUpdateLastCacheOperator(node, context);
+        return createUpdateLastCacheOperator(node, context, seriesPath);
       } else if (!satisfyFilter(
           context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
 
@@ -1125,7 +1127,7 @@ public class LocalExecutionPlanner {
                 || context.lastQueryTimeFilter instanceof GtEq);
         // time filter is not > or >=, we still need to read from disk
         if (!isFilterGtOrGe) {
-          return createUpdateLastCacheOperator(node, context);
+          return createUpdateLastCacheOperator(node, context, seriesPath);
         } else { // otherwise, we just ignore it and return null
           return null;
         }
@@ -1137,7 +1139,7 @@ public class LocalExecutionPlanner {
     }
 
     private UpdateLastCacheOperator createUpdateLastCacheOperator(
-        AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+        AlignedLastQueryScanNode node, LocalExecutionPlanContext context, PartialPath fullPath) {
       AlignedSeriesAggregationScanOperator lastQueryScan =
           createLastQueryScanOperator(node, context);
 
@@ -1147,7 +1149,7 @@ public class LocalExecutionPlanner {
               node.getPlanNodeId(),
               UpdateLastCacheOperator.class.getSimpleName()),
           lastQueryScan,
-          node.getSeriesPath(),
+          fullPath,
           node.getSeriesPath().getSchemaList().get(0).getType(),
           DATA_NODE_SCHEMA_CACHE,
           context.needUpdateLastCache);