You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 01:51:29 UTC

[incubator-iotdb] 02/11: add aggre feature without timegenerator

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit fe9937a8cfe6db48dd4b4d2cff31b10ebbc91f77
Author: lta <li...@163.com>
AuthorDate: Thu May 16 18:41:26 2019 +0800

    add aggre feature without timegenerator
---
 .../executor/ClusterAggregateEngineExecutor.java   | 114 ++++++++++++++++
 .../cluster/query/executor/ClusterQueryRouter.java |  29 +++-
 .../querynode/ClusterLocalSingleQueryManager.java  |  85 ++++++++++--
 .../query/utils/QueryPlanPartitionUtils.java       | 146 +++++++++++++++------
 .../db/qp/executor/IQueryProcessExecutor.java      |   4 +-
 .../db/query/executor/AggregateEngineExecutor.java |  75 +++++++----
 .../iotdb/db/query/executor/EngineQueryRouter.java |   4 +-
 7 files changed, 378 insertions(+), 79 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
new file mode 100644
index 0000000..b63b311
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.executor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc;
+import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader;
+import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
+import org.apache.iotdb.db.query.executor.AggregateEngineExecutor;
+import org.apache.iotdb.db.query.factory.AggreFuncFactory;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
+
+  private ClusterRpcSingleQueryManager queryManager;
+
+  public ClusterAggregateEngineExecutor(List<Path> selectedSeries, List<String> aggres,
+      IExpression expression, ClusterRpcSingleQueryManager queryManager) {
+    super(selectedSeries, aggres, expression);
+    this.queryManager = queryManager;
+  }
+
+  @Override
+  public QueryDataSet executeWithoutTimeGenerator(QueryContext context)
+      throws FileNodeManagerException, IOException, PathErrorException, ProcessorException {
+    Filter timeFilter = expression != null ? ((GlobalTimeExpression) expression).getFilter() : null;
+    Map<Path, ClusterSelectSeriesReader> selectPathReaders = queryManager.getSelectSeriesReaders();
+
+    List<Path> paths = new ArrayList<>();
+    List<IPointReader> readers = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (int i = 0; i < selectedSeries.size(); i++) {
+      Path path = selectedSeries.get(i);
+
+      if (selectPathReaders.containsKey(path)) {
+        ClusterSelectSeriesReader reader = selectPathReaders.get(path);
+        readers.add(reader);
+        dataTypes.add(reader.getDataType());
+      } else {
+        paths.add(path);
+        // construct AggregateFunction
+        TSDataType tsDataType = MManager.getInstance()
+            .getSeriesType(selectedSeries.get(i).getFullPath());
+        AggregateFunction function = AggreFuncFactory.getAggrFuncByName(aggres.get(i), tsDataType);
+        function.init();
+
+        QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+            .getQueryDataSource(selectedSeries.get(i), context);
+
+        // sequence reader for sealed tsfile, unsealed tsfile, memory
+        SequenceDataReader sequenceReader;
+        if (function instanceof MaxTimeAggrFunc || function instanceof LastAggrFunc) {
+          sequenceReader = new SequenceDataReader(queryDataSource.getSeqDataSource(), timeFilter,
+              context, true);
+        } else {
+          sequenceReader = new SequenceDataReader(queryDataSource.getSeqDataSource(), timeFilter,
+              context, false);
+        }
+
+        // unseq reader for all chunk groups in unSeqFile, memory
+        PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
+            .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
+
+        AggreResultData aggreResultData = aggregateWithoutTimeGenerator(function,
+            sequenceReader, unSeqMergeReader, timeFilter);
+        readers.add(new AggreResultDataPointReader(aggreResultData));
+      }
+    }
+    QueryResourceManager.getInstance()
+        .beginQueryOfGivenQueryPaths(context.getJobId(), paths);
+
+    return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, readers);
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index 2fa4576..672ca9d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.executor.FillEngineExecutor;
+import org.apache.iotdb.db.query.executor.AggregateEngineExecutor;
 import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
 import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -95,7 +95,32 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
   public QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
       IExpression expression, QueryContext context)
       throws QueryFilterOptimizationException, FileNodeManagerException, IOException, PathErrorException, ProcessorException {
-    throw new UnsupportedOperationException();
+
+    ClusterRpcSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance()
+        .getSingleQuery(context.getJobId());
+
+    try {
+      if (expression != null) {
+        IExpression optimizedExpression = ExpressionOptimizer.getInstance()
+            .optimize(expression, selectedSeries);
+        AggregateEngineExecutor engineExecutor = new ClusterAggregateEngineExecutor(
+            selectedSeries, aggres, optimizedExpression, queryManager);
+        if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
+          queryManager.initQueryResource(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel());
+          return engineExecutor.executeWithoutTimeGenerator(context);
+        } else {
+          queryManager.initQueryResource(QueryType.FILTER, getReadDataConsistencyLevel());
+          return engineExecutor.executeWithTimeGenerator(context);
+        }
+      } else {
+        AggregateEngineExecutor engineExecutor = new ClusterAggregateEngineExecutor(
+            selectedSeries, aggres, null, queryManager);
+        queryManager.initQueryResource(QueryType.NO_FILTER, getReadDataConsistencyLevel());
+        return engineExecutor.executeWithoutTimeGenerator(context);
+      }
+    } catch (QueryFilterOptimizationException | IOException | RaftConnectionException e) {
+      throw new FileNodeManagerException(e);
+    }
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index f776477..76a141e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -30,9 +30,9 @@ import org.apache.iotdb.cluster.query.PathType;
 import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
 import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterFillSelectSeriesBatchReader;
