You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/16 12:27:05 UTC

[incubator-iotdb] 14/19: add single query manager

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit e0439b68e6801d2969a2f9e9b3969e81ecbc207a
Author: lta <li...@163.com>
AuthorDate: Tue Apr 16 11:23:05 2019 +0800

    add single query manager
---
 .../cluster/qp/executor/AbstractQPExecutor.java    | 105 +-------------
 .../cluster/qp/executor/NonQueryExecutor.java      |  17 +--
 .../cluster/qp/executor/QueryMetadataExecutor.java |  17 +--
 .../apache/iotdb/cluster/qp/task/BatchQPTask.java  |   3 +-
 ...xecutorWithTimeGenerator.java => PathType.java} |   6 +-
 .../executor/ClusterExecutorWithTimeGenerator.java |  64 +++++++++
 .../ClusterExecutorWithoutTimeGenerator.java       | 145 +++++++++++---------
 .../executor/ClusterQueryProcessExecutor.java      |   1 +
 .../executor/ClusterQueryRouter.java               |  39 +++++-
 .../ClusterRpcReaderFactory.java}                  |  12 +-
 .../manager/ClusterRpcQueryManager.java            |   4 +-
 .../manager/ClusterSingleQueryManager.java         | 152 +++++++++++++++++++--
 .../manager/IClusterSingleQueryManager.java        |   6 +-
 ...hReader.java => ClusterRpcBatchDataReader.java} |   9 +-
 ...AllDataReader.java => ClusterSeriesReader.java} |  37 ++++-
 .../cluster/service/TSServiceClusterImpl.java      |  40 +-----
 .../iotdb/cluster/utils/QPExecutorUtils.java       | 120 ++++++++++++++++
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  |  25 ++++
 .../apache/iotdb/cluster/utils/hash/Router.java    |   7 +
 .../db/query/executor/IEngineQueryRouter.java      |   2 +-
 20 files changed, 564 insertions(+), 247 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index 8f635c4..e5df083 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -51,17 +51,8 @@ public abstract class AbstractQPExecutor {
 
   private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
 
-  /**
-   * Raft as client manager.
-   */
-  private static final RaftNodeAsClientManager CLIENT_MANAGER = RaftNodeAsClientManager
-      .getInstance();
-
   protected Router router = Router.getInstance();
 
-  private PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
-      CLUSTER_CONFIG.getPort());
-
   protected MManager mManager = MManager.getInstance();
 
   protected final Server server = Server.getInstance();
