You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 09:34:45 UTC
[incubator-iotdb] 01/04: add group by features
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 2bb0a0419b8da24ca5d923bbb486dd8e37ab67ad
Author: lta <li...@163.com>
AuthorDate: Tue May 21 15:47:54 2019 +0800
add group by features
---
.../ClusterGroupByDataSetWithOnlyTimeFilter.java | 154 +++++++++++++++++++++
.../ClusterGroupByDataSetWithTimeGenerator.java | 91 ++++++++++++
.../executor/ClusterAggregateEngineExecutor.java | 6 +-
.../cluster/query/executor/ClusterQueryRouter.java | 62 ++++++++-
.../querynode/ClusterLocalSingleQueryManager.java | 59 +++++++-
.../ClusterFilterSeriesBatchReaderEntity.java | 2 +-
...lusterGroupBySelectSeriesBatchReaderEntity.java | 79 +++++++++++
.../querynode/ClusterSelectSeriesBatchReader.java | 4 +-
.../ClusterSelectSeriesBatchReaderEntity.java | 7 +-
...y.java => IClusterSeriesBatchReaderEntity.java} | 2 +-
.../query/utils/QueryPlanPartitionUtils.java | 38 ++++-
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 5 +-
.../groupby/GroupByWithOnlyTimeFilterDataSet.java | 5 +-
.../groupby/GroupByWithValueFilterDataSet.java | 4 +-
...neQueryRouter.java => AbstractQueryRouter.java} | 52 ++++++-
.../iotdb/db/query/executor/EngineQueryRouter.java | 42 +-----
16 files changed, 536 insertions(+), 76 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
new file mode 100644
index 0000000..98460e4
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.dataset;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
+import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+/**
+ * Handle group by query with only time filter
+ */
+public class ClusterGroupByDataSetWithOnlyTimeFilter extends GroupByWithOnlyTimeFilterDataSet {
+
+ private ClusterRpcSingleQueryManager queryManager;
+ private List<IPointReader> readersOfSelectedSeries;
+
+ /**
+ * constructor.
+ */
+ public ClusterGroupByDataSetWithOnlyTimeFilter(long jobId,
+ List<Path> paths, long unit, long origin,
+ List<Pair<Long, Long>> mergedIntervals, ClusterRpcSingleQueryManager queryManager) {
+ super(jobId, paths, unit, origin, mergedIntervals);
+ this.queryManager =queryManager;
+ this.readersOfSelectedSeries = new ArrayList<>();
+ }
+
+
+ /**
+ * init reader and aggregate function.
+ */
+ public void initGroupBy(QueryContext context, List<String> aggres, IExpression expression)
+ throws FileNodeManagerException, PathErrorException, ProcessorException, IOException {
+ initAggreFuction(aggres);
+
+ /** add query token for query series which can handle locally **/
+ List<Path> localQuerySeries = new ArrayList<>(selectedSeries);
+ Set<Path> remoteQuerySeries = new HashSet<>();
+ queryManager.getSelectSeriesGroupEntityMap().values().forEach(
+ selectSeriesGroupEntity -> remoteQuerySeries
+ .addAll(selectSeriesGroupEntity.getSelectPaths()));
+ localQuerySeries.removeAll(remoteQuerySeries);
+ QueryResourceManager.getInstance()
+ .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries);
+ if (expression != null) {
+ timeFilter = ((GlobalTimeExpression) expression).getFilter();
+ }
+
+ Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = queryManager
+ .getSelectSeriesGroupEntityMap();
+ //Mark filter series reader index group by group id
+ Map<String, Integer> selectSeriesReaderIndex = new HashMap<>();
+ for (int i = 0; i < selectedSeries.size(); i++) {
+ Path path = selectedSeries.get(i);
+ String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
+ if (selectSeriesGroupEntityMap.containsKey(groupId)) {
+ int index = selectSeriesReaderIndex.getOrDefault(groupId, 0);
+ ClusterSelectSeriesReader reader = selectSeriesGroupEntityMap.get(groupId)
+ .getSelectSeriesReaders().get(index);
+ readersOfSelectedSeries.add(reader);
+ selectSeriesReaderIndex.put(groupId, index + 1);
+ } else {
+ readersOfSelectedSeries.add(null);
+ QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+ .getQueryDataSource(selectedSeries.get(i), context);
+
+ // sequence reader for sealed tsfile, unsealed tsfile, memory
+ SequenceDataReader sequenceReader = new SequenceDataReader(
+ queryDataSource.getSeqDataSource(),
+ timeFilter, context, false);
+
+ // unseq reader for all chunk groups in unSeqFile, memory
+ PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
+ .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
+
+ sequenceReaderList.add(sequenceReader);
+ unSequenceReaderList.add(unSeqMergeReader);
+ }
+ }
+ }
+
+ @Override
+ public RowRecord next() throws IOException {
+ if (!hasCachedTimeInterval) {
+ throw new IOException("need to call hasNext() before calling next() "
+ + "in GroupByWithOnlyTimeFilterDataSet.");
+ }
+ hasCachedTimeInterval = false;
+ RowRecord record = new RowRecord(startTime);
+ for (int i = 0; i < functions.size(); i++) {
+ IPointReader reader = readersOfSelectedSeries.get(i);
+ if(reader != null){
+ TimeValuePair timeValuePair = reader.next();
+ record.addField(getField(timeValuePair.getValue().getValue(), dataTypes.get(i)));
+ }else {
+ AggreResultData res;
+ try {
+ res = nextSeries(i);
+ } catch (ProcessorException e) {
+ throw new IOException(e);
+ }
+ if (res == null) {
+ record.addField(new Field(null));
+ } else {
+ record.addField(getField(res));
+ }
+ }
+ }
+ return record;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java
new file mode 100644
index 0000000..00f2d88
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.dataset;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
+import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class ClusterGroupByDataSetWithTimeGenerator extends GroupByWithValueFilterDataSet {
+
+ private ClusterRpcSingleQueryManager queryManager;
+
+ private List<TSDataType> selectSeriesDataTypes;
+
+ /**
+ * constructor.
+ */
+ public ClusterGroupByDataSetWithTimeGenerator(long jobId,
+ List<Path> paths, long unit, long origin,
+ List<Pair<Long, Long>> mergedIntervals, ClusterRpcSingleQueryManager queryManager) {
+ super(jobId, paths, unit, origin, mergedIntervals);
+ this.queryManager = queryManager;
+ selectSeriesDataTypes = new ArrayList<>();
+ }
+
+ /**
+ * init reader and aggregate function.
+ */
+ @Override
+ public void initGroupBy(QueryContext context, List<String> aggres, IExpression expression)
+ throws FileNodeManagerException, PathErrorException, ProcessorException, IOException {
+ initAggreFuction(aggres);
+
+ /** add query token for filter series which can handle locally **/
+ Set<String> deviceIdSet = new HashSet<>();
+ for (FilterSeriesGroupEntity filterSeriesGroupEntity : queryManager
+ .getFilterSeriesGroupEntityMap().values()) {
+ List<Path> remoteFilterSeries = filterSeriesGroupEntity.getFilterPaths();
+ remoteFilterSeries.forEach(seriesPath -> deviceIdSet.add(seriesPath.getDevice()));
+ }
+ QueryResourceManager.getInstance()
+ .beginQueryOfGivenExpression(context.getJobId(), expression, deviceIdSet);
+
+ /** add query token for query series which can handle locally **/
+ List<Path> localQuerySeries = new ArrayList<>(selectedSeries);
+ Set<Path> remoteQuerySeries = new HashSet<>();
+ queryManager.getSelectSeriesGroupEntityMap().values().forEach(
+ selectSeriesGroupEntity -> remoteQuerySeries
+ .addAll(selectSeriesGroupEntity.getSelectPaths()));
+ localQuerySeries.removeAll(remoteQuerySeries);
+ QueryResourceManager.getInstance()
+ .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries);
+
+ this.timestampGenerator = new ClusterTimeGenerator(expression, context, queryManager);
+ this.allDataReaderList = ClusterSeriesReaderFactory
+ .createReadersByTimestampOfSelectedPaths(selectedSeries, context, queryManager,
+ selectSeriesDataTypes);
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
index 2cf4e87..808eab8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
@@ -25,8 +25,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.iotdb.cluster.config.ClusterConfig;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
@@ -151,8 +149,8 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
List<Path> localQuerySeries = new ArrayList<>(selectedSeries);
Set<Path> remoteQuerySeries = new HashSet<>();
queryManager.getSelectSeriesGroupEntityMap().values().forEach(
- selectSeriesGroupEntity -> selectSeriesGroupEntity.getSelectPaths()
- .forEach(path -> remoteQuerySeries.add(path)));
+ selectSeriesGroupEntity -> remoteQuerySeries
+ .addAll(selectSeriesGroupEntity.getSelectPaths()));
localQuerySeries.removeAll(remoteQuerySeries);
QueryResourceManager.getInstance()
.beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index 5d6a342..f9d32f5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -23,14 +23,16 @@ import java.util.List;
import java.util.Map;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.query.QueryType;
+import org.apache.iotdb.cluster.query.dataset.ClusterGroupByDataSetWithOnlyTimeFilter;
+import org.apache.iotdb.cluster.query.dataset.ClusterGroupByDataSetWithTimeGenerator;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.AbstractQueryRouter;
import org.apache.iotdb.db.query.executor.AggregateEngineExecutor;
-import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -38,7 +40,10 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -46,7 +51,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
* Query entrance class of cluster query process. All query clause will be transformed to physical
* plan, physical plan will be executed by ClusterQueryRouter.
*/
-public class ClusterQueryRouter implements IEngineQueryRouter {
+public class ClusterQueryRouter extends AbstractQueryRouter {
/**
* Consistency level of reading data
@@ -128,7 +133,55 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals,
QueryContext context)
throws ProcessorException, QueryFilterOptimizationException, FileNodeManagerException, PathErrorException, IOException {
- throw new UnsupportedOperationException();
+
+ long jobId = context.getJobId();
+ ClusterRpcSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance()
+ .getSingleQuery(jobId);
+
+ //check the legitimacy of intervals
+ checkIntervals(intervals);
+
+ // merge intervals
+ List<Pair<Long, Long>> mergedIntervalList = mergeInterval(intervals);
+
+ // construct groupBy intervals filter
+ BinaryExpression intervalFilter = null;
+ for (Pair<Long, Long> pair : mergedIntervalList) {
+ BinaryExpression pairFilter = BinaryExpression
+ .and(new GlobalTimeExpression(TimeFilter.gtEq(pair.left)),
+ new GlobalTimeExpression(TimeFilter.ltEq(pair.right)));
+ if (intervalFilter != null) {
+ intervalFilter = BinaryExpression.or(intervalFilter, pairFilter);
+ } else {
+ intervalFilter = pairFilter;
+ }
+ }
+
+ // merge interval filter and filtering conditions after where statements
+ if (expression == null) {
+ expression = intervalFilter;
+ } else {
+ expression = BinaryExpression.and(expression, intervalFilter);
+ }
+
+ IExpression optimizedExpression = ExpressionOptimizer.getInstance()
+ .optimize(expression, selectedSeries);
+ try {
+ if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
+ ClusterGroupByDataSetWithOnlyTimeFilter groupByEngine = new ClusterGroupByDataSetWithOnlyTimeFilter(
+ jobId, selectedSeries, unit, origin, mergedIntervalList, queryManager);
+ groupByEngine.initGroupBy(context, aggres, optimizedExpression);
+ return groupByEngine;
+ } else {
+ queryManager.initQueryResource(QueryType.FILTER, getReadDataConsistencyLevel());
+ ClusterGroupByDataSetWithTimeGenerator groupByEngine = new ClusterGroupByDataSetWithTimeGenerator(
+ jobId, selectedSeries, unit, origin, mergedIntervalList, queryManager);
+ groupByEngine.initGroupBy(context, aggres, optimizedExpression);
+ return groupByEngine;
+ }
+ } catch (RaftConnectionException e) {
+ throw new FileNodeManagerException(e);
+ }
}
@Override
@@ -139,7 +192,8 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
try {
queryManager.initQueryResource(QueryType.NO_FILTER, getReadDataConsistencyLevel());
- ClusterFillEngineExecutor fillEngineExecutor = new ClusterFillEngineExecutor(fillPaths, queryTime,
+ ClusterFillEngineExecutor fillEngineExecutor = new ClusterFillEngineExecutor(fillPaths,
+ queryTime,
fillType, queryManager);
return fillEngineExecutor.execute(context);
} catch (IOException | RaftConnectionException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index 8799be2..25adbf5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -31,10 +31,10 @@ import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterFillSelectSeriesBatchReader;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReaderEntity;
+import org.apache.iotdb.cluster.query.reader.querynode.ClusterGroupBySelectSeriesBatchReaderEntity;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderEntity;
-import org.apache.iotdb.cluster.query.reader.querynode.IClusterFilterSeriesBatchReaderEntity;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet;
import org.apache.iotdb.db.query.executor.AbstractExecutorWithoutTimeGenerator;
import org.apache.iotdb.db.query.executor.AggregateEngineExecutor;
import org.apache.iotdb.db.query.fill.IFill;
@@ -105,9 +106,14 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
private ClusterSelectSeriesBatchReaderEntity selectReaderEntity;
/**
+ * Select reader entity of group by query, which handle group by query with only time filter
+ */
+ private ClusterGroupBySelectSeriesBatchReaderEntity groupBySelectReaderEntity;
+
+ /**
* Filter reader entity
*/
- private IClusterFilterSeriesBatchReaderEntity filterReaderEntity;
+ private ClusterFilterSeriesBatchReaderEntity filterReaderEntity;
/**
* Key is series full path, value is data type of series
@@ -146,7 +152,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
selectReaderEntity = new ClusterSelectSeriesBatchReaderEntity();
QueryPlan plan = queryPlanMap.get(PathType.SELECT_PATH);
if (plan instanceof GroupByPlan) {
- throw new UnsupportedOperationException();
+ handleGroupBySeriesReader(plan, context, response);
} else if (plan instanceof AggregationPlan) {
handleAggreSeriesReader(plan, context, response);
} else if (plan instanceof FillQueryPlan) {
@@ -206,6 +212,43 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
*
* @param queryPlan fill query plan
*/
+ private void handleGroupBySeriesReader(QueryPlan queryPlan, QueryContext context,
+ InitSeriesReaderResponse response)
+ throws FileNodeManagerException, PathErrorException, IOException, ProcessorException, QueryFilterOptimizationException {
+ if (queryPlan.getExpression() == null
+ || queryPlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
+ handleGroupBySeriesReaderWithoutTimeGenerator(queryPlan, context, response);
+ } else {
+ handleSelectReaderWithTimeGenerator(queryPlan, context, response);
+ }
+ }
+
+
+ /**
+ * Handle aggregation series reader without value filter
+ *
+ * @param queryPlan fill query plan
+ */
+ private void handleGroupBySeriesReaderWithoutTimeGenerator(QueryPlan queryPlan,
+ QueryContext context,
+ InitSeriesReaderResponse response)
+ throws FileNodeManagerException, PathErrorException, IOException, ProcessorException, QueryFilterOptimizationException {
+ QueryDataSet queryDataSet = queryProcessExecutor.processQuery(queryPlan, context);
+ List<Path> paths = queryDataSet.getPaths();
+ List<TSDataType> dataTypes = queryDataSet.getDataTypes();
+ for (int i = 0; i < paths.size(); i++) {
+ dataTypeMap.put(paths.get(i).getFullPath(), dataTypes.get(i));
+ }
+ groupBySelectReaderEntity = new ClusterGroupBySelectSeriesBatchReaderEntity(paths, dataTypes,
+ (GroupByWithOnlyTimeFilterDataSet) queryDataSet);
+ response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes);
+ }
+
+ /**
+ * Handle aggregation series reader
+ *
+ * @param queryPlan fill query plan
+ */
private void handleAggreSeriesReader(QueryPlan queryPlan, QueryContext context,
InitSeriesReaderResponse response)
throws FileNodeManagerException, PathErrorException, IOException, ProcessorException {
@@ -304,8 +347,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
private void handleFilterSeriesReader(QueryPlan plan, QueryContext context,
InitSeriesReaderRequest request, InitSeriesReaderResponse response, PathType pathType)
throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException, ProcessorException, IOException, ClassNotFoundException {
- QueryDataSet queryDataSet = queryProcessExecutor
- .processQuery(plan, context);
+ QueryDataSet queryDataSet = queryProcessExecutor.processQuery(plan, context);
List<Path> paths = plan.getPaths();
List<TSDataType> dataTypes = queryDataSet.getDataTypes();
for (int i = 0; i < paths.size(); i++) {
@@ -353,7 +395,10 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
PathType pathType = request.getPathType();
List<BatchData> batchDataList;
if (pathType == PathType.SELECT_PATH) {
- batchDataList = readSelectSeriesBatchData(request.getSeriesPathIndexs());
+ // check whether it's a group by query with only time filter
+ batchDataList =
+ groupBySelectReaderEntity != null ? groupBySelectReaderEntity.nextBatchList()
+ : readSelectSeriesBatchData(request.getSeriesPathIndexs());
} else {
batchDataList = readFilterSeriesBatchData();
}
@@ -435,7 +480,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
return selectReaderEntity;
}
- public IClusterFilterSeriesBatchReaderEntity getFilterReaderEntity() {
+ public ClusterFilterSeriesBatchReaderEntity getFilterReaderEntity() {
return filterReaderEntity;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java
index 65f8c1c..ddcb35d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
/**
* Batch reader entity for all filter paths.
*/
-public class ClusterFilterSeriesBatchReaderEntity implements IClusterFilterSeriesBatchReaderEntity {
+public class ClusterFilterSeriesBatchReaderEntity implements IClusterSeriesBatchReaderEntity {
private List<Path> allFilterPath;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
new file mode 100644
index 0000000..3b7fabe
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.querynode;
+
+import static org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader.CLUSTER_CONF;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+public class ClusterGroupBySelectSeriesBatchReaderEntity implements
+ IClusterSeriesBatchReaderEntity {
+
+ private List<Path> paths;
+ private List<TSDataType> dataTypes;
+
+ private GroupByWithOnlyTimeFilterDataSet queryDataSet;
+
+ public ClusterGroupBySelectSeriesBatchReaderEntity(
+ List<Path> paths,
+ List<TSDataType> dataTypes,
+ GroupByWithOnlyTimeFilterDataSet queryDataSet) {
+ this.paths = paths;
+ this.dataTypes = dataTypes;
+ this.queryDataSet = queryDataSet;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return queryDataSet.hasNext();
+ }
+
+ @Override
+ public List<BatchData> nextBatchList() throws IOException {
+ List<BatchData> batchDataList = new ArrayList<>(paths.size());
+ for (int i = 0; i < paths.size(); i++) {
+ batchDataList.add(new BatchData(dataTypes.get(i), true));
+ }
+ int dataPointCount = 0;
+ while (true) {
+ if (!hasNext() || dataPointCount == CLUSTER_CONF.getBatchReadSize()) {
+ break;
+ }
+ dataPointCount++;
+ RowRecord rowRecord = queryDataSet.next();
+ long time = rowRecord.getTimestamp();
+ List<Field> fieldList = rowRecord.getFields();
+ for (int j = 0; j < paths.size(); j++) {
+ BatchData batchData = batchDataList.get(j);
+ Object value = fieldList.get(j).getObjectValue(dataTypes.get(j));
+ batchData.putTime(time);
+ batchData.putAnObject(value);
+ }
+ }
+ return batchDataList;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
index cfc43b8..28b6346 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
@@ -51,6 +51,9 @@ public class ClusterSelectSeriesBatchReader extends
this.reader = reader;
}
+ public ClusterSelectSeriesBatchReader() {
+ }
+
@Override
public boolean hasNext() throws IOException {
return reader.hasNext();
@@ -62,7 +65,6 @@ public class ClusterSelectSeriesBatchReader extends
for (int i = 0; i < CLUSTER_CONF.getBatchReadSize(); i++) {
if (hasNext()) {
TimeValuePair pair = reader.next();
- System.out.println("reader value:" + pair);
batchData.putTime(pair.getTimestamp());
batchData.putAnObject(pair.getValue().getValue());
} else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
index f0dea38..484c423 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
@@ -25,15 +25,16 @@ import java.util.List;
* Batch reader entity for all select paths.
*/
public class ClusterSelectSeriesBatchReaderEntity {
+
/**
* All select paths
*/
- List<String> paths;
+ private List<String> paths;
/**
* All select readers
*/
- List<AbstractClusterSelectSeriesBatchReader> readers;
+ private List<AbstractClusterSelectSeriesBatchReader> readers;
public ClusterSelectSeriesBatchReaderEntity() {
paths = new ArrayList<>();
@@ -52,7 +53,7 @@ public class ClusterSelectSeriesBatchReaderEntity {
return readers;
}
- public AbstractClusterSelectSeriesBatchReader getReaderByIndex(int index){
+ public AbstractClusterSelectSeriesBatchReader getReaderByIndex(int index) {
return readers.get(index);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSeriesBatchReaderEntity.java
similarity index 95%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSeriesBatchReaderEntity.java
index a045e2a..e6e9e86 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSeriesBatchReaderEntity.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
/**
* Batch reader for filter series which is used in query node.
*/
-public interface IClusterFilterSeriesBatchReaderEntity {
+public interface IClusterSeriesBatchReaderEntity {
boolean hasNext() throws IOException;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
index 3a2746f..03e4e7f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
@@ -55,10 +55,10 @@ public class QueryPlanPartitionUtils {
QueryPlan queryPLan = singleQueryManager.getOriginQueryPlan();
if (queryPLan instanceof FillQueryPlan) {
splitFillPlan(singleQueryManager);
- } else if (queryPLan instanceof AggregationPlan) {
- splitAggregationPlanBySelectPath(singleQueryManager);
} else if (queryPLan instanceof GroupByPlan) {
splitGroupByPlanBySelectPath(singleQueryManager);
+ } else if (queryPLan instanceof AggregationPlan) {
+ splitAggregationPlanBySelectPath(singleQueryManager);
} else {
splitQueryPlanBySelectPath(singleQueryManager);
}
@@ -136,8 +136,38 @@ public class QueryPlanPartitionUtils {
* Split group by plan by select path
*/
private static void splitGroupByPlanBySelectPath(
- ClusterRpcSingleQueryManager singleQueryManager) {
- throw new UnsupportedOperationException();
+ ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException {
+ GroupByPlan queryPlan = (GroupByPlan) singleQueryManager.getOriginQueryPlan();
+ List<Path> selectPaths = queryPlan.getPaths();
+ List<String> aggregations = queryPlan.getAggregations();
+ Map<String, SelectSeriesGroupEntity> selectGroupEntityMap = singleQueryManager
+ .getSelectSeriesGroupEntityMap();
+ Map<String, List<String>> selectAggregationByGroupId = new HashMap<>();
+ for (int i = 0; i < selectPaths.size(); i++) {
+ String aggregation = aggregations.get(i);
+ Path path = selectPaths.get(i);
+ String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
+ if (!selectGroupEntityMap.containsKey(groupId)) {
+ selectGroupEntityMap.put(groupId, new SelectSeriesGroupEntity(groupId));
+ selectAggregationByGroupId.put(groupId, new ArrayList<>());
+ }
+ selectGroupEntityMap.get(groupId).addSelectPaths(path);
+ selectAggregationByGroupId.get(groupId).add(aggregation);
+ }
+ for (Entry<String, SelectSeriesGroupEntity> entry : selectGroupEntityMap.entrySet()) {
+ String groupId = entry.getKey();
+ SelectSeriesGroupEntity entity = entry.getValue();
+ List<Path> paths = entity.getSelectPaths();
+ GroupByPlan subQueryPlan = new GroupByPlan();
+ subQueryPlan.setIntervals(queryPlan.getIntervals());
+ subQueryPlan.setOrigin(queryPlan.getOrigin());
+ subQueryPlan.setUnit(queryPlan.getUnit());
+ subQueryPlan.setProposer(queryPlan.getProposer());
+ subQueryPlan.setPaths(paths);
+ subQueryPlan.setExpression(queryPlan.getExpression());
+ subQueryPlan.setAggregations(selectAggregationByGroupId.get(groupId));
+ entity.setQueryPlan(subQueryPlan);
+ }
}
/**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 99476f2..9bf87c2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -27,14 +27,13 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.EngineQueryRouter;
-import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
+import org.apache.iotdb.db.query.executor.AbstractQueryRouter;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
@@ -43,7 +42,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
public abstract class QueryProcessExecutor implements IQueryProcessExecutor {
protected ThreadLocal<Integer> fetchSize = new ThreadLocal<>();
- protected IEngineQueryRouter queryRouter = new EngineQueryRouter();
+ protected AbstractQueryRouter queryRouter = new EngineQueryRouter();
@Override
public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
index af74bf6..323bb82 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
@@ -51,7 +51,7 @@ public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngineDataSet {
protected List<IAggregateReader> sequenceReaderList;
private List<BatchData> batchDataList;
private List<Boolean> hasCachedSequenceDataList;
- private Filter timeFilter;
+ protected Filter timeFilter;
/**
* constructor.
@@ -96,7 +96,6 @@ public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngineDataSet {
sequenceReaderList.add(sequenceReader);
unSequenceReaderList.add(unSeqMergeReader);
}
-
}
@Override
@@ -128,7 +127,7 @@ public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngineDataSet {
*
* @param idx series id
*/
- private AggreResultData nextSeries(int idx) throws IOException, ProcessorException {
+ protected AggreResultData nextSeries(int idx) throws IOException, ProcessorException {
IPointReader unsequenceReader = unSequenceReaderList.get(idx);
IAggregateReader sequenceReader = sequenceReaderList.get(idx);
AggregateFunction function = functions.get(idx);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 2991ff7..528b378 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -40,8 +40,8 @@ import org.apache.iotdb.tsfile.utils.Pair;
public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
- private List<EngineReaderByTimeStamp> allDataReaderList;
- private TimeGenerator timestampGenerator;
+ protected List<EngineReaderByTimeStamp> allDataReaderList;
+ protected TimeGenerator timestampGenerator;
/**
* cached timestamp for next group by partition.
*/
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractQueryRouter.java
similarity index 59%
rename from iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
rename to iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractQueryRouter.java
index 01c1aed..20f0f9d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractQueryRouter.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.query.executor;
import java.io.IOException;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -34,18 +35,18 @@ import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Pair;
-public interface IEngineQueryRouter {
+public abstract class AbstractQueryRouter {
/**
* Execute physical plan.
*/
- QueryDataSet query(QueryExpression queryExpression, QueryContext context)
+ public abstract QueryDataSet query(QueryExpression queryExpression, QueryContext context)
throws FileNodeManagerException, PathErrorException;
/**
* Execute aggregation query.
*/
- QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
+ public abstract QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
IExpression expression, QueryContext context)
throws QueryFilterOptimizationException, FileNodeManagerException, IOException, PathErrorException, ProcessorException;
@@ -60,7 +61,7 @@ public interface IEngineQueryRouter {
* each TimeUnit time from this point forward and backward.
* @param intervals time intervals, closed interval.
*/
- QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
+ public abstract QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals,
QueryContext context)
throws ProcessorException, QueryFilterOptimizationException, FileNodeManagerException,
@@ -72,7 +73,48 @@ public interface IEngineQueryRouter {
* @param queryTime timestamp
* @param fillType type IFill map
*/
- QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType,
+ public abstract QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType,
QueryContext context) throws FileNodeManagerException, PathErrorException, IOException;
+
+ /**
+ * sort intervals by start time and merge overlapping intervals.
+ *
+ * @param intervals time interval
+ */
+ protected List<Pair<Long, Long>> mergeInterval(List<Pair<Long, Long>> intervals) {
+ // sort by interval start time.
+ intervals.sort(((o1, o2) -> (int) (o1.left - o2.left)));
+
+ LinkedList<Pair<Long, Long>> merged = new LinkedList<>();
+ for (Pair<Long, Long> interval : intervals) {
+ // if the list of merged intervals is empty or
+ // if the current interval does not overlap with the previous, simply append it.
+ if (merged.isEmpty() || merged.getLast().right < interval.left) {
+ merged.add(interval);
+ } else {
+ // otherwise, there is overlap, so we merge the current and previous intervals.
+ merged.getLast().right = Math.max(merged.getLast().right, interval.right);
+ }
+ }
+ return merged;
+ }
+
+ /**
+ * Check the legitimacy of intervals
+ */
+ protected void checkIntervals(List<Pair<Long, Long>> intervals) throws ProcessorException {
+ for (Pair<Long, Long> pair : intervals) {
+ if (!(pair.left > 0 && pair.right > 0)) {
+ throw new ProcessorException(
+ String.format("Time interval<%d, %d> must be greater than 0.", pair.left, pair.right));
+ }
+ if (pair.right < pair.left) {
+ throw new ProcessorException(String.format(
+ "Interval starting time must be greater than the interval ending time, "
+ + "found error interval<%d, %d>", pair.left, pair.right));
+ }
+ }
+ }
+
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index 32fcbf7..c46c6f1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.query.executor;
import java.io.IOException;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -47,7 +46,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
* Query entrance class of IoTDB query process. All query clause will be transformed to physical
* plan, physical plan will be executed by EngineQueryRouter.
*/
-public class EngineQueryRouter implements IEngineQueryRouter{
+public class EngineQueryRouter extends AbstractQueryRouter {
@Override
public QueryDataSet query(QueryExpression queryExpression, QueryContext context)
@@ -110,18 +109,9 @@ public class EngineQueryRouter implements IEngineQueryRouter{
long nextJobId = context.getJobId();
- // check the legitimacy of intervals
- for (Pair<Long, Long> pair : intervals) {
- if (!(pair.left > 0 && pair.right > 0)) {
- throw new ProcessorException(
- String.format("Time interval<%d, %d> must be greater than 0.", pair.left, pair.right));
- }
- if (pair.right < pair.left) {
- throw new ProcessorException(String.format(
- "Interval starting time must be greater than the interval ending time, "
- + "found error interval<%d, %d>", pair.left, pair.right));
- }
- }
+ //check the legitimacy of intervals
+ checkIntervals(intervals);
+
// merge intervals
List<Pair<Long, Long>> mergedIntervalList = mergeInterval(intervals);
@@ -173,28 +163,4 @@ public class EngineQueryRouter implements IEngineQueryRouter{
return fillEngineExecutor.execute(context);
}
- /**
- * sort intervals by start time and merge overlapping intervals.
- *
- * @param intervals time interval
- */
- private List<Pair<Long, Long>> mergeInterval(List<Pair<Long, Long>> intervals) {
- // sort by interval start time.
- intervals.sort(((o1, o2) -> (int) (o1.left - o2.left)));
-
- LinkedList<Pair<Long, Long>> merged = new LinkedList<>();
- for (Pair<Long, Long> interval : intervals) {
- // if the list of merged intervals is empty or
- // if the current interval does not overlap with the previous, simply append it.
- if (merged.isEmpty() || merged.getLast().right < interval.left) {
- merged.add(interval);
- } else {
- // otherwise, there is overlap, so we merge the current and previous intervals.
- merged.getLast().right = Math.max(merged.getLast().right, interval.right);
- }
- }
- return merged;
- }
-
-
}