You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 09:34:45 UTC

[incubator-iotdb] 01/04: add group by features

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2bb0a0419b8da24ca5d923bbb486dd8e37ab67ad
Author: lta <li...@163.com>
AuthorDate: Tue May 21 15:47:54 2019 +0800

    add group by features
---
 .../ClusterGroupByDataSetWithOnlyTimeFilter.java   | 154 +++++++++++++++++++++
 .../ClusterGroupByDataSetWithTimeGenerator.java    |  91 ++++++++++++
 .../executor/ClusterAggregateEngineExecutor.java   |   6 +-
 .../cluster/query/executor/ClusterQueryRouter.java |  62 ++++++++-
 .../querynode/ClusterLocalSingleQueryManager.java  |  59 +++++++-
 .../ClusterFilterSeriesBatchReaderEntity.java      |   2 +-
 ...lusterGroupBySelectSeriesBatchReaderEntity.java |  79 +++++++++++
 .../querynode/ClusterSelectSeriesBatchReader.java  |   4 +-
 .../ClusterSelectSeriesBatchReaderEntity.java      |   7 +-
 ...y.java => IClusterSeriesBatchReaderEntity.java} |   2 +-
 .../query/utils/QueryPlanPartitionUtils.java       |  38 ++++-
 .../iotdb/db/qp/executor/QueryProcessExecutor.java |   5 +-
 .../groupby/GroupByWithOnlyTimeFilterDataSet.java  |   5 +-
 .../groupby/GroupByWithValueFilterDataSet.java     |   4 +-
 ...neQueryRouter.java => AbstractQueryRouter.java} |  52 ++++++-
 .../iotdb/db/query/executor/EngineQueryRouter.java |  42 +-----
 16 files changed, 536 insertions(+), 76 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
