You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/11/19 02:39:06 UTC
[iotdb] branch rel/0.11 updated: Refactor LastQueryExecutor to
separate the execution into multiple stages
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 574aa05 Refactor LastQueryExecutor to separate the execution into multiple stages
574aa05 is described below
commit 574aa05ccb4e9f12cd4d118b9850f1f44982c25f
Author: wshao08 <ko...@163.com>
AuthorDate: Tue Nov 17 19:24:37 2020 +0800
Refactor LastQueryExecutor to separate the execution into multiple stages
(cherry picked from commit 39691323a5b3abb6e5e144b176acdd132f7c6b6b)
---
.../java/org/apache/iotdb/db/metadata/MTree.java | 12 +-
.../query/dataset/groupby/GroupByFillDataSet.java | 15 +-
.../iotdb/db/query/executor/LastQueryExecutor.java | 183 ++++++++++++---------
.../iotdb/db/query/executor/QueryRouter.java | 6 +-
4 files changed, 126 insertions(+), 90 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 4355319..ba844bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.metadata;
import static java.util.stream.Collectors.toList;
import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_WILDCARD;
-import static org.apache.iotdb.db.query.executor.LastQueryExecutor.calculateLastPairForOneSeriesLocally;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -55,6 +54,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -68,6 +68,8 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.executor.fill.LastPointReader;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -107,8 +109,12 @@ public class MTree implements Serializable {
return node.getCachedLast().getTimestamp();
} else {
try {
- last = calculateLastPairForOneSeriesLocally(node.getPartialPath(),
- node.getSchema().getType(), queryContext, null, Collections.emptySet());
+ QueryDataSource dataSource = QueryResourceManager.getInstance().
+ getQueryDataSource(node.getPartialPath(), queryContext, null);
+ LastPointReader lastReader = new LastPointReader(node.getPartialPath(),
+ node.getSchema().getType(), Collections.emptySet(), queryContext,
+ dataSource, Long.MAX_VALUE, null);
+ last = lastReader.readLastPoint();
return (last != null ? last.getTimestamp() : Long.MIN_VALUE);
} catch (Exception e) {
logger.error("Something wrong happened while trying to get last time value pair of {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
index 97b285f..c79bc41 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
@@ -105,13 +105,16 @@ public class GroupByFillDataSet extends QueryDataSet {
throws IOException, StorageEngineException, QueryProcessException {
lastTimeArray = new long[paths.size()];
Arrays.fill(lastTimeArray, Long.MAX_VALUE);
+ List<PartialPath> seriesPaths = new ArrayList<>();
for (int i = 0; i < paths.size(); i++) {
- TimeValuePair lastTimeValuePair;
- lastTimeValuePair = LastQueryExecutor.calculateLastPairForOneSeriesLocally(
- (PartialPath) paths.get(i), dataTypes.get(i), context, null,
- groupByFillPlan.getAllMeasurementsInDevice(paths.get(i).getDevice()));
- if (lastTimeValuePair != null && lastTimeValuePair.getValue() != null) {
- lastTimeArray[i] = lastTimeValuePair.getTimestamp();
+ seriesPaths.add((PartialPath) paths.get(i));
+ }
+ List<Pair<Boolean, TimeValuePair>> lastPairList =
+ LastQueryExecutor.calculateLastPairForSeriesLocally(
+ seriesPaths, dataTypes, context, null, groupByFillPlan);
+ for (int i = 0; i < lastPairList.size(); i++) {
+ if (lastPairList.get(i).left) {
+ lastTimeArray[i] = lastPairList.get(i).right.getTimestamp();
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index cc598d5..3d30bb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -24,9 +24,9 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Set;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.ListDataSet;
@@ -51,13 +52,14 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
public class LastQueryExecutor {
private List<PartialPath> selectedSeries;
private List<TSDataType> dataTypes;
private IExpression expression;
- private static boolean lastCacheEnabled =
+ private static final boolean lastCacheEnabled =
IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled();
public LastQueryExecutor(LastQueryPlan lastQueryPlan) {
@@ -83,36 +85,31 @@ public class LastQueryExecutor {
Arrays.asList(new PartialPath(COLUMN_TIMESERIES, false), new PartialPath(COLUMN_VALUE, false)),
Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));
- List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries);
- try {
- for (int i = 0; i < selectedSeries.size(); i++) {
- TimeValuePair lastTimeValuePair;
- lastTimeValuePair = calculateLastPairForOneSeries(
- selectedSeries.get(i), dataTypes.get(i), context,
- lastQueryPlan.getAllMeasurementsInDevice(selectedSeries.get(i).getDevice()));
- if (lastTimeValuePair != null && lastTimeValuePair.getValue() != null) {
- RowRecord resultRecord = new RowRecord(lastTimeValuePair.getTimestamp());
- Field pathField = new Field(TSDataType.TEXT);
- if (selectedSeries.get(i).getTsAlias() != null) {
- pathField.setBinaryV(new Binary(selectedSeries.get(i).getTsAlias()));
+ List<Pair<Boolean, TimeValuePair>> lastPairList = calculateLastPairForSeriesLocally(
+ selectedSeries, dataTypes, context, expression, lastQueryPlan);
+
+ for (int i = 0; i < lastPairList.size(); i++) {
+ if (lastPairList.get(i).left) {
+ TimeValuePair lastTimeValuePair = lastPairList.get(i).right;
+ RowRecord resultRecord = new RowRecord(lastTimeValuePair.getTimestamp());
+ Field pathField = new Field(TSDataType.TEXT);
+ if (selectedSeries.get(i).getTsAlias() != null) {
+ pathField.setBinaryV(new Binary(selectedSeries.get(i).getTsAlias()));
+ } else {
+ if (selectedSeries.get(i).getMeasurementAlias() != null) {
+ pathField.setBinaryV(new Binary(selectedSeries.get(i).getFullPathWithAlias()));
} else {
- if (selectedSeries.get(i).getMeasurementAlias() != null) {
- pathField.setBinaryV(new Binary(selectedSeries.get(i).getFullPathWithAlias()));
- } else {
- pathField.setBinaryV(new Binary(selectedSeries.get(i).getFullPath()));
- }
+ pathField.setBinaryV(new Binary(selectedSeries.get(i).getFullPath()));
}
- resultRecord.addField(pathField);
+ }
+ resultRecord.addField(pathField);
- Field valueField = new Field(TSDataType.TEXT);
- valueField.setBinaryV(new Binary(lastTimeValuePair.getValue().getStringValue()));
- resultRecord.addField(valueField);
+ Field valueField = new Field(TSDataType.TEXT);
+ valueField.setBinaryV(new Binary(lastTimeValuePair.getValue().getStringValue()));
+ resultRecord.addField(valueField);
- dataSet.putRecord(resultRecord);
- }
+ dataSet.putRecord(resultRecord);
}
- } finally {
- StorageEngine.getInstance().mergeUnLock(list);
}
if (!lastQueryPlan.isAscending()) {
@@ -121,35 +118,90 @@ public class LastQueryExecutor {
return dataSet;
}
- protected TimeValuePair calculateLastPairForOneSeries(
- PartialPath seriesPath, TSDataType tsDataType, QueryContext context, Set<String> deviceMeasurements)
- throws IOException, QueryProcessException, StorageEngineException {
- return calculateLastPairForOneSeriesLocally(seriesPath, tsDataType, context,
- expression, deviceMeasurements);
+ public static List<Pair<Boolean, TimeValuePair>> calculateLastPairForSeriesLocally(
+ List<PartialPath> seriesPaths, List<TSDataType> dataTypes, QueryContext context,
+ IExpression expression, RawDataQueryPlan lastQueryPlan)
+ throws QueryProcessException, StorageEngineException, IOException {
+ List<LastCacheAccessor> cacheAccessors = new ArrayList<>();
+ Filter filter = (expression == null) ? null : ((GlobalTimeExpression) expression).getFilter();
+
+ List<PartialPath> restPaths = new ArrayList<>();
+ List<Pair<Boolean, TimeValuePair>> resultContainer =
+ readLastPairsFromCache(seriesPaths, filter, cacheAccessors, restPaths);
+ // If any '>' or '>=' filters are specified, only access cache to get Last result.
+ if (filter != null || restPaths.isEmpty()) {
+ return resultContainer;
+ }
+
+ List<LastPointReader> readerList = new ArrayList<>();
+ List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(restPaths);
+ try {
+ for (int i = 0; i < restPaths.size(); i++) {
+ QueryDataSource dataSource =
+ QueryResourceManager.getInstance().getQueryDataSource(seriesPaths.get(i), context, null);
+ LastPointReader lastReader = new LastPointReader(seriesPaths.get(i), dataTypes.get(i),
+ lastQueryPlan.getAllMeasurementsInDevice(seriesPaths.get(i).getDevice()),
+ context, dataSource, Long.MAX_VALUE, null);
+ readerList.add(lastReader);
+ }
+ } finally {
+ StorageEngine.getInstance().mergeUnLock(list);
+ }
+
+ int index = 0;
+ for (int i = 0; i < resultContainer.size(); i++) {
+ if (!resultContainer.get(i).left) {
+ resultContainer.get(i).left = true;
+ resultContainer.get(i).right = readerList.get(index++).readLastPoint();
+ if (lastCacheEnabled) {
+ cacheAccessors.get(i).write(resultContainer.get(i).right);
+ }
+ }
+ }
+ return resultContainer;
}
- /**
- * get last result for one series
- *
- * @param context query context
- * @return TimeValuePair, result can be null
- */
- public static TimeValuePair calculateLastPairForOneSeriesLocally(
- PartialPath seriesPath, TSDataType tsDataType, QueryContext context,
- IExpression expression, Set<String> deviceMeasurements)
- throws IOException, QueryProcessException, StorageEngineException {
-
- // Retrieve last value from MNode
- MeasurementMNode node = null;
- Filter filter = null;
+ private static List<Pair<Boolean, TimeValuePair>> readLastPairsFromCache(List<PartialPath> seriesPaths,
+ Filter filter, List<LastCacheAccessor> cacheAccessors, List<PartialPath> restPaths) {
+ List<Pair<Boolean, TimeValuePair>> resultContainer = new ArrayList<>();
if (lastCacheEnabled) {
- if (expression != null) {
- filter = ((GlobalTimeExpression) expression).getFilter();
+ for (PartialPath path : seriesPaths) {
+ cacheAccessors.add(new LastCacheAccessor(path, filter));
+ }
+ } else {
+ restPaths.addAll(seriesPaths);
+ }
+ for (int i = 0; i < cacheAccessors.size(); i++) {
+ TimeValuePair tvPair = cacheAccessors.get(i).read();
+ if (tvPair != null) {
+ resultContainer.add(new Pair<>(true, tvPair));
+ } else {
+ resultContainer.add(new Pair<>(false, null));
+ restPaths.add(seriesPaths.get(i));
}
+ }
+ return resultContainer;
+ }
+
+ private static class LastCacheAccessor {
+ private PartialPath path;
+ private Filter filter;
+ private MeasurementMNode node;
+
+ LastCacheAccessor(PartialPath seriesPath) {
+ this.path = seriesPath;
+ }
+
+ LastCacheAccessor(PartialPath seriesPath, Filter filter) {
+ this.path = seriesPath;
+ this.filter = filter;
+ }
+
+ public TimeValuePair read() {
try {
- node = (MeasurementMNode) IoTDB.metaManager.getNodeByPath(seriesPath);
+ node = (MeasurementMNode) IoTDB.metaManager.getNodeByPath(path);
} catch (MetadataException e) {
- TimeValuePair timeValuePair = IoTDB.metaManager.getLastCache(seriesPath);
+ TimeValuePair timeValuePair = IoTDB.metaManager.getLastCache(path);
if (timeValuePair != null && satisfyFilter(filter, timeValuePair)) {
return timeValuePair;
} else if (timeValuePair != null) {
@@ -165,37 +217,16 @@ public class LastQueryExecutor {
return null;
}
}
+ return null;
}
- return calculateLastPairByScanningTsFiles(
- seriesPath, tsDataType, context, filter, deviceMeasurements, node);
- }
-
- private static TimeValuePair calculateLastPairByScanningTsFiles(
- PartialPath seriesPath, TSDataType tsDataType, QueryContext context,
- Filter filter, Set<String> deviceMeasurements, MeasurementMNode node)
- throws QueryProcessException, StorageEngineException, IOException {
-
- QueryDataSource dataSource =
- QueryResourceManager.getInstance().getQueryDataSource(seriesPath, context, filter);
-
- LastPointReader lastReader = new LastPointReader(
- seriesPath, tsDataType, deviceMeasurements, context, dataSource, Long.MAX_VALUE, filter);
- TimeValuePair resultPair = lastReader.readLastPoint();
-
- // Update cached last value with low priority unless "FROM" expression exists
- if (lastCacheEnabled) {
- IoTDB.metaManager.updateLastCache(
- seriesPath, resultPair, false, Long.MIN_VALUE, node);
+ public void write(TimeValuePair pair) {
+ IoTDB.metaManager.updateLastCache(path, pair, false, Long.MIN_VALUE, node);
}
- return resultPair;
}
private static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
- if (filter == null ||
- filter.satisfy(tvPair.getTimestamp(), tvPair.getValue().getValue())) {
- return true;
- }
- return false;
+ return filter == null ||
+ filter.satisfy(tvPair.getTimestamp(), tvPair.getValue().getValue());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 8c7750e..90289e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -225,12 +225,8 @@ public class QueryRouter implements IQueryRouter {
@Override
public QueryDataSet lastQuery(LastQueryPlan lastQueryPlan, QueryContext context)
throws StorageEngineException, QueryProcessException, IOException {
- LastQueryExecutor lastQueryExecutor = getLastQueryExecutor(lastQueryPlan);
+ LastQueryExecutor lastQueryExecutor = new LastQueryExecutor(lastQueryPlan);
return lastQueryExecutor.execute(context, lastQueryPlan);
}
- protected LastQueryExecutor getLastQueryExecutor(LastQueryPlan lastQueryPlan) {
- return new LastQueryExecutor(lastQueryPlan);
- }
-
}