You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/05/16 11:45:37 UTC
[iotdb] 02/03: cp
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch caLastOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bff77d369795ac440fcf8fb3d98d3c29776abb57
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue May 16 19:43:23 2023 +0800
cp
---
.../last/AbstractUpdateLastCacheOperator.java | 13 +++++++++++++
.../last/AlignedUpdateLastCacheOperator.java | 21 ++++++++++-----------
.../process/last/UpdateLastCacheOperator.java | 13 ++++++-------
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 4 ++++
4 files changed, 33 insertions(+), 18 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
index f0b1ffc49a..5ac2e13c05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
@@ -19,18 +19,23 @@
package org.apache.iotdb.db.mpp.execution.operator.process.last;
+import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nullable;
+
public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator {
protected static final TsBlock LAST_QUERY_EMPTY_TSBLOCK =
new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT))
@@ -80,6 +85,14 @@ public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator
return databaseName;
}
+ protected void updateLastCache(
+ long time, @Nullable TsPrimitiveType value, MeasurementPath fullPath) {
+ if (needUpdateCache) {
+ TimeValuePair timeValuePair = new TimeValuePair(time, value);
+ lastCache.updateLastCache(getDatabaseName(), fullPath, timeValuePair, false, Long.MIN_VALUE);
+ }
+ }
+
@Override
public boolean hasNext() throws Exception {
return child.hasNextWithTimer();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
index a75581eb03..85a5900616 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -63,25 +62,25 @@ public class AlignedUpdateLastCacheOperator extends AbstractUpdateLastCacheOpera
tsBlockBuilder.reset();
for (int i = 0; i + 1 < res.getValueColumnCount(); i += 2) {
+ MeasurementPath measurementPath =
+ new MeasurementPath(
+ devicePath.concatNode(seriesPath.getMeasurementList().get(i / 2)),
+ seriesPath.getSchemaList().get(i / 2),
+ true);
if (!res.getColumn(i).isNull(0)) {
long lastTime = res.getColumn(i).getLong(0);
TsPrimitiveType lastValue = res.getColumn(i + 1).getTsPrimitiveType(0);
- MeasurementPath measurementPath =
- new MeasurementPath(
- devicePath.concatNode(seriesPath.getMeasurementList().get(i / 2)),
- seriesPath.getSchemaList().get(i / 2),
- true);
- if (needUpdateCache) {
- TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
- lastCache.updateLastCache(
- getDatabaseName(), measurementPath, timeValuePair, false, Long.MIN_VALUE);
- }
+ updateLastCache(lastTime, lastValue, measurementPath);
LastQueryUtil.appendLastValue(
tsBlockBuilder,
lastTime,
measurementPath.getFullPath(),
lastValue.getStringValue(),
seriesPath.getSchemaList().get(i / 2).getType().name());
+ } else {
+ // we still need to update last cache if there is no data for this time series to avoid
+ // scanning all files each time
+ updateLastCache(Long.MIN_VALUE, null, measurementPath);
}
}
return !tsBlockBuilder.isEmpty() ? tsBlockBuilder.build() : LAST_QUERY_EMPTY_TSBLOCK;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 7bcc5a61e5..e45c18c019 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -34,10 +33,10 @@ public class UpdateLastCacheOperator extends AbstractUpdateLastCacheOperator {
// fullPath for queried time series
// It should be exact PartialPath, neither MeasurementPath nor AlignedPath, because lastCache only
// accept PartialPath
- private MeasurementPath fullPath;
+ private final MeasurementPath fullPath;
// dataType for queried time series;
- private String dataType;
+ private final String dataType;
public UpdateLastCacheOperator(
OperatorContext operatorContext,
@@ -65,16 +64,16 @@ public class UpdateLastCacheOperator extends AbstractUpdateLastCacheOperator {
// last value is null
if (res.getColumn(0).isNull(0)) {
+ // we still need to update last cache if there is no data for this time series to avoid
+ // scanning all files each time
+ updateLastCache(Long.MIN_VALUE, null, fullPath);
return LAST_QUERY_EMPTY_TSBLOCK;
}
long lastTime = res.getColumn(0).getLong(0);
TsPrimitiveType lastValue = res.getColumn(1).getTsPrimitiveType(0);
- if (needUpdateCache) {
- TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
- lastCache.updateLastCache(getDatabaseName(), fullPath, timeValuePair, false, Long.MIN_VALUE);
- }
+ updateLastCache(lastTime, lastValue, fullPath);
tsBlockBuilder.reset();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 4e945c8be6..e73bdd4236 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2031,6 +2031,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
if (timeValuePair == null) { // last value is not cached
return createUpdateLastCacheOperator(node, context, node.getSeriesPath());
+ } else if (timeValuePair.getValue() == null) { // there is no data for this time series
+ return null;
} else if (!LastQueryUtil.satisfyFilter(
updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()),
timeValuePair)) { // cached last value is not satisfied
@@ -2125,6 +2127,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(measurementPath);
if (timeValuePair == null) { // last value is not cached
unCachedMeasurementIndexes.add(i);
+ } else if (timeValuePair.getValue() == null) {
+ // there is no data for this time series, just ignore
} else if (!LastQueryUtil.satisfyFilter(
updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()),
timeValuePair)) { // cached last value is not satisfied