new file mode 100644
index 0000000..98460e4
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.dataset;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
+import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+/**
+ * Handle group by query with only time filter
+ */
+public class ClusterGroupByDataSetWithOnlyTimeFilter extends GroupByWithOnlyTimeFilterDataSet {
+
+  private ClusterRpcSingleQueryManager queryManager;
+  private List<IPointReader> readersOfSelectedSeries;
+
+  /**
+   * constructor.
+   */
+  public ClusterGroupByDataSetWithOnlyTimeFilter(long jobId,
+      List<Path> paths, long unit, long origin,
+      List<Pair<Long, Long>> mergedIntervals, ClusterRpcSingleQueryManager queryManager) {
+    super(jobId, paths, unit, origin, mergedIntervals);
+    this.queryManager  =queryManager;
+    this.readersOfSelectedSeries = new ArrayList<>();
+  }
+
+
+  /**
+   * init reader and aggregate function.
+   */
+  public void initGroupBy(QueryContext context, List<String> aggres, IExpression expression)
+      throws FileNodeManagerException, PathErrorException, ProcessorException, IOException {
+    initAggreFuction(aggres);
+
+    /** add query token for query series which can handle locally **/
+    List<Path> localQuerySeries = new ArrayList<>(selectedSeries);
+    Set<Path> remoteQuerySeries = new HashSet<>();
+    queryManager.getSelectSeriesGroupEntityMap().values().forEach(
+        selectSeriesGroupEntity -> remoteQuerySeries
+            .addAll(selectSeriesGroupEntity.getSelectPaths()));
+    localQuerySeries.removeAll(remoteQuerySeries);
+    QueryResourceManager.getInstance()
+        .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries);
+    if (expression != null) {
+      timeFilter = ((GlobalTimeExpression) expression).getFilter();
+    }
+
+    Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = queryManager
+        .getSelectSeriesGroupEntityMap();
+    //Mark filter series reader index group by group id
+    Map<String, Integer> selectSeriesReaderIndex = new HashMap<>();
+    for (int i = 0; i < selectedSeries.size(); i++) {
+      Path path = selectedSeries.get(i);
+      String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
+      if (selectSeriesGroupEntityMap.containsKey(groupId)) {
+        int index = selectSeriesReaderIndex.getOrDefault(groupId, 0);
+        ClusterSelectSeriesReader reader = selectSeriesGroupEntityMap.get(groupId)
+            .getSelectSeriesReaders().get(index);
+        readersOfSelectedSeries.add(reader);
+        selectSeriesReaderIndex.put(groupId, index + 1);
+      } else {
+        readersOfSelectedSeries.add(null);
+        QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+            .getQueryDataSource(selectedSeries.get(i), context);
+
+        // sequence reader for sealed tsfile, unsealed tsfile, memory
+        SequenceDataReader sequenceReader = new SequenceDataReader(
+            queryDataSource.getSeqDataSource(),
+            timeFilter, context, false);
+
+        // unseq reader for all chunk groups in unSeqFile, memory
+        PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
+            .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
+
+        sequenceReaderList.add(sequenceReader);
+        unSequenceReaderList.add(unSeqMergeReader);
+      }
+    }
+  }
+
+  @Override
+  public RowRecord next() throws IOException {
+    if (!hasCachedTimeInterval) {
+      throw new IOException("need to call hasNext() before calling next() "
+          + "in GroupByWithOnlyTimeFilterDataSet.");
+    }
+    hasCachedTimeInterval = false;
+    RowRecord record = new RowRecord(startTime);
+    for (int i = 0; i < functions.size(); i++) {
+      IPointReader reader = readersOfSelectedSeries.get(i);
+      if(reader != null){
+        TimeValuePair timeValuePair = reader.next();
+        record.addField(getField(timeValuePair.getValue().getValue(), dataTypes.get(i)));
+      }else {
+        AggreResultData res;
+        try {
+          res = nextSeries(i);
+        } catch (ProcessorException e) {
+          throw new IOException(e);
+        }
+        if (res == null) {
+          record.addField(new Field(null));
+        } else {
+          record.addField(getField(res));
+        }
+      }
+    }
+    return record;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java
new file mode 100644
index 0000000..00f2d88
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.dataset;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
+import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class ClusterGroupByDataSetWithTimeGenerator extends GroupByWithValueFilterDataSet {
+
+  private ClusterRpcSingleQueryManager queryManager;
+
+  private List<TSDataType> selectSeriesDataTypes;
+
+  /**
+   * constructor.
+   */
+  public ClusterGroupByDataSetWithTimeGenerator(long jobId,
+      List<Path> paths, long unit, long origin,
+      List<Pair<Long, Long>> mergedIntervals, ClusterRpcSingleQueryManager queryManager) {
+    super(jobId, paths, unit, origin, mergedIntervals);
+    this.queryManager = queryManager;
+    selectSeriesDataTypes = new ArrayList<>();
+  }
+
+  /**
+   * init reader and aggregate function.
+   */
+  @Override
+  public void initGroupBy(QueryContext context, List<String> aggres, IExpression expression)
+      throws FileNodeManagerException, PathErrorException, ProcessorException, IOException {
+    initAggreFuction(aggres);
+
+    /** add query token for filter series which can handle locally **/
+    Set<String> deviceIdSet = new HashSet<>();
+    for (FilterSeriesGroupEntity filterSeriesGroupEntity : queryManager
+        .getFilterSeriesGroupEntityMap().values()) {
+      List<Path> remoteFilterSeries = filterSeriesGroupEntity.getFilterPaths();
+      remoteFilterSeries.forEach(seriesPath -> deviceIdSet.add(seriesPath.getDevice()));
+    }
+    QueryResourceManager.getInstance()
+        .beginQueryOfGivenExpression(context.getJobId(), expression, deviceIdSet);
+
+    /** add query token for query series which can handle locally **/
+    List<Path> localQuerySeries = new ArrayList<>(selectedSeries);
+    Set<Path> remoteQuerySeries = new HashSet<>();
+    queryManager.getSelectSeriesGroupEntityMap().values().forEach(
+        selectSeriesGroupEntity -> remoteQuerySeries
+            .addAll(selectSeriesGroupEntity.getSelectPaths()));
+    localQuerySeries.removeAll(remoteQuerySeries);
+    QueryResourceManager.getInstance()
+        .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries);
+
+    this.timestampGenerator = new ClusterTimeGenerator(expression, context, queryManager);
+    this.allDataReaderList = ClusterSeriesReaderFactory
+        .createReadersByTimestampOfSelectedPaths(selectedSeries, context, queryManager,
+            selectSeriesDataTypes);
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
index 2cf4e87..808eab8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
@@ -25,8 +25,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.iotdb.cluster.config.ClusterConfig;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
@@ -151,8 +149,8 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
     List<Path> localQuerySeries = new ArrayList<>(selectedSeries);
     Set<Path> remoteQuerySeries = new HashSet<>();
     queryManager.getSelectSeriesGroupEntityMap().values().forEach(
-        selectSeriesGroupEntity -> selectSeriesGroupEntity.getSelectPaths()
-            .forEach(path -> remoteQuerySeries.add(path)));
+        selectSeriesGroupEntity -> remoteQuerySeries
+            .addAll(selectSeriesGroupEntity.getSelectPaths()));
     localQuerySeries.removeAll(remoteQuerySeries);
     QueryResourceManager.getInstance()
         .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index 5d6a342..f9d32f5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -23,14 +23,16 @@ import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.query.QueryType;
+import org.apache.iotdb.cluster.query.dataset.ClusterGroupByDataSetWithOnlyTimeFilter;
+import org.apache.iotdb.cluster.query.dataset.ClusterGroupByDataSetWithTimeGenerator;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.AbstractQueryRouter;
 import org.apache.iotdb.db.query.executor.AggregateEngineExecutor;
-import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
 import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -38,7 +40,10 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.ExpressionType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -46,7 +51,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
  * Query entrance class of cluster query process. All query clause will be transformed to physical
  * plan, physical plan will be executed by ClusterQueryRouter.
  */
-public class ClusterQueryRouter implements IEngineQueryRouter {
+public class ClusterQueryRouter extends AbstractQueryRouter {
 
   /**
    * Consistency level of reading data
@@ -128,7 +133,55 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
       IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals,
       QueryContext context)
       throws ProcessorException, QueryFilterOptimizationException, FileNodeManagerException, PathErrorException, IOException {
-    throw new UnsupportedOperationException();
+
+    long jobId = context.getJobId();
+    ClusterRpcSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance()
+        .getSingleQuery(jobId);
+
+    //check the legitimacy of intervals
+    checkIntervals(intervals);
+
+    // merge intervals
+    List<Pair<Long, Long>> mergedIntervalList = mergeInterval(intervals);
+
+    // construct groupBy intervals filter
+    BinaryExpression intervalFilter = null;
+    for (Pair<Long, Long> pair : mergedIntervalList) {
+      BinaryExpression pairFilter = BinaryExpression
+          .and(new GlobalTimeExpression(TimeFilter.gtEq(pair.left)),
+              new GlobalTimeExpression(TimeFilter.ltEq(pair.right)));
+      if (intervalFilter != null) {
+        intervalFilter = BinaryExpression.or(intervalFilter, pairFilter);
+      } else {
+        intervalFilter = pairFilter;
+      }
+    }
+
+    // merge interval filter and filtering conditions after where statements
+    if (expression == null) {
+      expression = intervalFilter;
+    } else {
+      expression = BinaryExpression.and(expression, intervalFilter);
+    }
+
+    IExpression optimizedExpression = ExpressionOptimizer.getInstance()
+        .optimize(expression, selectedSeries);
+    try {
+      if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
+        ClusterGroupByDataSetWithOnlyTimeFilter groupByEngine = new ClusterGroupByDataSetWithOnlyTimeFilter(
+            jobId, selectedSeries, unit, origin, mergedIntervalList, queryManager);
+        groupByEngine.initGroupBy(context, aggres, optimizedExpression);
+        return groupByEngine;
+      } else {
+        queryManager.initQueryResource(QueryType.FILTER, getReadDataConsistencyLevel());
+        ClusterGroupByDataSetWithTimeGenerator groupByEngine = new ClusterGroupByDataSetWithTimeGenerator(
+            jobId, selectedSeries, unit, origin, mergedIntervalList, queryManager);
+        groupByEngine.initGroupBy(context, aggres, optimizedExpression);
+        return groupByEngine;
+      }
+    } catch (RaftConnectionException e) {
+      throw new FileNodeManagerException(e);
+    }
   }
 
   @Override
@@ -139,7 +192,8 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
     try {
       queryManager.initQueryResource(QueryType.NO_FILTER, getReadDataConsistencyLevel());
 
-      ClusterFillEngineExecutor fillEngineExecutor = new ClusterFillEngineExecutor(fillPaths, queryTime,
+      ClusterFillEngineExecutor fillEngineExecutor = new ClusterFillEngineExecutor(fillPaths,
+          queryTime,
           fillType, queryManager);
       return fillEngineExecutor.execute(context);
     } catch (IOException | RaftConnectionException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index 8799be2..25adbf5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -31,10 +31,10 @@ import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
 import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterFillSelectSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReaderEntity;
+import org.apache.iotdb.cluster.query.reader.querynode.ClusterGroupBySelectSeriesBatchReaderEntity;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderEntity;
-import org.apache.iotdb.cluster.query.reader.querynode.IClusterFilterSeriesBatchReaderEntity;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet;
 import org.apache.iotdb.db.query.executor.AbstractExecutorWithoutTimeGenerator;
 import org.apache.iotdb.db.query.executor.AggregateEngineExecutor;
 import org.apache.iotdb.db.query.fill.IFill;
@@ -105,9 +106,14 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
   private ClusterSelectSeriesBatchReaderEntity selectReaderEntity;
 
   /**
+   * Select reader entity of group by query, which handle group by query with only time filter
+   */
+  private ClusterGroupBySelectSeriesBatchReaderEntity groupBySelectReaderEntity;
+
+  /**
    * Filter reader entity
    */
-  private IClusterFilterSeriesBatchReaderEntity filterReaderEntity;
+  private ClusterFilterSeriesBatchReaderEntity filterReaderEntity;
 
   /**
    * Key is series full path, value is data type of series
@@ -146,7 +152,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
       selectReaderEntity = new ClusterSelectSeriesBatchReaderEntity();
       QueryPlan plan = queryPlanMap.get(PathType.SELECT_PATH);
       if (plan instanceof GroupByPlan) {
-        throw new UnsupportedOperationException();
+        handleGroupBySeriesReader(plan, context, response);
       } else if (plan instanceof AggregationPlan) {
         handleAggreSeriesReader(plan, context, response);
       } else if (plan instanceof FillQueryPlan) {
@@ -206,6 +212,43 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
    *
    * @param queryPlan fill query plan
    */
+  private void handleGroupBySeriesReader(QueryPlan queryPlan, QueryContext context,
+      InitSeriesReaderResponse response)
+      throws FileNodeManagerException, PathErrorException, IOException, ProcessorException, QueryFilterOptimizationException {
+    if (queryPlan.getExpression() == null
+        || queryPlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
+      handleGroupBySeriesReaderWithoutTimeGenerator(queryPlan, context, response);
+    } else {
+      handleSelectReaderWithTimeGenerator(queryPlan, context, response);
+    }
+  }
+
+
+  /**
+   * Handle aggregation series reader without value filter
+   *
+   * @param queryPlan fill query plan
+   */
+  private void handleGroupBySeriesReaderWithoutTimeGenerator(QueryPlan queryPlan,
+      QueryContext context,
+      InitSeriesReaderResponse response)
+      throws FileNodeManagerException, PathErrorException, IOException, ProcessorException, QueryFilterOptimizationException {
+    QueryDataSet queryDataSet = queryProcessExecutor.processQuery(queryPlan, context);
+    List<Path> paths = queryDataSet.getPaths();
+    List<TSDataType> dataTypes = queryDataSet.getDataTypes();
+    for (int i = 0; i < paths.size(); i++) {
+      dataTypeMap.put(paths.get(i).getFullPath(), dataTypes.get(i));
+    }
+    groupBySelectReaderEntity = new ClusterGroupBySelectSeriesBatchReaderEntity(paths, dataTypes,
+        (GroupByWithOnlyTimeFilterDataSet) queryDataSet);
+    response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes);
+  }
+
+  /**
+   * Handle aggregation series reader
+   *
+   * @param queryPlan fill query plan
+   */
   private void handleAggreSeriesReader(QueryPlan queryPlan, QueryContext context,
       InitSeriesReaderResponse response)
       throws FileNodeManagerException, PathErrorException, IOException, ProcessorException {
@@ -304,8 +347,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
   private void handleFilterSeriesReader(QueryPlan plan, QueryContext context,
       InitSeriesReaderRequest request, InitSeriesReaderResponse response, PathType pathType)
       throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException, ProcessorException, IOException, ClassNotFoundException {
-    QueryDataSet queryDataSet = queryProcessExecutor
-        .processQuery(plan, context);
+    QueryDataSet queryDataSet = queryProcessExecutor.processQuery(plan, context);
     List<Path> paths = plan.getPaths();
     List<TSDataType> dataTypes = queryDataSet.getDataTypes();
     for (int i = 0; i < paths.size(); i++) {
@@ -353,7 +395,10 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
       PathType pathType = request.getPathType();
       List<BatchData> batchDataList;
       if (pathType == PathType.SELECT_PATH) {
-        batchDataList = readSelectSeriesBatchData(request.getSeriesPathIndexs());
+        // check whether it's a group by query with only time filter
+        batchDataList =
+            groupBySelectReaderEntity != null ? groupBySelectReaderEntity.nextBatchList()
+                : readSelectSeriesBatchData(request.getSeriesPathIndexs());
       } else {
         batchDataList = readFilterSeriesBatchData();
       }
@@ -435,7 +480,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
     return selectReaderEntity;
   }
 
-  public IClusterFilterSeriesBatchReaderEntity getFilterReaderEntity() {
+  public ClusterFilterSeriesBatchReaderEntity getFilterReaderEntity() {
     return filterReaderEntity;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java
index 65f8c1c..ddcb35d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 /**
  * Batch reader entity for all filter paths.
  */
-public class ClusterFilterSeriesBatchReaderEntity implements IClusterFilterSeriesBatchReaderEntity {
+public class ClusterFilterSeriesBatchReaderEntity implements IClusterSeriesBatchReaderEntity {
 
   private List<Path> allFilterPath;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
new file mode 100644
index 0000000..3b7fabe
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.querynode;
+
+import static org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader.CLUSTER_CONF;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+public class ClusterGroupBySelectSeriesBatchReaderEntity implements
+    IClusterSeriesBatchReaderEntity {
+
+  private List<Path> paths;
+  private List<TSDataType> dataTypes;
+
+  private GroupByWithOnlyTimeFilterDataSet queryDataSet;
+
+  public ClusterGroupBySelectSeriesBatchReaderEntity(
+      List<Path> paths,
+      List<TSDataType> dataTypes,
+      GroupByWithOnlyTimeFilterDataSet queryDataSet) {
+    this.paths = paths;
+    this.dataTypes = dataTypes;
+    this.queryDataSet = queryDataSet;
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return queryDataSet.hasNext();
+  }
+
+  @Override
+  public List<BatchData> nextBatchList() throws IOException {
+    List<BatchData> batchDataList = new ArrayList<>(paths.size());
+    for (int i = 0; i < paths.size(); i++) {
+      batchDataList.add(new BatchData(dataTypes.get(i), true));
+    }
+    int dataPointCount = 0;
+    while (true) {
+      if (!hasNext() || dataPointCount == CLUSTER_CONF.getBatchReadSize()) {
+        break;
+      }
+      dataPointCount++;
+      RowRecord rowRecord = queryDataSet.next();
+      long time = rowRecord.getTimestamp();
+      List<Field> fieldList = rowRecord.getFields();
+      for (int j = 0; j < paths.size(); j++) {
+        BatchData batchData = batchDataList.get(j);
+        Object value = fieldList.get(j).getObjectValue(dataTypes.get(j));
+        batchData.putTime(time);
+        batchData.putAnObject(value);
+      }
+    }
+    return batchDataList;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
index cfc43b8..28b6346 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
@@ -51,6 +51,9 @@ public class ClusterSelectSeriesBatchReader extends
     this.reader = reader;
   }
 
+  public ClusterSelectSeriesBatchReader() {
+  }
+
   @Override
   public boolean hasNext() throws IOException {
     return reader.hasNext();
@@ -62,7 +65,6 @@ public class ClusterSelectSeriesBatchReader extends
     for (int i = 0; i < CLUSTER_CONF.getBatchReadSize(); i++) {
       if (hasNext()) {
         TimeValuePair pair = reader.next();
-        System.out.println("reader value:" + pair);
         batchData.putTime(pair.getTimestamp());
         batchData.putAnObject(pair.getValue().getValue());
       } else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
index f0dea38..484c423 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
@@ -25,15 +25,16 @@ import java.util.List;
  * Batch reader entity for all select paths.
  */
 public class ClusterSelectSeriesBatchReaderEntity {
+
   /**
    * All select paths
    */
-  List<String> paths;
+  private List<String> paths;
 
   /**
    * All select readers
    */
-  List<AbstractClusterSelectSeriesBatchReader> readers;
+  private List<AbstractClusterSelectSeriesBatchReader> readers;
 
   public ClusterSelectSeriesBatchReaderEntity() {
     paths = new ArrayList<>();
@@ -52,7 +53,7 @@ public class ClusterSelectSeriesBatchReaderEntity {
     return readers;
   }
 
-  public AbstractClusterSelectSeriesBatchReader getReaderByIndex(int index){
+  public AbstractClusterSelectSeriesBatchReader getReaderByIndex(int index) {
     return readers.get(index);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSeriesBatchReaderEntity.java
similarity index 95%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSeriesBatchReaderEntity.java
index a045e2a..e6e9e86 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSeriesBatchReaderEntity.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 /**
  * Batch reader for filter series which is used in query node.
  */
-public interface IClusterFilterSeriesBatchReaderEntity {
+public interface IClusterSeriesBatchReaderEntity {
 
   boolean hasNext() throws IOException;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
index 3a2746f..03e4e7f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
@@ -55,10 +55,10 @@ public class QueryPlanPartitionUtils {
     QueryPlan queryPLan = singleQueryManager.getOriginQueryPlan();
     if (queryPLan instanceof FillQueryPlan) {
       splitFillPlan(singleQueryManager);
-    } else if (queryPLan instanceof AggregationPlan) {
-      splitAggregationPlanBySelectPath(singleQueryManager);
     } else if (queryPLan instanceof GroupByPlan) {
       splitGroupByPlanBySelectPath(singleQueryManager);
+    } else if (queryPLan instanceof AggregationPlan) {
+      splitAggregationPlanBySelectPath(singleQueryManager);
     } else {
       splitQueryPlanBySelectPath(singleQueryManager);
     }
@@ -136,8 +136,38 @@ public class QueryPlanPartitionUtils {
    * Split group by plan by select path
    */
   private static void splitGroupByPlanBySelectPath(
-      ClusterRpcSingleQueryManager singleQueryManager) {
-    throw new UnsupportedOperationException();
+      ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException {
+    GroupByPlan queryPlan = (GroupByPlan) singleQueryManager.getOriginQueryPlan();
+    List<Path> selectPaths = queryPlan.getPaths();
+    List<String> aggregations = queryPlan.getAggregations();
+    Map<String, SelectSeriesGroupEntity> selectGroupEntityMap = singleQueryManager
+        .getSelectSeriesGroupEntityMap();
+    Map<String, List<String>> selectAggregationByGroupId = new HashMap<>();
+    for (int i = 0; i < selectPaths.size(); i++) {
+      String aggregation = aggregations.get(i);
+      Path path = selectPaths.get(i);
+      String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
+      if (!selectGroupEntityMap.containsKey(groupId)) {
+        selectGroupEntityMap.put(groupId, new SelectSeriesGroupEntity(groupId));
+        selectAggregationByGroupId.put(groupId, new ArrayList<>());
+      }
+      selectGroupEntityMap.get(groupId).addSelectPaths(path);
+      selectAggregationByGroupId.get(groupId).add(aggregation);
+    }
+    for (Entry<String, SelectSeriesGroupEntity> entry : selectGroupEntityMap.entrySet()) {
+      String groupId = entry.getKey();
+      SelectSeriesGroupEntity entity = entry.getValue();
+      List<Path> paths = entity.getSelectPaths();
+      GroupByPlan subQueryPlan = new GroupByPlan();
+      subQueryPlan.setIntervals(queryPlan.getIntervals());
+      subQueryPlan.setOrigin(queryPlan.getOrigin());
+      subQueryPlan.setUnit(queryPlan.getUnit());
+      subQueryPlan.setProposer(queryPlan.getProposer());
+      subQueryPlan.setPaths(paths);
+      subQueryPlan.setExpression(queryPlan.getExpression());
+      subQueryPlan.setAggregations(selectAggregationByGroupId.get(groupId));
+      entity.setQueryPlan(subQueryPlan);
+    }
   }
 
   /**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 99476f2..9bf87c2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -27,14 +27,13 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.EngineQueryRouter;
-import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
+import org.apache.iotdb.db.query.executor.AbstractQueryRouter;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
@@ -43,7 +42,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 public abstract class QueryProcessExecutor implements IQueryProcessExecutor {
 
   protected ThreadLocal<Integer> fetchSize = new ThreadLocal<>();
-  protected IEngineQueryRouter queryRouter = new EngineQueryRouter();
+  protected AbstractQueryRouter queryRouter = new EngineQueryRouter();
 
   @Override
   public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
index af74bf6..323bb82 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
@@ -51,7 +51,7 @@ public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngineDataSet {
   protected List<IAggregateReader> sequenceReaderList;
   private List<BatchData> batchDataList;
   private List<Boolean> hasCachedSequenceDataList;
-  private Filter timeFilter;
+  protected Filter timeFilter;
 
   /**
    * constructor.
@@ -96,7 +96,6 @@ public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngineDataSet {
       sequenceReaderList.add(sequenceReader);
       unSequenceReaderList.add(unSeqMergeReader);
     }
-
   }
 
   @Override
@@ -128,7 +127,7 @@ public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngineDataSet {
    *
    * @param idx series id
    */
-  private AggreResultData nextSeries(int idx) throws IOException, ProcessorException {
+  protected AggreResultData nextSeries(int idx) throws IOException, ProcessorException {
     IPointReader unsequenceReader = unSequenceReaderList.get(idx);
     IAggregateReader sequenceReader = sequenceReaderList.get(idx);
     AggregateFunction function = functions.get(idx);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 2991ff7..528b378 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -40,8 +40,8 @@ import org.apache.iotdb.tsfile.utils.Pair;
 
 public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
 
-  private List<EngineReaderByTimeStamp> allDataReaderList;
-  private TimeGenerator timestampGenerator;
+  protected List<EngineReaderByTimeStamp> allDataReaderList;
+  protected TimeGenerator timestampGenerator;
   /**
    * cached timestamp for next group by partition.
    */
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractQueryRouter.java
similarity index 59%
rename from iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
rename to iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractQueryRouter.java
index 01c1aed..20f0f9d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractQueryRouter.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.query.executor;
 
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -34,18 +35,18 @@ import org.apache.iotdb.tsfile.read.expression.QueryExpression;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-public interface IEngineQueryRouter {
+public abstract class AbstractQueryRouter {
 
   /**
    * Execute physical plan.
    */
-  QueryDataSet query(QueryExpression queryExpression, QueryContext context)
+  public abstract QueryDataSet query(QueryExpression queryExpression, QueryContext context)
       throws FileNodeManagerException, PathErrorException;
 
   /**
    * Execute aggregation query.
    */
-  QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
+  public abstract QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
       IExpression expression, QueryContext context)
       throws QueryFilterOptimizationException, FileNodeManagerException, IOException, PathErrorException, ProcessorException;
 
@@ -60,7 +61,7 @@ public interface IEngineQueryRouter {
    * each TimeUnit time from this point forward and backward.
    * @param intervals time intervals, closed interval.
    */
-  QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
+  public abstract QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
       IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals,
       QueryContext context)
       throws ProcessorException, QueryFilterOptimizationException, FileNodeManagerException,
@@ -72,7 +73,48 @@ public interface IEngineQueryRouter {
    * @param queryTime timestamp
    * @param fillType type IFill map
    */
-  QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType,
+  public abstract QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType,
       QueryContext context) throws FileNodeManagerException, PathErrorException, IOException;
 
+
+  /**
+   * sort intervals by start time and merge overlapping intervals.
+   *
+   * @param intervals time interval
+   */
+  protected List<Pair<Long, Long>> mergeInterval(List<Pair<Long, Long>> intervals) {
+    // sort by interval start time.
+    intervals.sort(((o1, o2) -> (int) (o1.left - o2.left)));
+
+    LinkedList<Pair<Long, Long>> merged = new LinkedList<>();
+    for (Pair<Long, Long> interval : intervals) {
+      // if the list of merged intervals is empty or
+      // if the current interval does not overlap with the previous, simply append it.
+      if (merged.isEmpty() || merged.getLast().right < interval.left) {
+        merged.add(interval);
+      } else {
+        // otherwise, there is overlap, so we merge the current and previous intervals.
+        merged.getLast().right = Math.max(merged.getLast().right, interval.right);
+      }
+    }
+    return merged;
+  }
+
+  /**
+   * Check the legitimacy of intervals
+   */
+  protected void checkIntervals(List<Pair<Long, Long>> intervals) throws ProcessorException {
+    for (Pair<Long, Long> pair : intervals) {
+      if (!(pair.left > 0 && pair.right > 0)) {
+        throw new ProcessorException(
+            String.format("Time interval<%d, %d> must be greater than 0.", pair.left, pair.right));
+      }
+      if (pair.right < pair.left) {
+        throw new ProcessorException(String.format(
+            "Interval starting time must be greater than the interval ending time, "
+                + "found error interval<%d, %d>", pair.left, pair.right));
+      }
+    }
+  }
+
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index 32fcbf7..c46c6f1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.query.executor;
 
 import java.io.IOException;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -47,7 +46,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
  * Query entrance class of IoTDB query process. All query clause will be transformed to physical
  * plan, physical plan will be executed by EngineQueryRouter.
  */
-public class EngineQueryRouter implements IEngineQueryRouter{
+public class EngineQueryRouter extends AbstractQueryRouter {
 
   @Override
   public QueryDataSet query(QueryExpression queryExpression, QueryContext context)
@@ -110,18 +109,9 @@ public class EngineQueryRouter implements IEngineQueryRouter{
 
     long nextJobId = context.getJobId();
 
-    // check the legitimacy of intervals
-    for (Pair<Long, Long> pair : intervals) {
-      if (!(pair.left > 0 && pair.right > 0)) {
-        throw new ProcessorException(
-            String.format("Time interval<%d, %d> must be greater than 0.", pair.left, pair.right));
-      }
-      if (pair.right < pair.left) {
-        throw new ProcessorException(String.format(
-            "Interval starting time must be greater than the interval ending time, "
-                + "found error interval<%d, %d>", pair.left, pair.right));
-      }
-    }
+    //check the legitimacy of intervals
+    checkIntervals(intervals);
+
     // merge intervals
     List<Pair<Long, Long>> mergedIntervalList = mergeInterval(intervals);
 
@@ -173,28 +163,4 @@ public class EngineQueryRouter implements IEngineQueryRouter{
     return fillEngineExecutor.execute(context);
   }
 
-  /**
-   * sort intervals by start time and merge overlapping intervals.
-   *
-   * @param intervals time interval
-   */
-  private List<Pair<Long, Long>> mergeInterval(List<Pair<Long, Long>> intervals) {
-    // sort by interval start time.
-    intervals.sort(((o1, o2) -> (int) (o1.left - o2.left)));
-
-    LinkedList<Pair<Long, Long>> merged = new LinkedList<>();
-    for (Pair<Long, Long> interval : intervals) {
-      // if the list of merged intervals is empty or
-      // if the current interval does not overlap with the previous, simply append it.
-      if (merged.isEmpty() || merged.getLast().right < interval.left) {
-        merged.add(interval);
-      } else {
-        // otherwise, there is overlap, so we merge the current and previous intervals.
-        merged.getLast().right = Math.max(merged.getLast().right, interval.right);
-      }
-    }
-    return merged;
-  }
-
-
 }