You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/02/04 05:25:32 UTC

[incubator-iotdb] 01/01: [IOTDB-335] Separate query executions of the same timeseries with different aggregate functions may be optimized

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch jira_335
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 93ed6e3d2390e1355800cff76705e4d52244b3b7
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Tue Feb 4 13:24:51 2020 +0800

    [IOTDB-335] Separate query executions of the same timeseries with different aggregate functions may be optimized
---
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |  3 +-
 .../db/query/executor/AggregationExecutor.java     | 93 +++++++++++++---------
 2 files changed, 57 insertions(+), 39 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index f5330de..c9ade59 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -221,8 +221,7 @@ public class QueryPlan extends PhysicalPlan {
     this.deduplicatedPaths = deduplicatedPaths;
   }
 
-  public void setDeduplicatedDataTypes(
-      List<TSDataType> deduplicatedDataTypes) {
+  public void setDeduplicatedDataTypes(List<TSDataType> deduplicatedDataTypes) {
     this.deduplicatedDataTypes = deduplicatedDataTypes;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index aaa4d21..07d51c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -19,6 +19,12 @@
 
 package org.apache.iotdb.db.query.executor;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.path.PathException;
@@ -31,8 +37,8 @@ import org.apache.iotdb.db.query.dataset.SingleDataSet;
 import org.apache.iotdb.db.query.factory.AggreResultFactory;
 import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.seriesRelated.ISeriesReader;
-import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReader;
+import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -43,15 +49,11 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 public class AggregationExecutor {
 
   private List<Path> selectedSeries;
   private List<TSDataType> dataTypes;
-  private List<String> aggres;
+  private List<String> aggregations;
   private IExpression expression;
 
   /**
@@ -62,7 +64,7 @@ public class AggregationExecutor {
   public AggregationExecutor(AggregationPlan aggregationPlan) {
     this.selectedSeries = aggregationPlan.getDeduplicatedPaths();
     this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
-    this.aggres = aggregationPlan.getDeduplicatedAggregations();
+    this.aggregations = aggregationPlan.getDeduplicatedAggregations();
     this.expression = aggregationPlan.getExpression();
     this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
   }
@@ -80,39 +82,43 @@ public class AggregationExecutor {
       timeFilter = ((GlobalTimeExpression) expression).getFilter();
     }
 
-    List<AggregateResult> aggregateResultList = new ArrayList<>();
     //TODO use multi-thread
-    for (int i = 0; i < selectedSeries.size(); i++) {
-      AggregateResult aggregateResult = aggregateOneSeries(i, timeFilter, context);
-      aggregateResultList.add(aggregateResult);
+    Map<Path, List<Integer>> seriesMap = mergeSameSeries(selectedSeries);
+    AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
+    for (Map.Entry<Path, List<Integer>> entry : seriesMap.entrySet()) {
+      List<AggregateResult> aggregateResults = aggregateOneSeries(entry, timeFilter, context);
+      int index = 0;
+      for (int i : entry.getValue()) {
+        aggregateResultList[i] = aggregateResults.get(index);
+        index++;
+      }
     }
-    return constructDataSet(aggregateResultList);
-  }
 
+    return constructDataSet(Arrays.asList(aggregateResultList));
+  }
 
-  /**
-   * get aggregation result for one series
-   */
-  private AggregateResult aggregateOneSeries(int i, Filter timeFilter, QueryContext context)
+  private List<AggregateResult> aggregateOneSeries(Map.Entry<Path, List<Integer>> series,
+      Filter timeFilter, QueryContext context)
       throws IOException, PlannerException, StorageEngineException {
-
-    // construct AggregateResult
-    TSDataType tsDataType = dataTypes.get(i);
-    AggregateResult aggregateResult = AggreResultFactory
-        .getAggrResultByName(aggres.get(i), tsDataType);
-
+    List<AggregateResult> aggregateResultList = new ArrayList<>();
+    Path seriesPath = series.getKey();
+    TSDataType tsDataType = dataTypes.get(series.getValue().get(0));
     // construct series reader without value filter
-    ISeriesReader seriesReader = new SeriesReader(
-        selectedSeries.get(i), tsDataType, context,
+    ISeriesReader seriesReader = new SeriesReader(seriesPath, tsDataType, context,
         QueryResourceManager.getInstance()
-            .getQueryDataSource(selectedSeries.get(i), context, timeFilter), timeFilter, null);
+            .getQueryDataSource(seriesPath, context, timeFilter), timeFilter, null);
 
+    for (int i : series.getValue()) {
+      // construct AggregateResult
+      AggregateResult aggregateResult = AggreResultFactory
+          .getAggrResultByName(aggregations.get(i), tsDataType);
+      aggregateResultList.add(aggregateResult);
+    }
     while (seriesReader.hasNextChunk()) {
       if (seriesReader.canUseCurrentChunkStatistics()) {
         Statistics chunkStatistics = seriesReader.currentChunkStatistics();
-        aggregateResult.updateResultFromStatistics(chunkStatistics);
-        if (aggregateResult.isCalculatedAggregationResult()) {
-          return aggregateResult;
+        for (AggregateResult aggregateResult : aggregateResultList) {
+          aggregateResult.updateResultFromStatistics(chunkStatistics);
         }
         seriesReader.skipCurrentChunk();
         continue;
@@ -121,23 +127,21 @@ public class AggregationExecutor {
         //cal by pageheader
         if (seriesReader.canUseCurrentPageStatistics()) {
           Statistics pageStatistic = seriesReader.currentPageStatistics();
-          aggregateResult.updateResultFromStatistics(pageStatistic);
-          if (aggregateResult.isCalculatedAggregationResult()) {
-            return aggregateResult;
+          for (AggregateResult aggregateResult : aggregateResultList) {
+            aggregateResult.updateResultFromStatistics(pageStatistic);
           }
           seriesReader.skipCurrentPage();
           continue;
         }
         //cal by pagedata
         while (seriesReader.hasNextOverlappedPage()) {
-          aggregateResult.updateResultFromPageData(seriesReader.nextOverlappedPage());
-          if (aggregateResult.isCalculatedAggregationResult()) {
-            return aggregateResult;
+          for (AggregateResult aggregateResult : aggregateResultList) {
+            aggregateResult.updateResultFromPageData(seriesReader.nextOverlappedPage());
           }
         }
       }
     }
-    return aggregateResult;
+    return aggregateResultList;
   }
 
 
@@ -162,7 +166,7 @@ public class AggregationExecutor {
     List<AggregateResult> aggregateResults = new ArrayList<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
       TSDataType type = dataTypes.get(i);
-      AggregateResult result = AggreResultFactory.getAggrResultByName(aggres.get(i), type);
+      AggregateResult result = AggreResultFactory.getAggrResultByName(aggregations.get(i), type);
       aggregateResults.add(result);
     }
     aggregateWithValueFilter(aggregateResults, timestampGenerator, readersOfSelectedSeries);
@@ -214,4 +218,19 @@ public class AggregationExecutor {
     dataSet.setRecord(record);
     return dataSet;
   }
+
+  private Map<Path, List<Integer>> mergeSameSeries(List<Path> selectedSeries) {
+    Map<Path, List<Integer>> seriesMap = new HashMap<>();
+    for (int i = 0; i < selectedSeries.size(); i++) {
+      Path series = selectedSeries.get(i);
+      if (seriesMap.containsKey(series)) {
+        seriesMap.get(series).add(i);
+      } else {
+        List<Integer> indexList = new ArrayList<>();
+        indexList.add(i);
+        seriesMap.put(series, indexList);
+      }
+    }
+    return seriesMap;
+  }
 }