You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/02/04 05:25:32 UTC
[incubator-iotdb] 01/01: [IOTDB-335] Separate query executions of
the same timeseries with different aggregate functions may be optimized
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch jira_335
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 93ed6e3d2390e1355800cff76705e4d52244b3b7
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Tue Feb 4 13:24:51 2020 +0800
[IOTDB-335] Separate query executions of the same timeseries with different aggregate functions may be optimized
---
.../iotdb/db/qp/physical/crud/QueryPlan.java | 3 +-
.../db/query/executor/AggregationExecutor.java | 93 +++++++++++++---------
2 files changed, 57 insertions(+), 39 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index f5330de..c9ade59 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -221,8 +221,7 @@ public class QueryPlan extends PhysicalPlan {
this.deduplicatedPaths = deduplicatedPaths;
}
- public void setDeduplicatedDataTypes(
- List<TSDataType> deduplicatedDataTypes) {
+ public void setDeduplicatedDataTypes(List<TSDataType> deduplicatedDataTypes) {
this.deduplicatedDataTypes = deduplicatedDataTypes;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index aaa4d21..07d51c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -19,6 +19,12 @@
package org.apache.iotdb.db.query.executor;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.path.PathException;
@@ -31,8 +37,8 @@ import org.apache.iotdb.db.query.dataset.SingleDataSet;
import org.apache.iotdb.db.query.factory.AggreResultFactory;
import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.seriesRelated.ISeriesReader;
-import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReader;
+import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -43,15 +49,11 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
public class AggregationExecutor {
private List<Path> selectedSeries;
private List<TSDataType> dataTypes;
- private List<String> aggres;
+ private List<String> aggregations;
private IExpression expression;
/**
@@ -62,7 +64,7 @@ public class AggregationExecutor {
public AggregationExecutor(AggregationPlan aggregationPlan) {
this.selectedSeries = aggregationPlan.getDeduplicatedPaths();
this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
- this.aggres = aggregationPlan.getDeduplicatedAggregations();
+ this.aggregations = aggregationPlan.getDeduplicatedAggregations();
this.expression = aggregationPlan.getExpression();
this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
}
@@ -80,39 +82,43 @@ public class AggregationExecutor {
timeFilter = ((GlobalTimeExpression) expression).getFilter();
}
- List<AggregateResult> aggregateResultList = new ArrayList<>();
//TODO use multi-thread
- for (int i = 0; i < selectedSeries.size(); i++) {
- AggregateResult aggregateResult = aggregateOneSeries(i, timeFilter, context);
- aggregateResultList.add(aggregateResult);
+ Map<Path, List<Integer>> seriesMap = mergeSameSeries(selectedSeries);
+ AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
+ for (Map.Entry<Path, List<Integer>> entry : seriesMap.entrySet()) {
+ List<AggregateResult> aggregateResults = aggregateOneSeries(entry, timeFilter, context);
+ int index = 0;
+ for (int i : entry.getValue()) {
+ aggregateResultList[i] = aggregateResults.get(index);
+ index++;
+ }
}
- return constructDataSet(aggregateResultList);
- }
+ return constructDataSet(Arrays.asList(aggregateResultList));
+ }
- /**
- * get aggregation result for one series
- */
- private AggregateResult aggregateOneSeries(int i, Filter timeFilter, QueryContext context)
+ private List<AggregateResult> aggregateOneSeries(Map.Entry<Path, List<Integer>> series,
+ Filter timeFilter, QueryContext context)
throws IOException, PlannerException, StorageEngineException {
-
- // construct AggregateResult
- TSDataType tsDataType = dataTypes.get(i);
- AggregateResult aggregateResult = AggreResultFactory
- .getAggrResultByName(aggres.get(i), tsDataType);
-
+ List<AggregateResult> aggregateResultList = new ArrayList<>();
+ Path seriesPath = series.getKey();
+ TSDataType tsDataType = dataTypes.get(series.getValue().get(0));
// construct series reader without value filter
- ISeriesReader seriesReader = new SeriesReader(
- selectedSeries.get(i), tsDataType, context,
+ ISeriesReader seriesReader = new SeriesReader(seriesPath, tsDataType, context,
QueryResourceManager.getInstance()
- .getQueryDataSource(selectedSeries.get(i), context, timeFilter), timeFilter, null);
+ .getQueryDataSource(seriesPath, context, timeFilter), timeFilter, null);
+ for (int i : series.getValue()) {
+ // construct AggregateResult
+ AggregateResult aggregateResult = AggreResultFactory
+ .getAggrResultByName(aggregations.get(i), tsDataType);
+ aggregateResultList.add(aggregateResult);
+ }
while (seriesReader.hasNextChunk()) {
if (seriesReader.canUseCurrentChunkStatistics()) {
Statistics chunkStatistics = seriesReader.currentChunkStatistics();
- aggregateResult.updateResultFromStatistics(chunkStatistics);
- if (aggregateResult.isCalculatedAggregationResult()) {
- return aggregateResult;
+ for (AggregateResult aggregateResult : aggregateResultList) {
+ aggregateResult.updateResultFromStatistics(chunkStatistics);
}
seriesReader.skipCurrentChunk();
continue;
@@ -121,23 +127,21 @@ public class AggregationExecutor {
//cal by pageheader
if (seriesReader.canUseCurrentPageStatistics()) {
Statistics pageStatistic = seriesReader.currentPageStatistics();
- aggregateResult.updateResultFromStatistics(pageStatistic);
- if (aggregateResult.isCalculatedAggregationResult()) {
- return aggregateResult;
+ for (AggregateResult aggregateResult : aggregateResultList) {
+ aggregateResult.updateResultFromStatistics(pageStatistic);
}
seriesReader.skipCurrentPage();
continue;
}
//cal by pagedata
while (seriesReader.hasNextOverlappedPage()) {
- aggregateResult.updateResultFromPageData(seriesReader.nextOverlappedPage());
- if (aggregateResult.isCalculatedAggregationResult()) {
- return aggregateResult;
+ for (AggregateResult aggregateResult : aggregateResultList) {
+ aggregateResult.updateResultFromPageData(seriesReader.nextOverlappedPage());
}
}
}
}
- return aggregateResult;
+ return aggregateResultList;
}
@@ -162,7 +166,7 @@ public class AggregationExecutor {
List<AggregateResult> aggregateResults = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
TSDataType type = dataTypes.get(i);
- AggregateResult result = AggreResultFactory.getAggrResultByName(aggres.get(i), type);
+ AggregateResult result = AggreResultFactory.getAggrResultByName(aggregations.get(i), type);
aggregateResults.add(result);
}
aggregateWithValueFilter(aggregateResults, timestampGenerator, readersOfSelectedSeries);
@@ -214,4 +218,19 @@ public class AggregationExecutor {
dataSet.setRecord(record);
return dataSet;
}
+
+ private Map<Path, List<Integer>> mergeSameSeries(List<Path> selectedSeries) {
+ Map<Path, List<Integer>> seriesMap = new HashMap<>();
+ for (int i = 0; i < selectedSeries.size(); i++) {
+ Path series = selectedSeries.get(i);
+ if (seriesMap.containsKey(series)) {
+ seriesMap.get(series).add(i);
+ } else {
+ List<Integer> indexList = new ArrayList<>();
+ indexList.add(i);
+ seriesMap.put(series, indexList);
+ }
+ }
+ return seriesMap;
+ }
}