You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/09/16 09:07:42 UTC

[iotdb] 02/05: move querycontext to class parameter

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggrVector2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0cc1700716e4693090ddf652c7aef3e8a5c6c71d
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Sep 16 13:15:09 2021 +0800

    move querycontext to class parameter
---
 .../iotdb/cluster/query/ClusterQueryRouter.java    |  5 +-
 .../query/aggregate/ClusterAggregateExecutor.java  | 30 ++++---
 .../query/ClusterAggregateExecutorTest.java        |  8 +-
 .../db/query/executor/AggregationExecutor.java     | 96 +++++++++++-----------
 .../iotdb/db/query/executor/QueryRouter.java       | 11 +--
 5 files changed, 79 insertions(+), 71 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
index e3be92c..eb45353 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
@@ -78,8 +78,9 @@ public class ClusterQueryRouter extends QueryRouter {
   }
 
   @Override
-  protected AggregationExecutor getAggregationExecutor(AggregationPlan aggregationPlan) {
-    return new ClusterAggregateExecutor(aggregationPlan, metaGroupMember);
+  protected AggregationExecutor getAggregationExecutor(
+      QueryContext context, AggregationPlan aggregationPlan) {
+    return new ClusterAggregateExecutor(context, aggregationPlan, metaGroupMember);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
index a007ee9..af12e5a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 public class ClusterAggregateExecutor extends AggregationExecutor {
@@ -51,8 +50,9 @@ public class ClusterAggregateExecutor extends AggregationExecutor {
    *
    * @param aggregationPlan
    */
-  public ClusterAggregateExecutor(AggregationPlan aggregationPlan, MetaGroupMember metaMember) {
-    super(aggregationPlan);
+  public ClusterAggregateExecutor(
+      QueryContext context, AggregationPlan aggregationPlan, MetaGroupMember metaMember) {
+    super(context, aggregationPlan);
     this.metaMember = metaMember;
     this.readerFactory = new ClusterReaderFactory(metaMember);
     this.aggregator = new ClusterAggregator(metaMember);
@@ -60,24 +60,28 @@ public class ClusterAggregateExecutor extends AggregationExecutor {
 
   @Override
   protected void aggregateOneSeries(
-      Map.Entry<PartialPath, List<Integer>> pathToAggrIndexes,
-      AggregateResult[] aggregateResultList,
-      Set<String> measurements,
-      Filter timeFilter,
-      QueryContext context)
+      PartialPath seriesPath,
+      List<Integer> indexes,
+      Set<String> allMeasurementsInDevice,
+      Filter timeFilter)
       throws StorageEngineException {
-    PartialPath seriesPath = pathToAggrIndexes.getKey();
-    TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0));
+    TSDataType tsDataType = dataTypes.get(indexes.get(0));
     List<String> aggregationNames = new ArrayList<>();
 
-    for (int i : pathToAggrIndexes.getValue()) {
+    for (int i : indexes) {
       aggregationNames.add(aggregations.get(i));
     }
     List<AggregateResult> aggregateResult =
         aggregator.getAggregateResult(
-            seriesPath, measurements, aggregationNames, tsDataType, timeFilter, context, ascending);
+            seriesPath,
+            allMeasurementsInDevice,
+            aggregationNames,
+            tsDataType,
+            timeFilter,
+            context,
+            ascending);
     int rstIndex = 0;
-    for (int i : pathToAggrIndexes.getValue()) {
+    for (int i : indexes) {
       aggregateResultList[i] = aggregateResult.get(rstIndex++);
     }
   }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
index 7d18a00..ffa461e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
@@ -87,8 +87,8 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest {
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
     try {
-      executor = new ClusterAggregateExecutor(plan, testMetaMember);
-      QueryDataSet queryDataSet = executor.executeWithoutValueFilter(context, plan);
+      executor = new ClusterAggregateExecutor(context, plan, testMetaMember);
+      QueryDataSet queryDataSet = executor.executeWithoutValueFilter(plan);
       assertTrue(queryDataSet.hasNext());
       RowRecord record = queryDataSet.next();
       List<Field> fields = record.getFields();
@@ -144,8 +144,8 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest {
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
     try {
-      executor = new ClusterAggregateExecutor(plan, testMetaMember);
-      QueryDataSet queryDataSet = executor.executeWithValueFilter(context, plan);
+      executor = new ClusterAggregateExecutor(context, plan, testMetaMember);
+      QueryDataSet queryDataSet = executor.executeWithValueFilter(plan);
       assertTrue(queryDataSet.hasNext());
       RowRecord record = queryDataSet.next();
       List<Field> fields = record.getFields();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index bc970ff..871915a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -73,26 +73,25 @@ public class AggregationExecutor {
   protected List<String> aggregations;
   protected IExpression expression;
   protected boolean ascending;
+  protected QueryContext context;
+  protected AggregateResult[] aggregateResultList;
 
   /** aggregation batch calculation size. */
   private int aggregateFetchSize;
 
-  protected AggregationExecutor(AggregationPlan aggregationPlan) {
+  protected AggregationExecutor(QueryContext context, AggregationPlan aggregationPlan) {
     this.selectedSeries = aggregationPlan.getDeduplicatedPaths();
     this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
     this.aggregations = aggregationPlan.getDeduplicatedAggregations();
     this.expression = aggregationPlan.getExpression();
     this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
     this.ascending = aggregationPlan.isAscending();
+    this.context = context;
+    this.aggregateResultList = new AggregateResult[selectedSeries.size()];
   }
 
-  /**
-   * execute aggregate function with only time filter or no filter.
-   *
-   * @param context query context
-   */
-  public QueryDataSet executeWithoutValueFilter(
-      QueryContext context, AggregationPlan aggregationPlan)
+  /** execute aggregate function with only time filter or no filter. */
+  public QueryDataSet executeWithoutValueFilter(AggregationPlan aggregationPlan)
       throws StorageEngineException, IOException, QueryProcessException {
 
     Filter timeFilter = null;
@@ -103,22 +102,30 @@ public class AggregationExecutor {
     // TODO use multi-thread
     Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
         groupAggregationsBySeries(selectedSeries);
-    // Attention: this method will REMOVE vector path from pathToAggrIndexesMap
-    Map<PartialPath, Map<String, List<Integer>>> vectorPathIndexesMap =
-        groupVectorSeries(pathToAggrIndexesMap);
-    AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
-
     // TODO-Cluster: group the paths by storage group to reduce communications
     List<StorageGroupProcessor> list =
         StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
+
+    // Attention: this method will REMOVE vector path from pathToAggrIndexesMap
+    Map<PartialPath, Map<String, List<Integer>>> vectorPathIndexesMap =
+        groupVectorSeries(pathToAggrIndexesMap);
     try {
       for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
+        PartialPath seriesPath = entry.getKey();
         aggregateOneSeries(
-            entry,
-            aggregateResultList,
-            aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()),
-            timeFilter,
-            context);
+            seriesPath,
+            entry.getValue(),
+            aggregationPlan.getAllMeasurementsInDevice(seriesPath.getDevice()),
+            timeFilter);
+      }
+      for (Map.Entry<PartialPath, Map<String, List<Integer>>> entry :
+          vectorPathIndexesMap.entrySet()) {
+        VectorPartialPath vectorSeries = (VectorPartialPath) entry.getKey();
+        aggregateOneVectorSeries(
+            vectorSeries,
+            entry.getValue(),
+            aggregationPlan.getAllMeasurementsInDevice(vectorSeries.getDevice()),
+            timeFilter);
       }
     } finally {
       StorageEngine.getInstance().mergeUnLock(list);
@@ -130,25 +137,21 @@ public class AggregationExecutor {
   /**
    * get aggregation result for one series
    *
-   * @param pathToAggrIndexes entry of path to aggregation indexes map
    * @param timeFilter time filter
-   * @param context query context
    */
   protected void aggregateOneSeries(
-      Map.Entry<PartialPath, List<Integer>> pathToAggrIndexes,
-      AggregateResult[] aggregateResultList,
-      Set<String> measurements,
-      Filter timeFilter,
-      QueryContext context)
+      PartialPath seriesPath,
+      List<Integer> indexes,
+      Set<String> allMeasurementsInDevice,
+      Filter timeFilter)
       throws IOException, QueryProcessException, StorageEngineException {
     List<AggregateResult> ascAggregateResultList = new ArrayList<>();
     List<AggregateResult> descAggregateResultList = new ArrayList<>();
     boolean[] isAsc = new boolean[aggregateResultList.length];
 
-    PartialPath seriesPath = pathToAggrIndexes.getKey();
-    TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0));
+    TSDataType tsDataType = dataTypes.get(indexes.get(0));
 
-    for (int i : pathToAggrIndexes.getValue()) {
+    for (int i : indexes) {
       // construct AggregateResult
       AggregateResult aggregateResult =
           AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType);
@@ -161,7 +164,7 @@ public class AggregationExecutor {
     }
     aggregateOneSeries(
         seriesPath,
-        measurements,
+        allMeasurementsInDevice,
         context,
         timeFilter,
         tsDataType,
@@ -171,7 +174,7 @@ public class AggregationExecutor {
 
     int ascIndex = 0;
     int descIndex = 0;
-    for (int i : pathToAggrIndexes.getValue()) {
+    for (int i : indexes) {
       aggregateResultList[i] =
           isAsc[i]
               ? ascAggregateResultList.get(ascIndex++)
@@ -179,6 +182,13 @@ public class AggregationExecutor {
     }
   }
 
+  protected void aggregateOneVectorSeries(
+      PartialPath seriesPath,
+      Map<String, List<Integer>> subIndexes,
+      Set<String> allMeasurementsInDevice,
+      Filter timeFilter)
+      throws IOException, QueryProcessException, StorageEngineException {}
+
   @SuppressWarnings("squid:S107")
   public static void aggregateOneSeries(
       PartialPath seriesPath,
@@ -338,12 +348,8 @@ public class AggregationExecutor {
     return remainingToCalculate;
   }
 
-  /**
-   * execute aggregate function with value filter.
-   *
-   * @param context query context.
-   */
-  public QueryDataSet executeWithValueFilter(QueryContext context, AggregationPlan queryPlan)
+  /** execute aggregate function with value filter. */
+  public QueryDataSet executeWithValueFilter(AggregationPlan queryPlan)
       throws StorageEngineException, IOException, QueryProcessException {
     optimizeLastElementFunc(queryPlan);
 
@@ -367,15 +373,13 @@ public class AggregationExecutor {
       StorageEngine.getInstance().mergeUnLock(list);
     }
 
-    List<AggregateResult> aggregateResults = new ArrayList<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
-      AggregateResult result =
+      aggregateResultList[i] =
           AggregateResultFactory.getAggrResultByName(
               aggregations.get(i), dataTypes.get(i), ascending);
-      aggregateResults.add(result);
     }
-    aggregateWithValueFilter(aggregateResults, timestampGenerator, readerToAggrIndexesMap);
-    return constructDataSet(aggregateResults, queryPlan);
+    aggregateWithValueFilter(timestampGenerator, readerToAggrIndexesMap);
+    return constructDataSet(Arrays.asList(aggregateResultList), queryPlan);
   }
 
   private void optimizeLastElementFunc(QueryPlan queryPlan) {
@@ -413,7 +417,6 @@ public class AggregationExecutor {
 
   /** calculate aggregation result with value filter. */
   private void aggregateWithValueFilter(
-      List<AggregateResult> aggregateResults,
       TimeGenerator timestampGenerator,
       Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
       throws IOException {
@@ -440,17 +443,16 @@ public class AggregationExecutor {
         if (cached.get(pathId)) {
           Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
           for (Integer i : entry.getValue()) {
-            aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+            aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
           }
         } else {
           if (entry.getValue().size() == 1) {
-            aggregateResults
-                .get(entry.getValue().get(0))
-                .updateResultUsingTimestamps(timeArray, timeArrayLength, entry.getKey());
+            aggregateResultList[entry.getValue().get(0)].updateResultUsingTimestamps(
+                timeArray, timeArrayLength, entry.getKey());
           } else {
             Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
             for (Integer i : entry.getValue()) {
-              aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+              aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
             }
           }
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index d2ae91f..a134d8e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -139,22 +139,23 @@ public class QueryRouter implements IQueryRouter {
 
     aggregationPlan.setExpression(optimizedExpression);
 
-    AggregationExecutor engineExecutor = getAggregationExecutor(aggregationPlan);
+    AggregationExecutor engineExecutor = getAggregationExecutor(context, aggregationPlan);
 
     QueryDataSet dataSet = null;
 
     if (optimizedExpression != null
         && optimizedExpression.getType() != ExpressionType.GLOBAL_TIME) {
-      dataSet = engineExecutor.executeWithValueFilter(context, aggregationPlan);
+      dataSet = engineExecutor.executeWithValueFilter(aggregationPlan);
     } else {
-      dataSet = engineExecutor.executeWithoutValueFilter(context, aggregationPlan);
+      dataSet = engineExecutor.executeWithoutValueFilter(aggregationPlan);
     }
 
     return dataSet;
   }
 
-  protected AggregationExecutor getAggregationExecutor(AggregationPlan aggregationPlan) {
-    return new AggregationExecutor(aggregationPlan);
+  protected AggregationExecutor getAggregationExecutor(
+      QueryContext context, AggregationPlan aggregationPlan) {
+    return new AggregationExecutor(context, aggregationPlan);
   }
 
   @Override