+import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp;
-import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.IClusterFilterSeriesBatchReader;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.executor.AbstractExecutorWithoutTimeGenerator;
+import org.apache.iotdb.db.query.executor.AggregateEngineExecutor;
 import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.db.query.fill.PreviousFill;
 import org.apache.iotdb.db.query.reader.IPointReader;
@@ -63,7 +64,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.ExpressionType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.slf4j.Logger;
@@ -134,16 +137,11 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
       if (plan instanceof GroupByPlan) {
         throw new UnsupportedOperationException();
       } else if (plan instanceof AggregationPlan) {
-        throw new UnsupportedOperationException();
+        handleAggreSeriesReader(plan, context, response);
       } else if (plan instanceof FillQueryPlan) {
-        handleFillSeriesRerader(plan, context, response);
+        handleFillSeriesReader(plan, context, response);
       } else {
-        if (plan.getExpression() == null
-            || plan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
-          handleSelectReaderWithoutTimeGenerator(plan, context, response);
-        } else {
-          handleSelectReaderWithTimeGenerator(plan, context, response);
-        }
+        handleSelectSeriesReader(plan, context, response);
       }
     }
     if (queryPlanMap.containsKey(PathType.FILTER_PATH)) {
@@ -158,7 +156,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
    *
    * @param queryPlan fill query plan
    */
-  private void handleFillSeriesRerader(QueryPlan queryPlan, QueryContext context,
+  private void handleFillSeriesReader(QueryPlan queryPlan, QueryContext context,
       InitSeriesReaderResponse response)
       throws FileNodeManagerException, PathErrorException, IOException {
     FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan;
@@ -190,6 +188,73 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
     response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes);
   }
 
+
+  /**
+   * Handle aggregation series reader
+   *
+   * @param queryPlan fill query plan
+   */
+  private void handleAggreSeriesReader(QueryPlan queryPlan, QueryContext context,
+      InitSeriesReaderResponse response)
+      throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException {
+    if (queryPlan.getExpression() == null
+        || queryPlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
+      handleAggreSeriesReaderWithoutTimeGenerator(queryPlan,context,response);
+    } else {
+      handleSelectReaderWithTimeGenerator(queryPlan, context, response);
+    }
+  }
+
+  /**
+   * Handle aggregation series reader without value filter
+   *
+   * @param queryPlan fill query plan
+   */
+  private void handleAggreSeriesReaderWithoutTimeGenerator(QueryPlan queryPlan, QueryContext context,
+      InitSeriesReaderResponse response)
+      throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException {
+    AggregationPlan fillQueryPlan = (AggregationPlan) queryPlan;
+
+    List<Path> selectedPaths = fillQueryPlan.getPaths();
+    QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedPaths);
+
+    IExpression optimizedExpression = ExpressionOptimizer.getInstance()
+        .optimize(fillQueryPlan.getExpression(), selectedPaths);
+    AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(
+        selectedPaths, fillQueryPlan.getAggregations(), optimizedExpression);
+
+    List<IPointReader> readers = engineExecutor.constructAggreReadersWithoutTimeGenerator(context);
+
+    List<TSDataType> dataTypes = engineExecutor.getDataTypes();
+
+    for (int i =0 ; i < selectedPaths.size(); i ++) {
+      Path path = selectedPaths.get(i);
+      selectSeriesReaders.put(path.getFullPath(),
+          new ClusterSelectSeriesBatchReader(dataTypes.get(i), readers.get(i)));
+      dataTypeMap.put(path.getFullPath(), dataTypes.get(i));
+    }
+
+    response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes);
+  }
+
+  /**
+   * Handle select series query
+   *
+   * @param plan plan query plan
+   * @param context query context
+   * @param response response for coordinator node
+   */
+  private void handleSelectSeriesReader(QueryPlan plan, QueryContext context,
+      InitSeriesReaderResponse response)
+      throws FileNodeManagerException, IOException, PathErrorException {
+    if (plan.getExpression() == null
+        || plan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
+      handleSelectReaderWithoutTimeGenerator(plan, context, response);
+    } else {
+      handleSelectReaderWithTimeGenerator(plan, context, response);
+    }
+  }
+
   /**
    * Handle select series query with no filter or only global time filter
    *
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
index 546282a..fc0d401 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
@@ -32,8 +32,6 @@ import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.db.query.fill.IFill;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.ExpressionType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -55,13 +53,32 @@ public class QueryPlanPartitionUtils {
       throws PathErrorException {
     QueryPlan queryPLan = singleQueryManager.getOriginQueryPlan();
     if (queryPLan instanceof FillQueryPlan) {
-      splitFillPlan((FillQueryPlan)queryPLan, singleQueryManager);
+      splitFillPlan(singleQueryManager);
+    } else if (queryPLan instanceof AggregationPlan) {
+      splitAggregationPlanBySelectPath(singleQueryManager);
+    } else if (queryPLan instanceof GroupByPlan) {
+      splitGroupByPlanBySelectPath(singleQueryManager);
     } else {
       splitQueryPlanBySelectPath(singleQueryManager);
     }
   }
 
   /**
+   * Split query plan with filter.
+   */
+  public static void splitQueryPlanWithValueFilter(
+      ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException {
+    QueryPlan queryPlan = singleQueryManager.getOriginQueryPlan();
+    if (queryPlan instanceof GroupByPlan) {
+      splitGroupByPlanWithFilter(singleQueryManager);
+    } else if (queryPlan instanceof AggregationPlan) {
+      splitAggregationPlanWithFilter(singleQueryManager);
+    } else {
+      splitQueryPlanWithFilter(singleQueryManager);
+    }
+  }
+
+  /**
    * Split query plan by select paths
    */
   private static void splitQueryPlanBySelectPath(ClusterRpcSingleQueryManager singleQueryManager)
@@ -88,33 +105,100 @@ public class QueryPlanPartitionUtils {
     }
   }
 
