You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/18 09:11:37 UTC
[iotdb] 01/01: [IOTDB-3871] Make last query be able to update last cache even if there is no other write happens after restarting
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch UpdateLastCache
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 21e30ddf43ab4e50f6a78021ae103fc0c254ea24
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Jul 18 17:11:23 2022 +0800
[IOTDB-3871] Make last query be able to update last cache even if there is no other write happens after restarting
---
.../org/apache/iotdb/db/metadata/path/AlignedPath.java | 7 +++++++
.../apache/iotdb/db/metadata/path/MeasurementPath.java | 8 ++++++++
.../operator/process/UpdateLastCacheOperator.java | 6 +++---
.../iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java | 16 ++++++++++------
4 files changed, 28 insertions(+), 9 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index 1c6a9326e5..3d81316027 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -337,6 +337,13 @@ public class AlignedPath extends PartialPath {
return getDevicePath().concatNode(measurementList.get(0));
}
+ public MeasurementPath getMeasurementPath() {
+ if (schemaList.size() != 1) {
+ throw new UnsupportedOperationException();
+ }
+ return new MeasurementPath(transformToPartialPath(), schemaList.get(0), true);
+ }
+
public String getFormattedString() {
return getDevicePath().toString() + "[" + String.join(",", measurementList) + "]";
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index 469d223d58..2525844e31 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -58,8 +58,16 @@ public class MeasurementPath extends PartialPath {
}
public MeasurementPath(PartialPath measurementPath, IMeasurementSchema measurementSchema) {
+ this(measurementPath, measurementSchema, false);
+ }
+
+ public MeasurementPath(
+ PartialPath measurementPath,
+ IMeasurementSchema measurementSchema,
+ boolean isUnderAlignedEntity) {
super(measurementPath.getNodes());
this.measurementSchema = measurementSchema;
+ this.isUnderAlignedEntity = isUnderAlignedEntity;
}
public MeasurementPath(String device, String measurement, IMeasurementSchema measurementSchema)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
index 1bfebb251c..4088a13396 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process;
-import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
@@ -47,7 +47,7 @@ public class UpdateLastCacheOperator implements ProcessOperator {
// fullPath for queried time series
// It should be exact PartialPath, neither MeasurementPath nor AlignedPath, because lastCache only
// accept PartialPath
- private final PartialPath fullPath;
+ private final MeasurementPath fullPath;
// dataType for queried time series;
private final String dataType;
@@ -61,7 +61,7 @@ public class UpdateLastCacheOperator implements ProcessOperator {
public UpdateLastCacheOperator(
OperatorContext operatorContext,
Operator child,
- PartialPath fullPath,
+ MeasurementPath fullPath,
TSDataType dataType,
DataNodeSchemaCache dataNodeSchemaCache,
boolean needUpdateCache) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 2cddb29cc5..a6a7d5cde9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -1184,7 +1184,7 @@ public class LocalExecutionPlanner {
PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
if (timeValuePair == null) { // last value is not cached
- return createUpdateLastCacheOperator(node, context, seriesPath);
+ return createUpdateLastCacheOperator(node, context, node.getSeriesPath());
} else if (!satisfyFilter(
context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
@@ -1193,7 +1193,7 @@ public class LocalExecutionPlanner {
|| context.lastQueryTimeFilter instanceof GtEq);
// time filter is not > or >=, we still need to read from disk
if (!isFilterGtOrGe) {
- return createUpdateLastCacheOperator(node, context, seriesPath);
+ return createUpdateLastCacheOperator(node, context, node.getSeriesPath());
} else { // otherwise, we just ignore it and return null
return null;
}
@@ -1204,7 +1204,7 @@ public class LocalExecutionPlanner {
}
private UpdateLastCacheOperator createUpdateLastCacheOperator(
- LastQueryScanNode node, LocalExecutionPlanContext context, PartialPath fullPath) {
+ LastQueryScanNode node, LocalExecutionPlanContext context, MeasurementPath fullPath) {
SeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator(node, context);
OperatorContext operatorContext =
@@ -1256,7 +1256,8 @@ public class LocalExecutionPlanner {
PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
if (timeValuePair == null) { // last value is not cached
- return createUpdateLastCacheOperator(node, context, seriesPath);
+ return createUpdateLastCacheOperator(
+ node, context, node.getSeriesPath().getMeasurementPath());
} else if (!satisfyFilter(
context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
@@ -1265,7 +1266,8 @@ public class LocalExecutionPlanner {
|| context.lastQueryTimeFilter instanceof GtEq);
// time filter is not > or >=, we still need to read from disk
if (!isFilterGtOrGe) {
- return createUpdateLastCacheOperator(node, context, seriesPath);
+ return createUpdateLastCacheOperator(
+ node, context, node.getSeriesPath().getMeasurementPath());
} else { // otherwise, we just ignore it and return null
return null;
}
@@ -1276,7 +1278,9 @@ public class LocalExecutionPlanner {
}
private UpdateLastCacheOperator createUpdateLastCacheOperator(
- AlignedLastQueryScanNode node, LocalExecutionPlanContext context, PartialPath fullPath) {
+ AlignedLastQueryScanNode node,
+ LocalExecutionPlanContext context,
+ MeasurementPath fullPath) {
AlignedSeriesAggregationScanOperator lastQueryScan =
createLastQueryScanOperator(node, context);