You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/02/05 06:25:36 UTC
[incubator-iotdb] 01/01: [IOTDB-452] Do all aggregations of one
series at one pass in GroupBy
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch jira_452
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 41afe1120ee4d22797e0d6022677b38d22124114
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Wed Feb 5 14:22:46 2020 +0800
[IOTDB-452] Do all aggregations of one series at one pass in GroupBy
---
.../groupby/GroupByWithoutValueFilterDataSet.java | 217 ++++++++++++++++-----
.../db/query/executor/AggregationExecutor.java | 6 +-
.../apache/iotdb/tsfile/read/common/BatchData.java | 9 +-
3 files changed, 178 insertions(+), 54 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 4527671..291cc8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.query.dataset.groupby;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.PlannerException;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
@@ -31,7 +33,7 @@ import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.factory.AggreResultFactory;
import org.apache.iotdb.db.query.reader.seriesRelated.AggregateReader;
import org.apache.iotdb.db.query.reader.seriesRelated.IAggregateReader;
-import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Field;
@@ -44,7 +46,17 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
- private List<IAggregateReader> seriesReaders;
+ /**
+ * Merges same series to one map. For example: Given: paths: s1, s2, s3, s1 and aggregations:
+ * count, sum, count, sum seriesMap: s1 -> 0, 3; s2 -> 2; s3 -> 3
+ */
+ private Map<Path, List<Integer>> seriesReaders;
+
+ /**
+ * Maps path and its aggregate reader
+ */
+ private Map<Path, IAggregateReader> aggregateReaders;
+
private List<BatchData> cachedBatchDataList;
private Filter timeFilter;
private GroupByPlan groupByPlan;
@@ -56,7 +68,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
throws StorageEngineException {
super(context, groupByPlan);
- this.seriesReaders = new ArrayList<>();
+ this.seriesReaders = new HashMap<>();
+ this.aggregateReaders = new HashMap<>();
this.timeFilter = null;
this.cachedBatchDataList = new ArrayList<>();
for (int i = 0; i < paths.size(); i++) {
@@ -77,13 +90,16 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
if (expression != null) {
timeFilter = ((GlobalTimeExpression) expression).getFilter();
}
-
for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);
- IAggregateReader seriesReader = new AggregateReader(path, dataTypes.get(i), context,
- QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter),
- timeFilter, null);
- seriesReaders.add(seriesReader);
+ List<Integer> indexList = seriesReaders.computeIfAbsent(path, key -> new ArrayList<>());
+ indexList.add(i);
+ if (!aggregateReaders.containsKey(path)) {
+ IAggregateReader reader = new AggregateReader(path, dataTypes.get(i), context,
+ QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter),
+ timeFilter, null);
+ aggregateReaders.put(path, reader);
+ }
}
}
@@ -95,16 +111,25 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
}
hasCachedTimeInterval = false;
RowRecord record = new RowRecord(curStartTime);
- for (int i = 0; i < paths.size(); i++) {
- AggregateResult res;
+ AggregateResult[] aggregateResultList = new AggregateResult[paths.size()];
+ for (Map.Entry<Path, List<Integer>> entry : seriesReaders.entrySet()) {
+ List<AggregateResult> aggregateResults;
try {
- res = nextIntervalAggregation(i);
+ aggregateResults = nextIntervalAggregation(entry);
} catch (PlannerException e) {
throw new IOException(e);
}
- if (res == null) {
- record.addField(new Field(null));
- } else {
+
+ int index = 0;
+ for (int i : entry.getValue()) {
+ aggregateResultList[i] = aggregateResults.get(index);
+ index++;
+ }
+ }
+ if (aggregateResultList.length == 0) {
+ record.addField(new Field(null));
+ } else {
+ for (AggregateResult res : aggregateResultList) {
record.addField(res.getResult(), res.getDataType());
}
}
@@ -112,65 +137,156 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
}
/**
- * calculate the group by result of the series indexed by idx.
+ * calculate the group by result of one series
*
- * @param idx series id
+ * @param seriesReader series reader map
*/
- private AggregateResult nextIntervalAggregation(int idx) throws IOException, PlannerException {
- IAggregateReader reader = seriesReaders.get(idx);
- AggregateResult result = AggreResultFactory
- .getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(idx),
- groupByPlan.getDeduplicatedDataTypes().get(idx));
-
- TimeRange timeRange = new TimeRange(curStartTime, curEndTime - 1);
-
- BatchData lastBatch = cachedBatchDataList.get(idx);
- calcBatchData(result, lastBatch);
- if (isEndCalc(result, lastBatch)) {
- return result;
+ private List<AggregateResult> nextIntervalAggregation(Map.Entry<Path,
+ List<Integer>> seriesReader) throws IOException, PlannerException {
+ List<AggregateResult> aggregateResultList = new ArrayList<>();
+ List<BatchData> batchDataList = new ArrayList<>();
+ List<Boolean> isCalculatedList = new ArrayList<>();
+ List<Integer> indexList = seriesReader.getValue();
+
+ int remainingToCalculate = indexList.size();
+ TSDataType tsDataType = groupByPlan.getDeduplicatedDataTypes().get(indexList.get(0));
+
+ for (int index : indexList) {
+ AggregateResult result = AggreResultFactory
+ .getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(index), tsDataType);
+ aggregateResultList.add(result);
+
+ BatchData lastBatch = cachedBatchDataList.get(index);
+ batchDataList.add(lastBatch);
+
+ calcBatchData(result, lastBatch);
+ if (isEndCalc(result, lastBatch)) {
+ isCalculatedList.add(true);
+ remainingToCalculate--;
+ if (remainingToCalculate == 0) {
+ return aggregateResultList;
+ }
+ } else {
+ isCalculatedList.add(false);
+ }
}
+
+ IAggregateReader reader = aggregateReaders.get(seriesReader.getKey());
+
while (reader.hasNextChunk()) {
Statistics chunkStatistics = reader.currentChunkStatistics();
if (chunkStatistics.getStartTime() >= curEndTime) {
break;
}
+ List<Integer> calculatedList = new ArrayList<>(); // already calculated
+ TimeRange timeRange = new TimeRange(curStartTime, curEndTime - 1);
+
if (reader.canUseCurrentChunkStatistics() && timeRange.contains(
new TimeRange(chunkStatistics.getStartTime(), chunkStatistics.getEndTime()))) {
- result.updateResultFromStatistics(chunkStatistics);
- if (result.isCalculatedAggregationResult()) {
- break;
+ for (int i = 0; i < aggregateResultList.size(); i++) {
+ if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
+ AggregateResult result = aggregateResultList.get(i);
+ result.updateResultFromStatistics(chunkStatistics);
+ if (!result.isCalculatedAggregationResult()) {
+ // already calculated
+ calculatedList.add(i);
+ }
+ }
}
reader.skipCurrentChunk();
continue;
}
+ //cal by pageheader
+ nextPage(seriesReader, aggregateResultList, calculatedList, batchDataList);
+ }
+ return aggregateResultList;
+ }
- while (reader.hasNextPage()) {
- Statistics pageStatistics = reader.currentPageStatistics();
- if (pageStatistics.getStartTime() >= curEndTime) {
- break;
- }
- if (reader.canUseCurrentPageStatistics() && timeRange.contains(
- new TimeRange(pageStatistics.getStartTime(), pageStatistics.getEndTime()))) {
- result.updateResultFromStatistics(pageStatistics);
- if (result.isCalculatedAggregationResult()) {
- break;
+ /**
+ * calculate the group by result of one series by page header
+ *
+ * @param seriesReader series reader map entry
+ * @param aggregateResultList aggregate result list
+ * @param calculatedList calculated list, recording which result is already calculated
+ * @param batchDataList batch data list
+ */
+ private void nextPage(Map.Entry<Path, List<Integer>> seriesReader,
+ List<AggregateResult> aggregateResultList, List<Integer> calculatedList,
+ List<BatchData> batchDataList) throws IOException, PlannerException {
+ IAggregateReader reader = aggregateReaders.get(seriesReader.getKey());
+ while (reader.hasNextPage()) {
+ Statistics pageStatistics = reader.currentPageStatistics();
+ if (pageStatistics.getStartTime() >= curEndTime) {
+ break;
+ }
+ TimeRange timeRange = new TimeRange(curStartTime, curEndTime - 1);
+
+ if (reader.canUseCurrentPageStatistics() && timeRange.contains(
+ new TimeRange(pageStatistics.getStartTime(), pageStatistics.getEndTime()))) {
+ for (Integer index : seriesReader.getValue()) {
+ if (!calculatedList.contains(index)) {
+ AggregateResult result = aggregateResultList.get(index);
+ result.updateResultFromStatistics(pageStatistics);
+ if (!result.isCalculatedAggregationResult()) {
+ // already calculated
+ calculatedList.add(index);
+ }
}
- reader.skipCurrentPage();
- continue;
}
- while (reader.hasNextOverlappedPage()) {
- BatchData batchData = reader.nextOverlappedPage();
+ reader.skipCurrentPage();
+ continue;
+ }
+ // calculate by page data
+ nextOverlappedPage(seriesReader, aggregateResultList, calculatedList, batchDataList);
+ }
+ }
+
+ /**
+ * calculate the group by result of one series by page data
+ *
+ * @param seriesReader series reader map entry
+ * @param aggregateResultList aggregate result list
+ * @param calculatedList calculated list, recording which result is already calculated
+ * @param batchDataList batch data list
+ */
+ private void nextOverlappedPage(Map.Entry<Path, List<Integer>> seriesReader,
+ List<AggregateResult> aggregateResultList, List<Integer> calculatedList,
+ List<BatchData> batchDataList) throws IOException, PlannerException {
+ List<Boolean> isCalculatedList = new ArrayList<>();
+ IAggregateReader reader = aggregateReaders.get(seriesReader.getKey());
+
+ int remainingToCalculate = 0;
+ for (int i = 0; i < seriesReader.getValue().size(); i++) {
+ if (!calculatedList.contains(i)) {
+ isCalculatedList.add(false);
+ remainingToCalculate++;
+ } else {
+ isCalculatedList.add(true);
+ }
+ }
+
+ while (reader.hasNextOverlappedPage()) {
+ BatchData batchData = reader.nextOverlappedPage();
+ for (int i = 0; i < seriesReader.getValue().size(); i++) {
+ if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
+ AggregateResult result = aggregateResultList.get(i);
calcBatchData(result, batchData);
+
+ int idx = seriesReader.getValue().get(i);
if (batchData.hasCurrent()) {
cachedBatchDataList.set(idx, batchData);
}
- if (isEndCalc(result, lastBatch)) {
- break;
+
+ if (isEndCalc(result, batchDataList.get(i))) {
+ isCalculatedList.set(i, true);
+ remainingToCalculate--;
+ if (remainingToCalculate == 0) {
+ return;
+ }
}
}
}
}
- return result;
}
private boolean isEndCalc(AggregateResult function, BatchData lastBatch) {
@@ -181,8 +297,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
/**
* this batchData >= curEndTime
*/
- private void calcBatchData(AggregateResult result, BatchData batchData)
- throws IOException {
+ private void calcBatchData(AggregateResult result, BatchData batchData) throws IOException {
if (batchData == null || !batchData.hasCurrent()) {
return;
}
@@ -191,6 +306,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
}
if (batchData.hasCurrent()) {
result.updateResultFromPageData(batchData, curEndTime);
+ // reset batch data for next calculation
+ batchData.resetBatchData();
}
}
}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 6991c97..8c020ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -175,9 +175,9 @@ public class AggregationExecutor {
if (aggregateResult.isCalculatedAggregationResult()) {
isCalculatedList.set(i, true);
remainingToCalculate--;
- }
- if (remainingToCalculate == 0) {
- return aggregateResultList;
+ if (remainingToCalculate == 0) {
+ return aggregateResultList;
+ }
}
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index 9d8c998..db47959 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -521,5 +521,12 @@ public class BatchData implements Serializable {
return new BatchDataIterator(this);
}
-
+ /**
+ * This method is used to reset batch data when more than one group by aggregation functions
+ * visit the same batch data
+ */
+ public void resetBatchData() {
+ this.readCurArrayIndex = 0;
+ this.readCurListIndex = 0;
+ }
}