You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 01:51:33 UTC

[incubator-iotdb] 06/11: fix some error bugs: add select series group entity, add query for all nodes features

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 3ee37c3afc234b347c82b9ac984fb487988f8aeb
Author: lta <li...@163.com>
AuthorDate: Mon May 20 15:34:30 2019 +0800

    fix some error bugs: add select series group entity, add query for all nodes features
---
 .../executor/ClusterAggregateEngineExecutor.java   |  48 ++++----
 .../executor/ClusterExecutorWithTimeGenerator.java |  35 ++----
 .../ClusterExecutorWithoutTimeGenerator.java       |  21 +++-
 .../query/executor/ClusterFillEngineExecutor.java  |  15 ++-
 .../query/factory/ClusterSeriesReaderFactory.java  |  28 +++--
 .../ClusterRpcSingleQueryManager.java              | 137 +++++++++------------
 ...oupEntity.java => FilterSeriesGroupEntity.java} |   4 +-
 .../IClusterRpcSingleQueryManager.java             |   7 --
 ...oupEntity.java => SelectSeriesGroupEntity.java} |  55 +++------
 .../AbstractClusterPointReader.java                |   2 +
 .../coordinatornode/ClusterFilterSeriesReader.java |  17 +--
 .../coordinatornode/ClusterSelectSeriesReader.java |  25 +---
 .../timegenerator/ClusterNodeConstructor.java      |   4 +-
 .../query/utils/ClusterTimeValuePairUtils.java     |  18 +++
 .../iotdb/cluster/query/utils/ExpressionUtils.java |  12 +-
 .../query/utils/QueryPlanPartitionUtils.java       |  65 +++++-----
 .../query/manager/ClusterRpcManagerTest.java       |  46 ++-----
 .../query/utils/QueryPlanPartitionUtilsTest.java   |  56 +++++----
 18 files changed, 268 insertions(+), 327 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
index 51113c9..b34afa1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.cluster.query.executor;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -29,9 +30,11 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
 import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
 import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -79,18 +82,25 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
   public QueryDataSet executeWithoutTimeGenerator(QueryContext context)
       throws FileNodeManagerException, IOException, PathErrorException, ProcessorException {
     Filter timeFilter = expression != null ? ((GlobalTimeExpression) expression).getFilter() : null;
-    Map<Path, ClusterSelectSeriesReader> selectPathReaders = queryManager.getSelectSeriesReaders();
+    Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = queryManager
+        .getSelectSeriesGroupEntityMap();
 
     List<Path> paths = new ArrayList<>();
     List<IPointReader> readers = new ArrayList<>();
     List<TSDataType> dataTypes = new ArrayList<>();
+    //Mark filter series reader index group by group id
+    Map<String, Integer> selectSeriesReaderIndex = new HashMap<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
       Path path = selectedSeries.get(i);
+      String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
 
-      if (selectPathReaders.containsKey(path)) {
-        ClusterSelectSeriesReader reader = selectPathReaders.get(path);
+      if (selectSeriesGroupEntityMap.containsKey(groupId)) {
+        int index = selectSeriesReaderIndex.getOrDefault(groupId, 0);
+        ClusterSelectSeriesReader reader = selectSeriesGroupEntityMap.get(groupId)
+            .getSelectSeriesReaders().get(index);
         readers.add(reader);
         dataTypes.add(reader.getDataType());
+        selectSeriesReaderIndex.put(groupId, index + 1);
       } else {
         paths.add(path);
         // construct AggregateFunction
@@ -140,15 +150,19 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
 
     /** add query token for query series which can handle locally **/
     List<Path> localQuerySeries = new ArrayList<>(selectedSeries);
-    Set<Path> remoteQuerySeries = queryManager.getSelectSeriesReaders().keySet();
+    Set<Path> remoteQuerySeries = new HashSet<>();
+    queryManager.getSelectSeriesGroupEntityMap().values().forEach(
+        selectSeriesGroupEntity -> selectSeriesGroupEntity.getSelectPaths()
+            .forEach(path -> remoteQuerySeries.add(path)));
     localQuerySeries.removeAll(remoteQuerySeries);
     QueryResourceManager.getInstance()
         .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries);
 
     /** add query token for filter series which can handle locally **/
     Set<String> deviceIdSet = new HashSet<>();
-    for (FilterGroupEntity filterGroupEntity : queryManager.getFilterGroupEntityMap().values()) {
-      List<Path> remoteFilterSeries = filterGroupEntity.getFilterPaths();
+    for (FilterSeriesGroupEntity filterSeriesGroupEntity : queryManager
+        .getFilterSeriesGroupEntityMap().values()) {
+      List<Path> remoteFilterSeries = filterSeriesGroupEntity.getFilterPaths();
       remoteFilterSeries.forEach(seriesPath -> deviceIdSet.add(seriesPath.getDevice()));
     }
     QueryResourceManager.getInstance()
@@ -156,32 +170,18 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
 
     ClusterTimeGenerator timestampGenerator;
     List<EngineReaderByTimeStamp> readersOfSelectedSeries;
+    // origin data type of select paths
+    List<TSDataType> originDataTypes = new ArrayList<>();
     try {
       timestampGenerator = new ClusterTimeGenerator(expression, context,
           queryManager);
       readersOfSelectedSeries = ClusterSeriesReaderFactory
           .createReadersByTimestampOfSelectedPaths(selectedSeries, context,
-              queryManager);
+              queryManager, originDataTypes);
     } catch (IOException ex) {
       throw new FileNodeManagerException(ex);
     }
 
-    /** Get data type of select paths **/
-    List<TSDataType> originDataTypes = new ArrayList<>();
-    Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = queryManager
-        .getSelectSeriesReaders();
-    for (Path path : selectedSeries) {
-      try {
-        if (selectSeriesReaders.containsKey(path)) {
-          originDataTypes.add(selectSeriesReaders.get(path).getDataType());
-        } else {
-          originDataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
-        }
-      } catch (PathErrorException e) {
-        throw new FileNodeManagerException(e);
-      }
-    }
-
     List<AggregateFunction> aggregateFunctions = new ArrayList<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
       TSDataType type = originDataTypes.get(i);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java
index fed8c0d..fe2511a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java
@@ -27,7 +27,7 @@ import java.util.Set;
 import org.apache.iotdb.cluster.query.dataset.ClusterDataSetWithTimeGenerator;
 import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
 import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
 import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -71,15 +71,19 @@ public class ClusterExecutorWithTimeGenerator {
 
     /** add query token for query series which can handle locally **/
     List<Path> localQuerySeries = new ArrayList<>(queryExpression.getSelectedSeries());
-    Set<Path> remoteQuerySeries = queryManager.getSelectSeriesReaders().keySet();
+    Set<Path> remoteQuerySeries = new HashSet<>();
+    queryManager.getSelectSeriesGroupEntityMap().values().forEach(
+        selectSeriesGroupEntity -> selectSeriesGroupEntity.getSelectPaths()
+            .forEach(path -> remoteQuerySeries.add(path)));
     localQuerySeries.removeAll(remoteQuerySeries);
     QueryResourceManager.getInstance()
         .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries);
 
     /** add query token for filter series which can handle locally **/
     Set<String> deviceIdSet = new HashSet<>();
-    for (FilterGroupEntity filterGroupEntity : queryManager.getFilterGroupEntityMap().values()) {
-      List<Path> remoteFilterSeries = filterGroupEntity.getFilterPaths();
+    for (FilterSeriesGroupEntity filterSeriesGroupEntity : queryManager
+        .getFilterSeriesGroupEntityMap().values()) {
+      List<Path> remoteFilterSeries = filterSeriesGroupEntity.getFilterPaths();
       remoteFilterSeries.forEach(seriesPath -> deviceIdSet.add(seriesPath.getDevice()));
     }
     QueryResourceManager.getInstance()
@@ -88,33 +92,18 @@ public class ClusterExecutorWithTimeGenerator {
 
     ClusterTimeGenerator timestampGenerator;
     List<EngineReaderByTimeStamp> readersOfSelectedSeries;
+    /** Get data type of select paths **/
+    List<TSDataType> dataTypes = new ArrayList<>();
     try {
       timestampGenerator = new ClusterTimeGenerator(queryExpression.getExpression(), context,
           queryManager);
       readersOfSelectedSeries = ClusterSeriesReaderFactory
           .createReadersByTimestampOfSelectedPaths(queryExpression.getSelectedSeries(), context,
-              queryManager);
-    } catch (IOException ex) {
+              queryManager, dataTypes);
+    } catch (IOException | PathErrorException ex) {
       throw new FileNodeManagerException(ex);
     }
 
-    /** Get data type of select paths **/
-    List<TSDataType> dataTypes = new ArrayList<>();
-    Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = queryManager
-        .getSelectSeriesReaders();
-    for (Path path : queryExpression.getSelectedSeries()) {
-      try {
-        if (selectSeriesReaders.containsKey(path)) {
-          dataTypes.add(selectSeriesReaders.get(path).getDataType());
-        } else {
-          dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
-        }
-      } catch (PathErrorException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-    }
-
     EngineReaderByTimeStamp[] readersOfSelectedSeriesArray = new EngineReaderByTimeStamp[readersOfSelectedSeries
         .size()];
     int index = 0;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithoutTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithoutTimeGenerator.java
index 8f42c9f..95e5f1a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithoutTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithoutTimeGenerator.java
@@ -20,11 +20,15 @@ package org.apache.iotdb.cluster.query.executor;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
 import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
@@ -62,7 +66,7 @@ public class ClusterExecutorWithoutTimeGenerator extends AbstractExecutorWithout
    * Execute query without filter or with only global time filter.
    */
   public QueryDataSet execute(QueryContext context)
-      throws FileNodeManagerException {
+      throws FileNodeManagerException, PathErrorException {
 
     Filter timeFilter = null;
     if (queryExpression.getExpression() != null) {
@@ -72,15 +76,22 @@ public class ClusterExecutorWithoutTimeGenerator extends AbstractExecutorWithout
     List<IPointReader> readersOfSelectedSeries = new ArrayList<>();
     List<TSDataType> dataTypes = new ArrayList<>();
 
-    Map<Path, ClusterSelectSeriesReader> selectPathReaders = queryManager.getSelectSeriesReaders();
+    Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = queryManager
+        .getSelectSeriesGroupEntityMap();
     List<Path> paths = new ArrayList<>();
+    //Mark filter series reader index group by group id
+    Map<String, Integer> selectSeriesReaderIndex = new HashMap<>();
     for (Path path : queryExpression.getSelectedSeries()) {
 
-      if (selectPathReaders.containsKey(path)) {
-        ClusterSelectSeriesReader reader = selectPathReaders.get(path);
+      String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
+
+      if (selectSeriesGroupEntityMap.containsKey(groupId)) {
+        int index = selectSeriesReaderIndex.getOrDefault(groupId, 0);
+        ClusterSelectSeriesReader reader = selectSeriesGroupEntityMap.get(groupId)
+            .getSelectSeriesReaders().get(index);
         readersOfSelectedSeries.add(reader);
         dataTypes.add(reader.getDataType());
-
+        selectSeriesReaderIndex.put(groupId, index + 1);
       } else {
         IPointReader reader = createSeriesReader(context, path, dataTypes, timeFilter);
         readersOfSelectedSeries.add(reader);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java
index 771637e..608a479 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java
@@ -20,10 +20,13 @@ package org.apache.iotdb.cluster.query.executor;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
 import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -58,16 +61,22 @@ public class ClusterFillEngineExecutor implements IFillEngineExecutor {
   @Override
   public QueryDataSet execute(QueryContext context)
       throws FileNodeManagerException, PathErrorException, IOException {
-    Map<Path, ClusterSelectSeriesReader> selectPathReaders = queryManager.getSelectSeriesReaders();
     List<Path> paths = new ArrayList<>();
     List<IFill> fillList = new ArrayList<>();
     List<TSDataType> dataTypeList = new ArrayList<>();
     List<IPointReader> readers = new ArrayList<>();
+    Map<String, SelectSeriesGroupEntity> selectSeriesEntityMap = queryManager.getSelectSeriesGroupEntityMap();
+    //Mark filter series reader index group by group id
+    Map<String, Integer> selectSeriesReaderIndex = new HashMap<>();
     for (Path path : selectedSeries) {
-      if (selectPathReaders.containsKey(path)) {
-        ClusterSelectSeriesReader reader = selectPathReaders.get(path);
+      String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
+
+      if (selectSeriesEntityMap.containsKey(groupId)) {
+        int index = selectSeriesReaderIndex.getOrDefault(groupId, 0);
+        ClusterSelectSeriesReader reader = selectSeriesEntityMap.get(groupId).getSelectSeriesReaders().get(index);
         readers.add(reader);
         dataTypeList.add(reader.getDataType());
+        selectSeriesReaderIndex.put(groupId, index + 1);
       } else {
         QueryDataSource queryDataSource = QueryResourceManager.getInstance()
             .getQueryDataSource(path, context);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java
index d65ed58..a9ee032 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java
@@ -20,18 +20,24 @@ package org.apache.iotdb.cluster.query.factory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
 import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
 import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
 import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.sequence.SequenceDataReaderByTimestamp;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 
 /**
@@ -43,27 +49,35 @@ public class ClusterSeriesReaderFactory {
   }
 
   /**
-   * Construct ReaderByTimestamp , include sequential data and unsequential data.
+   * Construct ReaderByTimestamp , include sequential data and unsequential data. And get all series dataType.
    *
    * @param paths selected series path
    * @param context query context
    * @return the list of EngineReaderByTimeStamp
    */
   public static List<EngineReaderByTimeStamp> createReadersByTimestampOfSelectedPaths(
-      List<Path> paths, QueryContext context, ClusterRpcSingleQueryManager queryManager)
-      throws IOException, FileNodeManagerException {
+      List<Path> paths, QueryContext context, ClusterRpcSingleQueryManager queryManager, List<TSDataType> dataTypes)
+      throws IOException, FileNodeManagerException, PathErrorException {
 
-    Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = queryManager.getSelectSeriesReaders();
+    Map<String, SelectSeriesGroupEntity> selectSeriesEntityMap = queryManager
+        .getSelectSeriesGroupEntityMap();
     List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
+    //Mark filter series reader index group by group id
+    Map<String, Integer> selectSeriesReaderIndex = new HashMap<>();
 
     for (Path path : paths) {
-
-      if (selectSeriesReaders.containsKey(path)) {
-        readersOfSelectedSeries.add(selectSeriesReaders.get(path));
+      String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
+      if (selectSeriesEntityMap.containsKey(groupId)) {
+        int index = selectSeriesReaderIndex.getOrDefault(groupId, 0);
+        ClusterSelectSeriesReader reader = selectSeriesEntityMap.get(groupId).getSelectSeriesReaders().get(index);
+        readersOfSelectedSeries.add(reader);
+        dataTypes.add(reader.getDataType());
+        selectSeriesReaderIndex.put(groupId, index + 1);
       } else {
         /** can handle series query locally **/
         EngineReaderByTimeStamp readerByTimeStamp = createReaderByTimeStamp(path, context);
         readersOfSelectedSeries.add(readerByTimeStamp);
+        dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
       }
     }
     return readersOfSelectedSeries;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index af4db31..905ce1b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -86,27 +86,15 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
 
   // select path resource
   /**
-   * Query plans of select paths which are divided from queryPlan group by group id, it contains all
-   * group id ,including local data group if it involves.
+   * Select series group entity group by data group, key is group id(only contain remote group id)
    */
-  private Map<String, QueryPlan> selectPathPlans = new HashMap<>();
-
-  /**
-   * Key is group id (only contains remote group id), value is all select series in group id.
-   */
-  private Map<String, List<Path>> selectSeriesByGroupId = new HashMap<>();
-
-  /**
-   * Series reader of select paths (only contains remote series), key is series path , value is
-   * reader
-   */
-  private Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = new HashMap<>();
+  private Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = new HashMap<>();
 
   // filter path resource
   /**
-   * Filter group entity group by data group, key is group id(only contain remote group id)
+   * Filter series group entity group by data group, key is group id(only contain remote group id)
    */
-  private Map<String, FilterGroupEntity> filterGroupEntityMap = new HashMap<>();
+  private Map<String, FilterSeriesGroupEntity> filterSeriesGroupEntityMap = new HashMap<>();
 
   private static final ClusterConfig CLUSTER_CONF = ClusterDescriptor.getInstance().getConfig();
 
@@ -140,17 +128,18 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
   private void initSeriesReader(int readDataConsistencyLevel)
       throws RaftConnectionException, IOException {
     // Init all series with data group of select series,if filter series has the same data group, init them together.
-    for (Entry<String, QueryPlan> entry : selectPathPlans.entrySet()) {
+    for (Entry<String, SelectSeriesGroupEntity> entry : selectSeriesGroupEntityMap.entrySet()) {
       String groupId = entry.getKey();
-      QueryPlan queryPlan = entry.getValue();
+      SelectSeriesGroupEntity selectEntity = entry.getValue();
+      QueryPlan queryPlan = selectEntity.getQueryPlan();
       if (!QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
         Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
         allQueryPlan.put(PathType.SELECT_PATH, queryPlan);
         List<Filter> filterList = new ArrayList<>();
-        if (filterGroupEntityMap.containsKey(groupId)) {
-          FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId);
-          allQueryPlan.put(PathType.FILTER_PATH, filterGroupEntity.getQueryPlan());
-          filterList = filterGroupEntity.getFilters();
+        if (filterSeriesGroupEntityMap.containsKey(groupId)) {
+          FilterSeriesGroupEntity filterSeriesGroupEntity = filterSeriesGroupEntityMap.get(groupId);
+          allQueryPlan.put(PathType.FILTER_PATH, filterSeriesGroupEntity.getQueryPlan());
+          filterList = filterSeriesGroupEntity.getFilters();
         }
         /** create request **/
         BasicRequest request = InitSeriesReaderRequest
@@ -161,27 +150,29 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
         handleInitReaderResponse(groupId, allQueryPlan, response);
       } else {
         dataGroupUsage.add(groupId);
-        selectSeriesByGroupId.remove(groupId);
-        if (filterGroupEntityMap.containsKey(groupId)) {
-          filterGroupEntityMap.remove(groupId);
-        }
+        selectSeriesGroupEntityMap.remove(groupId);
+        filterSeriesGroupEntityMap.remove(groupId);
       }
     }
 
     //Init series reader with data groups of filter series, which don't exist in data groups list of select series.
-    for (Entry<String, FilterGroupEntity> entry : filterGroupEntityMap.entrySet()) {
+    for (Entry<String, FilterSeriesGroupEntity> entry : filterSeriesGroupEntityMap.entrySet()) {
       String groupId = entry.getKey();
-      if (!selectPathPlans.containsKey(groupId)) {
+      if (!selectSeriesGroupEntityMap.containsKey(groupId) && !QPExecutorUtils
+          .canHandleQueryByGroupId(groupId)) {
         Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
-        FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId);
-        allQueryPlan.put(PathType.FILTER_PATH, filterGroupEntity.getQueryPlan());
-        List<Filter> filterList = filterGroupEntity.getFilters();
+        FilterSeriesGroupEntity filterSeriesGroupEntity = filterSeriesGroupEntityMap.get(groupId);
+        allQueryPlan.put(PathType.FILTER_PATH, filterSeriesGroupEntity.getQueryPlan());
+        List<Filter> filterList = filterSeriesGroupEntity.getFilters();
         BasicRequest request = InitSeriesReaderRequest
             .createInitialQueryRequest(groupId, taskId, readDataConsistencyLevel,
                 allQueryPlan, filterList);
         InitSeriesReaderResponse response = (InitSeriesReaderResponse) ClusterRpcReaderUtils
             .createClusterSeriesReader(groupId, request, this);
         handleInitReaderResponse(groupId, allQueryPlan, response);
+      } else if (!selectSeriesGroupEntityMap.containsKey(groupId)) {
+        dataGroupUsage.add(groupId);
+        filterSeriesGroupEntityMap.remove(groupId);
       }
     }
   }
@@ -201,7 +192,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
         TSDataType dataType = seriesType.get(i);
         ClusterSelectSeriesReader seriesReader = new ClusterSelectSeriesReader(groupId, seriesPath,
             dataType, this);
-        selectSeriesReaders.put(seriesPath, seriesReader);
+        selectSeriesGroupEntityMap.get(groupId).addSelectSeriesReader(seriesReader);
       }
     }
     if (allQueryPlan.containsKey(PathType.FILTER_PATH)) {
@@ -213,10 +204,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
         TSDataType dataType = seriesType.get(i);
         ClusterFilterSeriesReader seriesReader = new ClusterFilterSeriesReader(groupId, seriesPath,
             dataType, this);
-        if (!filterGroupEntityMap.containsKey(groupId)) {
-          filterGroupEntityMap.put(groupId, new FilterGroupEntity(groupId));
-        }
-        filterGroupEntityMap.get(groupId).addFilterSeriesReader(seriesReader);
+        filterSeriesGroupEntityMap.get(groupId).addFilterSeriesReader(seriesReader);
       }
     }
   }
@@ -224,16 +212,15 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
   @Override
   public void fetchBatchDataForSelectPaths(String groupId) throws RaftConnectionException {
     List<String> fetchDataSeries = new ArrayList<>();
-    Map<String, List<Path>> seriesByGroupId;
-    Map<Path, ClusterSelectSeriesReader> seriesReaders;
-    seriesByGroupId = selectSeriesByGroupId;
-    seriesReaders = selectSeriesReaders;
-    if (seriesByGroupId.containsKey(groupId)) {
-      List<Path> allFilterSeries = seriesByGroupId.get(groupId);
-      for (Path series : allFilterSeries) {
-        if (seriesReaders.get(series).enableFetchData()) {
-          fetchDataSeries.add(series.getFullPath());
-        }
+    List<Integer> selectSeriesIndexs = new ArrayList<>();
+    List<Path> selectSeries = selectSeriesGroupEntityMap.get(groupId).getSelectPaths();
+    List<ClusterSelectSeriesReader> seriesReaders = selectSeriesGroupEntityMap.get(groupId)
+        .getSelectSeriesReaders();
+    for (int i = 0; i < selectSeries.size(); i++) {
+      Path series = selectSeries.get(i);
+      if (seriesReaders.get(i).enableFetchData()) {
+        fetchDataSeries.add(series.getFullPath());
+        selectSeriesIndexs.add(i);
       }
     }
     BasicRequest request = QuerySeriesDataRequest
@@ -241,7 +228,8 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
             queryRounds++);
     QuerySeriesDataResponse response = (QuerySeriesDataResponse) ClusterRpcReaderUtils
         .handleQueryRequest(request, queryNodes.get(groupId), 0);
-    handleFetchDataResponseForSelectPaths(fetchDataSeries, response);
+
+    handleFetchDataResponseForSelectPaths(groupId, selectSeriesIndexs, response);
   }
 
   @Override
@@ -258,42 +246,45 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
   @Override
   public void fetchBatchDataByTimestampForAllSelectPaths(List<Long> batchTimestamp)
       throws RaftConnectionException {
-    for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) {
+    for (Entry<String, SelectSeriesGroupEntity> entry : selectSeriesGroupEntityMap.entrySet()) {
       String groupId = entry.getKey();
       List<String> fetchDataFilterSeries = new ArrayList<>();
-      entry.getValue().forEach(path -> fetchDataFilterSeries.add(path.getFullPath()));
+      entry.getValue().getSelectPaths()
+          .forEach(path -> fetchDataFilterSeries.add(path.getFullPath()));
       BasicRequest request = QuerySeriesDataByTimestampRequest
           .createRequest(groupId, queryRounds++, taskId, batchTimestamp, fetchDataFilterSeries);
       QuerySeriesDataByTimestampResponse response = (QuerySeriesDataByTimestampResponse) ClusterRpcReaderUtils
           .handleQueryRequest(request, queryNodes.get(groupId), 0);
-      handleFetchDataByTimestampResponseForSelectPaths(fetchDataFilterSeries, response);
+      handleFetchDataByTimestampResponseForSelectPaths(groupId, fetchDataFilterSeries, response);
     }
   }
 
   /**
    * Handle response of fetching data, and add batch data to corresponding reader.
    */
-  private void handleFetchDataByTimestampResponseForSelectPaths(List<String> fetchDataSeries,
+  private void handleFetchDataByTimestampResponseForSelectPaths(String groupId,
+      List<String> fetchDataSeries,
       BasicQueryDataResponse response) {
     List<BatchData> batchDataList = response.getSeriesBatchData();
+    List<ClusterSelectSeriesReader> selectSeriesReaders = selectSeriesGroupEntityMap.get(groupId)
+        .getSelectSeriesReaders();
     for (int i = 0; i < fetchDataSeries.size(); i++) {
-      String series = fetchDataSeries.get(i);
       BatchData batchData = batchDataList.get(i);
-      selectSeriesReaders.get(new Path(series))
-          .addBatchData(batchData, true);
+      selectSeriesReaders.get(i).addBatchData(batchData, true);
     }
   }
 
   /**
    * Handle response of fetching data, and add batch data to corresponding reader.
    */
-  private void handleFetchDataResponseForSelectPaths(List<String> fetchDataSeries,
-      BasicQueryDataResponse response) {
+  private void handleFetchDataResponseForSelectPaths(String groupId,
+      List<Integer> selectSeriesIndexs, BasicQueryDataResponse response) {
     List<BatchData> batchDataList = response.getSeriesBatchData();
-    for (int i = 0; i < fetchDataSeries.size(); i++) {
-      String series = fetchDataSeries.get(i);
+    List<ClusterSelectSeriesReader> selectSeriesReaders = selectSeriesGroupEntityMap.get(groupId)
+        .getSelectSeriesReaders();
+    for (int i = 0; i < selectSeriesIndexs.size(); i++) {
       BatchData batchData = batchDataList.get(i);
-      selectSeriesReaders.get(new Path(series))
+      selectSeriesReaders.get(selectSeriesIndexs.get(i))
           .addBatchData(batchData, batchData.length() < CLUSTER_CONF.getBatchReadSize());
     }
   }
@@ -303,10 +294,11 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
    */
   private void handleFetchDataResponseForFilterPaths(String groupId,
       QuerySeriesDataResponse response) {
-    FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId);
-    List<Path> fetchDataSeries = filterGroupEntity.getFilterPaths();
+    FilterSeriesGroupEntity filterSeriesGroupEntity = filterSeriesGroupEntityMap.get(groupId);
+    List<Path> fetchDataSeries = filterSeriesGroupEntity.getFilterPaths();
     List<BatchData> batchDataList = response.getSeriesBatchData();
-    List<ClusterFilterSeriesReader> filterReaders = filterGroupEntity.getFilterSeriesReaders();
+    List<ClusterFilterSeriesReader> filterReaders = filterSeriesGroupEntity
+        .getFilterSeriesReaders();
     boolean remoteDataFinish = true;
     for (int i = 0; i < batchDataList.size(); i++) {
       if (batchDataList.get(i).length() != 0) {
@@ -323,11 +315,6 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
   }
 
   @Override
-  public QueryPlan getSelectPathQueryPlan(String fullPath) {
-    return selectPathPlans.get(fullPath);
-  }
-
-  @Override
   public void setDataGroupReaderNode(String groupId, PeerId readerNode) {
     queryNodes.put(groupId, readerNode);
   }
@@ -375,19 +362,11 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
     this.queryNodes.put(groupID, peerId);
   }
 
-  public Map<String, QueryPlan> getSelectPathPlans() {
-    return selectPathPlans;
-  }
-
-  public Map<String, List<Path>> getSelectSeriesByGroupId() {
-    return selectSeriesByGroupId;
-  }
-
-  public Map<Path, ClusterSelectSeriesReader> getSelectSeriesReaders() {
-    return selectSeriesReaders;
+  public Map<String, SelectSeriesGroupEntity> getSelectSeriesGroupEntityMap() {
+    return selectSeriesGroupEntityMap;
   }
 
-  public Map<String, FilterGroupEntity> getFilterGroupEntityMap() {
-    return filterGroupEntityMap;
+  public Map<String, FilterSeriesGroupEntity> getFilterSeriesGroupEntityMap() {
+    return filterSeriesGroupEntityMap;
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterSeriesGroupEntity.java
similarity index 97%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterSeriesGroupEntity.java
index 326af11..19407a0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterSeriesGroupEntity.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 /**
  * Filter entities of a data group, concluding QueryPlan, filters, all filter paths and filter readers
  */
-public class FilterGroupEntity {
+public class FilterSeriesGroupEntity {
 
   /**
    * Group id
@@ -62,7 +62,7 @@ public class FilterGroupEntity {
    */
   private List<ClusterFilterSeriesReader> filterSeriesReaders;
 
-  public FilterGroupEntity(String groupId) {
+  public FilterSeriesGroupEntity(String groupId) {
     this.groupId = groupId;
     this.filterPaths = new ArrayList<>();
     this.filters = new ArrayList<>();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
index c4aec9c..d6ca0d7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
@@ -69,13 +69,6 @@ public interface IClusterRpcSingleQueryManager {
       throws RaftConnectionException;
 
   /**
-   * Get query plan of select path
-   *
-   * @param fullPath Timeseries full path in select paths
-   */
-  QueryPlan getSelectPathQueryPlan(String fullPath);
-
-  /**
    * Set reader node of a data group
    *
    * @param groupId data group id
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java
similarity index 55%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java
index 326af11..9f35117 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java
@@ -21,15 +21,14 @@ package org.apache.iotdb.cluster.query.manager.coordinatornode;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterFilterSeriesReader;
+import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 /**
- * Filter entities of a data group, concluding QueryPlan, filters, all filter paths and filter readers
+ * Select series entity entities of a data group, concluding QueryPlan, all select paths and series readers
  */
-public class FilterGroupEntity {
-
+public class SelectSeriesGroupEntity {
   /**
    * Group id
    */
@@ -41,32 +40,24 @@ public class FilterGroupEntity {
   private QueryPlan queryPlan;
 
   /**
-   * Filters of filter path.
-   */
-  private List<Filter> filters;
-
-  /**
    *
-   * all filter series
+   * all select series
    * <p>
-   * Note: It may contain multiple series in a complicated tree
-   * for example: select * from root.vehicle where d0.s0 > 10 and d0.s0 < 101 or time = 12,
-   * filter tree: <code>[[[[root.vehicle.d0.s0:time == 12] || [root.vehicle.d0.s1:time == 12]] || [root.vehicle.d1.s2:time == 12]] || [root.vehicle.d1.s3:time == 12]]</code>
+   * Note: It may contain multiple series in a query
+   * for example: select sum(s0), max(s0) from root.vehicle.d0 where s0 > 10
    * </p>
    */
-  private List<Path> filterPaths;
-
+  private List<Path> selectPaths;
 
   /**
    * Series reader of filter paths (only contains remote series)
    */
-  private List<ClusterFilterSeriesReader> filterSeriesReaders;
+  private List<ClusterSelectSeriesReader> selectSeriesReaders;
 
-  public FilterGroupEntity(String groupId) {
+  public SelectSeriesGroupEntity(String groupId) {
     this.groupId = groupId;
-    this.filterPaths = new ArrayList<>();
-    this.filters = new ArrayList<>();
-    this.filterSeriesReaders = new ArrayList<>();
+    this.selectPaths = new ArrayList<>();
+    this.selectSeriesReaders = new ArrayList<>();
   }
 
   public String getGroupId() {
@@ -85,27 +76,19 @@ public class FilterGroupEntity {
     this.queryPlan = queryPlan;
   }
 
-  public List<Filter> getFilters() {
-    return filters;
-  }
-
-  public void addFilter(Filter filter) {
-    this.filters.add(filter);
-  }
-
-  public List<Path> getFilterPaths() {
-    return filterPaths;
+  public List<Path> getSelectPaths() {
+    return selectPaths;
   }
 
-  public void addFilterPaths(Path filterPath) {
-    this.filterPaths.add(filterPath);
+  public void addSelectPaths(Path selectPath) {
+    this.selectPaths.add(selectPath);
   }
 
-  public List<ClusterFilterSeriesReader> getFilterSeriesReaders() {
-    return filterSeriesReaders;
+  public List<ClusterSelectSeriesReader> getSelectSeriesReaders() {
+    return selectSeriesReaders;
   }
 
-  public void addFilterSeriesReader(ClusterFilterSeriesReader filterSeriesReader) {
-    this.filterSeriesReaders.add(filterSeriesReader);
+  public void addSelectSeriesReader(ClusterSelectSeriesReader selectSeriesReader) {
+    this.selectSeriesReaders.add(selectSeriesReader);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java
index 3f73160..c0012a1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java
@@ -71,4 +71,6 @@ public abstract class AbstractClusterPointReader implements IPointReader {
     }
     return null;
   }
+
+  public abstract void addBatchData(BatchData batchData, boolean remoteDataFinish);
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java
index 805d3af..0c0287e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java
@@ -95,14 +95,6 @@ public class ClusterFilterSeriesReader extends AbstractClusterPointReader {
     //Do nothing
   }
 
-  public Path getSeriesPath() {
-    return seriesPath;
-  }
-
-  public void setSeriesPath(Path seriesPath) {
-    this.seriesPath = seriesPath;
-  }
-
   public TSDataType getDataType() {
     return dataType;
   }
@@ -111,14 +103,7 @@ public class ClusterFilterSeriesReader extends AbstractClusterPointReader {
     this.dataType = dataType;
   }
 
-  public BatchData getCurrentBatchData() {
-    return currentBatchData;
-  }
-
-  public void setCurrentBatchData(BatchData currentBatchData) {
-    this.currentBatchData = currentBatchData;
-  }
-
+  @Override
   public void addBatchData(BatchData batchData, boolean remoteDataFinish) {
     batchDataList.addLast(batchData);
     this.remoteDataFinish = remoteDataFinish;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java
index 0a507d5..c640b53 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java
@@ -119,14 +119,6 @@ public class ClusterSelectSeriesReader extends AbstractClusterPointReader implem
     batchDataList = null;
   }
 
-  public Path getSeriesPath() {
-    return seriesPath;
-  }
-
-  public void setSeriesPath(Path seriesPath) {
-    this.seriesPath = seriesPath;
-  }
-
   public TSDataType getDataType() {
     return dataType;
   }
@@ -135,27 +127,12 @@ public class ClusterSelectSeriesReader extends AbstractClusterPointReader implem
     this.dataType = dataType;
   }
 
-  public BatchData getCurrentBatchData() {
-    return currentBatchData;
-  }
-
-  public void setCurrentBatchData(BatchData currentBatchData) {
-    this.currentBatchData = currentBatchData;
-  }
-
+  @Override
   public void addBatchData(BatchData batchData, boolean remoteDataFinish) {
     batchDataList.addLast(batchData);
     this.remoteDataFinish = remoteDataFinish;
   }
 
-  public boolean isRemoteDataFinish() {
-    return remoteDataFinish;
-  }
-
-  public void setRemoteDataFinish(boolean remoteDataFinish) {
-    this.remoteDataFinish = remoteDataFinish;
-  }
-
   /**
    * Check if this series need to fetch data from remote query node
    */
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/timegenerator/ClusterNodeConstructor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/timegenerator/ClusterNodeConstructor.java
index 639dce8..2b3ab18 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/timegenerator/ClusterNodeConstructor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/timegenerator/ClusterNodeConstructor.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
 import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterFilterSeriesReader;
 import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -65,7 +65,7 @@ public class ClusterNodeConstructor extends AbstractNodeConstructor {
    * Init filter series reader
    */
   private void init(ClusterRpcSingleQueryManager queryManager) {
-    Map<String, FilterGroupEntity> filterGroupEntityMap = queryManager.getFilterGroupEntityMap();
+    Map<String, FilterSeriesGroupEntity> filterGroupEntityMap = queryManager.getFilterSeriesGroupEntityMap();
     filterGroupEntityMap.forEach(
         (key, value) -> filterSeriesReadersByGroupId.put(key, value.getFilterSeriesReaders()));
     filterSeriesReadersByGroupId.forEach((key, value) -> filterSeriesReaderIndex.put(key, 0));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
index a0ee256..0f05cf2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.cluster.query.utils;
 
 import org.apache.iotdb.cluster.query.manager.common.FillBatchData;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ExpressionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ExpressionUtils.java
index 0024138..4089e9b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ExpressionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ExpressionUtils.java
@@ -26,7 +26,7 @@ import static org.apache.iotdb.tsfile.read.expression.ExpressionType.TRUE;
 import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.cluster.query.expression.TrueExpression;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
 import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -46,17 +46,17 @@ public class ExpressionUtils {
    * Get all series path of expression group by group id
    */
   public static void getAllExpressionSeries(IExpression expression,
-      Map<String, FilterGroupEntity> filterGroupEntityMap)
+      Map<String, FilterSeriesGroupEntity> filterGroupEntityMap)
       throws PathErrorException {
     if (expression.getType() == ExpressionType.SERIES) {
       Path path = ((SingleSeriesExpression) expression).getSeriesPath();
       String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
       if (!filterGroupEntityMap.containsKey(groupId)) {
-        filterGroupEntityMap.put(groupId, new FilterGroupEntity(groupId));
+        filterGroupEntityMap.put(groupId, new FilterSeriesGroupEntity(groupId));
       }
-      FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId);
-      filterGroupEntity.addFilterPaths(path);
-      filterGroupEntity.addFilter(((SingleSeriesExpression) expression).getFilter());
+      FilterSeriesGroupEntity filterSeriesGroupEntity = filterGroupEntityMap.get(groupId);
+      filterSeriesGroupEntity.addFilterPaths(path);
+      filterSeriesGroupEntity.addFilter(((SingleSeriesExpression) expression).getFilter());
     } else if (expression.getType() == OR || expression.getType() == AND) {
       getAllExpressionSeries(((IBinaryExpression) expression).getLeft(), filterGroupEntityMap);
       getAllExpressionSeries(((IBinaryExpression) expression).getRight(), filterGroupEntityMap);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
index 5fbd30c..3a2746f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
@@ -25,7 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
 import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
@@ -84,24 +85,24 @@ public class QueryPlanPartitionUtils {
   private static void splitQueryPlanBySelectPath(ClusterRpcSingleQueryManager singleQueryManager)
       throws PathErrorException {
     QueryPlan queryPlan = singleQueryManager.getOriginQueryPlan();
-    Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId();
-    Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
+    // split query plan by select path
+    Map<String, SelectSeriesGroupEntity> selectGroupEntityMap = singleQueryManager
+        .getSelectSeriesGroupEntityMap();
     List<Path> selectPaths = queryPlan.getPaths();
     for (Path path : selectPaths) {
       String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
-      if (!selectSeriesByGroupId.containsKey(groupId)) {
-        selectSeriesByGroupId.put(groupId, new ArrayList<>());
+      if (!selectGroupEntityMap.containsKey(groupId)) {
+        selectGroupEntityMap.put(groupId, new SelectSeriesGroupEntity(groupId));
       }
-      selectSeriesByGroupId.get(groupId).add(path);
+      selectGroupEntityMap.get(groupId).addSelectPaths(path);
     }
-    for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) {
-      String groupId = entry.getKey();
-      List<Path> paths = entry.getValue();
+    for (SelectSeriesGroupEntity entity : selectGroupEntityMap.values()) {
+      List<Path> paths = entity.getSelectPaths();
       QueryPlan subQueryPlan = new QueryPlan();
       subQueryPlan.setProposer(queryPlan.getProposer());
       subQueryPlan.setPaths(paths);
       subQueryPlan.setExpression(queryPlan.getExpression());
-      selectPathPlans.put(groupId, subQueryPlan);
+      entity.setQueryPlan(subQueryPlan);
     }
   }
 
@@ -113,12 +114,12 @@ public class QueryPlanPartitionUtils {
       throws PathErrorException {
     QueryPlan queryPlan = singleQueryManager.getOriginQueryPlan();
     // split query plan by filter path
-    Map<String, FilterGroupEntity> filterGroupEntityMap = singleQueryManager
-        .getFilterGroupEntityMap();
+    Map<String, FilterSeriesGroupEntity> filterGroupEntityMap = singleQueryManager
+        .getFilterSeriesGroupEntityMap();
     IExpression expression = queryPlan.getExpression();
     ExpressionUtils.getAllExpressionSeries(expression, filterGroupEntityMap);
-    for (FilterGroupEntity filterGroupEntity : filterGroupEntityMap.values()) {
-      List<Path> filterSeriesList = filterGroupEntity.getFilterPaths();
+    for (FilterSeriesGroupEntity filterSeriesGroupEntity : filterGroupEntityMap.values()) {
+      List<Path> filterSeriesList = filterSeriesGroupEntity.getFilterPaths();
       // create filter sub query plan
       QueryPlan subQueryPlan = new QueryPlan();
       subQueryPlan.setPaths(filterSeriesList);
@@ -127,7 +128,7 @@ public class QueryPlanPartitionUtils {
       if (subExpression.getType() != ExpressionType.TRUE) {
         subQueryPlan.setExpression(subExpression);
       }
-      filterGroupEntity.setQueryPlan(subQueryPlan);
+      filterSeriesGroupEntity.setQueryPlan(subQueryPlan);
     }
   }
 
@@ -157,29 +158,30 @@ public class QueryPlanPartitionUtils {
     AggregationPlan queryPlan = (AggregationPlan) singleQueryManager.getOriginQueryPlan();
     List<Path> selectPaths = queryPlan.getPaths();
     List<String> aggregations = queryPlan.getAggregations();
-    Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId();
     Map<String, List<String>> selectAggregationByGroupId = new HashMap<>();
-    Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
+    Map<String, SelectSeriesGroupEntity> selectGroupEntityMap = singleQueryManager
+        .getSelectSeriesGroupEntityMap();
     for (int i = 0; i < selectPaths.size(); i++) {
       Path path = selectPaths.get(i);
       String aggregation = aggregations.get(i);
       String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
-      if (!selectSeriesByGroupId.containsKey(groupId)) {
-        selectSeriesByGroupId.put(groupId, new ArrayList<>());
+      if (!selectGroupEntityMap.containsKey(groupId)) {
+        selectGroupEntityMap.put(groupId, new SelectSeriesGroupEntity(groupId));
         selectAggregationByGroupId.put(groupId, new ArrayList<>());
       }
       selectAggregationByGroupId.get(groupId).add(aggregation);
-      selectSeriesByGroupId.get(groupId).add(path);
+      selectGroupEntityMap.get(groupId).addSelectPaths(path);
     }
-    for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) {
+    for (Entry<String, SelectSeriesGroupEntity> entry : selectGroupEntityMap.entrySet()) {
       String groupId = entry.getKey();
-      List<Path> paths = entry.getValue();
+      SelectSeriesGroupEntity entity = entry.getValue();
+      List<Path> paths = entity.getSelectPaths();
       AggregationPlan subQueryPlan = new AggregationPlan();
       subQueryPlan.setProposer(queryPlan.getProposer());
       subQueryPlan.setPaths(paths);
       subQueryPlan.setExpression(queryPlan.getExpression());
       subQueryPlan.setAggregations(selectAggregationByGroupId.get(groupId));
-      selectPathPlans.put(groupId, subQueryPlan);
+      entity.setQueryPlan(subQueryPlan);
     }
   }
 
@@ -200,25 +202,24 @@ public class QueryPlanPartitionUtils {
       throws PathErrorException {
     FillQueryPlan fillQueryPlan = (FillQueryPlan) singleQueryManager.getOriginQueryPlan();
     List<Path> selectPaths = fillQueryPlan.getPaths();
-    Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId();
-    Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
+    Map<String, SelectSeriesGroupEntity> selectGroupEntityMap = singleQueryManager
+        .getSelectSeriesGroupEntityMap();
     for (Path path : selectPaths) {
       String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
-      if (!selectSeriesByGroupId.containsKey(groupId)) {
-        selectSeriesByGroupId.put(groupId, new ArrayList<>());
+      if (!selectGroupEntityMap.containsKey(groupId)) {
+        selectGroupEntityMap.put(groupId, new SelectSeriesGroupEntity(groupId));
       }
-      selectSeriesByGroupId.get(groupId).add(path);
+      selectGroupEntityMap.get(groupId).addSelectPaths(path);
     }
-    for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) {
-      String groupId = entry.getKey();
-      List<Path> paths = entry.getValue();
+    for (SelectSeriesGroupEntity entity : selectGroupEntityMap.values()) {
+      List<Path> paths = entity.getSelectPaths();
       FillQueryPlan subQueryPlan = new FillQueryPlan();
       subQueryPlan.setProposer(fillQueryPlan.getProposer());
       subQueryPlan.setPaths(paths);
       subQueryPlan.setExpression(fillQueryPlan.getExpression());
       subQueryPlan.setQueryTime(fillQueryPlan.getQueryTime());
       subQueryPlan.setFillType(new EnumMap<>(fillQueryPlan.getFillType()));
-      selectPathPlans.put(groupId, subQueryPlan);
+      entity.setQueryPlan(subQueryPlan);
     }
   }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterRpcManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterRpcManagerTest.java
index b800fbf..4745553 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterRpcManagerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterRpcManagerTest.java
@@ -39,7 +39,8 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
 import org.apache.iotdb.cluster.utils.EnvironmentUtils;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.jdbc.Config;
@@ -261,25 +262,8 @@ public class ClusterRpcManagerTest {
         assertEquals(taskId, singleManager.getTaskId());
 
         // select path plans
-        Map<String, QueryPlan> selectPathPlans = singleManager.getSelectPathPlans();
-        assertEquals(1, selectPathPlans.size());
-        for (QueryPlan queryPlan : selectPathPlans.values()) {
-          List<Path> paths = queryPlan.getPaths();
-          List<Path> correctPaths = new ArrayList<>();
-          correctPaths.add(new Path("root.vehicle.d0.s0"));
-          correctPaths.add(new Path("root.vehicle.d0.s1"));
-          correctPaths.add(new Path("root.vehicle.d0.s3"));
-          assertEquals(correctPaths, paths);
-          assertNull(queryPlan.getExpression());
-        }
-
-        // select series by group id
-        assertEquals(0, singleManager.getSelectSeriesByGroupId().size());
-
-        // select series reader
-        assertTrue(singleManager
-            .getSelectSeriesReaders().isEmpty());
-
+        Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = singleManager.getSelectSeriesGroupEntityMap();
+        assertTrue(selectSeriesGroupEntityMap.isEmpty());
       }
       statement.close();
     }
@@ -304,27 +288,11 @@ public class ClusterRpcManagerTest {
         assertEquals(taskId, singleManager.getTaskId());
 
         // select path plans
-        Map<String, QueryPlan> selectPathPlans = singleManager.getSelectPathPlans();
-        assertEquals(1, selectPathPlans.size());
-        for (QueryPlan queryPlan : selectPathPlans.values()) {
-          List<Path> paths = queryPlan.getPaths();
-          List<Path> correctPaths = new ArrayList<>();
-          correctPaths.add(new Path("root.vehicle.d0.s0"));
-          correctPaths.add(new Path("root.vehicle.d0.s1"));
-          correctPaths.add(new Path("root.vehicle.d0.s3"));
-          assertEquals(correctPaths, paths);
-          assertNotNull(queryPlan.getExpression());
-        }
-
-        // select series by group id
-        assertTrue(singleManager.getSelectSeriesByGroupId().isEmpty());
-
-        // select series reader
-        assertTrue(singleManager
-            .getSelectSeriesReaders().isEmpty());
+        Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = singleManager.getSelectSeriesGroupEntityMap();
+        assertTrue(selectSeriesGroupEntityMap.isEmpty());
 
         // filter path plans
-        Map<String, FilterGroupEntity> filterGroupEntityMap = singleManager.getFilterGroupEntityMap();
+        Map<String, FilterSeriesGroupEntity> filterGroupEntityMap = singleManager.getFilterSeriesGroupEntityMap();
         assertTrue(filterGroupEntityMap.isEmpty());
 
       }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java
index a0d409b..363ef98 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java
@@ -39,8 +39,11 @@ import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.qp.executor.ClusterQueryProcessExecutor;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
 import org.apache.iotdb.cluster.utils.EnvironmentUtils;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
+import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
 import org.apache.iotdb.db.qp.QueryProcessor;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.jdbc.Config;
@@ -60,6 +63,9 @@ public class QueryPlanPartitionUtilsTest {
   private static ClusterRpcQueryManager manager = ClusterRpcQueryManager.getInstance();
   private ClusterQueryProcessExecutor queryDataExecutor = new ClusterQueryProcessExecutor();
   private QueryProcessor queryProcessor = new QueryProcessor(queryDataExecutor);
+  private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
+      CLUSTER_CONFIG.getPort());
+
 
   private static final String URL = "127.0.0.1:6667/";
 
@@ -105,6 +111,7 @@ public class QueryPlanPartitionUtilsTest {
     EnvironmentUtils.cleanEnv();
     EnvironmentUtils.closeStatMonitor();
     EnvironmentUtils.closeMemControl();
+    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
     CLUSTER_CONFIG.createAllPath();
     server = Server.getInstance();
     server.start();
@@ -115,6 +122,7 @@ public class QueryPlanPartitionUtilsTest {
   @After
   public void tearDown() throws Exception {
     server.stop();
+    QPExecutorUtils.setLocalNodeAddr(localNode.getIp(), localNode.getPort());
     EnvironmentUtils.cleanEnv();
   }
 
@@ -230,14 +238,14 @@ public class QueryPlanPartitionUtilsTest {
   }
 
   @Test
-  public void splitQueryPlanWithoutValueFilter() throws Exception{
+  public void splitQueryPlanWithoutValueFilter() throws Exception {
     try (Connection connection = DriverManager
         .getConnection(Config.IOTDB_URL_PREFIX + URL, "root", "root")) {
       insertData(connection, createSQLs, insertSQLs);
       initCorrectResult();
-      for(int i = 0 ; i < queryStatementsWithoutFilters.length; i++) {
+      for (int i = 0; i < queryStatementsWithoutFilters.length; i++) {
         String queryStatementsWithoutFilter = queryStatementsWithoutFilters[i];
-        try(Statement statement = connection.createStatement()) {
+        try (Statement statement = connection.createStatement()) {
           boolean hasResultSet = statement.execute(queryStatementsWithoutFilter);
           assertTrue(hasResultSet);
           ResultSet resultSet = statement.getResultSet();
@@ -256,14 +264,15 @@ public class QueryPlanPartitionUtilsTest {
             assertEquals(taskId, String.format("%s:%d", LOCAL_ADDR, jobId));
             ClusterRpcSingleQueryManager singleQueryManager = ClusterRpcQueryManager.getInstance()
                 .getSingleQuery(jobId);
-            assertTrue(singleQueryManager.getFilterGroupEntityMap().isEmpty());
-            Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
-            assertFalse(selectPathPlans.isEmpty());
-            for(Entry<String, QueryPlan> entry1: selectPathPlans.entrySet()){
-              QueryPlan queryPlan = entry1.getValue();
+            assertTrue(singleQueryManager.getFilterSeriesGroupEntityMap().isEmpty());
+            Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = singleQueryManager
+                .getSelectSeriesGroupEntityMap();
+            assertFalse(selectSeriesGroupEntityMap.isEmpty());
+            for (SelectSeriesGroupEntity entity : selectSeriesGroupEntityMap.values()) {
+              QueryPlan queryPlan = entity.getQueryPlan();
               QueryPlan correctQueryPlan = withoutFilterResults.get(i + 1);
               assertTrue(correctQueryPlan.getPaths().containsAll(queryPlan.getPaths()));
-              assertEquals(correctQueryPlan.getExpression(),queryPlan.getExpression());
+              assertEquals(correctQueryPlan.getExpression(), queryPlan.getExpression());
               assertEquals(correctQueryPlan.isQuery(), queryPlan.isQuery());
               assertEquals(correctQueryPlan.getOperatorType(), queryPlan.getOperatorType());
               assertEquals(correctQueryPlan.getAggregations(), queryPlan.getAggregations());
@@ -275,14 +284,14 @@ public class QueryPlanPartitionUtilsTest {
   }
 
   @Test
-  public void splitQueryPlanWithValueFilter() throws Exception{
+  public void splitQueryPlanWithValueFilter() throws Exception {
     try (Connection connection = DriverManager
         .getConnection(Config.IOTDB_URL_PREFIX + URL, "root", "root")) {
       insertData(connection, createSQLs, insertSQLs);
       initCorrectResult();
-      for(int i = 0 ; i < queryStatementsWithFilters.length; i++) {
+      for (int i = 0; i < queryStatementsWithFilters.length; i++) {
         String queryStatementsWithoutFilter = queryStatementsWithFilters[i];
-        try(Statement statement = connection.createStatement()) {
+        try (Statement statement = connection.createStatement()) {
           boolean hasResultSet = statement.execute(queryStatementsWithoutFilter);
           assertTrue(hasResultSet);
           ResultSet resultSet = statement.getResultSet();
@@ -301,21 +310,24 @@ public class QueryPlanPartitionUtilsTest {
             assertEquals(taskId, String.format("%s:%d", LOCAL_ADDR, jobId));
             ClusterRpcSingleQueryManager singleQueryManager = ClusterRpcQueryManager.getInstance()
                 .getSingleQuery(jobId);
-            assertTrue(singleQueryManager.getFilterGroupEntityMap().isEmpty());
-            Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
-            assertFalse(selectPathPlans.isEmpty());
-            for(Entry<String, QueryPlan> entry1 : selectPathPlans.entrySet()) {
-              QueryPlan queryPlan = entry1.getValue();
+            assertFalse(singleQueryManager.getFilterSeriesGroupEntityMap().isEmpty());
+            Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = singleQueryManager
+                .getSelectSeriesGroupEntityMap();
+            assertFalse(selectSeriesGroupEntityMap.isEmpty());
+            for (SelectSeriesGroupEntity entity : selectSeriesGroupEntityMap.values()) {
+              QueryPlan queryPlan = entity.getQueryPlan();
               QueryPlan correctQueryPlan = withFilterSelectResults.get(i + 1);
               assertTrue(correctQueryPlan.getPaths().containsAll(queryPlan.getPaths()));
-              assertEquals(correctQueryPlan.getExpression().getType(), queryPlan.getExpression().getType());
+              assertEquals(correctQueryPlan.getExpression().getType(),
+                  queryPlan.getExpression().getType());
               assertEquals(correctQueryPlan.isQuery(), queryPlan.isQuery());
               assertEquals(correctQueryPlan.getOperatorType(), queryPlan.getOperatorType());
               assertEquals(correctQueryPlan.getAggregations(), queryPlan.getAggregations());
             }
-            Map<String, FilterGroupEntity> filterGroupEntityMap = singleQueryManager.getFilterGroupEntityMap();
-            for (FilterGroupEntity filterGroupEntity:filterGroupEntityMap.values()) {
-              QueryPlan queryPlan = filterGroupEntity.getQueryPlan();
+            Map<String, FilterSeriesGroupEntity> filterGroupEntityMap = singleQueryManager
+                .getFilterSeriesGroupEntityMap();
+            for (FilterSeriesGroupEntity filterSeriesGroupEntity : filterGroupEntityMap.values()) {
+              QueryPlan queryPlan = filterSeriesGroupEntity.getQueryPlan();
               QueryPlan correctQueryPlan = withFilterFilterResults.get(i + 1);
               assertTrue(correctQueryPlan.getPaths().containsAll(queryPlan.getPaths()));
               assertEquals(correctQueryPlan.getExpression().getType(),