You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/02/28 09:30:31 UTC
[incubator-iotdb] 01/01: parallel aggregation query
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch parallel_aggregation
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 3fb340e1408cd6371426f3d58b1d2d4d74144b79
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Feb 28 17:30:07 2020 +0800
parallel aggregation query
---
.../db/query/executor/AggregationExecutor.java | 237 ++++++++++++---------
.../db/query/reader/series/IAggregateReader.java | 3 +
.../query/reader/series/SeriesAggregateReader.java | 5 +
.../iotdb/db/query/reader/series/SeriesReader.java | 5 +
.../iotdb/db/integration/IoTDBAggregationIT.java | 10 +-
.../integration/IoTDBAggregationSmallDataIT.java | 2 +-
6 files changed, 161 insertions(+), 101 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index c39a392..a5f56c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -25,6 +25,9 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -36,6 +39,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.SingleDataSet;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.IAggregateReader;
import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
@@ -50,6 +54,9 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AggregationExecutor {
@@ -58,6 +65,8 @@ public class AggregationExecutor {
private List<String> aggregations;
private IExpression expression;
+ private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
+
/**
* aggregation batch calculation size.
**/
@@ -72,131 +81,169 @@ public class AggregationExecutor {
}
/**
- * execute aggregate function with only time filter or no filter.
- *
- * @param context query context
+ * Aggregate one series
*/
- public QueryDataSet executeWithoutValueFilter(QueryContext context)
- throws StorageEngineException, IOException, QueryProcessException {
+ class AggregationTask implements Callable<Pair<Path, List<AggregateResult>>> {
- Filter timeFilter = null;
- if (expression != null) {
- timeFilter = ((GlobalTimeExpression) expression).getFilter();
+ // path to aggregation result indexes
+ private Map.Entry<Path, List<Integer>> pathToAggrIndexes;
+ private IAggregateReader reader;
+
+ public AggregationTask(IAggregateReader reader, Map.Entry<Path, List<Integer>> pathToAggrIndexes) {
+ this.reader = reader;
+ this.pathToAggrIndexes = pathToAggrIndexes;
}
- // TODO use multi-thread
- Map<Path, List<Integer>> pathToAggrIndexesMap = groupAggregationsBySeries(selectedSeries);
- AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
- for (Map.Entry<Path, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
- List<AggregateResult> aggregateResults = aggregateOneSeries(entry, timeFilter, context);
- int index = 0;
- for (int i : entry.getValue()) {
- aggregateResultList[i] = aggregateResults.get(index);
- index++;
- }
+ @Override
+ public Pair<Path, List<AggregateResult>> call() throws QueryProcessException, IOException {
+ return aggregateOneSeries();
}
- return constructDataSet(Arrays.asList(aggregateResultList));
- }
+ /**
+ * get aggregation result for one series
+ *
+ * @return AggregateResult list
+ */
+ private Pair<Path, List<AggregateResult>> aggregateOneSeries()
+ throws IOException, QueryProcessException {
+ List<AggregateResult> aggregateResultList = new ArrayList<>();
+ List<Boolean> isCalculatedList = new ArrayList<>();
+ Path seriesPath = pathToAggrIndexes.getKey();
- /**
- * get aggregation result for one series
- *
- * @param pathToAggrIndexes entry of path to aggregation indexes map
- * @param timeFilter time filter
- * @param context query context
- * @return AggregateResult list
- */
- private List<AggregateResult> aggregateOneSeries(
- Map.Entry<Path, List<Integer>> pathToAggrIndexes,
- Filter timeFilter, QueryContext context)
- throws IOException, QueryProcessException, StorageEngineException {
- List<AggregateResult> aggregateResultList = new ArrayList<>();
- List<Boolean> isCalculatedList = new ArrayList<>();
- Path seriesPath = pathToAggrIndexes.getKey();
- TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0));
-
- // construct series reader without value filter
- QueryDataSource queryDataSource = QueryResourceManager.getInstance()
- .getQueryDataSource(seriesPath, context, timeFilter);
- // update filter by TTL
- timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
-
- IAggregateReader seriesReader = new SeriesAggregateReader(pathToAggrIndexes.getKey(),
- tsDataType, context, queryDataSource, timeFilter, null, null);
-
- for (int i : pathToAggrIndexes.getValue()) {
- // construct AggregateResult
- AggregateResult aggregateResult = AggregateResultFactory
- .getAggrResultByName(aggregations.get(i), tsDataType);
- aggregateResultList.add(aggregateResult);
- isCalculatedList.add(false);
- }
- int remainingToCalculate = pathToAggrIndexes.getValue().size();
-
- while (seriesReader.hasNextChunk()) {
- // cal by chunk statistics
- if (seriesReader.canUseCurrentChunkStatistics()) {
- Statistics chunkStatistics = seriesReader.currentChunkStatistics();
- for (int i = 0; i < aggregateResultList.size(); i++) {
- if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
- AggregateResult aggregateResult = aggregateResultList.get(i);
- aggregateResult.updateResultFromStatistics(chunkStatistics);
- if (aggregateResult.isCalculatedAggregationResult()) {
- isCalculatedList.set(i, true);
- remainingToCalculate--;
- if (remainingToCalculate == 0) {
- return aggregateResultList;
- }
- }
- }
- }
- seriesReader.skipCurrentChunk();
- continue;
+ for (int i : pathToAggrIndexes.getValue()) {
+ // construct AggregateResult
+ AggregateResult aggregateResult = AggregateResultFactory
+ .getAggrResultByName(aggregations.get(i), reader.getSeriesDataType());
+ aggregateResultList.add(aggregateResult);
+ isCalculatedList.add(false);
}
- while (seriesReader.hasNextPage()) {
- //cal by page statistics
- if (seriesReader.canUseCurrentPageStatistics()) {
- Statistics pageStatistic = seriesReader.currentPageStatistics();
+ int remainingToCalculate = pathToAggrIndexes.getValue().size();
+
+ while (reader.hasNextChunk()) {
+ // cal by chunk statistics
+ if (reader.canUseCurrentChunkStatistics()) {
+ Statistics chunkStatistics = reader.currentChunkStatistics();
for (int i = 0; i < aggregateResultList.size(); i++) {
if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
AggregateResult aggregateResult = aggregateResultList.get(i);
- aggregateResult.updateResultFromStatistics(pageStatistic);
+ aggregateResult.updateResultFromStatistics(chunkStatistics);
if (aggregateResult.isCalculatedAggregationResult()) {
isCalculatedList.set(i, true);
remainingToCalculate--;
if (remainingToCalculate == 0) {
- return aggregateResultList;
+ return new Pair<>(seriesPath, aggregateResultList);
}
}
}
}
- seriesReader.skipCurrentPage();
+ reader.skipCurrentChunk();
continue;
}
- // cal by page data
- while (seriesReader.hasNextOverlappedPage()) {
- BatchData nextOverlappedPageData = seriesReader.nextOverlappedPage();
- for (int i = 0; i < aggregateResultList.size(); i++) {
- if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
- AggregateResult aggregateResult = aggregateResultList.get(i);
- aggregateResult.updateResultFromPageData(nextOverlappedPageData);
- nextOverlappedPageData.resetBatchData();
- if (aggregateResult.isCalculatedAggregationResult()) {
- isCalculatedList.set(i, true);
- remainingToCalculate--;
- if (remainingToCalculate == 0) {
- return aggregateResultList;
+ while (reader.hasNextPage()) {
+ //cal by page statistics
+ if (reader.canUseCurrentPageStatistics()) {
+ Statistics pageStatistic = reader.currentPageStatistics();
+ for (int i = 0; i < aggregateResultList.size(); i++) {
+ if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
+ AggregateResult aggregateResult = aggregateResultList.get(i);
+ aggregateResult.updateResultFromStatistics(pageStatistic);
+ if (aggregateResult.isCalculatedAggregationResult()) {
+ isCalculatedList.set(i, true);
+ remainingToCalculate--;
+ if (remainingToCalculate == 0) {
+ return new Pair<>(seriesPath, aggregateResultList);
+ }
+ }
+ }
+ }
+ reader.skipCurrentPage();
+ continue;
+ }
+ // cal by page data
+ while (reader.hasNextOverlappedPage()) {
+ BatchData nextOverlappedPageData = reader.nextOverlappedPage();
+ for (int i = 0; i < aggregateResultList.size(); i++) {
+ if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
+ AggregateResult aggregateResult = aggregateResultList.get(i);
+ aggregateResult.updateResultFromPageData(nextOverlappedPageData);
+ nextOverlappedPageData.resetBatchData();
+ if (aggregateResult.isCalculatedAggregationResult()) {
+ isCalculatedList.set(i, true);
+ remainingToCalculate--;
+ if (remainingToCalculate == 0) {
+ return new Pair<>(seriesPath, aggregateResultList);
+ }
}
}
}
}
}
}
+ return new Pair<>(seriesPath, aggregateResultList);
+ }
+ }
+
+ /**
+ * execute aggregate function with only time filter or no filter.
+ *
+ * @param context query context
+ */
+ public QueryDataSet executeWithoutValueFilter(QueryContext context)
+ throws StorageEngineException, QueryProcessException {
+
+ Filter timeFilter = null;
+ if (expression != null) {
+ timeFilter = ((GlobalTimeExpression) expression).getFilter();
+ }
+
+ AggregateResult[] finalAggregateResults = new AggregateResult[selectedSeries.size()];
+ Map<Path, List<Integer>> pathToAggrIndexesMap = groupAggregationsBySeries(selectedSeries);
+
+ /*
+ * submit AggregationTask for each series
+ */
+ List<Future<Pair<Path, List<AggregateResult>>>> futureList = new ArrayList<>();
+ for (Map.Entry<Path, List<Integer>> pathToAggrIndex : pathToAggrIndexesMap.entrySet()) {
+
+ Path seriesPath = pathToAggrIndex.getKey();
+ // construct series reader without value filter
+ QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+ .getQueryDataSource(seriesPath, context, timeFilter);
+ // update filter by TTL
+ timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+ TSDataType tsDataType = dataTypes.get(pathToAggrIndex.getValue().get(0));
+
+ IAggregateReader seriesReader = new SeriesAggregateReader(pathToAggrIndex.getKey(),
+ tsDataType, context, queryDataSource, timeFilter, null, null);
+
+ Future<Pair<Path, List<AggregateResult>>> future = pool
+ .submit(new AggregationTask(seriesReader, pathToAggrIndex));
+ futureList.add(future);
+ }
+
+ /*
+ * get AggregateResults for each series and put to final finalAggregateResults
+ */
+ for (Future<Pair<Path, List<AggregateResult>>> future: futureList) {
+ try {
+ Pair<Path, List<AggregateResult>> currentSeriesResults = future.get();
+ // final result index of current series
+ List<Integer> resultIndexList = pathToAggrIndexesMap.get(currentSeriesResults.left);
+ List<AggregateResult> resultList = currentSeriesResults.right;
+
+ // put current series results to final finalAggregateResults
+ for (int i = 0; i < resultIndexList.size(); i++) {
+ finalAggregateResults[resultIndexList.get(i)] = resultList.get(i);
+ }
+ } catch (Exception e) {
+ throw new QueryProcessException(e.getMessage());
+ }
}
- return aggregateResultList;
+ return constructDataSet(Arrays.asList(finalAggregateResults));
}
+
/**
* execute aggregate function with value filter.
*
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
index bd22f3e..58e6894 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.query.reader.series;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -50,4 +51,6 @@ public interface IAggregateReader {
boolean hasNextOverlappedPage() throws IOException;
BatchData nextOverlappedPage() throws IOException;
+
+ TSDataType getSeriesDataType();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
index d711f02..7081c6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
@@ -93,6 +93,11 @@ public class SeriesAggregateReader implements IAggregateReader {
return seriesReader.nextOverlappedPage();
}
+ @Override
+ public TSDataType getSeriesDataType() {
+ return seriesReader.getSeriesDataType();
+ }
+
private boolean containedByTimeFilter(Statistics statistics) {
Filter timeFilter = seriesReader.getTimeFilter();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 8b1f795..c08fcdc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -53,6 +53,7 @@ import java.util.*;
public class SeriesReader {
private final Path seriesPath;
+
private final TSDataType dataType;
private final QueryContext context;
private final Filter timeFilter;
@@ -456,6 +457,10 @@ public class SeriesReader {
return timeFilter;
}
+ public TSDataType getSeriesDataType() {
+ return dataType;
+ }
+
private class VersionPair<T> {
protected long version;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
index da6d3bd..4d1d237 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
@@ -463,7 +463,7 @@ public class IoTDBAggregationIT {
}
@Test
- public void avgSumErrorTest() throws SQLException {
+ public void avgSumErrorTest() {
try (Connection connection = DriverManager.
getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
@@ -474,7 +474,7 @@ public class IoTDBAggregationIT {
resultSet.next();
fail();
} catch (Exception e) {
- Assert.assertEquals("500: Unsupported data type in aggregation AVG : TEXT", e.getMessage());
+ Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation AVG : TEXT"));
}
try {
statement.execute("SELECT sum(s3)" +
@@ -483,7 +483,7 @@ public class IoTDBAggregationIT {
resultSet.next();
fail();
} catch (Exception e) {
- Assert.assertEquals("500: Unsupported data type in aggregation SUM : TEXT", e.getMessage());
+ Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation SUM : TEXT"));
}
try {
statement.execute("SELECT avg(s4)" +
@@ -492,7 +492,7 @@ public class IoTDBAggregationIT {
resultSet.next();
fail();
} catch (Exception e) {
- Assert.assertEquals("500: Unsupported data type in aggregation AVG : BOOLEAN", e.getMessage());
+ Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation AVG : BOOLEAN"));
}
try {
statement.execute("SELECT sum(s4)" +
@@ -501,7 +501,7 @@ public class IoTDBAggregationIT {
resultSet.next();
fail();
} catch (Exception e) {
- Assert.assertEquals("500: Unsupported data type in aggregation SUM : BOOLEAN", e.getMessage());
+ Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation SUM : BOOLEAN"));
}
} catch (Exception e) {
e.printStackTrace();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java
index 05fbcb1..59fef21 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java
@@ -205,7 +205,7 @@ public class IoTDBAggregationSmallDataIT {
"SELECT max_value(d0.s0),max_value(d1.s1),max_value(d0.s3) FROM root.vehicle");
fail();
} catch (IoTDBSQLException e) {
- Assert.assertEquals("500: Binary statistics does not support: max", e.getMessage());
+ Assert.assertTrue(e.getMessage().contains("Binary statistics does not support: max"));
}
hasResultSet = statement.execute(