You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/15 11:37:28 UTC

[iotdb] branch clusterQueryOpt updated: use cache in aggregation with value filter

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch clusterQueryOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/clusterQueryOpt by this push:
     new 7fc9067  use cache in aggregation with value filter
7fc9067 is described below

commit 7fc9067ec67df2cc464af154eabdec45e66ca9bf
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon Mar 15 19:36:35 2021 +0800

    use cache in aggregation with value filter
---
 .../db/query/executor/AggregationExecutor.java     | 63 ++++++++++++++--------
 1 file changed, 42 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 0b0d010..2416f4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -61,6 +62,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
+
 @SuppressWarnings("java:S1135") // ignore todos
 public class AggregationExecutor {
 
@@ -337,18 +340,8 @@ public class AggregationExecutor {
    */
   public QueryDataSet executeWithValueFilter(QueryContext context, AggregationPlan queryPlan)
       throws StorageEngineException, IOException, QueryProcessException {
-    int index = 0;
-    for (; index < aggregations.size(); index++) {
-      String aggregationFunc = aggregations.get(index);
-      if (!aggregationFunc.equals(IoTDBConstant.MAX_TIME)
-          && !aggregationFunc.equals(IoTDBConstant.LAST_VALUE)) {
-        break;
-      }
-    }
-    if (index >= aggregations.size()) {
-      queryPlan.setAscending(false);
-      this.ascending = false;
-    }
+    optimizeLastElementFunc(queryPlan);
+
     TimeGenerator timestampGenerator = getTimeGenerator(context, queryPlan);
     // group by path name
     Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
@@ -371,15 +364,30 @@ public class AggregationExecutor {
 
     List<AggregateResult> aggregateResults = new ArrayList<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
-      TSDataType type = dataTypes.get(i);
       AggregateResult result =
-          AggregateResultFactory.getAggrResultByName(aggregations.get(i), type, ascending);
+          AggregateResultFactory.getAggrResultByName(
+              aggregations.get(i), dataTypes.get(i), ascending);
       aggregateResults.add(result);
     }
     aggregateWithValueFilter(aggregateResults, timestampGenerator, readerToAggrIndexesMap);
     return constructDataSet(aggregateResults, queryPlan);
   }
 
+  private void optimizeLastElementFunc(QueryPlan queryPlan) {
+    int index = 0;
+    for (; index < aggregations.size(); index++) {
+      String aggregationFunc = aggregations.get(index);
+      if (!aggregationFunc.equals(IoTDBConstant.MAX_TIME)
+          && !aggregationFunc.equals(IoTDBConstant.LAST_VALUE)) {
+        break;
+      }
+    }
+    if (index >= aggregations.size()) {
+      queryPlan.setAscending(false);
+      this.ascending = false;
+    }
+  }
+
   protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan)
       throws StorageEngineException {
     return new ServerTimeGenerator(expression, context, queryPlan);
@@ -404,6 +412,9 @@ public class AggregationExecutor {
       TimeGenerator timestampGenerator,
       Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
       throws IOException {
+    List<Boolean> cached =
+        markFilterdPaths(
+            expression, new ArrayList<>(selectedSeries), timestampGenerator.hasOrNode());
 
     while (timestampGenerator.hasNext()) {
 
@@ -419,14 +430,24 @@ public class AggregationExecutor {
 
       // cal part of aggregate result
       for (Entry<IReaderByTimestamp, List<Integer>> entry : readerToAggrIndexesMap.entrySet()) {
-        if (entry.getValue().size() == 1) {
-          aggregateResults
-              .get(entry.getValue().get(0))
-              .updateResultUsingTimestamps(timeArray, timeArrayLength, entry.getKey());
-        } else {
-          Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
-          for (Integer i : entry.getValue())
+        int pathId = entry.getValue().get(0);
+        // cache in timeGenerator
+        if (cached.get(pathId)) {
+          Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
+          for (Integer i : entry.getValue()) {
             aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+          }
+        } else {
+          if (entry.getValue().size() == 1) {
+            aggregateResults
+                .get(entry.getValue().get(0))
+                .updateResultUsingTimestamps(timeArray, timeArrayLength, entry.getKey());
+          } else {
+            Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
+            for (Integer i : entry.getValue()) {
+              aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+            }
+          }
         }
       }
     }