You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 01:51:34 UTC

[incubator-iotdb] 07/11: add it test of aggregation function

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 9487dbfc0a68fb62efa5270cc4fb080ea4e2a3a0
Author: lta <li...@163.com>
AuthorDate: Mon May 20 17:27:57 2019 +0800

    add it test of aggregation function
---
 .../query/{manager => }/common/FillBatchData.java  |   2 +-
 .../executor/ClusterAggregateEngineExecutor.java   |   1 -
 .../ClusterRpcSingleQueryManager.java              |  26 ++---
 .../IClusterRpcSingleQueryManager.java             |   4 +-
 .../coordinatornode/SelectSeriesGroupEntity.java   |   1 -
 .../querynode/ClusterLocalSingleQueryManager.java  |  76 +++++++++------
 .../coordinatornode/ClusterFilterSeriesReader.java |   2 +-
 .../ClusterFillSelectSeriesBatchReader.java        |   2 +-
 ...a => ClusterFilterSeriesBatchReaderEntity.java} |   6 +-
 ...a => ClusterSelectSeriesBatchReaderEntity.java} |  42 +++++++--
 ... => IClusterFilterSeriesBatchReaderEntity.java} |   2 +-
 .../query/utils/ClusterTimeValuePairUtils.java     |   2 +-
 .../QuerySeriesDataByTimestampRequest.java         |  17 +---
 .../request/querydata/QuerySeriesDataRequest.java  |  16 ++--
 .../integration/IoTDBAggregationLargeDataIT.java   |   3 +-
 .../integration/IoTDBAggregationSmallDataIT.java   |  94 ------------------
 .../query/manager/ClusterLocalManagerTest.java     | 105 +++++++++++----------
 17 files changed, 160 insertions(+), 241 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java
similarity index 97%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java
index 3e128e3..2d17d0c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.query.manager.common;
+package org.apache.iotdb.cluster.query.common;
 
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
index b34afa1..2cf4e87 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
@@ -69,7 +69,6 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
 
   private ClusterRpcSingleQueryManager queryManager;
