You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/02/05 06:25:36 UTC

[incubator-iotdb] 01/01: [IOTDB-452] Do all aggregations of one series at one pass in GroupBy

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch jira_452
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 41afe1120ee4d22797e0d6022677b38d22124114
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Wed Feb 5 14:22:46 2020 +0800

    [IOTDB-452] Do all aggregations of one series at one pass in GroupBy
---
 .../groupby/GroupByWithoutValueFilterDataSet.java  | 217 ++++++++++++++++-----
 .../db/query/executor/AggregationExecutor.java     |   6 +-
 .../apache/iotdb/tsfile/read/common/BatchData.java |   9 +-
 3 files changed, 178 insertions(+), 54 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 4527671..291cc8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.query.dataset.groupby;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.PlannerException;
 import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
@@ -31,7 +33,7 @@ import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.factory.AggreResultFactory;
 import org.apache.iotdb.db.query.reader.seriesRelated.AggregateReader;
 import org.apache.iotdb.db.query.reader.seriesRelated.IAggregateReader;
-import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Field;
@@ -44,7 +46,17 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
 
-  private List<IAggregateReader> seriesReaders;
+  /**
+   * Merges same series to one map. For example: Given: paths: s1, s2, s3, s1 and aggregations:
+   * count, sum, count, sum seriesMap: s1 -> 0, 3; s2 -> 2; s3 -> 3
+   */
+  private Map<Path, List<Integer>> seriesReaders;
+
+  /**
+   * Maps path and its aggregate reader
+   */
+  private Map<Path, IAggregateReader> aggregateReaders;
+
   private List<BatchData> cachedBatchDataList;
   private Filter timeFilter;
   private GroupByPlan groupByPlan;
@@ -56,7 +68,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
       throws StorageEngineException {
     super(context, groupByPlan);
 
-    this.seriesReaders = new ArrayList<>();
+    this.seriesReaders = new HashMap<>();
+    this.aggregateReaders = new HashMap<>();
     this.timeFilter = null;
     this.cachedBatchDataList = new ArrayList<>();
     for (int i = 0; i < paths.size(); i++) {
@@ -77,13 +90,16 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
     if (expression != null) {
       timeFilter = ((GlobalTimeExpression) expression).getFilter();
     }
-
     for (int i = 0; i < paths.size(); i++) {
       Path path = paths.get(i);
-      IAggregateReader seriesReader = new AggregateReader(path, dataTypes.get(i), context,
-          QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter),
-          timeFilter, null);
-      seriesReaders.add(seriesReader);
+      List<Integer> indexList = seriesReaders.computeIfAbsent(path, key -> new ArrayList<>());
+      indexList.add(i);
+      if (!aggregateReaders.containsKey(path)) {
+        IAggregateReader reader = new AggregateReader(path, dataTypes.get(i), context,
+            QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter),
+            timeFilter, null);
+        aggregateReaders.put(path, reader);
+      }
     }
   }
 
@@ -95,16 +111,25 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
     }
     hasCachedTimeInterval = false;
     RowRecord record = new RowRecord(curStartTime);
