You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 01:51:30 UTC

[incubator-iotdb] 03/11: fix a serve bug of filter serializable

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit aa3aa0d943626e3864753dd93ff4585e6f60e3eb
Author: lta <li...@163.com>
AuthorDate: Fri May 17 15:08:50 2019 +0800

    fix a serve bug of filter serializable
---
 .../executor/ClusterAggregateEngineExecutor.java   | 140 ++++-
 .../ClusterRpcSingleQueryManager.java              |   4 +-
 .../querynode/ClusterLocalQueryManager.java        |   2 +-
 .../querynode/ClusterLocalSingleQueryManager.java  |  25 +-
 .../querynode/IClusterLocalQueryManager.java       |   4 +-
 .../querynode/IClusterLocalSingleQueryManager.java |   5 +-
 .../querynode/ClusterFilterSeriesBatchReader.java  |   8 +-
 .../cluster/query/utils/ClusterRpcReaderUtils.java |  28 +-
 .../query/utils/QueryPlanPartitionUtils.java       |   2 +-
 .../request/querydata/InitSeriesReaderRequest.java |  72 ++-
 .../apache/iotdb/cluster/integration/Constant.java | 100 ++++
 .../cluster/integration/IoTDBAggregationIT.java    | 640 +++++++++++++++++++++
 .../cluster/integration/IoTDBFillQueryIT.java      |   1 -
 .../db/query/executor/AggregateEngineExecutor.java |  14 +-
 .../timegenerator/AbstractNodeConstructor.java     |   3 -
 15 files changed, 996 insertions(+), 52 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
index b63b311..51113c9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
@@ -20,10 +20,18 @@ package org.apache.iotdb.cluster.query.executor;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
 import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
+import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -41,6 +49,7 @@ import org.apache.iotdb.db.query.executor.AggregateEngineExecutor;
 import org.apache.iotdb.db.query.factory.AggreFuncFactory;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
 import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
 import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
 import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -49,10 +58,16 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 
+/**
+ * Handle aggregation query and construct dataset in cluster
+ */
 public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
 
   private ClusterRpcSingleQueryManager queryManager;
+  private static final ClusterConfig CLUSTER_CONF = ClusterDescriptor.getInstance().getConfig();
+
 
   public ClusterAggregateEngineExecutor(List<Path> selectedSeries, List<String> aggres,
       IExpression expression, ClusterRpcSingleQueryManager queryManager) {
@@ -80,7 +95,7 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
         paths.add(path);
         // construct AggregateFunction
         TSDataType tsDataType = MManager.getInstance()
-            .getSeriesType(selectedSeries.get(i).getFullPath());
+            .getSeriesType(path.getFullPath());
         AggregateFunction function = AggreFuncFactory.getAggrFuncByName(aggres.get(i), tsDataType);
         function.init();
 
@@ -103,6 +118,8 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
 
         AggreResultData aggreResultData = aggregateWithoutTimeGenerator(function,
             sequenceReader, unSeqMergeReader, timeFilter);
+
+        dataTypes.add(aggreResultData.getDataType());
         readers.add(new AggreResultDataPointReader(aggreResultData));
       }
     }
@@ -111,4 +128,125 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
 
     return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, readers);
   }
