You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 01:51:29 UTC
[incubator-iotdb] 02/11: add aggre feature without timegenerator
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit fe9937a8cfe6db48dd4b4d2cff31b10ebbc91f77
Author: lta <li...@163.com>
AuthorDate: Thu May 16 18:41:26 2019 +0800
add aggre feature without timegenerator
---
.../executor/ClusterAggregateEngineExecutor.java | 114 ++++++++++++++++
.../cluster/query/executor/ClusterQueryRouter.java | 29 +++-
.../querynode/ClusterLocalSingleQueryManager.java | 85 ++++++++++--
.../query/utils/QueryPlanPartitionUtils.java | 146 +++++++++++++++------
.../db/qp/executor/IQueryProcessExecutor.java | 4 +-
.../db/query/executor/AggregateEngineExecutor.java | 75 +++++++----
.../iotdb/db/query/executor/EngineQueryRouter.java | 4 +-
7 files changed, 378 insertions(+), 79 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
new file mode 100644
index 0000000..b63b311
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.executor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc;
+import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader;
+import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
+import org.apache.iotdb.db.query.executor.AggregateEngineExecutor;
+import org.apache.iotdb.db.query.factory.AggreFuncFactory;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
+
+ private ClusterRpcSingleQueryManager queryManager;
+
+ public ClusterAggregateEngineExecutor(List<Path> selectedSeries, List<String> aggres,
+ IExpression expression, ClusterRpcSingleQueryManager queryManager) {
+ super(selectedSeries, aggres, expression);
+ this.queryManager = queryManager;
+ }
+
+ @Override
+ public QueryDataSet executeWithoutTimeGenerator(QueryContext context)
+ throws FileNodeManagerException, IOException, PathErrorException, ProcessorException {
+ Filter timeFilter = expression != null ? ((GlobalTimeExpression) expression).getFilter() : null;
+ Map<Path, ClusterSelectSeriesReader> selectPathReaders = queryManager.getSelectSeriesReaders();
+
+ List<Path> paths = new ArrayList<>();
+ List<IPointReader> readers = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (int i = 0; i < selectedSeries.size(); i++) {
+ Path path = selectedSeries.get(i);
+
+ if (selectPathReaders.containsKey(path)) {
+ ClusterSelectSeriesReader reader = selectPathReaders.get(path);
+ readers.add(reader);
+ dataTypes.add(reader.getDataType());
+ } else {
+ paths.add(path);
+ // construct AggregateFunction
+ TSDataType tsDataType = MManager.getInstance()
+ .getSeriesType(selectedSeries.get(i).getFullPath());
+ AggregateFunction function = AggreFuncFactory.getAggrFuncByName(aggres.get(i), tsDataType);
+ function.init();
+
+ QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+ .getQueryDataSource(selectedSeries.get(i), context);
+
+ // sequence reader for sealed tsfile, unsealed tsfile, memory
+ SequenceDataReader sequenceReader;
+ if (function instanceof MaxTimeAggrFunc || function instanceof LastAggrFunc) {
+ sequenceReader = new SequenceDataReader(queryDataSource.getSeqDataSource(), timeFilter,
+ context, true);
+ } else {
+ sequenceReader = new SequenceDataReader(queryDataSource.getSeqDataSource(), timeFilter,
+ context, false);
+ }
+
+ // unseq reader for all chunk groups in unSeqFile, memory
+ PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
+ .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
+
+ AggreResultData aggreResultData = aggregateWithoutTimeGenerator(function,
+ sequenceReader, unSeqMergeReader, timeFilter);
+ readers.add(new AggreResultDataPointReader(aggreResultData));
+ }
+ }
+ QueryResourceManager.getInstance()
+ .beginQueryOfGivenQueryPaths(context.getJobId(), paths);
+
+ return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, readers);
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index 2fa4576..672ca9d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.executor.FillEngineExecutor;
+import org.apache.iotdb.db.query.executor.AggregateEngineExecutor;
import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -95,7 +95,32 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
public QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
IExpression expression, QueryContext context)
throws QueryFilterOptimizationException, FileNodeManagerException, IOException, PathErrorException, ProcessorException {
- throw new UnsupportedOperationException();
+
+ ClusterRpcSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance()
+ .getSingleQuery(context.getJobId());
+
+ try {
+ if (expression != null) {
+ IExpression optimizedExpression = ExpressionOptimizer.getInstance()
+ .optimize(expression, selectedSeries);
+ AggregateEngineExecutor engineExecutor = new ClusterAggregateEngineExecutor(
+ selectedSeries, aggres, optimizedExpression, queryManager);
+ if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
+ queryManager.initQueryResource(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel());
+ return engineExecutor.executeWithoutTimeGenerator(context);
+ } else {
+ queryManager.initQueryResource(QueryType.FILTER, getReadDataConsistencyLevel());
+ return engineExecutor.executeWithTimeGenerator(context);
+ }
+ } else {
+ AggregateEngineExecutor engineExecutor = new ClusterAggregateEngineExecutor(
+ selectedSeries, aggres, null, queryManager);
+ queryManager.initQueryResource(QueryType.NO_FILTER, getReadDataConsistencyLevel());
+ return engineExecutor.executeWithoutTimeGenerator(context);
+ }
+ } catch (QueryFilterOptimizationException | IOException | RaftConnectionException e) {
+ throw new FileNodeManagerException(e);
+ }
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index f776477..76a141e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -30,9 +30,9 @@ import org.apache.iotdb.cluster.query.PathType;
import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterFillSelectSeriesBatchReader;
+import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp;
-import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader;
import org.apache.iotdb.cluster.query.reader.querynode.IClusterFilterSeriesBatchReader;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.executor.AbstractExecutorWithoutTimeGenerator;
+import org.apache.iotdb.db.query.executor.AggregateEngineExecutor;
import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.db.query.fill.PreviousFill;
import org.apache.iotdb.db.query.reader.IPointReader;
@@ -63,7 +64,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.slf4j.Logger;
@@ -134,16 +137,11 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
if (plan instanceof GroupByPlan) {
throw new UnsupportedOperationException();
} else if (plan instanceof AggregationPlan) {
- throw new UnsupportedOperationException();
+ handleAggreSeriesReader(plan, context, response);
} else if (plan instanceof FillQueryPlan) {
- handleFillSeriesRerader(plan, context, response);
+ handleFillSeriesReader(plan, context, response);
} else {
- if (plan.getExpression() == null
- || plan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
- handleSelectReaderWithoutTimeGenerator(plan, context, response);
- } else {
- handleSelectReaderWithTimeGenerator(plan, context, response);
- }
+ handleSelectSeriesReader(plan, context, response);
}
}
if (queryPlanMap.containsKey(PathType.FILTER_PATH)) {
@@ -158,7 +156,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
*
* @param queryPlan fill query plan
*/
- private void handleFillSeriesRerader(QueryPlan queryPlan, QueryContext context,
+ private void handleFillSeriesReader(QueryPlan queryPlan, QueryContext context,
InitSeriesReaderResponse response)
throws FileNodeManagerException, PathErrorException, IOException {
FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan;
@@ -190,6 +188,73 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes);
}
+
+ /**
+ * Handle aggregation series reader
+ *
+ * @param queryPlan fill query plan
+ */
+ private void handleAggreSeriesReader(QueryPlan queryPlan, QueryContext context,
+ InitSeriesReaderResponse response)
+ throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException {
+ if (queryPlan.getExpression() == null
+ || queryPlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
+ handleAggreSeriesReaderWithoutTimeGenerator(queryPlan,context,response);
+ } else {
+ handleSelectReaderWithTimeGenerator(queryPlan, context, response);
+ }
+ }
+
+ /**
+ * Handle aggregation series reader without value filter
+ *
+ * @param queryPlan fill query plan
+ */
+ private void handleAggreSeriesReaderWithoutTimeGenerator(QueryPlan queryPlan, QueryContext context,
+ InitSeriesReaderResponse response)
+ throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException {
+ AggregationPlan fillQueryPlan = (AggregationPlan) queryPlan;
+
+ List<Path> selectedPaths = fillQueryPlan.getPaths();
+ QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedPaths);
+
+ IExpression optimizedExpression = ExpressionOptimizer.getInstance()
+ .optimize(fillQueryPlan.getExpression(), selectedPaths);
+ AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(
+ selectedPaths, fillQueryPlan.getAggregations(), optimizedExpression);
+
+ List<IPointReader> readers = engineExecutor.constructAggreReadersWithoutTimeGenerator(context);
+
+ List<TSDataType> dataTypes = engineExecutor.getDataTypes();
+
+ for (int i =0 ; i < selectedPaths.size(); i ++) {
+ Path path = selectedPaths.get(i);
+ selectSeriesReaders.put(path.getFullPath(),
+ new ClusterSelectSeriesBatchReader(dataTypes.get(i), readers.get(i)));
+ dataTypeMap.put(path.getFullPath(), dataTypes.get(i));
+ }
+
+ response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes);
+ }
+
+ /**
+ * Handle select series query
+ *
+ * @param plan plan query plan
+ * @param context query context
+ * @param response response for coordinator node
+ */
+ private void handleSelectSeriesReader(QueryPlan plan, QueryContext context,
+ InitSeriesReaderResponse response)
+ throws FileNodeManagerException, IOException, PathErrorException {
+ if (plan.getExpression() == null
+ || plan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
+ handleSelectReaderWithoutTimeGenerator(plan, context, response);
+ } else {
+ handleSelectReaderWithTimeGenerator(plan, context, response);
+ }
+ }
+
/**
* Handle select series query with no filter or only global time filter
*
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
index 546282a..fc0d401 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
@@ -32,8 +32,6 @@ import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.db.query.fill.IFill;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -55,13 +53,32 @@ public class QueryPlanPartitionUtils {
throws PathErrorException {
QueryPlan queryPLan = singleQueryManager.getOriginQueryPlan();
if (queryPLan instanceof FillQueryPlan) {
- splitFillPlan((FillQueryPlan)queryPLan, singleQueryManager);
+ splitFillPlan(singleQueryManager);
+ } else if (queryPLan instanceof AggregationPlan) {
+ splitAggregationPlanBySelectPath(singleQueryManager);
+ } else if (queryPLan instanceof GroupByPlan) {
+ splitGroupByPlanBySelectPath(singleQueryManager);
} else {
splitQueryPlanBySelectPath(singleQueryManager);
}
}
/**
+ * Split query plan with filter.
+ */
+ public static void splitQueryPlanWithValueFilter(
+ ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException {
+ QueryPlan queryPlan = singleQueryManager.getOriginQueryPlan();
+ if (queryPlan instanceof GroupByPlan) {
+ splitGroupByPlanWithFilter(singleQueryManager);
+ } else if (queryPlan instanceof AggregationPlan) {
+ splitAggregationPlanWithFilter(singleQueryManager);
+ } else {
+ splitQueryPlanWithFilter(singleQueryManager);
+ }
+ }
+
+ /**
* Split query plan by select paths
*/
private static void splitQueryPlanBySelectPath(ClusterRpcSingleQueryManager singleQueryManager)
@@ -88,33 +105,100 @@ public class QueryPlanPartitionUtils {
}
}
+
/**
- * Split query plan with filter.
+ * Split query plan by filter paths
*/
- public static void splitQueryPlanWithValueFilter(
- ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException {
+ private static void splitQueryPlanByFilterPath(ClusterRpcSingleQueryManager singleQueryManager)
+ throws PathErrorException {
QueryPlan queryPlan = singleQueryManager.getOriginQueryPlan();
- if (queryPlan instanceof GroupByPlan) {
- splitGroupByPlan((GroupByPlan) queryPlan, singleQueryManager);
- } else if (queryPlan instanceof AggregationPlan) {
- splitAggregationPlan((AggregationPlan) queryPlan, singleQueryManager);
- } else {
- splitQueryPlan(queryPlan, singleQueryManager);
+ // split query plan by filter path
+ Map<String, FilterGroupEntity> filterGroupEntityMap = singleQueryManager
+ .getFilterGroupEntityMap();
+ IExpression expression = queryPlan.getExpression();
+ ExpressionUtils.getAllExpressionSeries(expression, filterGroupEntityMap);
+ for (FilterGroupEntity filterGroupEntity : filterGroupEntityMap.values()) {
+ List<Path> filterSeriesList = filterGroupEntity.getFilterPaths();
+ // create filter sub query plan
+ QueryPlan subQueryPlan = new QueryPlan();
+ subQueryPlan.setPaths(filterSeriesList);
+ IExpression subExpression = ExpressionUtils
+ .pruneFilterTree(expression.clone(), filterSeriesList);
+ if (subExpression.getType() != ExpressionType.TRUE) {
+ subQueryPlan.setExpression(subExpression);
+ }
+ filterGroupEntity.setQueryPlan(subQueryPlan);
}
}
- private static void splitGroupByPlan(GroupByPlan groupByPlan,
+ /**
+ * Split group by plan by select path
+ */
+ private static void splitGroupByPlanBySelectPath(
ClusterRpcSingleQueryManager singleQueryManager) {
throw new UnsupportedOperationException();
}
- private static void splitAggregationPlan(AggregationPlan aggregationPlan,
- ClusterRpcSingleQueryManager singleQueryManager) {
- throw new UnsupportedOperationException();
+ /**
+ * Split group by plan with filter path
+ */
+ private static void splitGroupByPlanWithFilter(ClusterRpcSingleQueryManager singleQueryManager)
+ throws PathErrorException {
+ splitGroupByPlanBySelectPath(singleQueryManager);
+ splitQueryPlanByFilterPath(singleQueryManager);
}
- private static void splitFillPlan(FillQueryPlan fillQueryPlan,
- ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException {
+ /**
+ * Split aggregation plan by select path
+ */
+ private static void splitAggregationPlanBySelectPath(
+ ClusterRpcSingleQueryManager singleQueryManager)
+ throws PathErrorException {
+ AggregationPlan queryPlan = (AggregationPlan) singleQueryManager.getOriginQueryPlan();
+ List<Path> selectPaths = queryPlan.getPaths();
+ List<String> aggregations = new ArrayList<>();
+ Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId();
+ Map<String, List<String>> selectAggregationByGroupId = new HashMap<>();
+ Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
+ for (int i = 0; i < selectPaths.size(); i++) {
+ Path path = selectPaths.get(i);
+ String aggregation = aggregations.get(i);
+ String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
+ if (!selectSeriesByGroupId.containsKey(groupId)) {
+ selectSeriesByGroupId.put(groupId, new ArrayList<>());
+ selectAggregationByGroupId.put(groupId, new ArrayList<>());
+ }
+ selectAggregationByGroupId.get(groupId).add(aggregation);
+ selectSeriesByGroupId.get(groupId).add(path);
+ }
+ for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) {
+ String groupId = entry.getKey();
+ List<Path> paths = entry.getValue();
+ AggregationPlan subQueryPlan = new AggregationPlan();
+ subQueryPlan.setProposer(queryPlan.getProposer());
+ subQueryPlan.setPaths(paths);
+ subQueryPlan.setExpression(queryPlan.getExpression());
+ subQueryPlan.setAggregations(selectAggregationByGroupId.get(groupId));
+ selectPathPlans.put(groupId, subQueryPlan);
+ }
+ }
+
+ /**
+ * Split aggregation plan with filter path
+ */
+ private static void splitAggregationPlanWithFilter(
+ ClusterRpcSingleQueryManager singleQueryManager)
+ throws PathErrorException {
+ splitAggregationPlanBySelectPath(singleQueryManager);
+ splitQueryPlanByFilterPath(singleQueryManager);
+ }
+
+ /**
+ * Split fill plan which only contain select paths.
+ */
+ private static void splitFillPlan(ClusterRpcSingleQueryManager singleQueryManager)
+ throws PathErrorException {
+ FillQueryPlan fillQueryPlan = (FillQueryPlan) singleQueryManager.getOriginQueryPlan();
List<Path> selectPaths = fillQueryPlan.getPaths();
Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId();
Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
@@ -138,25 +222,13 @@ public class QueryPlanPartitionUtils {
}
}
- private static void splitQueryPlan(QueryPlan queryPlan,
- ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException {
+ /**
+ * Split query plan with filter
+ */
+ private static void splitQueryPlanWithFilter(ClusterRpcSingleQueryManager singleQueryManager)
+ throws PathErrorException {
splitQueryPlanBySelectPath(singleQueryManager);
- // split query plan by filter path
- Map<String, FilterGroupEntity> filterGroupEntityMap = singleQueryManager
- .getFilterGroupEntityMap();
- IExpression expression = queryPlan.getExpression();
- ExpressionUtils.getAllExpressionSeries(expression, filterGroupEntityMap);
- for (FilterGroupEntity filterGroupEntity : filterGroupEntityMap.values()) {
- List<Path> filterSeriesList = filterGroupEntity.getFilterPaths();
- // create filter sub query plan
- QueryPlan subQueryPlan = new QueryPlan();
- subQueryPlan.setPaths(filterSeriesList);
- IExpression subExpression = ExpressionUtils
- .pruneFilterTree(expression.clone(), filterSeriesList);
- if (subExpression.getType() != ExpressionType.TRUE) {
- subQueryPlan.setExpression(subExpression);
- }
- filterGroupEntity.setQueryPlan(subQueryPlan);
- }
+ splitQueryPlanByFilterPath(singleQueryManager);
}
+
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
index 42a2e8b..920aeef 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@ -56,9 +56,7 @@ public interface IQueryProcessExecutor {
QueryFilterOptimizationException, ProcessorException;
/**
- * process aggregate plan of qp layer, construct queryDataSet. <<<<<<< HEAD
- *
- * ======= >>>>>>> master
+ * process aggregate plan of qp layer, construct queryDataSet.
*/
QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression,
QueryContext context)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index 508a787..6c458a5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.db.query.executor;
import java.io.IOException;
@@ -28,7 +27,6 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.query.factory.AggreFuncFactory;
import org.apache.iotdb.db.query.aggregation.AggreResultData;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc;
@@ -37,6 +35,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
+import org.apache.iotdb.db.query.factory.AggreFuncFactory;
import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
@@ -54,9 +53,10 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
public class AggregateEngineExecutor {
- private List<Path> selectedSeries;
- private List<String> aggres;
- private IExpression expression;
+ protected List<Path> selectedSeries;
+ protected List<String> aggres;
+ protected IExpression expression;
+ protected List<TSDataType> dataTypes;
/**
* aggregation batch calculation size.
@@ -72,6 +72,7 @@ public class AggregateEngineExecutor {
this.aggres = aggres;
this.expression = expression;
this.aggregateFetchSize = 10 * IoTDBDescriptor.getInstance().getConfig().getFetchSize();
+ this.dataTypes = new ArrayList<>();
}
/**
@@ -79,8 +80,19 @@ public class AggregateEngineExecutor {
*
* @param context query context
*/
- public QueryDataSet executeWithOutTimeGenerator(QueryContext context)
+ public QueryDataSet executeWithoutTimeGenerator(QueryContext context)
throws FileNodeManagerException, IOException, PathErrorException, ProcessorException {
+ List<IPointReader> resultDataPointReaders = constructAggreReadersWithoutTimeGenerator(context);
+ return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, resultDataPointReaders);
+ }
+
+ /**
+ * Construct aggregate readers with only time filter or no filter.
+ *
+ * @param context query context
+ */
+ public List<IPointReader> constructAggreReadersWithoutTimeGenerator(QueryContext context)
+ throws FileNodeManagerException, PathErrorException, IOException, ProcessorException {
Filter timeFilter = null;
if (expression != null) {
timeFilter = ((GlobalTimeExpression) expression).getFilter();
@@ -121,11 +133,17 @@ public class AggregateEngineExecutor {
List<AggreResultData> aggreResultDataList = new ArrayList<>();
//TODO use multi-thread
for (int i = 0; i < selectedSeries.size(); i++) {
- AggreResultData aggreResultData = aggregateWithOutTimeGenerator(aggregateFunctions.get(i),
+ AggreResultData aggreResultData = aggregateWithoutTimeGenerator(aggregateFunctions.get(i),
readersOfSequenceData.get(i), readersOfUnSequenceData.get(i), timeFilter);
aggreResultDataList.add(aggreResultData);
}
- return constructDataSet(aggreResultDataList);
+
+ List<IPointReader> resultDataPointReaders = new ArrayList<>();
+ for (AggreResultData resultData : aggreResultDataList) {
+ dataTypes.add(resultData.getDataType());
+ resultDataPointReaders.add(new AggreResultDataPointReader(resultData));
+ }
+ return resultDataPointReaders;
}
/**
@@ -137,7 +155,7 @@ public class AggregateEngineExecutor {
* @param filter time filter or null
* @return one series aggregate result data
*/
- private AggreResultData aggregateWithOutTimeGenerator(AggregateFunction function,
+ protected AggreResultData aggregateWithoutTimeGenerator(AggregateFunction function,
SequenceDataReader sequenceReader, IPointReader unSequenceReader, Filter filter)
throws IOException, ProcessorException {
if (function instanceof MaxTimeAggrFunc || function instanceof LastAggrFunc) {
@@ -256,6 +274,18 @@ public class AggregateEngineExecutor {
*/
public QueryDataSet executeWithTimeGenerator(QueryContext context)
throws FileNodeManagerException, PathErrorException, IOException, ProcessorException {
+ List<IPointReader> resultDataPointReaders = constructAggreReadersWithTimeGenerator(context);
+ return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, resultDataPointReaders);
+ }
+
+
+ /**
+ * Construct aggregate readers with value filter.
+ *
+ * @param context query context
+ */
+ private List<IPointReader> constructAggreReadersWithTimeGenerator(QueryContext context)
+ throws FileNodeManagerException, PathErrorException, IOException, ProcessorException {
QueryResourceManager
.getInstance().beginQueryOfGivenQueryPaths(context.getJobId(), selectedSeries);
QueryResourceManager.getInstance().beginQueryOfGivenExpression(context.getJobId(), expression);
@@ -271,12 +301,19 @@ public class AggregateEngineExecutor {
function.init();
aggregateFunctions.add(function);
}
- List<AggreResultData> batchDataList = aggregateWithTimeGenerator(aggregateFunctions,
+ List<AggreResultData> aggreResultDataList = aggregateWithTimeGenerator(aggregateFunctions,
timestampGenerator,
readersOfSelectedSeries);
- return constructDataSet(batchDataList);
+
+ List<IPointReader> resultDataPointReaders = new ArrayList<>();
+ for (AggreResultData resultData : aggreResultDataList) {
+ dataTypes.add(resultData.getDataType());
+ resultDataPointReaders.add(new AggreResultDataPointReader(resultData));
+ }
+ return resultDataPointReaders;
}
+
/**
* calculation aggregate result with value filter.
*/
@@ -312,19 +349,7 @@ public class AggregateEngineExecutor {
return aggreResultDataArrayList;
}
- /**
- * using aggregate result data list construct QueryDataSet.
- *
- * @param aggreResultDataList aggregate result data list
- */
- private QueryDataSet constructDataSet(List<AggreResultData> aggreResultDataList)
- throws IOException {
- List<TSDataType> dataTypes = new ArrayList<>();
- List<IPointReader> resultDataPointReaders = new ArrayList<>();
- for (AggreResultData resultData : aggreResultDataList) {
- dataTypes.add(resultData.getDataType());
- resultDataPointReaders.add(new AggreResultDataPointReader(resultData));
- }
- return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, resultDataPointReaders);
+ public List<TSDataType> getDataTypes() {
+ return dataTypes;
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index 03c600d..32fcbf7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -90,14 +90,14 @@ public class EngineQueryRouter implements IEngineQueryRouter{
AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(
selectedSeries, aggres, optimizedExpression);
if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
- return engineExecutor.executeWithOutTimeGenerator(context);
+ return engineExecutor.executeWithoutTimeGenerator(context);
} else {
return engineExecutor.executeWithTimeGenerator(context);
}
} else {
AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(
selectedSeries, aggres, null);
- return engineExecutor.executeWithOutTimeGenerator(context);
+ return engineExecutor.executeWithoutTimeGenerator(context);
}
}