-  private static final ClusterConfig CLUSTER_CONF = ClusterDescriptor.getInstance().getConfig();
 
 
   public ClusterAggregateEngineExecutor(List<Path> selectedSeries, List<String> aggres,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index 905ce1b..05bf9df 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -211,29 +211,26 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
 
   @Override
   public void fetchBatchDataForSelectPaths(String groupId) throws RaftConnectionException {
-    List<String> fetchDataSeries = new ArrayList<>();
-    List<Integer> selectSeriesIndexs = new ArrayList<>();
+    List<Integer> fetchDataSeriesIndexs = new ArrayList<>();
     List<Path> selectSeries = selectSeriesGroupEntityMap.get(groupId).getSelectPaths();
     List<ClusterSelectSeriesReader> seriesReaders = selectSeriesGroupEntityMap.get(groupId)
         .getSelectSeriesReaders();
     for (int i = 0; i < selectSeries.size(); i++) {
-      Path series = selectSeries.get(i);
       if (seriesReaders.get(i).enableFetchData()) {
-        fetchDataSeries.add(series.getFullPath());
-        selectSeriesIndexs.add(i);
+        fetchDataSeriesIndexs.add(i);
       }
     }
     BasicRequest request = QuerySeriesDataRequest
-        .createFetchDataRequest(groupId, taskId, PathType.SELECT_PATH, fetchDataSeries,
+        .createFetchDataRequest(groupId, taskId, PathType.SELECT_PATH, fetchDataSeriesIndexs,
             queryRounds++);
     QuerySeriesDataResponse response = (QuerySeriesDataResponse) ClusterRpcReaderUtils
         .handleQueryRequest(request, queryNodes.get(groupId), 0);
 
-    handleFetchDataResponseForSelectPaths(groupId, selectSeriesIndexs, response);
+    handleFetchDataResponseForSelectPaths(groupId, fetchDataSeriesIndexs, response);
   }
 
   @Override
-  public void fetchBatchDataForFilterPaths(String groupId) throws RaftConnectionException {
+  public void fetchBatchDataForAllFilterPaths(String groupId) throws RaftConnectionException {
     BasicRequest request = QuerySeriesDataRequest
         .createFetchDataRequest(groupId, taskId, PathType.FILTER_PATH, null, queryRounds++);
     QuerySeriesDataResponse response = (QuerySeriesDataResponse) ClusterRpcReaderUtils
@@ -248,27 +245,22 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
       throws RaftConnectionException {
     for (Entry<String, SelectSeriesGroupEntity> entry : selectSeriesGroupEntityMap.entrySet()) {
       String groupId = entry.getKey();
-      List<String> fetchDataFilterSeries = new ArrayList<>();
-      entry.getValue().getSelectPaths()
-          .forEach(path -> fetchDataFilterSeries.add(path.getFullPath()));
       BasicRequest request = QuerySeriesDataByTimestampRequest
-          .createRequest(groupId, queryRounds++, taskId, batchTimestamp, fetchDataFilterSeries);
+          .createRequest(groupId, queryRounds++, taskId, batchTimestamp);
       QuerySeriesDataByTimestampResponse response = (QuerySeriesDataByTimestampResponse) ClusterRpcReaderUtils
           .handleQueryRequest(request, queryNodes.get(groupId), 0);
-      handleFetchDataByTimestampResponseForSelectPaths(groupId, fetchDataFilterSeries, response);
+      handleFetchDataByTimestampResponseForSelectPaths(groupId, response);
     }
   }
 
   /**
    * Handle response of fetching data, and add batch data to corresponding reader.
    */
-  private void handleFetchDataByTimestampResponseForSelectPaths(String groupId,
-      List<String> fetchDataSeries,
-      BasicQueryDataResponse response) {
+  private void handleFetchDataByTimestampResponseForSelectPaths(String groupId, BasicQueryDataResponse response) {
     List<BatchData> batchDataList = response.getSeriesBatchData();
     List<ClusterSelectSeriesReader> selectSeriesReaders = selectSeriesGroupEntityMap.get(groupId)
         .getSelectSeriesReaders();
-    for (int i = 0; i < fetchDataSeries.size(); i++) {
+    for (int i = 0; i < selectSeriesReaders.size(); i++) {
       BatchData batchData = batchDataList.get(i);
       selectSeriesReaders.get(i).addBatchData(batchData, true);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
index d6ca0d7..19d8f25 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
@@ -53,11 +53,11 @@ public interface IClusterRpcSingleQueryManager {
   void fetchBatchDataForSelectPaths(String groupId) throws RaftConnectionException;
 
   /**
-   * Fetch data for filter path.
+   * Fetch data for all filter paths.
    *
    * @param groupId data group id
    */
-  void fetchBatchDataForFilterPaths(String groupId) throws RaftConnectionException;
+  void fetchBatchDataForAllFilterPaths(String groupId) throws RaftConnectionException;
 
   /**
    * Fetch batch data for all select paths by batch timestamp. If target data can be fetched, skip
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java
index 9f35117..1de26bd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.cluster.query.manager.coordinatornode;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterFilterSeriesReader;
 import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.tsfile.read.common.Path;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index 0f2cf62..8799be2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -30,10 +30,11 @@ import org.apache.iotdb.cluster.query.PathType;
 import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
 import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterFillSelectSeriesBatchReader;
-import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader;
+import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReaderEntity;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp;
-import org.apache.iotdb.cluster.query.reader.querynode.IClusterFilterSeriesBatchReader;
+import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderEntity;
+import org.apache.iotdb.cluster.query.reader.querynode.IClusterFilterSeriesBatchReaderEntity;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
@@ -79,6 +80,11 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
   private String groupId;
 
   /**
+   * Mark whether this manager has initialized or not.
+   */
+  private boolean isInit = false;
+
+  /**
    * Timer of Query, if the time is up, close query resource.
    */
   private ScheduledFuture<?> queryTimer;
@@ -94,14 +100,14 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
   private long queryRound = -1;
 
   /**
-   * Key is series full path, value is reader of select series
+   * Select reader entity
    */
-  private Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = new HashMap<>();
+  private ClusterSelectSeriesBatchReaderEntity selectReaderEntity;
 
   /**
-   * Filter reader
+   * Filter reader entity
    */
-  private IClusterFilterSeriesBatchReader filterReader;
+  private IClusterFilterSeriesBatchReaderEntity filterReaderEntity;
 
   /**
    * Key is series full path, value is data type of series
@@ -127,11 +133,17 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
   @Override
   public InitSeriesReaderResponse createSeriesReader(InitSeriesReaderRequest request)
       throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException {
+    if (isInit) {
+      throw new IOException(String
+          .format("ClusterLocalSingleQueryManager has already initialized. Job id = %s", jobId));
+    }
+    isInit = true;
     this.groupId = request.getGroupID();
     InitSeriesReaderResponse response = new InitSeriesReaderResponse(groupId);
     QueryContext context = new QueryContext(jobId);
     Map<PathType, QueryPlan> queryPlanMap = request.getAllQueryPlan();
     if (queryPlanMap.containsKey(PathType.SELECT_PATH)) {
+      selectReaderEntity = new ClusterSelectSeriesBatchReaderEntity();
       QueryPlan plan = queryPlanMap.get(PathType.SELECT_PATH);
       if (plan instanceof GroupByPlan) {
         throw new UnsupportedOperationException();
@@ -179,8 +191,9 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
       fill.setDataType(dataType);
       fill.setQueryTime(fillQueryPlan.getQueryTime());
       fill.constructReaders(queryDataSource, context);
-      selectSeriesReaders.put(path.getFullPath(),
-          new ClusterFillSelectSeriesBatchReader(dataType, fill.getFillResult()));
+      selectReaderEntity.addPath(path.getFullPath());
+      selectReaderEntity
+          .addReaders(new ClusterFillSelectSeriesBatchReader(dataType, fill.getFillResult()));
       dataTypeMap.put(path.getFullPath(), dataType);
     }
 
@@ -195,7 +208,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
    */
   private void handleAggreSeriesReader(QueryPlan queryPlan, QueryContext context,
       InitSeriesReaderResponse response)
-      throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException {
+      throws FileNodeManagerException, PathErrorException, IOException, ProcessorException {
     if (queryPlan.getExpression() == null
         || queryPlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
       handleAggreSeriesReaderWithoutTimeGenerator(queryPlan, context, response);
@@ -227,7 +240,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
 
     for (int i = 0; i < selectedPaths.size(); i++) {
       Path path = selectedPaths.get(i);
-      selectSeriesReaders.put(path.getFullPath(),
+      selectReaderEntity.addPath(path.getFullPath());
+      selectReaderEntity.addReaders(
           new ClusterSelectSeriesBatchReader(dataTypes.get(i), readers.get(i)));
       dataTypeMap.put(path.getFullPath(), dataTypes.get(i));
     }
@@ -275,8 +289,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
       String fullPath = paths.get(i).getFullPath();
       IPointReader reader = AbstractExecutorWithoutTimeGenerator
           .createSeriesReader(context, paths.get(i), dataTypes, timeFilter);
-      selectSeriesReaders
-          .put(fullPath, new ClusterSelectSeriesBatchReader(dataTypes.get(i), reader));
+      selectReaderEntity.addPath(fullPath);
+      selectReaderEntity.addReaders(new ClusterSelectSeriesBatchReader(dataTypes.get(i), reader));
       dataTypeMap.put(fullPath, dataTypes.get(i));
     }
     response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes);
@@ -298,7 +312,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
       dataTypeMap.put(paths.get(i).getFullPath(), dataTypes.get(i));
     }
     response.getSeriesDataTypes().put(pathType, dataTypes);
-    filterReader = new ClusterFilterSeriesBatchReader(queryDataSet, paths, request.getFilterList());
+    filterReaderEntity = new ClusterFilterSeriesBatchReaderEntity(queryDataSet, paths,
+        request.getFilterList());
   }
 
   /**
@@ -318,9 +333,9 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
       EngineReaderByTimeStamp readerByTimeStamp = ClusterSeriesReaderFactory
           .createReaderByTimeStamp(path, context);
       TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath());
-      selectSeriesReaders
-          .put(path.getFullPath(),
-              new ClusterSelectSeriesBatchReaderByTimestamp(readerByTimeStamp, dataType));
+      selectReaderEntity.addPath(path.getFullPath());
+      selectReaderEntity
+          .addReaders(new ClusterSelectSeriesBatchReaderByTimestamp(readerByTimeStamp, dataType));
       dataTypeMap.put(path.getFullPath(), dataType);
       dataTypeList.add(dataType);
     }
@@ -336,10 +351,9 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
     if (targetQueryRounds != this.queryRound) {
       this.queryRound = targetQueryRounds;
       PathType pathType = request.getPathType();
-      List<String> paths = request.getSeriesPaths();
       List<BatchData> batchDataList;
       if (pathType == PathType.SELECT_PATH) {
-        batchDataList = readSelectSeriesBatchData(paths);
+        batchDataList = readSelectSeriesBatchData(request.getSeriesPathIndexs());
       } else {
         batchDataList = readFilterSeriesBatchData();
       }
@@ -355,13 +369,12 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
       throws IOException {
     resetQueryTimer();
     QuerySeriesDataByTimestampResponse response = new QuerySeriesDataByTimestampResponse(groupId);
-    List<String> fetchDataSeries = request.getFetchDataSeries();
     long targetQueryRounds = request.getQueryRounds();
     if (targetQueryRounds != this.queryRound) {
       this.queryRound = targetQueryRounds;
+      List<AbstractClusterSelectSeriesBatchReader> readers = selectReaderEntity.getAllReaders();
       List<BatchData> batchDataList = new ArrayList<>();
-      for (String series : fetchDataSeries) {
-        AbstractClusterSelectSeriesBatchReader reader = selectSeriesReaders.get(series);
+      for (AbstractClusterSelectSeriesBatchReader reader : readers) {
         batchDataList.add(reader.nextBatch(request.getBatchTimestamp()));
       }
       cachedBatchDataResult = batchDataList;
@@ -378,14 +391,15 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
   }
 
   /**
-   * Read batch data of select series
+   * Read batch data of select series by series index
    *
-   * @param paths all series to query
+   * @param seriesIndexs all series index to query
    */
-  private List<BatchData> readSelectSeriesBatchData(List<String> paths) throws IOException {
+  private List<BatchData> readSelectSeriesBatchData(List<Integer> seriesIndexs) throws IOException {
     List<BatchData> batchDataList = new ArrayList<>();
-    for (String fullPath : paths) {
-      batchDataList.add(selectSeriesReaders.get(fullPath).nextBatch());
+    for (int index : seriesIndexs) {
+      AbstractClusterSelectSeriesBatchReader reader = selectReaderEntity.getReaderByIndex(index);
+      batchDataList.add(reader.nextBatch());
     }
     return batchDataList;
   }
@@ -396,7 +410,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
    * @return batch data of all filter series
    */
   private List<BatchData> readFilterSeriesBatchData() throws IOException {
-    return filterReader.nextBatchList();
+    return filterReaderEntity.nextBatchList();
   }
 
   public String getGroupId() {
@@ -417,12 +431,12 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
     return queryRound;
   }
 
-  public Map<String, AbstractClusterSelectSeriesBatchReader> getSelectSeriesReaders() {
-    return selectSeriesReaders;
+  public ClusterSelectSeriesBatchReaderEntity getSelectReaderEntity() {
+    return selectReaderEntity;
   }
 
-  public IClusterFilterSeriesBatchReader getFilterReader() {
-    return filterReader;
+  public IClusterFilterSeriesBatchReaderEntity getFilterReaderEntity() {
+    return filterReaderEntity;
   }
 
   public Map<String, TSDataType> getDataTypeMap() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java
index 0c0287e..9d60ae2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java
@@ -83,7 +83,7 @@ public class ClusterFilterSeriesReader extends AbstractClusterPointReader {
   @Override
   protected void updateCurrentBatchData() throws RaftConnectionException {
     if (batchDataList.isEmpty() && !remoteDataFinish) {
-      queryManager.fetchBatchDataForFilterPaths(groupId);
+      queryManager.fetchBatchDataForAllFilterPaths(groupId);
     }
     if (!batchDataList.isEmpty()) {
       currentBatchData = batchDataList.removeFirst();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
index 55639a1..fadd92f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.cluster.query.reader.querynode;
 
 import java.io.IOException;
-import org.apache.iotdb.cluster.query.manager.common.FillBatchData;
+import org.apache.iotdb.cluster.query.common.FillBatchData;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java
similarity index 93%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java
index 1cd357e..65f8c1c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java
@@ -32,9 +32,9 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 /**
- * Batch reader for all filter paths.
+ * Batch reader entity for all filter paths.
  */
-public class ClusterFilterSeriesBatchReader implements IClusterFilterSeriesBatchReader {
+public class ClusterFilterSeriesBatchReaderEntity implements IClusterFilterSeriesBatchReaderEntity {
 
   private List<Path> allFilterPath;
 
@@ -44,7 +44,7 @@ public class ClusterFilterSeriesBatchReader implements IClusterFilterSeriesBatch
 
   private static final ClusterConfig CLUSTER_CONF = ClusterDescriptor.getInstance().getConfig();
 
-  public ClusterFilterSeriesBatchReader(QueryDataSet queryDataSet, List<Path> allFilterPath,
+  public ClusterFilterSeriesBatchReaderEntity(QueryDataSet queryDataSet, List<Path> allFilterPath,
       List<Filter> filters) {
     this.queryDataSet = queryDataSet;
     this.allFilterPath = allFilterPath;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
similarity index 52%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
index 218d68b..f0dea38 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
@@ -18,19 +18,45 @@
  */
 package org.apache.iotdb.cluster.query.reader.querynode;
 
-import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
-import org.apache.iotdb.tsfile.read.common.BatchData;
 
 /**
- * Batch reader for filter series which is used in query node.
+ * Batch reader entity for all select paths.
  */
-public interface IClusterFilterSeriesBatchReader {
-
-  boolean hasNext() throws IOException;
+public class ClusterSelectSeriesBatchReaderEntity {
+  /**
+   * All select paths
+   */
+  List<String> paths;
 
   /**
-   * Get next batch data of all filter series.
+   * All select readers
    */
-  List<BatchData> nextBatchList() throws IOException;
+  List<AbstractClusterSelectSeriesBatchReader> readers;
+
+  public ClusterSelectSeriesBatchReaderEntity() {
+    paths = new ArrayList<>();
+    readers = new ArrayList<>();
+  }
+
+  public void addPath(String path) {
+    this.paths.add(path);
+  }
+
+  public void addReaders(AbstractClusterSelectSeriesBatchReader reader) {
+    this.readers.add(reader);
+  }
+
+  public List<AbstractClusterSelectSeriesBatchReader> getAllReaders() {
+    return readers;
+  }
+
+  public AbstractClusterSelectSeriesBatchReader getReaderByIndex(int index){
+    return readers.get(index);
+  }
+
+  public List<String> getAllPaths() {
+    return paths;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java
similarity index 95%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java
index 218d68b..a045e2a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 /**
  * Batch reader for filter series which is used in query node.
  */
-public interface IClusterFilterSeriesBatchReader {
+public interface IClusterFilterSeriesBatchReaderEntity {
 
   boolean hasNext() throws IOException;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
index 0f05cf2..7525368 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.cluster.query.utils;
 
-import org.apache.iotdb.cluster.query.manager.common.FillBatchData;
+import org.apache.iotdb.cluster.query.common.FillBatchData;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.db.utils.TimeValuePairUtils;
 import org.apache.iotdb.tsfile.read.common.BatchData;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataByTimestampRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataByTimestampRequest.java
index 351e6eb..cbcef15 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataByTimestampRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataByTimestampRequest.java
@@ -39,21 +39,16 @@ public class QuerySeriesDataByTimestampRequest extends BasicQueryRequest {
    */
   private List<Long> batchTimestamp;
 
-  /**
-   * Series to fetch data from remote query node
-   */
-  private List<String> fetchDataSeries;
-
   private QuerySeriesDataByTimestampRequest(String groupID) {
     super(groupID);
   }
 
-  public static QuerySeriesDataByTimestampRequest createRequest(String groupId, long queryRounds, String taskId, List<Long> batchTimestamp, List<String> fetchDataSeries){
+  public static QuerySeriesDataByTimestampRequest createRequest(String groupId, long queryRounds,
+      String taskId, List<Long> batchTimestamp) {
     QuerySeriesDataByTimestampRequest request = new QuerySeriesDataByTimestampRequest(groupId);
     request.queryRounds = queryRounds;
     request.taskId = taskId;
     request.batchTimestamp = batchTimestamp;
-    request.fetchDataSeries = fetchDataSeries;
     return request;
   }
 
@@ -80,12 +75,4 @@ public class QuerySeriesDataByTimestampRequest extends BasicQueryRequest {
   public void setBatchTimestamp(List<Long> batchTimestamp) {
     this.batchTimestamp = batchTimestamp;
   }
-
-  public List<String> getFetchDataSeries() {
-    return fetchDataSeries;
-  }
-
-  public void setFetchDataSeries(List<String> fetchDataSeries) {
-    this.fetchDataSeries = fetchDataSeries;
-  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
index 554b8c1..e0fc23c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
@@ -46,9 +46,9 @@ public class QuerySeriesDataRequest extends BasicQueryRequest {
   private PathType pathType;
 
   /**
-   * Key is series type, value is series list
+   * list of series path index.
    */
-  private List<String> seriesPaths = new ArrayList<>();
+  private List<Integer> seriesPathIndexs = new ArrayList<>();
 
   private QuerySeriesDataRequest(String groupID, String taskId) {
     super(groupID);
@@ -56,10 +56,10 @@ public class QuerySeriesDataRequest extends BasicQueryRequest {
   }
 
   public static QuerySeriesDataRequest createFetchDataRequest(String groupId, String taskId,
-      PathType pathType, List<String> seriesPaths, long queryRounds) {
+      PathType pathType, List<Integer> seriesPathIndexs, long queryRounds) {
     QuerySeriesDataRequest request = new QuerySeriesDataRequest(groupId, taskId);
     request.pathType = pathType;
-    request.seriesPaths = seriesPaths;
+    request.seriesPathIndexs = seriesPathIndexs;
     request.queryRounds = queryRounds;
     return request;
   }
@@ -88,11 +88,7 @@ public class QuerySeriesDataRequest extends BasicQueryRequest {
     this.pathType = pathType;
   }
 
-  public List<String> getSeriesPaths() {
-    return seriesPaths;
-  }
-
-  public void setSeriesPaths(List<String> seriesPaths) {
-    this.seriesPaths = seriesPaths;
+  public List<Integer> getSeriesPathIndexs() {
+    return seriesPathIndexs;
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java
index 494029c..45ab923 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java
@@ -163,7 +163,6 @@ public class IoTDBAggregationLargeDataIT {
   }
 
   @Test
-  @Ignore
   public void remoteTest() throws ClassNotFoundException, SQLException {
     QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
     insertSQL();
@@ -183,7 +182,7 @@ public class IoTDBAggregationLargeDataIT {
     maxTimeAggreWithMultiFilterTest();
     minValueAggreWithMultiFilterTest();
     maxValueAggreWithMultiFilterTest();
-//    meanAggreWithMultiFilterTest();
+    meanAggreWithMultiFilterTest();
     sumAggreWithMultiFilterTest();
     firstAggreWithMultiFilterTest();
   }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
index 02dc01f..162c5ac 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
@@ -181,13 +181,6 @@ public class IoTDBAggregationSmallDataIT {
   }
 
   @Test
-  @Ignore
-  public void countOnlyTimeFilterRemoteTest() throws ClassNotFoundException, SQLException {
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
-    countOnlyTimeFilterTest();
-  }
-
-  @Test
   public void functionsNoFilterTest() throws ClassNotFoundException, SQLException {
     String[] retArray = new String[]{
         "0,4,0,6,1",
@@ -286,13 +279,6 @@ public class IoTDBAggregationSmallDataIT {
   }
 
   @Test
-  @Ignore
-  public void functionsNoFilterRemoteTest() throws ClassNotFoundException, SQLException {
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
-    functionsNoFilterTest();
-  }
-
-  @Test
   public void lastAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
     String[] retArray = new String[]{
         "0,22222,55555"
@@ -322,13 +308,6 @@ public class IoTDBAggregationSmallDataIT {
   }
 
   @Test
-  @Ignore
-  public void lastAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException {
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
-    lastAggreWithSingleFilterTest();
-  }
-
-  @Test
   public void firstAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
     String[] retArray = new String[]{
         "0,99,180"
@@ -358,13 +337,6 @@ public class IoTDBAggregationSmallDataIT {
   }
 
   @Test
-  @Ignore
-  public void firstAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException {
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
-    firstAggreWithSingleFilterTest();
-  }
-
-  @Test
   public void sumAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
     String[] retArray = new String[]{
         "0,22321.0,55934.0,1029"
@@ -394,13 +366,6 @@ public class IoTDBAggregationSmallDataIT {
   }
 
   @Test
-  @Ignore
-  public void sumAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException {
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
-    sumAggreWithSingleFilterTest();
-  }
-
-  @Test
   public void meanAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
     String[] retArray = new String[]{
         "0,11160.5,18645,206"
@@ -430,13 +395,6 @@ public class IoTDBAggregationSmallDataIT {
   }
 
   @Test
-  @Ignore
-  public void meanAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException {
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
-    meanAggreWithSingleFilterTest();
-  }
-
-  @Test
   public void countAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
     String[] retArray = new String[]{
         "0,2,3,5,1,0"
@@ -468,13 +426,6 @@ public class IoTDBAggregationSmallDataIT {
   }
 
   @Test
-  @Ignore
-  public void countAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException {
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
-    countAggreWithSingleFilterTest();
-  }
-
-  @Test
   public void minTimeAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
     String[] retArray = new String[]{
         "0,104,1,2,101,100"
@@ -507,13 +458,6 @@ public class IoTDBAggregationSmallDataIT {
   }
 
   @Test
-  @Ignore
-  public void minTimeAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException {
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
-    minTimeAggreWithSingleFilterTest();
-  }
-
-  @Test
   public void maxTimeAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
     String[] retArray = new String[]{
         "0,105,105,105,102,100"
@@ -546,13 +490,6 @@ public class IoTDBAggregationSmallDataIT {
   }
 
   @Test
-  @Ignore
-  public void maxTimeAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException {
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
-    maxTimeAggreWithSingleFilterTest();
-  }
-
-  @Test
   public void minValueAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
     String[] retArray = new String[]{
         "0,90,180,2.22,ddddd,true"
@@ -588,14 +525,6 @@ public class IoTDBAggregationSmallDataIT {
   }
 
   @Test
-  @Ignore
-  public void minValueAggreWithSingleFilterRemoteTest()
-      throws ClassNotFoundException, SQLException {
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
-    minValueAggreWithSingleFilterTest();
-  }
-
-  @Test
   public void maxValueAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
     String[] retArray = new String[]{
         "0,99,50000,11.11,fffff,true"
@@ -630,14 +559,6 @@ public class IoTDBAggregationSmallDataIT {
   }
 
   @Test
-  @Ignore
-  public void maxValueAggreWithSingleFilterRemoteTest()
-      throws ClassNotFoundException, SQLException {
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
-    maxValueAggreWithSingleFilterTest();
-  }
-
-  @Test
   public void countAggreWithMultiMultiFilterTest() {
     String[] retArray = new String[]{
         "0,2",
@@ -667,14 +588,6 @@ public class IoTDBAggregationSmallDataIT {
   }
 
   @Test
-  @Ignore
-  public void countAggreWithMultiMultiFilterRemoteTest()
-      throws ClassNotFoundException, SQLException {
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
-    countAggreWithMultiMultiFilterTest();
-  }
-
-  @Test
   public void selectAllSQLTest() throws ClassNotFoundException, SQLException {
     //d0s0,d0s1,d0s2,d0s3,d1s0
     String[] retArray = new String[]{
@@ -739,13 +652,6 @@ public class IoTDBAggregationSmallDataIT {
     }
   }
 
-  @Test
-  @Ignore
-  public void selectAllSQLRemoteTest() throws ClassNotFoundException, SQLException {
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
-    selectAllSQLTest();
-  }
-
   private static void insertSQL() {
     try (Connection connection = DriverManager.getConnection
         (Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java
index b822831..e71e489 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java
@@ -42,7 +42,8 @@ import org.apache.iotdb.cluster.query.manager.querynode.ClusterLocalSingleQueryM
 import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp;
-import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader;
+import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReaderEntity;
+import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderEntity;
 import org.apache.iotdb.cluster.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
@@ -219,15 +220,15 @@ public class ClusterLocalManagerTest {
         assertNotNull(singleQueryManager);
         assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
         assertEquals(0, singleQueryManager.getQueryRound());
-        assertNull(singleQueryManager.getFilterReader());
-        Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager
-            .getSelectSeriesReaders();
-        assertEquals(3, selectSeriesReaders.size());
+        assertNull(singleQueryManager.getFilterReaderEntity());
+        ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
+        assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) {
-          String path = entry.getKey();
-          TSDataType dataType = typeMap.get(path);
-          AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue();
+        List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+        List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
+        for (int i =0 ; i < readers.size(); i++) {
+          TSDataType dataType = typeMap.get(paths.get(i));
+          AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
           assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader());
           assertEquals(dataType,
               ((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType());
@@ -246,15 +247,15 @@ public class ClusterLocalManagerTest {
         assertNotNull(singleQueryManager);
         assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
         assertEquals(0, singleQueryManager.getQueryRound());
-        assertNull(singleQueryManager.getFilterReader());
-        Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager
-            .getSelectSeriesReaders();
-        assertEquals(3, selectSeriesReaders.size());
+        assertNull(singleQueryManager.getFilterReaderEntity());
+        ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
+        assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) {
-          String path = entry.getKey();
-          TSDataType dataType = typeMap.get(path);
-          AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue();
+        List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+        List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
+        for (int i =0 ; i < readers.size(); i++) {
+          TSDataType dataType = typeMap.get(paths.get(i));
+          AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
           assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader());
           assertEquals(dataType,
               ((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType());
@@ -273,15 +274,15 @@ public class ClusterLocalManagerTest {
         assertNotNull(singleQueryManager);
         assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
         assertEquals(0, singleQueryManager.getQueryRound());
-        assertNull(singleQueryManager.getFilterReader());
-        Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager
-            .getSelectSeriesReaders();
-        assertEquals(3, selectSeriesReaders.size());
+        assertNull(singleQueryManager.getFilterReaderEntity());
+        ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
+        assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) {
-          String path = entry.getKey();
-          TSDataType dataType = typeMap.get(path);
-          AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue();
+        List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+        List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
+        for (int i =0 ; i < readers.size(); i++) {
+          TSDataType dataType = typeMap.get(paths.get(i));
+          AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
           assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader());
           assertEquals(dataType,
               ((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType());
@@ -310,22 +311,22 @@ public class ClusterLocalManagerTest {
         assertNotNull(singleQueryManager);
         assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
         assertEquals(3, singleQueryManager.getQueryRound());
-        ClusterFilterSeriesBatchReader filterReader = (ClusterFilterSeriesBatchReader) singleQueryManager.getFilterReader();
+        ClusterFilterSeriesBatchReaderEntity filterReader = (ClusterFilterSeriesBatchReaderEntity) singleQueryManager.getFilterReaderEntity();
         assertNotNull(filterReader);
         List<Path> allFilterPaths = new ArrayList<>();
         allFilterPaths.add(new Path("root.vehicle.d0.s0"));
         assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath()));
         assertNotNull(filterReader.getQueryDataSet());
 
-        Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager
-            .getSelectSeriesReaders();
-        assertNotNull(selectSeriesReaders);
-        assertEquals(3, selectSeriesReaders.size());
+        ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
+        assertNotNull(selectSeriesBatchReaderEntity.getAllReaders());
+        assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) {
-          String path = entry.getKey();
-          TSDataType dataType = typeMap.get(path);
-          AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue();
+        List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+        List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
+        for (int i =0 ; i < readers.size(); i++) {
+          TSDataType dataType = typeMap.get(paths.get(i));
+          AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
           assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp());
           assertEquals(dataType,
               ((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType());
@@ -344,22 +345,22 @@ public class ClusterLocalManagerTest {
         assertNotNull(singleQueryManager);
         assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
         assertEquals(3, singleQueryManager.getQueryRound());
-        ClusterFilterSeriesBatchReader filterReader = (ClusterFilterSeriesBatchReader) singleQueryManager.getFilterReader();
+        ClusterFilterSeriesBatchReaderEntity filterReader = (ClusterFilterSeriesBatchReaderEntity) singleQueryManager.getFilterReaderEntity();
         assertNotNull(filterReader);
         List<Path> allFilterPaths = new ArrayList<>();
         allFilterPaths.add(new Path("root.vehicle.d0.s0"));
         assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath()));
         assertNotNull(filterReader.getQueryDataSet());
 
-        Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager
-            .getSelectSeriesReaders();
-        assertNotNull(selectSeriesReaders);
-        assertEquals(3, selectSeriesReaders.size());
+        ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
+        List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+        assertNotNull(readers);
+        assertEquals(3, readers.size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) {
-          String path = entry.getKey();
-          TSDataType dataType = typeMap.get(path);
-          AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue();
+        List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
+        for (int i =0 ; i < readers.size(); i++) {
+          TSDataType dataType = typeMap.get(paths.get(i));
+          AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
           assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp());
           assertEquals(dataType,
               ((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType());
@@ -378,22 +379,22 @@ public class ClusterLocalManagerTest {
         assertNotNull(singleQueryManager);
         assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
         assertEquals(3, singleQueryManager.getQueryRound());
-        ClusterFilterSeriesBatchReader filterReader = (ClusterFilterSeriesBatchReader) singleQueryManager.getFilterReader();
+        ClusterFilterSeriesBatchReaderEntity filterReader = (ClusterFilterSeriesBatchReaderEntity) singleQueryManager.getFilterReaderEntity();
         assertNotNull(filterReader);
         List<Path> allFilterPaths = new ArrayList<>();
         allFilterPaths.add(new Path("root.vehicle.d0.s0"));
         assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath()));
         assertNotNull(filterReader.getQueryDataSet());
 
-        Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager
-            .getSelectSeriesReaders();
-        assertNotNull(selectSeriesReaders);
-        assertEquals(3, selectSeriesReaders.size());
+        ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
+        List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+        assertNotNull(readers);
+        assertEquals(3, readers.size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) {
-          String path = entry.getKey();
-          TSDataType dataType = typeMap.get(path);
-          AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue();
+        List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
+        for (int i =0 ; i < readers.size(); i++) {
+          TSDataType dataType = typeMap.get(paths.get(i));
+          AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
           assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp());
           assertEquals(dataType,
               ((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType());