You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/02/04 05:25:31 UTC

[incubator-iotdb] branch jira_335 created (now 93ed6e3)

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a change to branch jira_335
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 93ed6e3  [IOTDB-335] Separate query executions of the same timeseries with different aggregate functions may be optimized

This branch includes the following new commits:

     new 93ed6e3  [IOTDB-335] Separate query executions of the same timeseries with different aggregate functions may be optimized

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: [IOTDB-335] Separate query executions of the same timeseries with different aggregate functions may be optimized

Posted by su...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch jira_335
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 93ed6e3d2390e1355800cff76705e4d52244b3b7
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Tue Feb 4 13:24:51 2020 +0800

    [IOTDB-335] Separate query executions of the same timeseries with different aggregate functions may be optimized
---
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |  3 +-
 .../db/query/executor/AggregationExecutor.java     | 93 +++++++++++++---------
 2 files changed, 57 insertions(+), 39 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index f5330de..c9ade59 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -221,8 +221,7 @@ public class QueryPlan extends PhysicalPlan {
     this.deduplicatedPaths = deduplicatedPaths;
   }
 
-  public void setDeduplicatedDataTypes(
-      List<TSDataType> deduplicatedDataTypes) {
+  public void setDeduplicatedDataTypes(List<TSDataType> deduplicatedDataTypes) {
     this.deduplicatedDataTypes = deduplicatedDataTypes;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index aaa4d21..07d51c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -19,6 +19,12 @@
 
 package org.apache.iotdb.db.query.executor;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.path.PathException;
@@ -31,8 +37,8 @@ import org.apache.iotdb.db.query.dataset.SingleDataSet;
 import org.apache.iotdb.db.query.factory.AggreResultFactory;
 import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.seriesRelated.ISeriesReader;
-import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReader;
+import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -43,15 +49,11 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 public class AggregationExecutor {
 
   private List<Path> selectedSeries;
   private List<TSDataType> dataTypes;
-  private List<String> aggres;
+  private List<String> aggregations;
   private IExpression expression;
 
   /**
@@ -62,7 +64,7 @@ public class AggregationExecutor {
   public AggregationExecutor(AggregationPlan aggregationPlan) {
     this.selectedSeries = aggregationPlan.getDeduplicatedPaths();
     this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
-    this.aggres = aggregationPlan.getDeduplicatedAggregations();
+    this.aggregations = aggregationPlan.getDeduplicatedAggregations();
     this.expression = aggregationPlan.getExpression();
     this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
   }
@@ -80,39 +82,43 @@ public class AggregationExecutor {
       timeFilter = ((GlobalTimeExpression) expression).getFilter();
     }
 
-    List<AggregateResult> aggregateResultList = new ArrayList<>();
     //TODO use multi-thread
-    for (int i = 0; i < selectedSeries.size(); i++) {
-      AggregateResult aggregateResult = aggregateOneSeries(i, timeFilter, context);
-      aggregateResultList.add(aggregateResult);
+    Map<Path, List<Integer>> seriesMap = mergeSameSeries(selectedSeries);
+    AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
+    for (Map.Entry<Path, List<Integer>> entry : seriesMap.entrySet()) {
+      List<AggregateResult> aggregateResults = aggregateOneSeries(entry, timeFilter, context);
+      int index = 0;
+      for (int i : entry.getValue()) {
+        aggregateResultList[i] = aggregateResults.get(index);
+        index++;
+      }
     }
-    return constructDataSet(aggregateResultList);
-  }
 
+    return constructDataSet(Arrays.asList(aggregateResultList));
+  }
 
-  /**
-   * get aggregation result for one series
-   */
-  private AggregateResult aggregateOneSeries(int i, Filter timeFilter, QueryContext context)
+  private List<AggregateResult> aggregateOneSeries(Map.Entry<Path, List<Integer>> series,
+      Filter timeFilter, QueryContext context)
       throws IOException, PlannerException, StorageEngineException {
-
-    // construct AggregateResult
-    TSDataType tsDataType = dataTypes.get(i);
-    AggregateResult aggregateResult = AggreResultFactory
-        .getAggrResultByName(aggres.get(i), tsDataType);
-
+    List<AggregateResult> aggregateResultList = new ArrayList<>();
+    Path seriesPath = series.getKey();
+    TSDataType tsDataType = dataTypes.get(series.getValue().get(0));
     // construct series reader without value filter
-    ISeriesReader seriesReader = new SeriesReader(
-        selectedSeries.get(i), tsDataType, context,
+    ISeriesReader seriesReader = new SeriesReader(seriesPath, tsDataType, context,
         QueryResourceManager.getInstance()
-            .getQueryDataSource(selectedSeries.get(i), context, timeFilter), timeFilter, null);
+            .getQueryDataSource(seriesPath, context, timeFilter), timeFilter, null);
 
+    for (int i : series.getValue()) {
+      // construct AggregateResult
+      AggregateResult aggregateResult = AggreResultFactory
+          .getAggrResultByName(aggregations.get(i), tsDataType);
+      aggregateResultList.add(aggregateResult);
+    }
     while (seriesReader.hasNextChunk()) {
       if (seriesReader.canUseCurrentChunkStatistics()) {
         Statistics chunkStatistics = seriesReader.currentChunkStatistics();
-        aggregateResult.updateResultFromStatistics(chunkStatistics);
-        if (aggregateResult.isCalculatedAggregationResult()) {
-          return aggregateResult;
+        for (AggregateResult aggregateResult : aggregateResultList) {
+          aggregateResult.updateResultFromStatistics(chunkStatistics);
         }
         seriesReader.skipCurrentChunk();
         continue;
@@ -121,23 +127,21 @@ public class AggregationExecutor {
         //cal by pageheader
         if (seriesReader.canUseCurrentPageStatistics()) {
           Statistics pageStatistic = seriesReader.currentPageStatistics();
-          aggregateResult.updateResultFromStatistics(pageStatistic);
-          if (aggregateResult.isCalculatedAggregationResult()) {
-            return aggregateResult;
+          for (AggregateResult aggregateResult : aggregateResultList) {
+            aggregateResult.updateResultFromStatistics(pageStatistic);
           }
           seriesReader.skipCurrentPage();
           continue;
         }
         //cal by pagedata
         while (seriesReader.hasNextOverlappedPage()) {
-          aggregateResult.updateResultFromPageData(seriesReader.nextOverlappedPage());
-          if (aggregateResult.isCalculatedAggregationResult()) {
-            return aggregateResult;
+          for (AggregateResult aggregateResult : aggregateResultList) {
+            aggregateResult.updateResultFromPageData(seriesReader.nextOverlappedPage());
           }
         }
       }
     }
-    return aggregateResult;
+    return aggregateResultList;
   }
 
 
@@ -162,7 +166,7 @@ public class AggregationExecutor {
     List<AggregateResult> aggregateResults = new ArrayList<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
       TSDataType type = dataTypes.get(i);
-      AggregateResult result = AggreResultFactory.getAggrResultByName(aggres.get(i), type);
+      AggregateResult result = AggreResultFactory.getAggrResultByName(aggregations.get(i), type);
       aggregateResults.add(result);
     }
     aggregateWithValueFilter(aggregateResults, timestampGenerator, readersOfSelectedSeries);
@@ -214,4 +218,19 @@ public class AggregationExecutor {
     dataSet.setRecord(record);
     return dataSet;
   }
+
+  private Map<Path, List<Integer>> mergeSameSeries(List<Path> selectedSeries) {
+    Map<Path, List<Integer>> seriesMap = new HashMap<>();
+    for (int i = 0; i < selectedSeries.size(); i++) {
+      Path series = selectedSeries.get(i);
+      if (seriesMap.containsKey(series)) {
+        seriesMap.get(series).add(i);
+      } else {
+        List<Integer> indexList = new ArrayList<>();
+        indexList.add(i);
+        seriesMap.put(series, indexList);
+      }
+    }
+    return seriesMap;
+  }
 }