You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/05/16 11:45:37 UTC

[iotdb] 02/03: cp

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch caLastOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bff77d369795ac440fcf8fb3d98d3c29776abb57
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue May 16 19:43:23 2023 +0800

    cp
---
 .../last/AbstractUpdateLastCacheOperator.java       | 13 +++++++++++++
 .../last/AlignedUpdateLastCacheOperator.java        | 21 ++++++++++-----------
 .../process/last/UpdateLastCacheOperator.java       | 13 ++++++-------
 .../db/mpp/plan/planner/OperatorTreeGenerator.java  |  4 ++++
 4 files changed, 33 insertions(+), 18 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
index f0b1ffc49a..5ac2e13c05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
@@ -19,18 +19,23 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.process.last;
 
+import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
 
+import javax.annotation.Nullable;
+
 public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator {
   protected static final TsBlock LAST_QUERY_EMPTY_TSBLOCK =
       new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT))
@@ -80,6 +85,14 @@ public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator
     return databaseName;
   }
 
+  protected void updateLastCache(
+      long time, @Nullable TsPrimitiveType value, MeasurementPath fullPath) {
+    if (needUpdateCache) {
+      TimeValuePair timeValuePair = new TimeValuePair(time, value);
+      lastCache.updateLastCache(getDatabaseName(), fullPath, timeValuePair, false, Long.MIN_VALUE);
+    }
+  }
+
   @Override
   public boolean hasNext() throws Exception {
     return child.hasNextWithTimer();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
index a75581eb03..85a5900616 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
@@ -63,25 +62,25 @@ public class AlignedUpdateLastCacheOperator extends AbstractUpdateLastCacheOpera
 
     tsBlockBuilder.reset();
     for (int i = 0; i + 1 < res.getValueColumnCount(); i += 2) {
+      MeasurementPath measurementPath =
+          new MeasurementPath(
+              devicePath.concatNode(seriesPath.getMeasurementList().get(i / 2)),
+              seriesPath.getSchemaList().get(i / 2),
+              true);
       if (!res.getColumn(i).isNull(0)) {
         long lastTime = res.getColumn(i).getLong(0);
         TsPrimitiveType lastValue = res.getColumn(i + 1).getTsPrimitiveType(0);
-        MeasurementPath measurementPath =
-            new MeasurementPath(
-                devicePath.concatNode(seriesPath.getMeasurementList().get(i / 2)),
-                seriesPath.getSchemaList().get(i / 2),
-                true);
-        if (needUpdateCache) {
-          TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
-          lastCache.updateLastCache(
-              getDatabaseName(), measurementPath, timeValuePair, false, Long.MIN_VALUE);
-        }
+        updateLastCache(lastTime, lastValue, measurementPath);
         LastQueryUtil.appendLastValue(
             tsBlockBuilder,
             lastTime,
             measurementPath.getFullPath(),
             lastValue.getStringValue(),
             seriesPath.getSchemaList().get(i / 2).getType().name());
+      } else {
+        // we still need to update last cache if there is no data for this time series to avoid
+        // scanning all files each time
+        updateLastCache(Long.MIN_VALUE, null, measurementPath);
       }
     }
     return !tsBlockBuilder.isEmpty() ? tsBlockBuilder.build() : LAST_QUERY_EMPTY_TSBLOCK;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 7bcc5a61e5..e45c18c019 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
@@ -34,10 +33,10 @@ public class UpdateLastCacheOperator extends AbstractUpdateLastCacheOperator {
   // fullPath for queried time series
   // It should be exact PartialPath, neither MeasurementPath nor AlignedPath, because lastCache only
   // accept PartialPath
-  private MeasurementPath fullPath;
+  private final MeasurementPath fullPath;
 
   // dataType for queried time series;
-  private String dataType;
+  private final String dataType;
 
   public UpdateLastCacheOperator(
       OperatorContext operatorContext,
@@ -65,16 +64,16 @@ public class UpdateLastCacheOperator extends AbstractUpdateLastCacheOperator {
 
     // last value is null
     if (res.getColumn(0).isNull(0)) {
+      // we still need to update last cache if there is no data for this time series to avoid
+      // scanning all files each time
+      updateLastCache(Long.MIN_VALUE, null, fullPath);
       return LAST_QUERY_EMPTY_TSBLOCK;
     }
 
     long lastTime = res.getColumn(0).getLong(0);
     TsPrimitiveType lastValue = res.getColumn(1).getTsPrimitiveType(0);
 
-    if (needUpdateCache) {
-      TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
-      lastCache.updateLastCache(getDatabaseName(), fullPath, timeValuePair, false, Long.MIN_VALUE);
-    }
+    updateLastCache(lastTime, lastValue, fullPath);
 
     tsBlockBuilder.reset();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 4e945c8be6..e73bdd4236 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2031,6 +2031,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
     if (timeValuePair == null) { // last value is not cached
       return createUpdateLastCacheOperator(node, context, node.getSeriesPath());
+    } else if (timeValuePair.getValue() == null) { // there is no data for this time series
+      return null;
     } else if (!LastQueryUtil.satisfyFilter(
         updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()),
         timeValuePair)) { // cached last value is not satisfied
@@ -2125,6 +2127,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
       TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(measurementPath);
       if (timeValuePair == null) { // last value is not cached
         unCachedMeasurementIndexes.add(i);
+      } else if (timeValuePair.getValue() == null) {
+        // there is no data for this time series, just ignore
       } else if (!LastQueryUtil.satisfyFilter(
           updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()),
           timeValuePair)) { // cached last value is not satisfied