@@ -87,84 +78,6 @@ public abstract class AbstractQPExecutor {
   private int readDataConsistencyLevel = CLUSTER_CONFIG.getReadDataConsistencyLevel();
 
   /**
-   * Get Storage Group Name by device name
-   */
-  protected String getStroageGroupByDevice(String device) throws PathErrorException {
-    String storageGroup;
-    try {
-      storageGroup = MManager.getInstance().getFileNameByPath(device);
-    } catch (PathErrorException e) {
-      throw new PathErrorException(String.format("File level of %s doesn't exist.", device));
-    }
-    return storageGroup;
-  }
-
-  /**
-   * Get all Storage Group Names by path
-   */
-  public List<String> getAllStroageGroupsByPath(String path) throws PathErrorException {
-    List<String> storageGroupList;
-    try {
-      storageGroupList = mManager.getAllFileNamesByPath(path);
-    } catch (PathErrorException e) {
-      throw new PathErrorException(String.format("File level of %s doesn't exist.", path));
-    }
-    return storageGroupList;
-  }
-
-  /**
-   * Classify the input storage group list by which data group it belongs to.
-   *
-   * @return key is groupId, value is all SGs belong to this data group
-   */
-  protected Map<String, Set<String>> classifySGByGroupId(List<String> sgList) {
-    Map<String, Set<String>> map = new HashMap<>();
-    for (int i = 0; i < sgList.size(); i++) {
-      String sg = sgList.get(i);
-      String groupId = getGroupIdBySG(sg);
-      if (map.containsKey(groupId)) {
-        map.get(groupId).add(sg);
-      } else {
-        Set<String> set = new HashSet<>();
-        set.add(sg);
-        map.put(groupId, set);
-      }
-    }
-    return map;
-  }
-
-  /**
-   * Get raft group id by storage group name
-   */
-  protected String getGroupIdBySG(String storageGroup) {
-    return router.getGroupID(router.routeGroup(storageGroup));
-  }
-
-  /**
-   * Check if the non query command can execute in local. 1. If this node belongs to the storage
-   * group 2. If this node is leader.
-   */
-  public boolean canHandleNonQueryByGroupId(String groupId) {
-    boolean canHandle = false;
-    if(groupId.equals(ClusterConfig.METADATA_GROUP_ID)){
-      canHandle = ((MetadataRaftHolder) (server.getMetadataHolder())).getFsm().isLeader();
-    }else {
-      if (router.containPhysicalNodeByGroupId(groupId, localNode) && RaftUtils
-          .getPhysicalNodeFrom(RaftUtils.getLeaderPeerID(groupId)).equals(localNode)) {
-        canHandle = true;
-      }
-    }
-    return canHandle;
-  }
-
-  /**
-   * Check if the query command can execute in local. Check if this node belongs to the group id
-   */
-  protected boolean canHandleQueryByGroupId(String groupId) {
-    return router.containPhysicalNodeByGroupId(groupId, localNode);
-  }
-
-  /**
    * Async handle QPTask by QPTask and leader id
    *
    * @param task request QPTask
@@ -185,32 +98,18 @@ public abstract class AbstractQPExecutor {
    * @param leader leader node of the group
    * @param taskRetryNum Retry time of the task
    */
-  public void asyncSendNonQuerySingleTask(SingleQPTask task, PeerId leader, int taskRetryNum)
+  protected void asyncSendNonQuerySingleTask(SingleQPTask task, PeerId leader, int taskRetryNum)
       throws RaftConnectionException {
     if (taskRetryNum >= TASK_MAX_RETRY) {
       throw new RaftConnectionException(String.format("QPTask retries reach the upper bound %s",
           TASK_MAX_RETRY));
     }
-    NodeAsClient client = getRaftNodeAsClient();
+    NodeAsClient client = RaftUtils.getRaftNodeAsClient();
     /** Call async method **/
     client.asyncHandleRequest(task.getRequest(), leader, task);
   }
 
   /**
-   * try to get raft rpc client
-   */
-  private NodeAsClient getRaftNodeAsClient() throws RaftConnectionException {
-    NodeAsClient client = CLIENT_MANAGER.getRaftNodeAsClient();
-    if (client == null) {
-      throw new RaftConnectionException(String
-          .format("Raft inner rpc clients have reached the max numbers %s",
-              CLUSTER_CONFIG.getMaxNumOfInnerRpcClient() + CLUSTER_CONFIG
-                  .getMaxQueueNumOfInnerRpcClient()));
-    }
-    return client;
-  }
-
-  /**
    * Synchronous get task response. If it's redirected or status is exception, the task needs to be
    * resent. Note: If status is Exception, it marks that an exception occurred during the task is
    * being sent instead of executed.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index f90bc66..ecafe4e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.DataGroupNonQueryResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.MetaGroupNonQueryResponse;
 import org.apache.iotdb.cluster.service.TSServiceClusterImpl.BatchResult;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
@@ -196,16 +197,16 @@ public class NonQueryExecutor extends AbstractQPExecutor {
     switch (plan.getOperatorType()) {
       case DELETE:
         storageGroup = getStorageGroupFromDeletePlan((DeletePlan) plan);
-        groupId = getGroupIdBySG(storageGroup);
+        groupId = router.getGroupIdBySG(storageGroup);
         break;
       case UPDATE:
         Path path = ((UpdatePlan) plan).getPath();
-        storageGroup = getStroageGroupByDevice(path.getDevice());
-        groupId = getGroupIdBySG(storageGroup);
+        storageGroup = QPExecutorUtils.getStroageGroupByDevice(path.getDevice());
+        groupId = router.getGroupIdBySG(storageGroup);
         break;
       case INSERT:
-        storageGroup = getStroageGroupByDevice(((InsertPlan) plan).getDeviceId());
-        groupId = getGroupIdBySG(storageGroup);
+        storageGroup = QPExecutorUtils.getStroageGroupByDevice(((InsertPlan) plan).getDeviceId());
+        groupId = router.getGroupIdBySG(storageGroup);
         break;
       case CREATE_ROLE:
       case DELETE_ROLE:
@@ -284,8 +285,8 @@ public class NonQueryExecutor extends AbstractQPExecutor {
       case ADD_PATH:
       case DELETE_PATH:
         String deviceId = path.getDevice();
-        String storageGroup = getStroageGroupByDevice(deviceId);
-        groupId = getGroupIdBySG(storageGroup);
+        String storageGroup = QPExecutorUtils.getStroageGroupByDevice(deviceId);
+        groupId = router.getGroupIdBySG(storageGroup);
         break;
       case SET_FILE_LEVEL:
         boolean fileLevelExist = mManager.checkStorageLevelOfMTree(path.getFullPath());
@@ -319,7 +320,7 @@ public class NonQueryExecutor extends AbstractQPExecutor {
     currentTask.set(qpTask);
 
     /** Check if the plan can be executed locally. **/
-    if (canHandleNonQueryByGroupId(groupId)) {
+    if (QPExecutorUtils.canHandleNonQueryByGroupId(groupId)) {
       return handleNonQueryRequestLocally(groupId, qpTask);
     } else {
       PeerId leader = RaftUtils.getLeaderPeerID(groupId);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 0702548..aea0656 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.QueryStorageGroupResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
@@ -81,7 +82,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
     if (storageGroupList.isEmpty()) {
       return new ArrayList<>();
     } else {
-      Map<String, Set<String>> groupIdSGMap = classifySGByGroupId(storageGroupList);
+      Map<String, Set<String>> groupIdSGMap = QPExecutorUtils.classifySGByGroupId(storageGroupList);
       for (Entry<String, Set<String>> entry : groupIdSGMap.entrySet()) {
         List<String> paths = getSubQueryPaths(entry.getValue(), path);
         String groupId = entry.getKey();
@@ -130,7 +131,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
     LOGGER.debug("Execute show timeseries {} statement for group {}.", pathList, groupId);
     PeerId holder;
     /** Check if the plan can be executed locally. **/
-    if (canHandleQueryByGroupId(groupId)) {
+    if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
       LOGGER.debug("Execute show timeseries {} statement locally for group {} by sending request to local node.", pathList, groupId);
       holder = this.server.getServerId();
     } else {
@@ -158,7 +159,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
       LOGGER.debug("Execute show metadata in string statement for group {}.", groupId);
       PeerId holder;
       /** Check if the plan can be executed locally. **/
-      if (canHandleQueryByGroupId(groupId)) {
+      if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
         LOGGER.debug("Execute show metadata in string statement locally for group {} by sending request to local node.", groupId);
         holder = this.server.getServerId();
       } else {
@@ -197,7 +198,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
       LOGGER.debug("Execute query metadata statement for group {}.", groupId);
       PeerId holder;
       /** Check if the plan can be executed locally. **/
-      if (canHandleQueryByGroupId(groupId)) {
+      if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
         LOGGER.debug("Execute query metadata statement locally for group {} by sending request to local node.", groupId);
         holder = this.server.getServerId();
       } else {
@@ -232,7 +233,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
     if (storageGroupList.size() != 1) {
       throw new PathErrorException("path " + path + " is not valid.");
     } else {
-      String groupId = getGroupIdBySG(storageGroupList.get(0));
+      String groupId = router.getGroupIdBySG(storageGroupList.get(0));
       QuerySeriesTypeRequest request = new QuerySeriesTypeRequest(groupId,
           readMetadataConsistencyLevel, path);
       SingleQPTask task = new SingleQPTask(false, request);
@@ -240,7 +241,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
       LOGGER.debug("Execute get series type for {} statement for group {}.", path, groupId);
       PeerId holder;
       /** Check if the plan can be executed locally. **/
-      if (canHandleQueryByGroupId(groupId)) {
+      if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
         LOGGER.debug("Execute get series type for {} statement locally for group {} by sending request to local node.", path, groupId);
         holder = this.server.getServerId();
       } else {
@@ -265,7 +266,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
     if (storageGroupList.isEmpty()) {
       return new ArrayList<>();
     } else {
-      Map<String, Set<String>> groupIdSGMap = classifySGByGroupId(storageGroupList);
+      Map<String, Set<String>> groupIdSGMap = QPExecutorUtils.classifySGByGroupId(storageGroupList);
       for (Entry<String, Set<String>> entry : groupIdSGMap.entrySet()) {
         List<String> paths = getSubQueryPaths(entry.getValue(), path);
         String groupId = entry.getKey();
@@ -289,7 +290,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
     LOGGER.debug("Execute get paths for {} statement for group {}.", pathList, groupId);
     PeerId holder;
     /** Check if the plan can be executed locally. **/
-    if (canHandleQueryByGroupId(groupId)) {
+    if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
       LOGGER.debug("Execute get paths for {} statement locally for group {} by sending request to local node.", pathList, groupId);
       holder = this.server.getServerId();
     } else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index 67c7362..d224bae 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.DataGroupNonQueryResponse;
 import org.apache.iotdb.cluster.service.TSServiceClusterImpl.BatchResult;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -121,7 +122,7 @@ public class BatchQPTask extends MultiQPTask {
       String groupId = entry.getKey();
       SingleQPTask subTask = entry.getValue();
       Future<?> taskThread;
-      if (executor.canHandleNonQueryByGroupId(groupId)) {
+      if (QPExecutorUtils.canHandleNonQueryByGroupId(groupId)) {
         taskThread = QPTaskManager.getInstance()
             .submit(() -> executeLocalSubTask(subTask, groupId));
       } else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/PathType.java
similarity index 87%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/query/PathType.java
index 8bbff0b..d25bb86 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/PathType.java
@@ -16,8 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.query.coordinatornode.executor;
-
-public class ClusterExecutorWithTimeGenerator {
+package org.apache.iotdb.cluster.query;
 
+public enum PathType {
+  SELECT_PATH, FILTER_PATH
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
index 8bbff0b..d1ca472 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
@@ -18,6 +18,70 @@
  */
 package org.apache.iotdb.cluster.query.coordinatornode.executor;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.IClusterSingleQueryManager;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.dataset.EngineDataSetWithTimeGenerator;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
 public class ClusterExecutorWithTimeGenerator {
 
+  private QueryExpression queryExpression;
+  private IClusterSingleQueryManager queryManager;
+
+  ClusterExecutorWithTimeGenerator(QueryExpression queryExpression, IClusterSingleQueryManager queryManager) {
+    this.queryExpression = queryExpression;
+    this.queryManager = queryManager;
+  }
+
+  /**
+   * execute query.
+   *
+   * @return QueryDataSet object
+   * @throws IOException IOException
+   * @throws FileNodeManagerException FileNodeManagerException
+   */
+  public QueryDataSet execute(QueryContext context) throws FileNodeManagerException {
+
+    QueryResourceManager.getInstance()
+        .beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries());
+    QueryResourceManager.getInstance()
+        .beginQueryOfGivenExpression(context.getJobId(), queryExpression.getExpression());
+
+    EngineTimeGenerator timestampGenerator;
+    List<EngineReaderByTimeStamp> readersOfSelectedSeries;
+    try {
+      timestampGenerator = new EngineTimeGenerator(queryExpression.getExpression(), context);
+      readersOfSelectedSeries = SeriesReaderFactory
+          .getByTimestampReadersOfSelectedPaths(queryExpression.getSelectedSeries(), context);
+    } catch (IOException ex) {
+      throw new FileNodeManagerException(ex);
+    }
+
+    List<TSDataType> dataTypes = new ArrayList<>();
+
+    for (Path path : queryExpression.getSelectedSeries()) {
+      try {
+        dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
+      } catch (PathErrorException e) {
+        throw new FileNodeManagerException(e);
+      }
+
+    }
+    return new EngineDataSetWithTimeGenerator(queryExpression.getSelectedSeries(), dataTypes,
+        timestampGenerator,
+        readersOfSelectedSeries);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
index 0392bce..7bd8008 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.cluster.query.coordinatornode.executor;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager;
+import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterSeriesReader;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -42,9 +45,12 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 public class ClusterExecutorWithoutTimeGenerator {
   private QueryExpression queryExpression;
+  private ClusterSingleQueryManager queryManager;
 
-  public ClusterExecutorWithoutTimeGenerator(QueryExpression queryExpression) {
+  public ClusterExecutorWithoutTimeGenerator(QueryExpression queryExpression,
+      ClusterSingleQueryManager queryManager) {
     this.queryExpression = queryExpression;
+    this.queryManager = queryManager;
   }
 
   /**
@@ -62,37 +68,37 @@ public class ClusterExecutorWithoutTimeGenerator {
         .beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries());
 
     for (Path path : queryExpression.getSelectedSeries()) {
-
-      QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path,
-          context);
-
-      // add data type
-      try {
-        dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
-      } catch (PathErrorException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-      // sequence reader for one sealed tsfile
-      SequenceDataReader tsFilesReader;
-      try {
-        tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
-            timeFilter, context);
-      } catch (IOException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-      // unseq reader for all chunk groups in unSeqFile
-      PriorityMergeReader unSeqMergeReader;
-      try {
-        unSeqMergeReader = SeriesReaderFactory.getInstance()
-            .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
-      } catch (IOException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-      // merge sequence data with unsequence data.
-      readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
+//
+//      QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path,
+//          context);
+//
+//      // add data type
+//      try {
+//        dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
+//      } catch (PathErrorException e) {
+//        throw new FileNodeManagerException(e);
+//      }
+//
+//      // sequence reader for one sealed tsfile
+//      SequenceDataReader tsFilesReader;
+//      try {
+//        tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
+//            timeFilter, context);
+//      } catch (IOException e) {
+//        throw new FileNodeManagerException(e);
+//      }
+//
+//      // unseq reader for all chunk groups in unSeqFile
+//      PriorityMergeReader unSeqMergeReader;
+//      try {
+//        unSeqMergeReader = SeriesReaderFactory.getInstance()
+//            .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
+//      } catch (IOException e) {
+//        throw new FileNodeManagerException(e);
+//      }
+//
+//      // merge sequence data with unsequence data.
+//      readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
     }
 
     try {
@@ -112,43 +118,54 @@ public class ClusterExecutorWithoutTimeGenerator {
     List<IPointReader> readersOfSelectedSeries = new ArrayList<>();
     List<TSDataType> dataTypes = new ArrayList<>();
 
-    QueryResourceManager.getInstance()
-        .beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries());
-
+    Map<String, ClusterSeriesReader> selectPathReaders = queryManager.getSelectPathReaders();
+    List<Path> paths = new ArrayList<>();
     for (Path path : queryExpression.getSelectedSeries()) {
 
-      QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path,
-          context);
-
-      // add data type
-      try {
-        dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
-      } catch (PathErrorException e) {
-        throw new FileNodeManagerException(e);
+      if(selectPathReaders.containsKey(path.toString())){
+        ClusterSeriesReader reader = selectPathReaders.get(path.toString());
+        readersOfSelectedSeries.add(reader);
+        dataTypes.add(reader.getDataType());
+      } else {
+        QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+            .getQueryDataSource(path,
+                context);
+
+        // add data type
+        try {
+          dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
+        } catch (PathErrorException e) {
+          throw new FileNodeManagerException(e);
+        }
+
+        // sequence insert data
+        SequenceDataReader tsFilesReader;
+        try {
+          tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
+              null, context);
+        } catch (IOException e) {
+          throw new FileNodeManagerException(e);
+        }
+
+        // unseq insert data
+        PriorityMergeReader unSeqMergeReader;
+        try {
+          unSeqMergeReader = SeriesReaderFactory.getInstance()
+              .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
+        } catch (IOException e) {
+          throw new FileNodeManagerException(e);
+        }
+
+        // merge sequence data with unsequence data.
+        readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
+
+        paths.add(path);
       }
-
-      // sequence insert data
-      SequenceDataReader tsFilesReader;
-      try {
-        tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
-            null, context);
-      } catch (IOException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-      // unseq insert data
-      PriorityMergeReader unSeqMergeReader;
-      try {
-        unSeqMergeReader = SeriesReaderFactory.getInstance()
-            .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
-      } catch (IOException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-      // merge sequence data with unsequence data.
-      readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
     }
 
+    QueryResourceManager.getInstance()
+        .beginQueryOfGivenQueryPaths(context.getJobId(), paths);
+
     try {
       return new EngineDataSetWithoutTimeGenerator(queryExpression.getSelectedSeries(), dataTypes,
           readersOfSelectedSeries);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java
index 7b51a4f..e7e58f8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java
@@ -110,6 +110,7 @@ public class ClusterQueryProcessExecutor extends QueryProcessExecutor {
     this.fetchSize.set(fetchSize);
   }
 
+  @Override
   public IEngineQueryRouter getQueryRouter() {
     return queryRouter;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
index c0a853b..8fa1a95 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
@@ -21,6 +21,10 @@ package org.apache.iotdb.cluster.query.coordinatornode.executor;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcQueryManager;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager.QueryType;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.IClusterSingleQueryManager;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
@@ -30,8 +34,10 @@ import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.ExpressionType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -39,8 +45,37 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
 
   @Override
   public QueryDataSet query(QueryExpression queryExpression, QueryContext context)
-      throws FileNodeManagerException {
-    return null;
+      throws FileNodeManagerException, PathErrorException {
+
+    ClusterSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance()
+        .getSingleQuery(context.getJobId());
+    if (queryExpression.hasQueryFilter()) {
+      try {
+        IExpression optimizedExpression = ExpressionOptimizer.getInstance()
+            .optimize(queryExpression.getExpression(), queryExpression.getSelectedSeries());
+        queryExpression.setExpression(optimizedExpression);
+
+        if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
+          queryManager.init(QueryType.GLOBAL_TIME);
+          ClusterExecutorWithoutTimeGenerator engineExecutor =
+              new ClusterExecutorWithoutTimeGenerator(queryExpression, queryManager);
+          return engineExecutor.executeWithGlobalTimeFilter(context);
+        } else {
+          queryManager.init(QueryType.FILTER);
+          ClusterExecutorWithTimeGenerator engineExecutor = new ClusterExecutorWithTimeGenerator(
+              queryExpression, queryManager);
+          return engineExecutor.execute(context);
+        }
+
+      } catch (QueryFilterOptimizationException e) {
+        throw new FileNodeManagerException(e);
+      }
+    } else {
+      queryManager.init(QueryType.NO_FILTER);
+      ClusterExecutorWithoutTimeGenerator engineExecutor =
+          new ClusterExecutorWithoutTimeGenerator(queryExpression, queryManager);
+      return engineExecutor.executeWithoutFilter(context);
+    }
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/factory/ClusterRpcReaderFactory.java
similarity index 64%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/factory/ClusterRpcReaderFactory.java
index 8bbff0b..27444af 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/factory/ClusterRpcReaderFactory.java
@@ -16,8 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.query.coordinatornode.executor;
+package org.apache.iotdb.cluster.query.coordinatornode.factory;
 
-public class ClusterExecutorWithTimeGenerator {
+import com.alipay.sofa.jraft.entity.PeerId;
+import java.util.Map;
+import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterSeriesReader;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 
+public class ClusterRpcReaderFactory {
+
+  public static Map<String, ClusterSeriesReader> createClusterSeriesReader(String groupId, PeerId peerId, QueryPlan queryPlan){
+return null;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
index 3e9a68c..f33fe05 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
@@ -29,7 +29,7 @@ public class ClusterRpcQueryManager{
   /**
    * Key is group id, value is manager of a client query.
    */
-  ConcurrentHashMap<Long, IClusterSingleQueryManager> singleQueryManagerMap = new ConcurrentHashMap<>();
+  ConcurrentHashMap<Long, ClusterSingleQueryManager> singleQueryManagerMap = new ConcurrentHashMap<>();
 
   /**
    * Add a query
@@ -41,7 +41,7 @@ public class ClusterRpcQueryManager{
   /**
    * Get query manager by group id
    */
-  public IClusterSingleQueryManager getSingleQuery(long jobId) {
+  public ClusterSingleQueryManager getSingleQuery(long jobId) {
     return singleQueryManagerMap.get(jobId);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
index 156b9a5..90c1ac5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
@@ -19,10 +19,20 @@
 package org.apache.iotdb.cluster.query.coordinatornode.manager;
 
 import com.alipay.sofa.jraft.entity.PeerId;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.cluster.query.coordinatornode.factory.ClusterRpcReaderFactory;
+import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterSeriesReader;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.cluster.utils.hash.Router;
+import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.tsfile.read.common.Path;
 
 public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
 
@@ -34,7 +44,7 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
   /**
    * Origin query plan parsed by QueryProcessor
    */
-  private QueryPlan originPhysicalPlan;
+  private QueryPlan queryPlan;
 
   /**
    * Represent selected reader nodes, key is group id and value is selected peer id
@@ -42,25 +52,107 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
   private Map<String, PeerId> readerNodes = new HashMap<>();
 
   /**
-   * Physical plans of select paths which are divided from originPhysicalPlan
+   * Query plans of select paths which are divided from queryPlan group by group id
    */
   private Map<String, QueryPlan> selectPathPlans = new HashMap<>();
 
+  private Map<String, ClusterSeriesReader> selectPathReaders = new HashMap<>();
+
   /**
-   * Physical plans of filter paths which are divided from originPhysicalPlan
+   * Physical plans of filter paths which are divided from queryPlan group by group id
    */
   private Map<String, QueryPlan> filterPathPlans = new HashMap<>();
 
   public ClusterSingleQueryManager(long jobId,
-      QueryPlan originPhysicalPlan) {
+      QueryPlan queryPlan) {
     this.jobId = jobId;
-    this.originPhysicalPlan = originPhysicalPlan;
+    this.queryPlan = queryPlan;
   }
 
   @Override
-  public void dividePhysicalPlan() {
-//    List<Path>
-//    MManager.getInstance().getFileNameByPath()
+  public void init(QueryType queryType) throws PathErrorException {
+    switch (queryType) {
+      case NO_FILTER:
+        divideNoFilterPhysicalPlan();
+        break;
+      case GLOBAL_TIME:
+        divideGlobalTimePhysicalPlan();
+        break;
+      case FILTER:
+        divideFilterPhysicalPlan();
+        break;
+      default:
+        throw new UnsupportedOperationException();
+    }
+    initSelectedPathPlan();
+    initFilterPathPlan();
+  }
+
+  public enum QueryType {
+    NO_FILTER, GLOBAL_TIME, FILTER
+  }
+
+  /**
+   * Divide no-fill type query plan by group id
+   */
+  private void divideNoFilterPhysicalPlan() throws PathErrorException {
+    List<Path> selectPaths = queryPlan.getPaths();
+    Map<String, List<Path>> pathsByGroupId = new HashMap<>();
+    for (Path path : selectPaths) {
+      String storageGroup = QPExecutorUtils.getStroageGroupByDevice(path.getDevice());
+      String groupId = Router.getInstance().getGroupIdBySG(storageGroup);
+      if (pathsByGroupId.containsKey(groupId)) {
+        pathsByGroupId.put(groupId, new ArrayList<>());
+      }
+      pathsByGroupId.get(groupId).add(path);
+    }
+    for (Entry<String, List<Path>> entry : pathsByGroupId.entrySet()) {
+      String groupId = entry.getKey();
+      List<Path> paths = entry.getValue();
+      QueryPlan subQueryPlan = new QueryPlan();
+      subQueryPlan.setProposer(queryPlan.getProposer());
+      subQueryPlan.setPaths(paths);
+      selectPathPlans.put(groupId, subQueryPlan);
+    }
+  }
+
+  private void divideGlobalTimePhysicalPlan() {
+
+  }
+
+  private void divideFilterPhysicalPlan() {
+
+  }
+
+  /**
+   * Init select path
+   */
+  private void initSelectedPathPlan(){
+    if(!selectPathPlans.isEmpty()){
+      for(Entry<String, QueryPlan> entry: selectPathPlans.entrySet()){
+        String groupId = entry.getKey();
+        QueryPlan queryPlan = entry.getValue();
+        if(!canHandleQueryLocally(groupId)) {
+          PeerId randomPeer = RaftUtils.getRandomPeerID(groupId);
+          readerNodes.put(groupId, randomPeer);
+          Map<String, ClusterSeriesReader> selectPathReaders = ClusterRpcReaderFactory
+              .createClusterSeriesReader(groupId, randomPeer, queryPlan);
+          for (Path path : queryPlan.getPaths()) {
+            selectPathReaders.put(path.getFullPath(), selectPathReaders.get(path.getFullPath()));
+          }
+        }
+      }
+    }
+  }
+
+  private void initFilterPathPlan(){
+    if(!filterPathPlans.isEmpty()){
+
+    }
+  }
+
+  private boolean canHandleQueryLocally(String groupId){
+    return QPExecutorUtils.canHandleQueryByGroupId(groupId);
   }
 
   @Override
@@ -96,11 +188,47 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
     this.jobId = jobId;
   }
 
-  public PhysicalPlan getOriginPhysicalPlan() {
-    return originPhysicalPlan;
+  public PhysicalPlan getQueryPlan() {
+    return queryPlan;
+  }
+
+  public void setQueryPlan(QueryPlan queryPlan) {
+    this.queryPlan = queryPlan;
+  }
+
+  public Map<String, PeerId> getReaderNodes() {
+    return readerNodes;
+  }
+
+  public void setReaderNodes(
+      Map<String, PeerId> readerNodes) {
+    this.readerNodes = readerNodes;
+  }
+
+  public Map<String, QueryPlan> getSelectPathPlans() {
+    return selectPathPlans;
+  }
+
+  public void setSelectPathPlans(
+      Map<String, QueryPlan> selectPathPlans) {
+    this.selectPathPlans = selectPathPlans;
+  }
+
+  public Map<String, ClusterSeriesReader> getSelectPathReaders() {
+    return selectPathReaders;
+  }
+
+  public void setSelectPathReaders(
+      Map<String, ClusterSeriesReader> selectPathReaders) {
+    this.selectPathReaders = selectPathReaders;
+  }
+
+  public Map<String, QueryPlan> getFilterPathPlans() {
+    return filterPathPlans;
   }
 
-  public void setOriginPhysicalPlan(QueryPlan originPhysicalPlan) {
-    this.originPhysicalPlan = originPhysicalPlan;
+  public void setFilterPathPlans(
+      Map<String, QueryPlan> filterPathPlans) {
+    this.filterPathPlans = filterPathPlans;
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
index c5fffbc..dc84d32 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.cluster.query.coordinatornode.manager;
 
 import com.alipay.sofa.jraft.entity.PeerId;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager.QueryType;
+import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
 /**
@@ -28,8 +30,10 @@ public interface IClusterSingleQueryManager {
 
   /**
    * Divide physical plan into several sub physical plans according to timeseries full path.
+   * @param queryType
    */
-  void dividePhysicalPlan();
+  void init(
+      QueryType queryType) throws PathErrorException;
 
   /**
    * Get physical plan of select path
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
similarity index 81%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchReader.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
index fc9eb8c..5b75c9d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
@@ -19,10 +19,17 @@
 package org.apache.iotdb.cluster.query.coordinatornode.reader;
 
 import java.io.IOException;
+import org.apache.iotdb.cluster.query.PathType;
 import org.apache.iotdb.db.query.reader.IBatchReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
-public class ClusterRpcBatchReader implements IBatchReader {
+public class ClusterRpcBatchDataReader implements IBatchReader {
+
+  private String PeerId;
+  private String jobId;
+  private PathType type;
+  private BatchData batchData;
 
   @Override
   public boolean hasNext() throws IOException {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterAllDataReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterSeriesReader.java
similarity index 60%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterAllDataReader.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterSeriesReader.java
index 684ead0..438559e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterAllDataReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterSeriesReader.java
@@ -22,10 +22,21 @@ import java.io.IOException;
 import org.apache.iotdb.db.query.reader.IBatchReader;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
 
-public class ClusterAllDataReader implements IPointReader {
+public class ClusterSeriesReader implements IPointReader {
 
   private IBatchReader rpcBatchReader;
+  private String fullPath;
+  private TSDataType dataType;
+
+  public ClusterSeriesReader(IBatchReader rpcBatchReader, String fullPath,
+      TSDataType dataType) {
+    this.rpcBatchReader = rpcBatchReader;
+    this.fullPath = fullPath;
+    this.dataType = dataType;
+  }
 
   @Override
   public TimeValuePair current() throws IOException {
@@ -46,4 +57,28 @@ public class ClusterAllDataReader implements IPointReader {
   public void close() throws IOException {
 
   }
+
+  public IBatchReader getRpcBatchReader() {
+    return rpcBatchReader;
+  }
+
+  public void setRpcBatchReader(IBatchReader rpcBatchReader) {
+    this.rpcBatchReader = rpcBatchReader;
+  }
+
+  public String getFullPath() {
+    return fullPath;
+  }
+
+  public void setFullPath(String fullPath) {
+    this.fullPath = fullPath;
+  }
+
+  public TSDataType getDataType() {
+    return dataType;
+  }
+
+  public void setDataType(TSDataType dataType) {
+    this.dataType = dataType;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index a11fae4..79b2acd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -38,10 +38,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.Metadata;
 import org.apache.iotdb.db.qp.QueryProcessor;
-import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -72,11 +70,6 @@ public class TSServiceClusterImpl extends TSServiceImpl {
   private NonQueryExecutor nonQueryExecutor = new NonQueryExecutor();
   private QueryMetadataExecutor queryMetadataExecutor = new QueryMetadataExecutor();
 
-  /**
-   * Key is query statement, Value is corresponding job id which is assigned in <class>QueryResourceManager</class>
-   */
-  private ThreadLocal<HashMap<String, Long>> queryJobIdMap = new ThreadLocal<>();
-
   public TSServiceClusterImpl() throws IOException {
     super();
   }
@@ -278,18 +271,6 @@ public class TSServiceClusterImpl extends TSServiceImpl {
   }
 
   @Override
-  public void recordANewQuery(String statement, PhysicalPlan physicalPlan) {
-    long jobId = QueryResourceManager.getInstance().assignJobId();
-    queryStatus.get().put(statement, physicalPlan);
-    queryJobIdMap.get().put(statement, jobId);
-    queryManager.addSingleQuery(jobId, (QueryPlan) physicalPlan);
-    // refresh current queryRet for statement
-    if (queryRet.get().containsKey(statement)) {
-      queryRet.get().remove(statement);
-    }
-  }
-
-  @Override
   public void releaseQueryResource(TSCloseOperationReq req) throws FileNodeManagerException {
     Map<Long, QueryContext> contextMap = contextMapLocal.get();
     if (contextMap == null) {
@@ -310,35 +291,18 @@ public class TSServiceClusterImpl extends TSServiceImpl {
   }
 
   @Override
-  public void clearAllStatusForCurrentRequest() {
-    if (this.queryRet.get() != null) {
-      this.queryRet.get().clear();
-    }
-
-    if (this.queryJobIdMap.get() != null) {
-      this.queryJobIdMap.get().clear();
-    }
-
-    if (this.queryStatus.get() != null) {
-      this.queryStatus.get().clear();
-    }
-  }
-
-
-  @Override
   public QueryDataSet createNewDataSet(String statement, int fetchSize, TSFetchResultsReq req)
       throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException,
       ProcessorException, IOException {
     PhysicalPlan physicalPlan = queryStatus.get().get(statement);
     processor.getExecutor().setFetchSize(fetchSize);
-    long jobId = queryJobIdMap.get().get(statement);
 
+    long jobId = QueryResourceManager.getInstance().assignJobId();
     QueryContext context = new QueryContext(jobId);
     initContextMap();
     contextMapLocal.get().put(req.queryId, context);
 
-    queryManager.getSingleQuery(jobId).dividePhysicalPlan();
-
+    queryManager.addSingleQuery(jobId, (QueryPlan) physicalPlan);
     QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan,
         context);
     queryRet.get().put(statement, queryDataSet);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
new file mode 100644
index 0000000..63a5be6
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.utils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
+import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
+import org.apache.iotdb.cluster.utils.hash.Router;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+
+public class QPExecutorUtils {
+
+  private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
+
+  private static final Router router = Router.getInstance();
+
+  private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
+      CLUSTER_CONFIG.getPort());
+
+  private static final  MManager mManager = MManager.getInstance();
+
+  private static final Server server = Server.getInstance();
+
+
+  /**
+   * Get Storage Group Name by device name
+   */
+  public static String getStroageGroupByDevice(String device) throws PathErrorException {
+    String storageGroup;
+    try {
+      storageGroup = MManager.getInstance().getFileNameByPath(device);
+    } catch (PathErrorException e) {
+      throw new PathErrorException(String.format("File level of %s doesn't exist.", device));
+    }
+    return storageGroup;
+  }
+
+  /**
+   * Get all Storage Group Names by path
+   */
+  public static List<String> getAllStroageGroupsByPath(String path) throws PathErrorException {
+    List<String> storageGroupList;
+    try {
+      storageGroupList = mManager.getAllFileNamesByPath(path);
+    } catch (PathErrorException e) {
+      throw new PathErrorException(String.format("File level of %s doesn't exist.", path));
+    }
+    return storageGroupList;
+  }
+
+  /**
+   * Classify the input storage group list by which data group it belongs to.
+   *
+   * @return key is groupId, value is all SGs belong to this data group
+   */
+  public static Map<String, Set<String>> classifySGByGroupId(List<String> sgList) {
+    Map<String, Set<String>> map = new HashMap<>();
+    for (int i = 0; i < sgList.size(); i++) {
+      String sg = sgList.get(i);
+      String groupId = router.getGroupIdBySG(sg);
+      if (map.containsKey(groupId)) {
+        map.get(groupId).add(sg);
+      } else {
+        Set<String> set = new HashSet<>();
+        set.add(sg);
+        map.put(groupId, set);
+      }
+    }
+    return map;
+  }
+
+  /**
+   * Check if the non query command can execute in local. 1. If this node belongs to the storage
+   * group 2. If this node is leader.
+   */
+  public static boolean canHandleNonQueryByGroupId(String groupId) {
+    boolean canHandle = false;
+    if(groupId.equals(ClusterConfig.METADATA_GROUP_ID)){
+      canHandle = ((MetadataRaftHolder) (server.getMetadataHolder())).getFsm().isLeader();
+    }else {
+      if (router.containPhysicalNodeByGroupId(groupId, localNode) && RaftUtils
+          .getPhysicalNodeFrom(RaftUtils.getLeaderPeerID(groupId)).equals(localNode)) {
+        canHandle = true;
+      }
+    }
+    return canHandle;
+  }
+
+  /**
+   * Check if the query command can execute in local. Check if this node belongs to the group id
+   */
+  public static boolean canHandleQueryByGroupId(String groupId) {
+    return router.containPhysicalNodeByGroupId(groupId, localNode);
+  }
+
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index c1a3270..5c3f3cb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -33,6 +33,8 @@ import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.task.QPTask;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.config.ClusterConfig;
@@ -40,7 +42,9 @@ import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
+import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
 import org.apache.iotdb.cluster.rpc.raft.closure.ResponseClosure;
+import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.MetaGroupNonQueryResponse;
@@ -51,10 +55,17 @@ import org.slf4j.LoggerFactory;
 
 public class RaftUtils {
 
+  private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
+
   private static final Logger LOGGER = LoggerFactory.getLogger(RaftUtils.class);
   private static final Server server = Server.getInstance();
   private static final Router router = Router.getInstance();
   private static final AtomicInteger requestId = new AtomicInteger(0);
+  /**
+   * Raft as client manager.
+   */
+  private static final RaftNodeAsClientManager CLIENT_MANAGER = RaftNodeAsClientManager
+      .getInstance();
 
   /**
    * The cache will be update in two case: 1. When @onLeaderStart() method of state machine is
@@ -302,4 +313,18 @@ public class RaftUtils {
     status.setCode(-1);
     return status;
   }
+
+  /**
+   * try to get raft rpc client
+   */
+  public static NodeAsClient getRaftNodeAsClient() throws RaftConnectionException {
+    NodeAsClient client = CLIENT_MANAGER.getRaftNodeAsClient();
+    if (client == null) {
+      throw new RaftConnectionException(String
+          .format("Raft inner rpc clients have reached the max numbers %s",
+              CLUSTER_CONFIG.getMaxNumOfInnerRpcClient() + CLUSTER_CONFIG
+                  .getMaxQueueNumOfInnerRpcClient()));
+    }
+    return client;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
index 7c7b2be..12d3b01 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
@@ -236,4 +236,11 @@ public class Router {
   public Set<String> getAllGroupId() {
     return groupIdMapNodeCache.keySet();
   }
+
+  /**
+   * Get raft group id by storage group name
+   */
+  public String getGroupIdBySG(String storageGroup) {
+    return getGroupID(routeGroup(storageGroup));
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
index 1d0b91b..01c1aed 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
@@ -40,7 +40,7 @@ public interface IEngineQueryRouter {
    * Execute physical plan.
    */
   QueryDataSet query(QueryExpression queryExpression, QueryContext context)
-      throws FileNodeManagerException;
+      throws FileNodeManagerException, PathErrorException;
 
   /**
    * Execute aggregation query.