-    for (int i = 0; i < paths.size(); i++) {
-      AggregateResult res;
+    AggregateResult[] aggregateResultList = new AggregateResult[paths.size()];
+    for (Map.Entry<Path, List<Integer>> entry : seriesReaders.entrySet()) {
+      List<AggregateResult> aggregateResults;
       try {
-        res = nextIntervalAggregation(i);
+        aggregateResults = nextIntervalAggregation(entry);
       } catch (PlannerException e) {
         throw new IOException(e);
       }
-      if (res == null) {
-        record.addField(new Field(null));
-      } else {
+
+      int index = 0;
+      for (int i : entry.getValue()) {
+        aggregateResultList[i] = aggregateResults.get(index);
+        index++;
+      }
+    }
+    if (aggregateResultList.length == 0) {
+      record.addField(new Field(null));
+    } else {
+      for (AggregateResult res : aggregateResultList) {
         record.addField(res.getResult(), res.getDataType());
       }
     }
@@ -112,65 +137,156 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
   }
 
   /**
-   * calculate the group by result of the series indexed by idx.
+   * calculate the group by result of one series
    *
-   * @param idx series id
+   * @param seriesReader series reader map
    */
-  private AggregateResult nextIntervalAggregation(int idx) throws IOException, PlannerException {
-    IAggregateReader reader = seriesReaders.get(idx);
-    AggregateResult result = AggreResultFactory
-        .getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(idx),
-            groupByPlan.getDeduplicatedDataTypes().get(idx));
-
-    TimeRange timeRange = new TimeRange(curStartTime, curEndTime - 1);
-
-    BatchData lastBatch = cachedBatchDataList.get(idx);
-    calcBatchData(result, lastBatch);
-    if (isEndCalc(result, lastBatch)) {
-      return result;
+  private List<AggregateResult> nextIntervalAggregation(Map.Entry<Path,
+      List<Integer>> seriesReader) throws IOException, PlannerException {
+    List<AggregateResult> aggregateResultList = new ArrayList<>();
+    List<BatchData> batchDataList = new ArrayList<>();
+    List<Boolean> isCalculatedList = new ArrayList<>();
+    List<Integer> indexList = seriesReader.getValue();
+
+    int remainingToCalculate = indexList.size();
+    TSDataType tsDataType = groupByPlan.getDeduplicatedDataTypes().get(indexList.get(0));
+
+    for (int index : indexList) {
+      AggregateResult result = AggreResultFactory
+          .getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(index), tsDataType);
+      aggregateResultList.add(result);
+
+      BatchData lastBatch = cachedBatchDataList.get(index);
+      batchDataList.add(lastBatch);
+
+      calcBatchData(result, lastBatch);
+      if (isEndCalc(result, lastBatch)) {
+        isCalculatedList.add(true);
+        remainingToCalculate--;
+        if (remainingToCalculate == 0) {
+          return aggregateResultList;
+        }
+      } else {
+        isCalculatedList.add(false);
+      }
     }
+
+    IAggregateReader reader = aggregateReaders.get(seriesReader.getKey());
+
     while (reader.hasNextChunk()) {
       Statistics chunkStatistics = reader.currentChunkStatistics();
       if (chunkStatistics.getStartTime() >= curEndTime) {
         break;
       }
+      List<Integer> calculatedList = new ArrayList<>(); // already calculated
+      TimeRange timeRange = new TimeRange(curStartTime, curEndTime - 1);
+
       if (reader.canUseCurrentChunkStatistics() && timeRange.contains(
           new TimeRange(chunkStatistics.getStartTime(), chunkStatistics.getEndTime()))) {
-        result.updateResultFromStatistics(chunkStatistics);
-        if (result.isCalculatedAggregationResult()) {
-          break;
+        for (int i = 0; i < aggregateResultList.size(); i++) {
+          if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
+            AggregateResult result = aggregateResultList.get(i);
+            result.updateResultFromStatistics(chunkStatistics);
+            if (!result.isCalculatedAggregationResult()) {
+              // already calculated
+              calculatedList.add(i);
+            }
+          }
         }
         reader.skipCurrentChunk();
         continue;
       }
+      //cal by pageheader
+      nextPage(seriesReader, aggregateResultList, calculatedList, batchDataList);
+    }
+    return aggregateResultList;
+  }
 
-      while (reader.hasNextPage()) {
-        Statistics pageStatistics = reader.currentPageStatistics();
-        if (pageStatistics.getStartTime() >= curEndTime) {
-          break;
-        }
-        if (reader.canUseCurrentPageStatistics() && timeRange.contains(
-            new TimeRange(pageStatistics.getStartTime(), pageStatistics.getEndTime()))) {
-          result.updateResultFromStatistics(pageStatistics);
-          if (result.isCalculatedAggregationResult()) {
-            break;
+  /**
+   * calculate the group by result of one series by page header
+   *
+   * @param seriesReader series reader map entry
+   * @param aggregateResultList aggregate result list
+   * @param calculatedList calculated list, recording which result is already calculated
+   * @param batchDataList batch data list
+   */
+  private void nextPage(Map.Entry<Path, List<Integer>> seriesReader,
+      List<AggregateResult> aggregateResultList, List<Integer> calculatedList,
+      List<BatchData> batchDataList) throws IOException, PlannerException {
+    IAggregateReader reader = aggregateReaders.get(seriesReader.getKey());
+    while (reader.hasNextPage()) {
+      Statistics pageStatistics = reader.currentPageStatistics();
+      if (pageStatistics.getStartTime() >= curEndTime) {
+        break;
+      }
+      TimeRange timeRange = new TimeRange(curStartTime, curEndTime - 1);
+
+      if (reader.canUseCurrentPageStatistics() && timeRange.contains(
+          new TimeRange(pageStatistics.getStartTime(), pageStatistics.getEndTime()))) {
+        for (Integer index : seriesReader.getValue()) {
+          if (!calculatedList.contains(index)) {
+            AggregateResult result = aggregateResultList.get(index);
+            result.updateResultFromStatistics(pageStatistics);
+            if (!result.isCalculatedAggregationResult()) {
+              // already calculated
+              calculatedList.add(index);
+            }
           }
-          reader.skipCurrentPage();
-          continue;
         }
-        while (reader.hasNextOverlappedPage()) {
-          BatchData batchData = reader.nextOverlappedPage();
+        reader.skipCurrentPage();
+        continue;
+      }
+      // calculate by page data
+      nextOverlappedPage(seriesReader, aggregateResultList, calculatedList, batchDataList);
+    }
+  }
+
+  /**
+   * calculate the group by result of one series by page data
+   *
+   * @param seriesReader series reader map entry
+   * @param aggregateResultList aggregate result list
+   * @param calculatedList calculated list, recording which result is already calculated
+   * @param batchDataList batch data list
+   */
+  private void nextOverlappedPage(Map.Entry<Path, List<Integer>> seriesReader,
+      List<AggregateResult> aggregateResultList, List<Integer> calculatedList,
+      List<BatchData> batchDataList) throws IOException, PlannerException {
+    List<Boolean> isCalculatedList = new ArrayList<>();
+    IAggregateReader reader = aggregateReaders.get(seriesReader.getKey());
+
+    int remainingToCalculate = 0;
+    for (int i = 0; i < seriesReader.getValue().size(); i++) {
+      if (!calculatedList.contains(i)) {
+        isCalculatedList.add(false);
+        remainingToCalculate++;
+      } else {
+        isCalculatedList.add(true);
+      }
+    }
+
+    while (reader.hasNextOverlappedPage()) {
+      BatchData batchData = reader.nextOverlappedPage();
+      for (int i = 0; i < seriesReader.getValue().size(); i++) {
+        if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
+          AggregateResult result = aggregateResultList.get(i);
           calcBatchData(result, batchData);
+
+          int idx = seriesReader.getValue().get(i);
           if (batchData.hasCurrent()) {
             cachedBatchDataList.set(idx, batchData);
           }
-          if (isEndCalc(result, lastBatch)) {
-            break;
+
+          if (isEndCalc(result, batchDataList.get(i))) {
+            isCalculatedList.set(i, true);
+            remainingToCalculate--;
+            if (remainingToCalculate == 0) {
+              return;
+            }
           }
         }
       }
     }
-    return result;
   }
 
   private boolean isEndCalc(AggregateResult function, BatchData lastBatch) {
@@ -181,8 +297,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
   /**
    * this batchData >= curEndTime
    */
-  private void calcBatchData(AggregateResult result, BatchData batchData)
-      throws IOException {
+  private void calcBatchData(AggregateResult result, BatchData batchData) throws IOException {
     if (batchData == null || !batchData.hasCurrent()) {
       return;
     }
@@ -191,6 +306,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
     }
     if (batchData.hasCurrent()) {
       result.updateResultFromPageData(batchData, curEndTime);
+      // reset batch data for next calculation
+      batchData.resetBatchData();
     }
   }
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 6991c97..8c020ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -175,9 +175,9 @@ public class AggregationExecutor {
               if (aggregateResult.isCalculatedAggregationResult()) {
                 isCalculatedList.set(i, true);
                 remainingToCalculate--;
-              }
-              if (remainingToCalculate == 0) {
-                return aggregateResultList;
+                if (remainingToCalculate == 0) {
+                  return aggregateResultList;
+                }
               }
             }
           }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index 9d8c998..db47959 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -521,5 +521,12 @@ public class BatchData implements Serializable {
     return new BatchDataIterator(this);
   }
 
-
+  /**
+   * This method is used to reset batch data when more than one group by aggregation functions
+   * visit the same batch data
+   */
+  public void resetBatchData() {
+    this.readCurArrayIndex = 0;
+    this.readCurListIndex = 0;
+  }
 }