You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/15 11:37:28 UTC
[iotdb] branch clusterQueryOpt updated: use cache in aggregation
with value filter
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch clusterQueryOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/clusterQueryOpt by this push:
new 7fc9067 use cache in aggregation with value filter
7fc9067 is described below
commit 7fc9067ec67df2cc464af154eabdec45e66ca9bf
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon Mar 15 19:36:35 2021 +0800
use cache in aggregation with value filter
---
.../db/query/executor/AggregationExecutor.java | 63 ++++++++++++++--------
1 file changed, 42 insertions(+), 21 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 0b0d010..2416f4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -61,6 +62,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
+
@SuppressWarnings("java:S1135") // ignore todos
public class AggregationExecutor {
@@ -337,18 +340,8 @@ public class AggregationExecutor {
*/
public QueryDataSet executeWithValueFilter(QueryContext context, AggregationPlan queryPlan)
throws StorageEngineException, IOException, QueryProcessException {
- int index = 0;
- for (; index < aggregations.size(); index++) {
- String aggregationFunc = aggregations.get(index);
- if (!aggregationFunc.equals(IoTDBConstant.MAX_TIME)
- && !aggregationFunc.equals(IoTDBConstant.LAST_VALUE)) {
- break;
- }
- }
- if (index >= aggregations.size()) {
- queryPlan.setAscending(false);
- this.ascending = false;
- }
+ optimizeLastElementFunc(queryPlan);
+
TimeGenerator timestampGenerator = getTimeGenerator(context, queryPlan);
// group by path name
Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
@@ -371,15 +364,30 @@ public class AggregationExecutor {
List<AggregateResult> aggregateResults = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
- TSDataType type = dataTypes.get(i);
AggregateResult result =
- AggregateResultFactory.getAggrResultByName(aggregations.get(i), type, ascending);
+ AggregateResultFactory.getAggrResultByName(
+ aggregations.get(i), dataTypes.get(i), ascending);
aggregateResults.add(result);
}
aggregateWithValueFilter(aggregateResults, timestampGenerator, readerToAggrIndexesMap);
return constructDataSet(aggregateResults, queryPlan);
}
+ private void optimizeLastElementFunc(QueryPlan queryPlan) {
+ int index = 0;
+ for (; index < aggregations.size(); index++) {
+ String aggregationFunc = aggregations.get(index);
+ if (!aggregationFunc.equals(IoTDBConstant.MAX_TIME)
+ && !aggregationFunc.equals(IoTDBConstant.LAST_VALUE)) {
+ break;
+ }
+ }
+ if (index >= aggregations.size()) {
+ queryPlan.setAscending(false);
+ this.ascending = false;
+ }
+ }
+
protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan)
throws StorageEngineException {
return new ServerTimeGenerator(expression, context, queryPlan);
@@ -404,6 +412,9 @@ public class AggregationExecutor {
TimeGenerator timestampGenerator,
Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
throws IOException {
+ List<Boolean> cached =
+ markFilterdPaths(
+ expression, new ArrayList<>(selectedSeries), timestampGenerator.hasOrNode());
while (timestampGenerator.hasNext()) {
@@ -419,14 +430,24 @@ public class AggregationExecutor {
// cal part of aggregate result
for (Entry<IReaderByTimestamp, List<Integer>> entry : readerToAggrIndexesMap.entrySet()) {
- if (entry.getValue().size() == 1) {
- aggregateResults
- .get(entry.getValue().get(0))
- .updateResultUsingTimestamps(timeArray, timeArrayLength, entry.getKey());
- } else {
- Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
- for (Integer i : entry.getValue())
+ int pathId = entry.getValue().get(0);
+ // cache in timeGenerator
+ if (cached.get(pathId)) {
+ Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
+ for (Integer i : entry.getValue()) {
aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+ }
+ } else {
+ if (entry.getValue().size() == 1) {
+ aggregateResults
+ .get(entry.getValue().get(0))
+ .updateResultUsingTimestamps(timeArray, timeArrayLength, entry.getKey());
+ } else {
+ Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
+ for (Integer i : entry.getValue()) {
+ aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+ }
+ }
}
}
}