You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/01 13:50:03 UTC
[incubator-iotdb] branch optimize_series_reader updated: new
groupby (#862)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch optimize_series_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/optimize_series_reader by this push:
new 34476e5 new groupby (#862)
34476e5 is described below
commit 34476e5dab28880cfe659aef01b346f35b0d11b4
Author: Dawei Liu <at...@163.com>
AuthorDate: Sun Mar 1 21:49:54 2020 +0800
new groupby (#862)
* new multithread groupby and GroupByExecutor
---
.../db/query/aggregation/AggregateResult.java | 9 +-
.../db/query/aggregation/impl/AvgAggrResult.java | 6 +
.../groupby/GroupByWithoutValueFilterDataSet.java | 376 +++++++++++----------
.../reader/series/SeriesRawDataBatchReader.java | 2 +-
4 files changed, 221 insertions(+), 172 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 4b88bca..5483bd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -74,11 +74,12 @@ public abstract class AggregateResult {
* @param dataInThisPage the data in Page
*/
public abstract void updateResultFromPageData(BatchData dataInThisPage) throws IOException;
+
/**
* Aggregate results cannot be calculated using Statistics directly, using the data in each page
*
* @param dataInThisPage the data in Page
- * @param bound calculate points whose time < bound
+ * @param bound calculate points whose time < bound
*/
public abstract void updateResultFromPageData(BatchData dataInThisPage, long bound)
throws IOException;
@@ -170,6 +171,12 @@ public abstract class AggregateResult {
public void reset() {
hasResult = false;
+ booleanValue = false;
+ doubleValue = 0;
+ floatValue = 0;
+ intValue = 0;
+ longValue = 0;
+ binaryValue = null;
}
protected Object getValue() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index 2c4fd7a..1e44444 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -143,4 +143,10 @@ public class AvgAggrResult extends AggregateResult {
return cnt;
}
+ @Override
+ public void reset() {
+ super.reset();
+ cnt = 0;
+ avg = 0;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index f756816..944b566 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -19,6 +19,14 @@
package org.apache.iotdb.db.query.dataset.groupby;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -27,35 +35,30 @@ import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
import org.apache.iotdb.db.query.reader.series.IAggregateReader;
import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.*;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
- /**
- * Merges same series to one map. For example: Given: paths: s1, s2, s3, s1 and aggregations:
- * count, sum, count, sum seriesMap: s1 -> 0, 3; s2 -> 1; s3 -> 2
- */
- private Map<Path, List<Integer>> pathToAggrIndexesMap;
+ private static final Logger logger = LoggerFactory
+ .getLogger(GroupByWithoutValueFilterDataSet.class);
- /**
- * Maps path and its aggregate reader
- */
- private Map<Path, IAggregateReader> aggregateReaders;
- private List<BatchData> cachedBatchDataList;
- private GroupByPlan groupByPlan;
+ private Map<Path, GroupByExecutor> pathExecutors = new HashMap<>();
+ private TimeRange timeRange;
/**
* constructor.
@@ -64,45 +67,33 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
throws StorageEngineException {
super(context, groupByPlan);
- this.pathToAggrIndexesMap = new HashMap<>();
- this.aggregateReaders = new HashMap<>();
- this.cachedBatchDataList = new ArrayList<>();
- for (int i = 0; i < paths.size(); i++) {
- cachedBatchDataList.add(null);
- }
initGroupBy(context, groupByPlan);
}
- /**
- * init reader and aggregate function.
- */
private void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
throws StorageEngineException {
IExpression expression = groupByPlan.getExpression();
- this.groupByPlan = groupByPlan;
Filter timeFilter = null;
- // init reader
if (expression != null) {
timeFilter = ((GlobalTimeExpression) expression).getFilter();
}
for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);
- List<Integer> indexList = pathToAggrIndexesMap
- .computeIfAbsent(path, key -> new ArrayList<>());
- indexList.add(i);
- if (!aggregateReaders.containsKey(path)) {
-
- QueryDataSource queryDataSource = QueryResourceManager.getInstance()
- .getQueryDataSource(path, context, timeFilter);
- // update filter by TTL
- timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
-
- IAggregateReader seriesReader = new SeriesAggregateReader(path, dataTypes.get(i), context,
- queryDataSource, timeFilter, null, null);
- aggregateReaders.put(path, seriesReader);
- }
+
+ QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+ .getQueryDataSource(path, context, timeFilter);
+ // update filter by TTL
+ timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+ //init reader
+ pathExecutors.putIfAbsent(path,
+ new GroupByExecutor(path, dataTypes.get(i), context, queryDataSource, timeFilter));
+
+ AggregateResult aggrResult = AggregateResultFactory
+ .getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(i),
+ dataTypes.get(i));
+ pathExecutors.get(path).addAggregateResult(aggrResult, i);
}
}
@@ -114,157 +105,202 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
}
hasCachedTimeInterval = false;
RowRecord record = new RowRecord(curStartTime);
- AggregateResult[] aggregateResultList = new AggregateResult[paths.size()];
- for (Map.Entry<Path, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
- List<AggregateResult> aggregateResults;
+ timeRange = new TimeRange(curStartTime, curEndTime - 1);
+
+ final AggregateResult[] fields = new AggregateResult[paths.size()];
+ final List<Future> asyncResult = new ArrayList(pathExecutors.size());
+
+ for (Entry<Path, GroupByExecutor> executorEntry : pathExecutors.entrySet()) {
+ asyncResult.add(QueryTaskPoolManager.getInstance().submit((Callable<?>) () -> {
+ GroupByExecutor executor = executorEntry.getValue();
+
+ executor.resetAggregateResults();
+ List<Pair<AggregateResult, Integer>> aggregations = executor.calcResult();
+ for (int i = 0; i < aggregations.size(); i++) {
+ fields[aggregations.get(i).right] = aggregations.get(i).left;
+ }
+ return null;
+ }));
+ }
+ //waiting for data
+ for (Future future : asyncResult) {
try {
- aggregateResults = nextIntervalAggregation(entry);
- } catch (QueryProcessException e) {
+ future.get();
+ } catch (Exception e) {
+ logger.error("GroupByWithoutValueFilterDataSet execute has error,{}", e);
throw new IOException(e);
}
- int index = 0;
- for (int i : entry.getValue()) {
- aggregateResultList[i] = aggregateResults.get(index);
- index++;
- }
}
- if (aggregateResultList.length == 0) {
- record.addField(new Field(null));
- } else {
- for (AggregateResult res : aggregateResultList) {
- record.addField(res.getResult(), res.getResultDataType());
+
+ for (AggregateResult res : fields) {
+ if (res == null) {
+ record.addField(new Field(null));
+ continue;
}
+ record.addField(res.getResult(), res.getResultDataType());
}
return record;
}
- /**
- * calculate the group by result of one series
- *
- * @param pathToAggrIndexes entry of path to aggregation indexes map
- */
- private List<AggregateResult> nextIntervalAggregation(Map.Entry<Path,
- List<Integer>> pathToAggrIndexes) throws IOException, QueryProcessException {
- List<AggregateResult> aggregateResultList = new ArrayList<>();
- List<Boolean> isCalculatedList = new ArrayList<>();
- List<Integer> indexList = pathToAggrIndexes.getValue();
-
- int remainingToCalculate = indexList.size();
- TSDataType tsDataType = groupByPlan.getDeduplicatedDataTypes().get(indexList.get(0));
-
- for (int index : indexList) {
- AggregateResult result = AggregateResultFactory
- .getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(index), tsDataType);
- aggregateResultList.add(result);
-
- BatchData lastBatch = cachedBatchDataList.get(index);
-
- calcBatchData(result, lastBatch);
- if (isEndCalc(result, lastBatch)) {
- isCalculatedList.add(true);
- remainingToCalculate--;
- if (remainingToCalculate == 0) {
- return aggregateResultList;
+
+ private class GroupByExecutor {
+
+ private IAggregateReader reader;
+ private BatchData preCachedData;
+ //<aggFunction - indexForRecord> of path
+ private List<Pair<AggregateResult, Integer>> results = new ArrayList<>();
+
+ public GroupByExecutor(Path path, TSDataType dataType, QueryContext context,
+ QueryDataSource dataSource, Filter timeFilter) {
+ this.reader = new SeriesAggregateReader(path, dataType, context,
+ dataSource, timeFilter, null, null);
+ this.preCachedData = null;
+ }
+
+ public IAggregateReader getReader() {
+ return reader;
+ }
+
+ public void addAggregateResult(AggregateResult aggrResult, int index) {
+ results.add(new Pair<>(aggrResult, index));
+ }
+
+ public boolean isEndCalc() {
+ for (Pair<AggregateResult, Integer> result : results) {
+ if (result.left.isCalculatedAggregationResult() == false) {
+ return false;
}
- } else {
- isCalculatedList.add(false);
}
+ return true;
}
- TimeRange timeRange = new TimeRange(curStartTime, curEndTime - 1);
- IAggregateReader reader = aggregateReaders.get(pathToAggrIndexes.getKey());
-
- while (reader.hasNextChunk()) {
- // cal by chunk statistics
- Statistics chunkStatistics = reader.currentChunkStatistics();
- if (chunkStatistics.getStartTime() >= curEndTime) {
- return aggregateResultList;
+
+ public boolean calcFromCacheData() throws IOException {
+ calcFromBatch(preCachedData);
+ // The result is calculated from the cache
+ if ((preCachedData != null && preCachedData.getMaxTimestamp() >= curEndTime) || isEndCalc()) {
+ return true;
}
- if (reader.canUseCurrentChunkStatistics() && timeRange.contains(
- new TimeRange(chunkStatistics.getStartTime(), chunkStatistics.getEndTime()))) {
- for (int i = 0; i < aggregateResultList.size(); i++) {
- if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
- AggregateResult result = aggregateResultList.get(i);
- result.updateResultFromStatistics(chunkStatistics);
- if (result.isCalculatedAggregationResult()) {
- isCalculatedList.set(i, true);
- remainingToCalculate--;
- if (remainingToCalculate == 0) {
- return aggregateResultList;
- }
- }
- }
- }
- reader.skipCurrentChunk();
- continue;
+ return false;
+ }
+
+ public void calcFromBatch(BatchData batchData) throws IOException {
+ // is error data
+ if (batchData == null
+ || !batchData.hasCurrent()
+ || batchData.getMaxTimestamp() < curStartTime
+ || batchData.currentTime() >= curEndTime) {
+ return;
}
- while (reader.hasNextPage()) {
- //cal by page statistics
- Statistics pageStatistics = reader.currentPageStatistics();
- if (pageStatistics.getStartTime() >= curEndTime) {
- return aggregateResultList;
+ for (Pair<AggregateResult, Integer> result : results) {
+ //current agg method has been calculated
+ if (result.left.isCalculatedAggregationResult()) {
+ continue;
}
- if (reader.canUseCurrentPageStatistics() && timeRange.contains(
- new TimeRange(pageStatistics.getStartTime(), pageStatistics.getEndTime()))) {
- for (int i = 0; i < aggregateResultList.size(); i++) {
- if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
- AggregateResult result = aggregateResultList.get(i);
- result.updateResultFromStatistics(pageStatistics);
- if (result.isCalculatedAggregationResult()) {
- isCalculatedList.set(i, true);
- remainingToCalculate--;
- if (remainingToCalculate == 0) {
- return aggregateResultList;
- }
- }
- }
- }
- reader.skipCurrentPage();
+ //lazy reset batch data for calculation
+ batchData.resetBatchData();
+ //skip points that cannot be calculated
+ while (batchData.currentTime() < curStartTime && batchData.hasCurrent()) {
+ batchData.next();
+ }
+ if (batchData.hasCurrent()) {
+ result.left.updateResultFromPageData(batchData, curEndTime);
+ }
+ }
+ //can calc for next interval
+ if (batchData.getMaxTimestamp() >= curEndTime) {
+ preCachedData = batchData;
+ }
+ }
+
+ public void calcFromStatistics(Statistics pageStatistics)
+ throws QueryProcessException {
+ for (Pair<AggregateResult, Integer> result : results) {
+ //cacl is compile
+ if (result.left.isCalculatedAggregationResult()) {
continue;
- } else {
- BatchData batchData = reader.nextPage();
- for (int i = 0; i < aggregateResultList.size(); i++) {
- if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
- AggregateResult result = aggregateResultList.get(i);
- calcBatchData(result, batchData);
- int idx = pathToAggrIndexes.getValue().get(i);
- if (batchData.hasCurrent()) {
- cachedBatchDataList.set(idx, batchData);
- }
- if (isEndCalc(result, null)) {
- isCalculatedList.set(i, true);
- remainingToCalculate--;
- if (remainingToCalculate == 0) {
- break;
- }
- }
- }
- }
}
+ result.left.updateResultFromStatistics(pageStatistics);
}
}
- return aggregateResultList;
- }
- private boolean isEndCalc(AggregateResult function, BatchData lastBatch) {
- return (lastBatch != null && lastBatch.hasCurrent() && lastBatch.currentTime() >= curEndTime)
- || function.isCalculatedAggregationResult();
- }
+ private List<Pair<AggregateResult, Integer>> calcResult()
+ throws IOException, QueryProcessException {
+ if (calcFromCacheData()) {
+ return results;
+ }
- /**
- * this batchData >= curEndTime
- */
- private void calcBatchData(AggregateResult result, BatchData batchData) throws IOException {
- if (batchData == null || !batchData.hasCurrent()) {
- return;
+ //read page data firstly
+ if (readAndCalcFromPage()) {
+ return results;
+ }
+
+ //read chunk finally
+ while (reader.hasNextChunk()) {
+ Statistics chunkStatistics = reader.currentChunkStatistics();
+ if (chunkStatistics.getStartTime() >= curEndTime) {
+ return results;
+ }
+ //calc from chunkMetaData
+ if (reader.canUseCurrentChunkStatistics() && timeRange.contains(
+ new TimeRange(chunkStatistics.getStartTime(), chunkStatistics.getEndTime()))) {
+ calcFromStatistics(chunkStatistics);
+ reader.skipCurrentChunk();
+ continue;
+ }
+ if (readAndCalcFromPage()) {
+ return results;
+ }
+ }
+ return results;
}
- while (batchData.hasCurrent() && batchData.currentTime() < curStartTime) {
- batchData.next();
+
+ // clear all results
+ public void resetAggregateResults() {
+ for (Pair<AggregateResult, Integer> result : results) {
+ result.left.reset();
+ }
}
- if (batchData.hasCurrent()) {
- result.updateResultFromPageData(batchData, curEndTime);
- // reset batch data for next calculation
- batchData.resetBatchData();
+
+
+ private boolean readAndCalcFromPage() throws IOException, QueryProcessException {
+ while (reader.hasNextPage()) {
+ Statistics pageStatistics = reader.currentPageStatistics();
+ //must be non overlapped page
+ if (pageStatistics != null) {
+ //current page max than time range
+ if (pageStatistics.getStartTime() >= curEndTime) {
+ return true;
+ }
+ //can use pageHeader
+ if (reader.canUseCurrentPageStatistics() && timeRange.contains(
+ new TimeRange(pageStatistics.getStartTime(), pageStatistics.getEndTime()))) {
+ calcFromStatistics(pageStatistics);
+ reader.skipCurrentPage();
+ if (isEndCalc()) {
+ return true;
+ }
+ continue;
+ }
+ }
+ // calc from page data
+ BatchData batchData = reader.nextPage();
+ if (batchData == null || !batchData.hasCurrent()) {
+ continue;
+ }
+ // stop calc and cached current batchData
+ if (batchData.currentTime() >= curEndTime) {
+ preCachedData = batchData;
+ return true;
+ }
+
+ calcFromBatch(batchData);
+ if (isEndCalc() || batchData.currentTime() >= curEndTime) {
+ return true;
+ }
+ }
+ return false;
}
}
+
}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
index 19f3810..9e5e654 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
@@ -71,7 +71,7 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
}
/*
- * consume page data secondly
+ * consume page data firstly
*/
if (readPageData()) {
return hasCachedBatchData = true;