You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/09/16 09:07:42 UTC
[iotdb] 02/05: move querycontext to class parameter
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch aggrVector2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0cc1700716e4693090ddf652c7aef3e8a5c6c71d
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Sep 16 13:15:09 2021 +0800
move querycontext to class parameter
---
.../iotdb/cluster/query/ClusterQueryRouter.java | 5 +-
.../query/aggregate/ClusterAggregateExecutor.java | 30 ++++---
.../query/ClusterAggregateExecutorTest.java | 8 +-
.../db/query/executor/AggregationExecutor.java | 96 +++++++++++-----------
.../iotdb/db/query/executor/QueryRouter.java | 11 +--
5 files changed, 79 insertions(+), 71 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
index e3be92c..eb45353 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
@@ -78,8 +78,9 @@ public class ClusterQueryRouter extends QueryRouter {
}
@Override
- protected AggregationExecutor getAggregationExecutor(AggregationPlan aggregationPlan) {
- return new ClusterAggregateExecutor(aggregationPlan, metaGroupMember);
+ protected AggregationExecutor getAggregationExecutor(
+ QueryContext context, AggregationPlan aggregationPlan) {
+ return new ClusterAggregateExecutor(context, aggregationPlan, metaGroupMember);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
index a007ee9..af12e5a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
public class ClusterAggregateExecutor extends AggregationExecutor {
@@ -51,8 +50,9 @@ public class ClusterAggregateExecutor extends AggregationExecutor {
*
* @param aggregationPlan
*/
- public ClusterAggregateExecutor(AggregationPlan aggregationPlan, MetaGroupMember metaMember) {
- super(aggregationPlan);
+ public ClusterAggregateExecutor(
+ QueryContext context, AggregationPlan aggregationPlan, MetaGroupMember metaMember) {
+ super(context, aggregationPlan);
this.metaMember = metaMember;
this.readerFactory = new ClusterReaderFactory(metaMember);
this.aggregator = new ClusterAggregator(metaMember);
@@ -60,24 +60,28 @@ public class ClusterAggregateExecutor extends AggregationExecutor {
@Override
protected void aggregateOneSeries(
- Map.Entry<PartialPath, List<Integer>> pathToAggrIndexes,
- AggregateResult[] aggregateResultList,
- Set<String> measurements,
- Filter timeFilter,
- QueryContext context)
+ PartialPath seriesPath,
+ List<Integer> indexes,
+ Set<String> allMeasurementsInDevice,
+ Filter timeFilter)
throws StorageEngineException {
- PartialPath seriesPath = pathToAggrIndexes.getKey();
- TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0));
+ TSDataType tsDataType = dataTypes.get(indexes.get(0));
List<String> aggregationNames = new ArrayList<>();
- for (int i : pathToAggrIndexes.getValue()) {
+ for (int i : indexes) {
aggregationNames.add(aggregations.get(i));
}
List<AggregateResult> aggregateResult =
aggregator.getAggregateResult(
- seriesPath, measurements, aggregationNames, tsDataType, timeFilter, context, ascending);
+ seriesPath,
+ allMeasurementsInDevice,
+ aggregationNames,
+ tsDataType,
+ timeFilter,
+ context,
+ ascending);
int rstIndex = 0;
- for (int i : pathToAggrIndexes.getValue()) {
+ for (int i : indexes) {
aggregateResultList[i] = aggregateResult.get(rstIndex++);
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
index 7d18a00..ffa461e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
@@ -87,8 +87,8 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest {
QueryContext context =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
try {
- executor = new ClusterAggregateExecutor(plan, testMetaMember);
- QueryDataSet queryDataSet = executor.executeWithoutValueFilter(context, plan);
+ executor = new ClusterAggregateExecutor(context, plan, testMetaMember);
+ QueryDataSet queryDataSet = executor.executeWithoutValueFilter(plan);
assertTrue(queryDataSet.hasNext());
RowRecord record = queryDataSet.next();
List<Field> fields = record.getFields();
@@ -144,8 +144,8 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest {
QueryContext context =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
try {
- executor = new ClusterAggregateExecutor(plan, testMetaMember);
- QueryDataSet queryDataSet = executor.executeWithValueFilter(context, plan);
+ executor = new ClusterAggregateExecutor(context, plan, testMetaMember);
+ QueryDataSet queryDataSet = executor.executeWithValueFilter(plan);
assertTrue(queryDataSet.hasNext());
RowRecord record = queryDataSet.next();
List<Field> fields = record.getFields();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index bc970ff..871915a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -73,26 +73,25 @@ public class AggregationExecutor {
protected List<String> aggregations;
protected IExpression expression;
protected boolean ascending;
+ protected QueryContext context;
+ protected AggregateResult[] aggregateResultList;
/** aggregation batch calculation size. */
private int aggregateFetchSize;
- protected AggregationExecutor(AggregationPlan aggregationPlan) {
+ protected AggregationExecutor(QueryContext context, AggregationPlan aggregationPlan) {
this.selectedSeries = aggregationPlan.getDeduplicatedPaths();
this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
this.aggregations = aggregationPlan.getDeduplicatedAggregations();
this.expression = aggregationPlan.getExpression();
this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
this.ascending = aggregationPlan.isAscending();
+ this.context = context;
+ this.aggregateResultList = new AggregateResult[selectedSeries.size()];
}
- /**
- * execute aggregate function with only time filter or no filter.
- *
- * @param context query context
- */
- public QueryDataSet executeWithoutValueFilter(
- QueryContext context, AggregationPlan aggregationPlan)
+ /** execute aggregate function with only time filter or no filter. */
+ public QueryDataSet executeWithoutValueFilter(AggregationPlan aggregationPlan)
throws StorageEngineException, IOException, QueryProcessException {
Filter timeFilter = null;
@@ -103,22 +102,30 @@ public class AggregationExecutor {
// TODO use multi-thread
Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
groupAggregationsBySeries(selectedSeries);
- // Attention: this method will REMOVE vector path from pathToAggrIndexesMap
- Map<PartialPath, Map<String, List<Integer>>> vectorPathIndexesMap =
- groupVectorSeries(pathToAggrIndexesMap);
- AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
-
// TODO-Cluster: group the paths by storage group to reduce communications
List<StorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
+
+ // Attention: this method will REMOVE vector path from pathToAggrIndexesMap
+ Map<PartialPath, Map<String, List<Integer>>> vectorPathIndexesMap =
+ groupVectorSeries(pathToAggrIndexesMap);
try {
for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
+ PartialPath seriesPath = entry.getKey();
aggregateOneSeries(
- entry,
- aggregateResultList,
- aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()),
- timeFilter,
- context);
+ seriesPath,
+ entry.getValue(),
+ aggregationPlan.getAllMeasurementsInDevice(seriesPath.getDevice()),
+ timeFilter);
+ }
+ for (Map.Entry<PartialPath, Map<String, List<Integer>>> entry :
+ vectorPathIndexesMap.entrySet()) {
+ VectorPartialPath vectorSeries = (VectorPartialPath) entry.getKey();
+ aggregateOneVectorSeries(
+ vectorSeries,
+ entry.getValue(),
+ aggregationPlan.getAllMeasurementsInDevice(vectorSeries.getDevice()),
+ timeFilter);
}
} finally {
StorageEngine.getInstance().mergeUnLock(list);
@@ -130,25 +137,21 @@ public class AggregationExecutor {
/**
* get aggregation result for one series
*
- * @param pathToAggrIndexes entry of path to aggregation indexes map
* @param timeFilter time filter
- * @param context query context
*/
protected void aggregateOneSeries(
- Map.Entry<PartialPath, List<Integer>> pathToAggrIndexes,
- AggregateResult[] aggregateResultList,
- Set<String> measurements,
- Filter timeFilter,
- QueryContext context)
+ PartialPath seriesPath,
+ List<Integer> indexes,
+ Set<String> allMeasurementsInDevice,
+ Filter timeFilter)
throws IOException, QueryProcessException, StorageEngineException {
List<AggregateResult> ascAggregateResultList = new ArrayList<>();
List<AggregateResult> descAggregateResultList = new ArrayList<>();
boolean[] isAsc = new boolean[aggregateResultList.length];
- PartialPath seriesPath = pathToAggrIndexes.getKey();
- TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0));
+ TSDataType tsDataType = dataTypes.get(indexes.get(0));
- for (int i : pathToAggrIndexes.getValue()) {
+ for (int i : indexes) {
// construct AggregateResult
AggregateResult aggregateResult =
AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType);
@@ -161,7 +164,7 @@ public class AggregationExecutor {
}
aggregateOneSeries(
seriesPath,
- measurements,
+ allMeasurementsInDevice,
context,
timeFilter,
tsDataType,
@@ -171,7 +174,7 @@ public class AggregationExecutor {
int ascIndex = 0;
int descIndex = 0;
- for (int i : pathToAggrIndexes.getValue()) {
+ for (int i : indexes) {
aggregateResultList[i] =
isAsc[i]
? ascAggregateResultList.get(ascIndex++)
@@ -179,6 +182,13 @@ public class AggregationExecutor {
}
}
+ protected void aggregateOneVectorSeries(
+ PartialPath seriesPath,
+ Map<String, List<Integer>> subIndexes,
+ Set<String> allMeasurementsInDevice,
+ Filter timeFilter)
+ throws IOException, QueryProcessException, StorageEngineException {}
+
@SuppressWarnings("squid:S107")
public static void aggregateOneSeries(
PartialPath seriesPath,
@@ -338,12 +348,8 @@ public class AggregationExecutor {
return remainingToCalculate;
}
- /**
- * execute aggregate function with value filter.
- *
- * @param context query context.
- */
- public QueryDataSet executeWithValueFilter(QueryContext context, AggregationPlan queryPlan)
+ /** execute aggregate function with value filter. */
+ public QueryDataSet executeWithValueFilter(AggregationPlan queryPlan)
throws StorageEngineException, IOException, QueryProcessException {
optimizeLastElementFunc(queryPlan);
@@ -367,15 +373,13 @@ public class AggregationExecutor {
StorageEngine.getInstance().mergeUnLock(list);
}
- List<AggregateResult> aggregateResults = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
- AggregateResult result =
+ aggregateResultList[i] =
AggregateResultFactory.getAggrResultByName(
aggregations.get(i), dataTypes.get(i), ascending);
- aggregateResults.add(result);
}
- aggregateWithValueFilter(aggregateResults, timestampGenerator, readerToAggrIndexesMap);
- return constructDataSet(aggregateResults, queryPlan);
+ aggregateWithValueFilter(timestampGenerator, readerToAggrIndexesMap);
+ return constructDataSet(Arrays.asList(aggregateResultList), queryPlan);
}
private void optimizeLastElementFunc(QueryPlan queryPlan) {
@@ -413,7 +417,6 @@ public class AggregationExecutor {
/** calculate aggregation result with value filter. */
private void aggregateWithValueFilter(
- List<AggregateResult> aggregateResults,
TimeGenerator timestampGenerator,
Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
throws IOException {
@@ -440,17 +443,16 @@ public class AggregationExecutor {
if (cached.get(pathId)) {
Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
for (Integer i : entry.getValue()) {
- aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+ aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
}
} else {
if (entry.getValue().size() == 1) {
- aggregateResults
- .get(entry.getValue().get(0))
- .updateResultUsingTimestamps(timeArray, timeArrayLength, entry.getKey());
+ aggregateResultList[entry.getValue().get(0)].updateResultUsingTimestamps(
+ timeArray, timeArrayLength, entry.getKey());
} else {
Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
for (Integer i : entry.getValue()) {
- aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+ aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index d2ae91f..a134d8e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -139,22 +139,23 @@ public class QueryRouter implements IQueryRouter {
aggregationPlan.setExpression(optimizedExpression);
- AggregationExecutor engineExecutor = getAggregationExecutor(aggregationPlan);
+ AggregationExecutor engineExecutor = getAggregationExecutor(context, aggregationPlan);
QueryDataSet dataSet = null;
if (optimizedExpression != null
&& optimizedExpression.getType() != ExpressionType.GLOBAL_TIME) {
- dataSet = engineExecutor.executeWithValueFilter(context, aggregationPlan);
+ dataSet = engineExecutor.executeWithValueFilter(aggregationPlan);
} else {
- dataSet = engineExecutor.executeWithoutValueFilter(context, aggregationPlan);
+ dataSet = engineExecutor.executeWithoutValueFilter(aggregationPlan);
}
return dataSet;
}
- protected AggregationExecutor getAggregationExecutor(AggregationPlan aggregationPlan) {
- return new AggregationExecutor(aggregationPlan);
+ protected AggregationExecutor getAggregationExecutor(
+ QueryContext context, AggregationPlan aggregationPlan) {
+ return new AggregationExecutor(context, aggregationPlan);
}
@Override