+
+  /**
+   * execute aggregate function with value filter.
+   *
+   * @param context query context.
+   */
+  @Override
+  public QueryDataSet executeWithTimeGenerator(QueryContext context)
+      throws FileNodeManagerException, PathErrorException, IOException, ProcessorException {
+
+    /** add query token for query series which can handle locally **/
+    List<Path> localQuerySeries = new ArrayList<>(selectedSeries);
+    Set<Path> remoteQuerySeries = queryManager.getSelectSeriesReaders().keySet();
+    localQuerySeries.removeAll(remoteQuerySeries);
+    QueryResourceManager.getInstance()
+        .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries);
+
+    /** add query token for filter series which can handle locally **/
+    Set<String> deviceIdSet = new HashSet<>();
+    for (FilterGroupEntity filterGroupEntity : queryManager.getFilterGroupEntityMap().values()) {
+      List<Path> remoteFilterSeries = filterGroupEntity.getFilterPaths();
+      remoteFilterSeries.forEach(seriesPath -> deviceIdSet.add(seriesPath.getDevice()));
+    }
+    QueryResourceManager.getInstance()
+        .beginQueryOfGivenExpression(context.getJobId(), expression, deviceIdSet);
+
+    ClusterTimeGenerator timestampGenerator;
+    List<EngineReaderByTimeStamp> readersOfSelectedSeries;
+    try {
+      timestampGenerator = new ClusterTimeGenerator(expression, context,
+          queryManager);
+      readersOfSelectedSeries = ClusterSeriesReaderFactory
+          .createReadersByTimestampOfSelectedPaths(selectedSeries, context,
+              queryManager);
+    } catch (IOException ex) {
+      throw new FileNodeManagerException(ex);
+    }
+
+    /** Get data type of select paths **/
+    List<TSDataType> originDataTypes = new ArrayList<>();
+    Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = queryManager
+        .getSelectSeriesReaders();
+    for (Path path : selectedSeries) {
+      try {
+        if (selectSeriesReaders.containsKey(path)) {
+          originDataTypes.add(selectSeriesReaders.get(path).getDataType());
+        } else {
+          originDataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
+        }
+      } catch (PathErrorException e) {
+        throw new FileNodeManagerException(e);
+      }
+    }
+
+    List<AggregateFunction> aggregateFunctions = new ArrayList<>();
+    for (int i = 0; i < selectedSeries.size(); i++) {
+      TSDataType type = originDataTypes.get(i);
+      AggregateFunction function = AggreFuncFactory.getAggrFuncByName(aggres.get(i), type);
+      function.init();
+      aggregateFunctions.add(function);
+    }
+    List<AggreResultData> aggreResultDataList = aggregateWithTimeGenerator(aggregateFunctions,
+        timestampGenerator,
+        readersOfSelectedSeries);
+
+    List<IPointReader> resultDataPointReaders = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (AggreResultData resultData : aggreResultDataList) {
+      dataTypes.add(resultData.getDataType());
+      resultDataPointReaders.add(new AggreResultDataPointReader(resultData));
+    }
+    return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, resultDataPointReaders);
+  }
+
+  /**
+   * calculation aggregate result with value filter.
+   */
+  @Override
+  protected List<AggreResultData> aggregateWithTimeGenerator(
+      List<AggregateFunction> aggregateFunctions,
+      TimeGenerator timestampGenerator,
+      List<EngineReaderByTimeStamp> readersOfSelectedSeries)
+      throws IOException {
+
+    while (timestampGenerator.hasNext()) {
+
+      // generate timestamps for aggregate
+      long[] timeArray = new long[aggregateFetchSize];
+      List<Long> batchTimestamp = new ArrayList<>();
+      int timeArrayLength = 0;
+      for (int cnt = 0; cnt < aggregateFetchSize; cnt++) {
+        if (!timestampGenerator.hasNext()) {
+          break;
+        }
+        long time = timestampGenerator.next();
+        timeArray[timeArrayLength++] = time;
+        batchTimestamp.add(time);
+      }
+
+      // fetch all remote select series data by timestamp list.
+      if (!batchTimestamp.isEmpty()) {
+        try {
+          queryManager.fetchBatchDataByTimestampForAllSelectPaths(batchTimestamp);
+        } catch (RaftConnectionException e) {
+          throw new IOException(e);
+        }
+      }
+
+      // cal part of aggregate result
+      for (int i = 0; i < readersOfSelectedSeries.size(); i++) {
+        aggregateFunctions.get(i).calcAggregationUsingTimestamps(timeArray, timeArrayLength,
+            readersOfSelectedSeries.get(i));
+      }
+    }
+
+    List<AggreResultData> aggreResultDataArrayList = new ArrayList<>();
+    for (AggregateFunction function : aggregateFunctions) {
+      aggreResultDataArrayList.add(function.getResult());
+    }
+    return aggreResultDataArrayList;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index faeda22..6c4f2ad 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -134,7 +134,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
    * group
    */
   private void initSeriesReader(int readDataConsistencyLevel)
-      throws RaftConnectionException {
+      throws RaftConnectionException, IOException {
     // Init all series with data group of select series,if filter series has the same data group, init them together.
     for (Entry<String, QueryPlan> entry : selectPathPlans.entrySet()) {
       String groupId = entry.getKey();
@@ -144,7 +144,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
         queryNodes.put(groupId, randomPeer);
         Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
         allQueryPlan.put(PathType.SELECT_PATH, queryPlan);
-        List<Filter> filterList = null;
+        List<Filter> filterList = new ArrayList<>();
         if (filterGroupEntityMap.containsKey(groupId)) {
           FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId);
           allQueryPlan.put(PathType.FILTER_PATH, filterGroupEntity.getQueryPlan());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index fe3ac52..4e09af8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -53,7 +53,7 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
 
   @Override
   public InitSeriesReaderResponse createQueryDataSet(InitSeriesReaderRequest request)
-      throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException {
+      throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException {
     long jobId = QueryResourceManager.getInstance().assignJobId();
     String taskId = request.getTaskId();
     TASK_ID_MAP_JOB_ID.put(taskId, jobId);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index 76a141e..0f2cf62 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -64,9 +64,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.ExpressionType;
-import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
-import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.slf4j.Logger;
@@ -75,7 +73,8 @@ import org.slf4j.LoggerFactory;
 
 public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryManager {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(ClusterLocalSingleQueryManager.class);
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(ClusterLocalSingleQueryManager.class);
 
   private String groupId;
 
@@ -127,7 +126,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
 
   @Override
   public InitSeriesReaderResponse createSeriesReader(InitSeriesReaderRequest request)
-      throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException {
+      throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException {
     this.groupId = request.getGroupID();
     InitSeriesReaderResponse response = new InitSeriesReaderResponse(groupId);
     QueryContext context = new QueryContext(jobId);
@@ -199,7 +198,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
       throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException {
     if (queryPlan.getExpression() == null
         || queryPlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
-      handleAggreSeriesReaderWithoutTimeGenerator(queryPlan,context,response);
+      handleAggreSeriesReaderWithoutTimeGenerator(queryPlan, context, response);
     } else {
       handleSelectReaderWithTimeGenerator(queryPlan, context, response);
     }
@@ -210,24 +209,23 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
    *
    * @param queryPlan fill query plan
    */
-  private void handleAggreSeriesReaderWithoutTimeGenerator(QueryPlan queryPlan, QueryContext context,
+  private void handleAggreSeriesReaderWithoutTimeGenerator(QueryPlan queryPlan,
+      QueryContext context,
       InitSeriesReaderResponse response)
-      throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException {
+      throws FileNodeManagerException, PathErrorException, IOException, ProcessorException {
     AggregationPlan fillQueryPlan = (AggregationPlan) queryPlan;
 
     List<Path> selectedPaths = fillQueryPlan.getPaths();
     QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedPaths);
 
-    IExpression optimizedExpression = ExpressionOptimizer.getInstance()
-        .optimize(fillQueryPlan.getExpression(), selectedPaths);
     AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(
-        selectedPaths, fillQueryPlan.getAggregations(), optimizedExpression);
+        selectedPaths, fillQueryPlan.getAggregations(), fillQueryPlan.getExpression());
 
     List<IPointReader> readers = engineExecutor.constructAggreReadersWithoutTimeGenerator(context);
 
     List<TSDataType> dataTypes = engineExecutor.getDataTypes();
 
-    for (int i =0 ; i < selectedPaths.size(); i ++) {
+    for (int i = 0; i < selectedPaths.size(); i++) {
       Path path = selectedPaths.get(i);
       selectSeriesReaders.put(path.getFullPath(),
           new ClusterSelectSeriesBatchReader(dataTypes.get(i), readers.get(i)));
@@ -291,7 +289,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
    */
   private void handleFilterSeriesReader(QueryPlan plan, QueryContext context,
       InitSeriesReaderRequest request, InitSeriesReaderResponse response, PathType pathType)
-      throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException, ProcessorException, IOException {
+      throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException, ProcessorException, IOException, ClassNotFoundException {
     QueryDataSet queryDataSet = queryProcessExecutor
         .processQuery(plan, context);
     List<Path> paths = plan.getPaths();
@@ -321,7 +319,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
           .createReaderByTimeStamp(path, context);
       TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath());
       selectSeriesReaders
-          .put(path.getFullPath(), new ClusterSelectSeriesBatchReaderByTimestamp(readerByTimeStamp, dataType));
+          .put(path.getFullPath(),
+              new ClusterSelectSeriesBatchReaderByTimestamp(readerByTimeStamp, dataType));
       dataTypeMap.put(path.getFullPath(), dataType);
       dataTypeList.add(dataType);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
index cc0f103..1105bb2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
@@ -42,7 +42,7 @@ public interface IClusterLocalQueryManager {
    * @param request request for query data from coordinator node
    */
   InitSeriesReaderResponse createQueryDataSet(InitSeriesReaderRequest request)
-      throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException;
+      throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException;
 
   /**
    * Read batch data of all querying series in request and set response.
@@ -54,8 +54,8 @@ public interface IClusterLocalQueryManager {
 
   /**
    * Read batch data of select series by batch timestamp which is used in query with value filter
-   *  @param request request of querying select paths
    *
+   * @param request request of querying select paths
    */
   QuerySeriesDataByTimestampResponse readBatchDataByTimestamp(
       QuerySeriesDataByTimestampRequest request) throws IOException;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java
index 318772f..1d89c5c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java
@@ -40,18 +40,19 @@ public interface IClusterLocalSingleQueryManager {
 
   /**
    * Initially create corresponding series readers.
+   *
    * @param request request of querying series data
    */
   InitSeriesReaderResponse createSeriesReader(InitSeriesReaderRequest request)
-      throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException;
+      throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException;
 
   /**
    * <p>
    * Read batch data If query round in cache is equal to target query round, it means that batch
    * data in query node transfer to coordinator fail and return cached batch data.
    * </p>
-   *  @param request request of querying series data
    *
+   * @param request request of querying series data
    */
   QuerySeriesDataResponse readBatchData(QuerySeriesDataRequest request)
       throws IOException;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java
index 3f21835..1cd357e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java
@@ -68,12 +68,12 @@ public class ClusterFilterSeriesBatchReader implements IClusterFilterSeriesBatch
       batchDataList.add(new BatchData(dataTypeList.get(i), true));
     }
     int dataPointCount = 0;
-    while(true){
-      if(!hasNext() || dataPointCount == CLUSTER_CONF.getBatchReadSize()){
+    while (true) {
+      if (!hasNext() || dataPointCount == CLUSTER_CONF.getBatchReadSize()) {
         break;
       }
-      if(hasNext() && addTimeValuePair(batchDataList, dataTypeList)){
-          dataPointCount++;
+      if (hasNext() && addTimeValuePair(batchDataList, dataTypeList)) {
+        dataPointCount++;
       }
     }
     return batchDataList;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
index 424ba95..dca2d30 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.cluster.query.utils;
 
 import com.alipay.sofa.jraft.entity.PeerId;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -62,7 +63,7 @@ public class ClusterRpcReaderUtils {
    */
   public static BasicResponse createClusterSeriesReader(String groupId, PeerId peerId,
       int readDataConsistencyLevel, Map<PathType, QueryPlan> allQueryPlan, String taskId,
-      List<Filter> filterList) throws RaftConnectionException {
+      List<Filter> filterList) throws RaftConnectionException, IOException {
 
     /** handle request **/
     BasicRequest request = InitSeriesReaderRequest
@@ -71,14 +72,35 @@ public class ClusterRpcReaderUtils {
     return handleQueryRequest(request, peerId, 0);
   }
 
-  public static QuerySeriesDataResponse fetchBatchData(String groupID, PeerId peerId, String taskId,
+  /**
+   * Fetch batch data for select series in a query without value filter or filter series.
+   *
+   * @param groupId data group id
+   * @param peerId query node id
+   * @param taskId task id of query task
+   * @param pathType type of path
+   * @param fetchDataSeries series list which need to fetch data
+   * @param queryRounds query rounds
+   */
+  public static QuerySeriesDataResponse fetchBatchData(String groupId, PeerId peerId, String taskId,
       PathType pathType, List<String> fetchDataSeries, long queryRounds)
       throws RaftConnectionException {
     BasicRequest request = QuerySeriesDataRequest
-        .createFetchDataRequest(groupID, taskId, pathType, fetchDataSeries, queryRounds);
+        .createFetchDataRequest(groupId, taskId, pathType, fetchDataSeries, queryRounds);
     return (QuerySeriesDataResponse) handleQueryRequest(request, peerId, 0);
   }
 
+  /**
+   * Fetch batch data corresponding to a given list of timestamp for select series in a query with
+   * value filter.
+   *
+   * @param groupId data group id
+   * @param peerId query node id
+   * @param taskId task id of query task
+   * @param queryRounds query rounds
+   * @param batchTimestamp list of valid timestamp
+   * @param fetchDataSeries series list which need to fetch data
+   */
   public static QuerySeriesDataByTimestampResponse fetchBatchDataByTimestamp(String groupId,
       PeerId peerId, String taskId, long queryRounds, List<Long> batchTimestamp,
       List<String> fetchDataSeries)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
index fc0d401..5fbd30c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
@@ -156,7 +156,7 @@ public class QueryPlanPartitionUtils {
       throws PathErrorException {
     AggregationPlan queryPlan = (AggregationPlan) singleQueryManager.getOriginQueryPlan();
     List<Path> selectPaths = queryPlan.getPaths();
-    List<String> aggregations = new ArrayList<>();
+    List<String> aggregations = queryPlan.getAggregations();
     Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId();
     Map<String, List<String>> selectAggregationByGroupId = new HashMap<>();
     Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/InitSeriesReaderRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/InitSeriesReaderRequest.java
index c974e2f..e28ac15 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/InitSeriesReaderRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/InitSeriesReaderRequest.java
@@ -18,10 +18,16 @@
  */
 package org.apache.iotdb.cluster.rpc.raft.request.querydata;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import org.apache.iotdb.cluster.query.PathType;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -42,12 +48,12 @@ public class InitSeriesReaderRequest extends BasicQueryRequest {
   /**
    * Key is series type, value is query plan
    */
-  private Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
+  private Map<PathType, byte[]> allQueryPlan = new EnumMap<>(PathType.class);
 
   /**
    * Represent all filter of leaf node in filter tree while executing a query with value filter.
    */
-  private List<Filter> filterList = new ArrayList<>();
+  private List<byte[]> filterList = new ArrayList<>();
 
 
   private InitSeriesReaderRequest(String groupID, String taskId) {
@@ -55,12 +61,17 @@ public class InitSeriesReaderRequest extends BasicQueryRequest {
     this.taskId = taskId;
   }
 
-  public static InitSeriesReaderRequest createInitialQueryRequest(String groupId, String taskId, int readConsistencyLevel,
-      Map<PathType, QueryPlan> allQueryPlan, List<Filter> filterList){
+  public static InitSeriesReaderRequest createInitialQueryRequest(String groupId, String taskId,
+      int readConsistencyLevel,
+      Map<PathType, QueryPlan> allQueryPlan, List<Filter> filterList) throws IOException {
     InitSeriesReaderRequest request = new InitSeriesReaderRequest(groupId, taskId);
     request.setReadConsistencyLevel(readConsistencyLevel);
-    request.allQueryPlan = allQueryPlan;
-    request.filterList = filterList;
+    for (Entry<PathType, QueryPlan> entry : allQueryPlan.entrySet()) {
+      request.allQueryPlan.put(entry.getKey(), toByteArray(entry.getValue()));
+    }
+    for (Filter filter : filterList) {
+      request.filterList.add(toByteArray(filter));
+    }
     return request;
   }
 
@@ -72,20 +83,51 @@ public class InitSeriesReaderRequest extends BasicQueryRequest {
     this.taskId = taskId;
   }
 
-  public Map<PathType, QueryPlan> getAllQueryPlan() {
-    return allQueryPlan;
+  public Map<PathType, QueryPlan> getAllQueryPlan() throws IOException, ClassNotFoundException {
+    Map<PathType, QueryPlan> queryPlanMap = new EnumMap<>(PathType.class);
+    for (Entry<PathType, byte[]> entry : allQueryPlan.entrySet()) {
+      queryPlanMap.put(entry.getKey(), (QueryPlan) toObject(entry.getValue()));
+    }
+    return queryPlanMap;
   }
 
-  public void setAllQueryPlan(
-      Map<PathType, QueryPlan> allQueryPlan) {
-    this.allQueryPlan = allQueryPlan;
+  public List<Filter> getFilterList() throws IOException, ClassNotFoundException {
+    List<Filter> filters = new ArrayList<>();
+    for (byte[] filterBytes : filterList) {
+      filters.add((Filter) toObject(filterBytes));
+    }
+    return filters;
   }
 
-  public List<Filter> getFilterList() {
-    return filterList;
+  /**
+   * Convert an object to byte array
+   *
+   * @param obj Object, which need to implement Serializable
+   * @return byte array of object
+   */
+  private static byte[] toByteArray(Object obj) throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    ObjectOutputStream oos = new ObjectOutputStream(bos);
+    oos.writeObject(obj);
+    oos.flush();
+    byte[] bytes = bos.toByteArray();
+    oos.close();
+    bos.close();
+    return bytes;
   }
 
-  public void setFilterList(List<Filter> filterList) {
-    this.filterList = filterList;
+  /**
+   * Convert byte array back to Object
+   *
+   * @param bytes byte array of object
+   * @return object
+   */
+  private static Object toObject(byte[] bytes) throws IOException, ClassNotFoundException {
+    ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+    ObjectInputStream ois = new ObjectInputStream(bis);
+    Object obj = ois.readObject();
+    ois.close();
+    bis.close();
+    return obj;
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/Constant.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/Constant.java
new file mode 100644
index 0000000..71cf523
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/Constant.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.integration;
+
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+
+public class Constant {
+
+  public static final String d0s0 = "root.vehicle.d0.s0";
+  public static final String d0s1 = "root.vehicle.d0.s1";
+  public static final String d0s2 = "root.vehicle.d0.s2";
+  public static final String d0s3 = "root.vehicle.d0.s3";
+  public static final String d0s4 = "root.vehicle.d0.s4";
+  public static final String d0s5 = "root.vehicle.d0.s5";
+  public static final String d1s0 = "root.vehicle.d1.s0";
+  public static final String d1s1 = "root.vehicle.d1.s1";
+  public static final String TIMESTAMP_STR = "Time";
+  public static boolean testFlag = true;
+  public static String[] stringValue = new String[]{"A", "B", "C", "D", "E"};
+  public static String[] booleanValue = new String[]{"true", "false"};
+
+  public static String[] create_sql = new String[]{"SET STORAGE GROUP TO root.vehicle",
+
+      "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+      "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+      "CREATE TIMESERIES root.vehicle.d0.s5 WITH DATATYPE=DOUBLE, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d1.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d1.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+
+  };
+
+  public static String insertTemplate = "insert into %s(timestamp%s) values(%d%s)";
+
+  public static String first(String path) {
+    return String.format("first(%s)", path);
+  }
+
+  public static String last(String path) {
+    return String.format("last(%s)", path);
+  }
+
+  public static String sum(String path) {
+    return String.format("sum(%s)", path);
+  }
+
+  public static String mean(String path) {
+    return String.format("mean(%s)", path);
+  }
+
+  public static String count(String path) {
+    return String.format("count(%s)", path);
+  }
+
+  public static String max_time(String path) {
+    return String.format("max_time(%s)", path);
+  }
+
+  public static String min_time(String path) {
+    return String.format("min_time(%s)", path);
+  }
+
+  public static String max_value(String path) {
+    return String.format("max_value(%s)", path);
+  }
+
+  public static String min_value(String path) {
+    return String.format("min_value(%s)", path);
+  }
+
+  public static String recordToInsert(TSRecord record) {
+    StringBuilder measurements = new StringBuilder();
+    StringBuilder values = new StringBuilder();
+    for (DataPoint dataPoint : record.dataPointList) {
+      measurements.append(",").append(dataPoint.getMeasurementId());
+      values.append(",").append(dataPoint.getValue());
+    }
+    return String
+        .format(insertTemplate, record.deviceId, measurements.toString(), record.time, values);
+  }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationIT.java
new file mode 100644
index 0000000..bf7c4da
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationIT.java
@@ -0,0 +1,640 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.integration;
+
+import static org.apache.iotdb.cluster.integration.Constant.count;
+import static org.apache.iotdb.cluster.integration.Constant.first;
+import static org.apache.iotdb.cluster.integration.Constant.last;
+import static org.apache.iotdb.cluster.integration.Constant.max_time;
+import static org.apache.iotdb.cluster.integration.Constant.max_value;
+import static org.apache.iotdb.cluster.integration.Constant.mean;
+import static org.apache.iotdb.cluster.integration.Constant.min_time;
+import static org.apache.iotdb.cluster.integration.Constant.min_value;
+import static org.apache.iotdb.cluster.integration.Constant.sum;
+import static org.apache.iotdb.cluster.utils.Utils.insertData;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.utils.EnvironmentUtils;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
+import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBAggregationIT {
+
+  private Server server;
+  private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
+  private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
+      CLUSTER_CONFIG.getPort());
+
+  private static String[] creationSqls = new String[]{
+      "SET STORAGE GROUP TO root.vehicle.d0",
+      "SET STORAGE GROUP TO root.vehicle.d1",
+
+      "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+      "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN"
+  };
+
+  private static String[] dataSet2 = new String[]{
+      "SET STORAGE GROUP TO root.ln.wf01.wt01",
+      "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+      "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=FLOAT, ENCODING=PLAIN",
+      "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, ENCODING=PLAIN",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(1, 1.1, false, 11)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(2, 2.2, true, 22)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(3, 3.3, false, 33 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(4, 4.4, false, 44)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(5, 5.5, false, 55)"
+  };
+
+  private String insertTemplate = "INSERT INTO root.vehicle.d0(timestamp,s0,s1,s2,s3)"
+      + " VALUES(%d,%d,%d,%f,%s)";
+
+  private static final String TIMESTAMP_STR = "Time";
+  private final String d0s0 = "root.vehicle.d0.s0";
+  private final String d0s1 = "root.vehicle.d0.s1";
+  private final String d0s2 = "root.vehicle.d0.s2";
+  private final String d0s3 = "root.vehicle.d0.s3";
+  private static final String TEMPERATURE_STR = "root.ln.wf01.wt01.temperature";
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.cleanEnv();
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.closeMemControl();
+    CLUSTER_CONFIG.createAllPath();
+    server = Server.getInstance();
+    server.start();
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    server.stop();
+    QPExecutorUtils.setLocalNodeAddr(localNode.getIp(), localNode.getPort());
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void test() throws SQLException {
+    String[] retArray = new String[]{
+        "0,2",
+        "0,4",
+        "0,3"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.
+          getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute(
+          "select count(temperature) from root.ln.wf01.wt01 where time > 3");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," +
+            resultSet.getString(count(TEMPERATURE_STR));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(1, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute(
+          "select min_time(temperature) from root.ln.wf01.wt01 where time > 3");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," +
+            resultSet.getString(min_time(TEMPERATURE_STR));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(2, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute(
+          "select min_time(temperature) from root.ln.wf01.wt01 where temperature > 3");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," +
+            resultSet.getString(min_time(TEMPERATURE_STR));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(3, cnt);
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void remoteTest() throws SQLException {
+    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+    test();
+    try {
+      Thread.sleep(200);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void countTest() throws SQLException {
+    String[] retArray = new String[]{
+        "0,2001,2001,2001,2001",
+        "0,7500,7500,7500,7500"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.
+          getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select count(s0),count(s1),count(s2),count(s3) " +
+          "from root.vehicle.d0 where time >= 6000 and time <= 9000");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(d0s0))
+            + "," + resultSet.getString(count(d0s1)) + "," + resultSet.getString(count(d0s2))
+            + "," + resultSet.getString(count(d0s3));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(1, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select count(s0),count(s1),count(s2),count(s3) " +
+          "from root.vehicle.d0");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(d0s0))
+            + "," + resultSet.getString(count(d0s1)) + "," + resultSet.getString(count(d0s2))
+            + "," + resultSet.getString(count(d0s3));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(2, cnt);
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void countRemoteTest() throws SQLException {
+    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+    countTest();
+    try {
+      Thread.sleep(200);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+
+  @Test
+  public void firstTest() throws SQLException {
+    String[] retArray = new String[]{
+        "0,2000,2000,2000.0,2000",
+        "0,500,500,500.0,500"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.
+          getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select first(s0),first(s1),first(s2),first(s3) " +
+          "from root.vehicle.d0 where time >= 1500 and time <= 9000");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(first(d0s0))
+            + "," + resultSet.getString(first(d0s1)) + "," + resultSet.getString(first(d0s2))
+            + "," + resultSet.getString(first(d0s3));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(1, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select first(s0),first(s1),first(s2),first(s3) " +
+          "from root.vehicle.d0");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(first(d0s0))
+            + "," + resultSet.getString(first(d0s1)) + "," + resultSet.getString(first(d0s2))
+            + "," + resultSet.getString(first(d0s3));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(2, cnt);
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void firstRemoteTest() throws SQLException {
+    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+    firstTest();
+    try {
+      Thread.sleep(200);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void lastTest() throws SQLException {
+    String[] retArray = new String[]{
+        "0,8499,8499.0",
+        "0,1499,1499.0",
+        "0,2200,2200.0"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.
+          getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select last(s0),last(s2) " +
+          "from root.vehicle.d0 where time >= 1500 and time < 9000");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(last(d0s0))
+            + "," + resultSet.getString(last(d0s2));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(1, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select last(s0),last(s2) " +
+          "from root.vehicle.d0 where time <= 1600");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(last(d0s0))
+            + "," + resultSet.getString(last(d0s2));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(2, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select last(s0),last(s2) " +
+          "from root.vehicle.d0 where time <= 2200");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(last(d0s0))
+            + "," + resultSet.getString(last(d0s2));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(3, cnt);
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void lastRemoteTest() throws SQLException {
+    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+    lastTest();
+    try {
+      Thread.sleep(200);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void maxminTimeTest() throws SQLException {
+    String[] retArray = new String[]{
+        "0,8499,500",
+        "0,2499,2000"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.
+          getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select max_time(s0),min_time(s2) " +
+          "from root.vehicle.d0 where time >= 100 and time < 9000");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_time(d0s0))
+            + "," + resultSet.getString(min_time(d0s2));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(1, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select max_time(s0),min_time(s2) " +
+          "from root.vehicle.d0 where time <= 2500 and time > 1800");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_time(d0s0))
+            + "," + resultSet.getString(min_time(d0s2));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(2, cnt);
+      statement.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void maxminTimeRemoteTest() throws SQLException {
+    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+    maxminTimeTest();
+    try {
+      Thread.sleep(200);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void maxminValueTest() throws SQLException {
+    String[] retArray = new String[]{
+        "0,8499,500.0",
+        "0,2499,500.0"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.
+          getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select max_value(s0),min_value(s2) " +
+          "from root.vehicle.d0 where time >= 100 and time < 9000");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_value(d0s0))
+            + "," + resultSet.getString(min_value(d0s2));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(1, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select max_value(s0),min_value(s2) " +
+          "from root.vehicle.d0 where time < 2500");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_value(d0s0))
+            + "," + resultSet.getString(min_value(d0s2));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(2, cnt);
+      statement.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void maxminValueRemoteTest() throws SQLException {
+    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+    maxminTimeTest();
+    try {
+      Thread.sleep(200);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void meanSumTest() throws SQLException {
+    String[] retArray = new String[]{
+        "0,1.4508E7,7250.374812593703",
+        "0,626750.0,1250.998003992016"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.
+          getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select sum(s0),mean(s2)" +
+          "from root.vehicle.d0 where time >= 6000 and time <= 9000");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(sum(d0s0))
+            + "," + resultSet.getString(mean(d0s2));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(1, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select sum(s0),mean(s2)" +
+          "from root.vehicle.d0 where time >= 1000 and time <= 2000");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(sum(d0s0))
+            + "," + resultSet.getString(mean(d0s2));
+        Assert.assertEquals(retArray[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(2, cnt);
+      statement.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void meanSumRemoteTest() throws SQLException {
+    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+    meanSumTest();
+    try {
+      Thread.sleep(200);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void prepareData() throws SQLException {
+    Connection connection = null;
+    try {
+      connection = DriverManager
+          .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+              "root");
+      Statement statement = connection.createStatement();
+      insertData(connection, creationSqls, dataSet2);
+      // prepare BufferWrite file
+      for (int i = 5000; i < 7000; i++) {
+        statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'"));
+      }
+      statement.executeBatch();
+      statement.clearBatch();
+      statement.execute("flush");
+      for (int i = 7500; i < 8500; i++) {
+        statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'"));
+      }
+      statement.executeBatch();
+      statement.clearBatch();
+      statement.execute("flush");
+
+      // prepare Unseq-File
+      for (int i = 500; i < 1500; i++) {
+        statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'"));
+      }
+      statement.executeBatch();
+      statement.clearBatch();
+      statement.execute("flush");
+      for (int i = 3000; i < 6500; i++) {
+        statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'"));
+      }
+      statement.executeBatch();
+      statement.clearBatch();
+      statement.execute("merge");
+
+      // prepare BufferWrite cache
+      for (int i = 9000; i < 10000; i++) {
+        statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'"));
+      }
+      statement.executeBatch();
+      statement.clearBatch();
+      // prepare Overflow cache
+      for (int i = 2000; i < 2500; i++) {
+        statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'"));
+      }
+      statement.executeBatch();
+      statement.clearBatch();
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java
index f5bf17f..ba8746d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java
@@ -40,7 +40,6 @@ import org.junit.Test;
 
 public class IoTDBFillQueryIT {
 
-
   private Server server;
   private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
   private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index 6c458a5..401b056 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -50,18 +50,22 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 
+/**
+ * Handle aggregation query and construct dataset
+ */
 public class AggregateEngineExecutor {
 
   protected List<Path> selectedSeries;
   protected List<String> aggres;
   protected IExpression expression;
-  protected List<TSDataType> dataTypes;
+  private List<TSDataType> dataTypes;
 
   /**
    * aggregation batch calculation size.
    **/
-  private int aggregateFetchSize;
+  protected int aggregateFetchSize;
 
   /**
    * constructor.
@@ -317,9 +321,9 @@ public class AggregateEngineExecutor {
   /**
    * calculation aggregate result with value filter.
    */
-  private List<AggreResultData> aggregateWithTimeGenerator(
+  protected List<AggreResultData> aggregateWithTimeGenerator(
       List<AggregateFunction> aggregateFunctions,
-      EngineTimeGenerator timestampGenerator,
+      TimeGenerator timestampGenerator,
       List<EngineReaderByTimeStamp> readersOfSelectedSeries)
       throws IOException {
 
@@ -335,6 +339,8 @@ public class AggregateEngineExecutor {
         timeArray[timeArrayLength++] = timestampGenerator.next();
       }
 
+
+
       // cal part of aggregate result
       for (int i = 0; i < readersOfSelectedSeries.size(); i++) {
         aggregateFunctions.get(i).calcAggregationUsingTimestamps(timeArray, timeArrayLength,
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java
index c23c2b9..5d83314 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java
@@ -58,10 +58,7 @@ public abstract class AbstractNodeConstructor {
 
   /**
    * Construct not series type node.
-<<<<<<< HEAD
-=======
    *
->>>>>>> master
    * @param expression expression
    * @return Node object
    * @throws FileNodeManagerException FileNodeManagerException