+
   /**
-   * Split query plan with filter.
+   * Split query plan by filter paths
    */
-  public static void splitQueryPlanWithValueFilter(
-      ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException {
+  private static void splitQueryPlanByFilterPath(ClusterRpcSingleQueryManager singleQueryManager)
+      throws PathErrorException {
     QueryPlan queryPlan = singleQueryManager.getOriginQueryPlan();
-    if (queryPlan instanceof GroupByPlan) {
-      splitGroupByPlan((GroupByPlan) queryPlan, singleQueryManager);
-    } else if (queryPlan instanceof AggregationPlan) {
-      splitAggregationPlan((AggregationPlan) queryPlan, singleQueryManager);
-    } else {
-      splitQueryPlan(queryPlan, singleQueryManager);
+    // split query plan by filter path
+    Map<String, FilterGroupEntity> filterGroupEntityMap = singleQueryManager
+        .getFilterGroupEntityMap();
+    IExpression expression = queryPlan.getExpression();
+    ExpressionUtils.getAllExpressionSeries(expression, filterGroupEntityMap);
+    for (FilterGroupEntity filterGroupEntity : filterGroupEntityMap.values()) {
+      List<Path> filterSeriesList = filterGroupEntity.getFilterPaths();
+      // create filter sub query plan
+      QueryPlan subQueryPlan = new QueryPlan();
+      subQueryPlan.setPaths(filterSeriesList);
+      IExpression subExpression = ExpressionUtils
+          .pruneFilterTree(expression.clone(), filterSeriesList);
+      if (subExpression.getType() != ExpressionType.TRUE) {
+        subQueryPlan.setExpression(subExpression);
+      }
+      filterGroupEntity.setQueryPlan(subQueryPlan);
     }
   }
 
-  private static void splitGroupByPlan(GroupByPlan groupByPlan,
+  /**
+   * Split group by plan by select path
+   */
+  private static void splitGroupByPlanBySelectPath(
       ClusterRpcSingleQueryManager singleQueryManager) {
     throw new UnsupportedOperationException();
   }
 
-  private static void splitAggregationPlan(AggregationPlan aggregationPlan,
-      ClusterRpcSingleQueryManager singleQueryManager) {
-    throw new UnsupportedOperationException();
+  /**
+   * Split group by plan with filter path
+   */
+  private static void splitGroupByPlanWithFilter(ClusterRpcSingleQueryManager singleQueryManager)
+      throws PathErrorException {
+    splitGroupByPlanBySelectPath(singleQueryManager);
+    splitQueryPlanByFilterPath(singleQueryManager);
   }
 
-  private static void splitFillPlan(FillQueryPlan fillQueryPlan,
-      ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException {
+  /**
+   * Split aggregation plan by select path
+   */
+  private static void splitAggregationPlanBySelectPath(
+      ClusterRpcSingleQueryManager singleQueryManager)
+      throws PathErrorException {
+    AggregationPlan queryPlan = (AggregationPlan) singleQueryManager.getOriginQueryPlan();
+    List<Path> selectPaths = queryPlan.getPaths();
+    List<String> aggregations = new ArrayList<>();
+    Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId();
+    Map<String, List<String>> selectAggregationByGroupId = new HashMap<>();
+    Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
+    for (int i = 0; i < selectPaths.size(); i++) {
+      Path path = selectPaths.get(i);
+      String aggregation = aggregations.get(i);
+      String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
+      if (!selectSeriesByGroupId.containsKey(groupId)) {
+        selectSeriesByGroupId.put(groupId, new ArrayList<>());
+        selectAggregationByGroupId.put(groupId, new ArrayList<>());
+      }
+      selectAggregationByGroupId.get(groupId).add(aggregation);
+      selectSeriesByGroupId.get(groupId).add(path);
+    }
+    for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) {
+      String groupId = entry.getKey();
+      List<Path> paths = entry.getValue();
+      AggregationPlan subQueryPlan = new AggregationPlan();
+      subQueryPlan.setProposer(queryPlan.getProposer());
+      subQueryPlan.setPaths(paths);
+      subQueryPlan.setExpression(queryPlan.getExpression());
+      subQueryPlan.setAggregations(selectAggregationByGroupId.get(groupId));
+      selectPathPlans.put(groupId, subQueryPlan);
+    }
+  }
+
+  /**
+   * Split aggregation plan with filter path
+   */
+  private static void splitAggregationPlanWithFilter(
+      ClusterRpcSingleQueryManager singleQueryManager)
+      throws PathErrorException {
+    splitAggregationPlanBySelectPath(singleQueryManager);
+    splitQueryPlanByFilterPath(singleQueryManager);
+  }
+
+  /**
+   * Split fill plan which only contain select paths.
+   */
+  private static void splitFillPlan(ClusterRpcSingleQueryManager singleQueryManager)
+      throws PathErrorException {
+    FillQueryPlan fillQueryPlan = (FillQueryPlan) singleQueryManager.getOriginQueryPlan();
     List<Path> selectPaths = fillQueryPlan.getPaths();
     Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId();
     Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
@@ -138,25 +222,13 @@ public class QueryPlanPartitionUtils {
     }
   }
 
-  private static void splitQueryPlan(QueryPlan queryPlan,
-      ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException {
+  /**
+   * Split query plan with filter
+   */
+  private static void splitQueryPlanWithFilter(ClusterRpcSingleQueryManager singleQueryManager)
+      throws PathErrorException {
     splitQueryPlanBySelectPath(singleQueryManager);
-    // split query plan by filter path
-    Map<String, FilterGroupEntity> filterGroupEntityMap = singleQueryManager
-        .getFilterGroupEntityMap();
-    IExpression expression = queryPlan.getExpression();
-    ExpressionUtils.getAllExpressionSeries(expression, filterGroupEntityMap);
-    for (FilterGroupEntity filterGroupEntity : filterGroupEntityMap.values()) {
-      List<Path> filterSeriesList = filterGroupEntity.getFilterPaths();
-      // create filter sub query plan
-      QueryPlan subQueryPlan = new QueryPlan();
-      subQueryPlan.setPaths(filterSeriesList);
-      IExpression subExpression = ExpressionUtils
-          .pruneFilterTree(expression.clone(), filterSeriesList);
-      if (subExpression.getType() != ExpressionType.TRUE) {
-        subQueryPlan.setExpression(subExpression);
-      }
-      filterGroupEntity.setQueryPlan(subQueryPlan);
-    }
+    splitQueryPlanByFilterPath(singleQueryManager);
   }
+
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
index 42a2e8b..920aeef 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@ -56,9 +56,7 @@ public interface IQueryProcessExecutor {
       QueryFilterOptimizationException, ProcessorException;
 
   /**
-   * process aggregate plan of qp layer, construct queryDataSet. <<<<<<< HEAD
-   *
-   * ======= >>>>>>> master
+   * process aggregate plan of qp layer, construct queryDataSet.
    */
   QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression,
       QueryContext context)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index 508a787..6c458a5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.db.query.executor;
 
 import java.io.IOException;
@@ -28,7 +27,6 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.query.factory.AggreFuncFactory;
 import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc;
@@ -37,6 +35,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader;
 import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
+import org.apache.iotdb.db.query.factory.AggreFuncFactory;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
@@ -54,9 +53,10 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 public class AggregateEngineExecutor {
 
-  private List<Path> selectedSeries;
-  private List<String> aggres;
-  private IExpression expression;
+  protected List<Path> selectedSeries;
+  protected List<String> aggres;
+  protected IExpression expression;
+  protected List<TSDataType> dataTypes;
 
   /**
    * aggregation batch calculation size.
@@ -72,6 +72,7 @@ public class AggregateEngineExecutor {
     this.aggres = aggres;
     this.expression = expression;
     this.aggregateFetchSize = 10 * IoTDBDescriptor.getInstance().getConfig().getFetchSize();
+    this.dataTypes = new ArrayList<>();
   }
 
   /**
@@ -79,8 +80,19 @@ public class AggregateEngineExecutor {
    *
    * @param context query context
    */
-  public QueryDataSet executeWithOutTimeGenerator(QueryContext context)
+  public QueryDataSet executeWithoutTimeGenerator(QueryContext context)
       throws FileNodeManagerException, IOException, PathErrorException, ProcessorException {
+    List<IPointReader> resultDataPointReaders = constructAggreReadersWithoutTimeGenerator(context);
+    return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, resultDataPointReaders);
+  }
+
+  /**
+   * Construct aggregate readers with only time filter or no filter.
+   *
+   * @param context query context
+   */
+  public List<IPointReader> constructAggreReadersWithoutTimeGenerator(QueryContext context)
+      throws FileNodeManagerException, PathErrorException, IOException, ProcessorException {
     Filter timeFilter = null;
     if (expression != null) {
       timeFilter = ((GlobalTimeExpression) expression).getFilter();
@@ -121,11 +133,17 @@ public class AggregateEngineExecutor {
     List<AggreResultData> aggreResultDataList = new ArrayList<>();
     //TODO use multi-thread
     for (int i = 0; i < selectedSeries.size(); i++) {
-      AggreResultData aggreResultData = aggregateWithOutTimeGenerator(aggregateFunctions.get(i),
+      AggreResultData aggreResultData = aggregateWithoutTimeGenerator(aggregateFunctions.get(i),
           readersOfSequenceData.get(i), readersOfUnSequenceData.get(i), timeFilter);
       aggreResultDataList.add(aggreResultData);
     }
-    return constructDataSet(aggreResultDataList);
+
+    List<IPointReader> resultDataPointReaders = new ArrayList<>();
+    for (AggreResultData resultData : aggreResultDataList) {
+      dataTypes.add(resultData.getDataType());
+      resultDataPointReaders.add(new AggreResultDataPointReader(resultData));
+    }
+    return resultDataPointReaders;
   }
 
   /**
@@ -137,7 +155,7 @@ public class AggregateEngineExecutor {
    * @param filter time filter or null
    * @return one series aggregate result data
    */
-  private AggreResultData aggregateWithOutTimeGenerator(AggregateFunction function,
+  protected AggreResultData aggregateWithoutTimeGenerator(AggregateFunction function,
       SequenceDataReader sequenceReader, IPointReader unSequenceReader, Filter filter)
       throws IOException, ProcessorException {
     if (function instanceof MaxTimeAggrFunc || function instanceof LastAggrFunc) {
@@ -256,6 +274,18 @@ public class AggregateEngineExecutor {
    */
   public QueryDataSet executeWithTimeGenerator(QueryContext context)
       throws FileNodeManagerException, PathErrorException, IOException, ProcessorException {
+    List<IPointReader> resultDataPointReaders = constructAggreReadersWithTimeGenerator(context);
+    return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, resultDataPointReaders);
+  }
+
+
+  /**
+   * Construct aggregate readers with value filter.
+   *
+   * @param context query context
+   */
+  private List<IPointReader> constructAggreReadersWithTimeGenerator(QueryContext context)
+      throws FileNodeManagerException, PathErrorException, IOException, ProcessorException {
     QueryResourceManager
         .getInstance().beginQueryOfGivenQueryPaths(context.getJobId(), selectedSeries);
     QueryResourceManager.getInstance().beginQueryOfGivenExpression(context.getJobId(), expression);
@@ -271,12 +301,19 @@ public class AggregateEngineExecutor {
       function.init();
       aggregateFunctions.add(function);
     }
-    List<AggreResultData> batchDataList = aggregateWithTimeGenerator(aggregateFunctions,
+    List<AggreResultData> aggreResultDataList = aggregateWithTimeGenerator(aggregateFunctions,
         timestampGenerator,
         readersOfSelectedSeries);
-    return constructDataSet(batchDataList);
+
+    List<IPointReader> resultDataPointReaders = new ArrayList<>();
+    for (AggreResultData resultData : aggreResultDataList) {
+      dataTypes.add(resultData.getDataType());
+      resultDataPointReaders.add(new AggreResultDataPointReader(resultData));
+    }
+    return resultDataPointReaders;
   }
 
+
   /**
    * calculation aggregate result with value filter.
    */
@@ -312,19 +349,7 @@ public class AggregateEngineExecutor {
     return aggreResultDataArrayList;
   }
 
-  /**
-   * using aggregate result data list construct QueryDataSet.
-   *
-   * @param aggreResultDataList aggregate result data list
-   */
-  private QueryDataSet constructDataSet(List<AggreResultData> aggreResultDataList)
-      throws IOException {
-    List<TSDataType> dataTypes = new ArrayList<>();
-    List<IPointReader> resultDataPointReaders = new ArrayList<>();
-    for (AggreResultData resultData : aggreResultDataList) {
-      dataTypes.add(resultData.getDataType());
-      resultDataPointReaders.add(new AggreResultDataPointReader(resultData));
-    }
-    return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, resultDataPointReaders);
+  public List<TSDataType> getDataTypes() {
+    return dataTypes;
   }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index 03c600d..32fcbf7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -90,14 +90,14 @@ public class EngineQueryRouter implements IEngineQueryRouter{
       AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(
           selectedSeries, aggres, optimizedExpression);
       if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
-        return engineExecutor.executeWithOutTimeGenerator(context);
+        return engineExecutor.executeWithoutTimeGenerator(context);
       } else {
         return engineExecutor.executeWithTimeGenerator(context);
       }
     } else {
       AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(
           selectedSeries, aggres, null);
-      return engineExecutor.executeWithOutTimeGenerator(context);
+      return engineExecutor.executeWithoutTimeGenerator(context);
     }
   }