You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/27 03:40:44 UTC
[iotdb] 01/01: use cache for last query
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 76cd146380429e55c1e0c23b43092b090f4806e3
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri May 27 11:40:27 2022 +0800
use cache for last query
---
.../org/apache/iotdb/commons/path/PartialPath.java | 4 ++++
.../apache/iotdb/db/metadata/path/AlignedPath.java | 8 ++++++++
.../iotdb/db/metadata/path/MeasurementPath.java | 5 +++++
.../db/mpp/execution/operator/LastQueryUtil.java | 6 +++++-
.../operator/process/UpdateLastCacheOperator.java | 2 ++
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 22 ++++++++++++----------
6 files changed, 36 insertions(+), 11 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index 31861e3e10..510a02dde3 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -547,4 +547,8 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
partialPath.fullPath = path.getFullPath();
return partialPath;
}
+
+ public PartialPath transformToPartialPath() {
+ return this;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index e420cf4dde..4ef32ba2a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -303,4 +303,12 @@ public class AlignedPath extends PartialPath {
alignedPath.fullPath = partialPath.getFullPath();
return alignedPath;
}
+
+ @Override
+ public PartialPath transformToPartialPath() {
+ if (measurementList.size() != 1) {
+ throw new UnsupportedOperationException();
+ }
+ return getDevicePath().concatNode(measurementList.get(0));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index e0f4fd1dba..0a4a83dc3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -219,4 +219,9 @@ public class MeasurementPath extends PartialPath {
measurementPath.fullPath = partialPath.getFullPath();
return measurementPath;
}
+
+ @Override
+ public PartialPath transformToPartialPath() {
+ return getDevicePath().concatNode(getTailNode());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
index 0ec8d36dd6..f8cb5ae291 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.operator;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.aggregation.LastValueDescAccumulator;
import org.apache.iotdb.db.mpp.aggregation.MaxTimeDescAccumulator;
@@ -39,6 +40,9 @@ import java.util.List;
public class LastQueryUtil {
+ private static final boolean CACHE_ENABLED =
+ IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled();
+
public static TsBlockBuilder createTsBlockBuilder() {
return new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
}
@@ -84,6 +88,6 @@ public class LastQueryUtil {
public static boolean needUpdateCache(Filter timeFilter) {
// Update the cache only when, the filter is gt (greater than) or ge (greater than or equal to)
- return (timeFilter instanceof GtEq) || (timeFilter instanceof Gt);
+ return CACHE_ENABLED && (timeFilter instanceof GtEq) || (timeFilter instanceof Gt);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
index ee0eb898a8..4ce473e62b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -45,6 +45,8 @@ public class UpdateLastCacheOperator implements ProcessOperator {
private final Operator child;
// fullPath for queried time series
+ // It should be exact PartialPath, neither MeasurementPath nor AlignedPath, because lastCache only
+ // accept PartialPath
private final PartialPath fullPath;
// dataType for queried time series;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 9e37b44da9..6587e05306 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -1046,9 +1046,10 @@ public class LocalExecutionPlanner {
@Override
public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanContext context) {
- TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+ PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
+ TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
if (timeValuePair == null) { // last value is not cached
- return createUpdateLastCacheOperator(node, context);
+ return createUpdateLastCacheOperator(node, context, seriesPath);
} else if (!satisfyFilter(
context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
@@ -1057,7 +1058,7 @@ public class LocalExecutionPlanner {
|| context.lastQueryTimeFilter instanceof GtEq);
// time filter is not > or >=, we still need to read from disk
if (!isFilterGtOrGe) {
- return createUpdateLastCacheOperator(node, context);
+ return createUpdateLastCacheOperator(node, context, seriesPath);
} else { // otherwise, we just ignore it and return null
return null;
}
@@ -1069,7 +1070,7 @@ public class LocalExecutionPlanner {
}
private UpdateLastCacheOperator createUpdateLastCacheOperator(
- LastQueryScanNode node, LocalExecutionPlanContext context) {
+ LastQueryScanNode node, LocalExecutionPlanContext context, PartialPath fullPath) {
SeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator(node, context);
return new UpdateLastCacheOperator(
@@ -1078,7 +1079,7 @@ public class LocalExecutionPlanner {
node.getPlanNodeId(),
UpdateLastCacheOperator.class.getSimpleName()),
lastQueryScan,
- node.getSeriesPath(),
+ fullPath,
node.getSeriesPath().getSeriesType(),
DATA_NODE_SCHEMA_CACHE,
context.needUpdateLastCache);
@@ -1114,9 +1115,10 @@ public class LocalExecutionPlanner {
@Override
public Operator visitAlignedLastQueryScan(
AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
- TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+ PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
+ TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
if (timeValuePair == null) { // last value is not cached
- return createUpdateLastCacheOperator(node, context);
+ return createUpdateLastCacheOperator(node, context, seriesPath);
} else if (!satisfyFilter(
context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
@@ -1125,7 +1127,7 @@ public class LocalExecutionPlanner {
|| context.lastQueryTimeFilter instanceof GtEq);
// time filter is not > or >=, we still need to read from disk
if (!isFilterGtOrGe) {
- return createUpdateLastCacheOperator(node, context);
+ return createUpdateLastCacheOperator(node, context, seriesPath);
} else { // otherwise, we just ignore it and return null
return null;
}
@@ -1137,7 +1139,7 @@ public class LocalExecutionPlanner {
}
private UpdateLastCacheOperator createUpdateLastCacheOperator(
- AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+ AlignedLastQueryScanNode node, LocalExecutionPlanContext context, PartialPath fullPath) {
AlignedSeriesAggregationScanOperator lastQueryScan =
createLastQueryScanOperator(node, context);
@@ -1147,7 +1149,7 @@ public class LocalExecutionPlanner {
node.getPlanNodeId(),
UpdateLastCacheOperator.class.getSimpleName()),
lastQueryScan,
- node.getSeriesPath(),
+ fullPath,
node.getSeriesPath().getSchemaList().get(0).getType(),
DATA_NODE_SCHEMA_CACHE,
context.needUpdateLastCache);