You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/16 12:27:06 UTC

[incubator-iotdb] 15/19: add ClusterRpcReaderUtils

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit d4863aad7f439c31bd967780aff9094ede628fde
Author: lta <li...@163.com>
AuthorDate: Tue Apr 16 16:17:00 2019 +0800

    add ClusterRpcReaderUtils
---
 .../org/apache/iotdb/cluster/entity/Server.java    | 14 +--
 .../cluster/entity/raft/DataStateMachine.java      |  2 +-
 .../cluster/entity/raft/MetadataStateManchine.java |  2 +-
 .../cluster/qp/executor/AbstractQPExecutor.java    | 17 ++--
 .../executor/ClusterQueryProcessExecutor.java      | 57 ++++++++++---
 .../cluster/qp/executor/NonQueryExecutor.java      |  8 +-
 .../cluster/qp/executor/QueryMetadataExecutor.java | 38 ++++-----
 .../apache/iotdb/cluster/qp/task/BatchQPTask.java  |  2 +-
 .../executor/ClusterExecutorWithTimeGenerator.java |  5 +-
 .../ClusterExecutorWithoutTimeGenerator.java       |  6 +-
 .../executor/ClusterQueryRouter.java               | 41 ++++++---
 .../factory/ClusterRpcReaderFactory.java           | 31 -------
 .../manager/ClusterSingleQueryManager.java         | 37 ++++----
 .../manager/IClusterSingleQueryManager.java        |  5 +-
 .../reader/ClusterRpcBatchDataReader.java          | 29 ++++++-
 .../reader/ClusterSeriesReader.java                |  2 +-
 .../iotdb/cluster/rpc/raft/NodeAsClient.java       |  8 +-
 .../rpc/raft/impl/RaftNodeAsClientManager.java     | 13 ++-
 .../DataGroupNonQueryAsyncProcessor.java           |  7 +-
 .../MetaGroupNonQueryAsyncProcessor.java           |  7 +-
 .../QueryMetadataAsyncProcessor.java               |  7 +-
 .../QueryMetadataInStringAsyncProcessor.java       |  7 +-
 .../QueryPathsAsyncProcessor.java                  |  7 +-
 .../QuerySeriesTypeAsyncProcessor.java             |  7 +-
 .../QueryTimeSeriesAsyncProcessor.java             |  7 +-
 .../rpc/raft/request/BasicQueryRequest.java        |  4 +
 .../{ => nonquery}/DataGroupNonQueryRequest.java   |  5 +-
 .../{ => nonquery}/MetaGroupNonQueryRequest.java   |  5 +-
 .../QuerySeriesDataRequest.java}                   | 27 ++++--
 .../Stage.java}                                    | 13 +--
 .../QueryMetadataInStringRequest.java              |  6 +-
 .../{ => querymetadata}/QueryMetadataRequest.java  |  6 +-
 .../{ => querymetadata}/QueryPathsRequest.java     |  6 +-
 .../QuerySeriesTypeRequest.java                    |  6 +-
 .../QueryStorageGroupRequest.java                  |  6 +-
 .../QueryTimeSeriesRequest.java                    |  6 +-
 .../{ => nonquery}/DataGroupNonQueryResponse.java  |  4 +-
 .../{ => nonquery}/MetaGroupNonQueryResponse.java  |  4 +-
 .../querydata/QuerySeriesDataResponse.java         | 69 +++++++++++++++
 .../QueryMetadataInStringResponse.java             |  4 +-
 .../{ => querymetadata}/QueryMetadataResponse.java |  3 +-
 .../{ => querymetadata}/QueryPathsResponse.java    |  3 +-
 .../QuerySeriesTypeResponse.java                   |  3 +-
 .../QueryStorageGroupResponse.java                 |  3 +-
 .../QueryTimeSeriesResponse.java                   |  3 +-
 .../cluster/service/TSServiceClusterImpl.java      | 18 ++--
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  |  2 +-
 .../cluster/utils/query/ClusterRpcReaderUtils.java | 99 ++++++++++++++++++++++
 .../query/QueryTask.java}                          | 39 ++++-----
 .../apache/iotdb/cluster/utils/RaftUtilsTest.java  |  2 +-
 .../db/qp/executor/IQueryProcessExecutor.java      |  1 -
 .../iotdb/db/qp/executor/OverflowQPExecutor.java   |  6 +-
 .../iotdb/db/qp/executor/QueryProcessExecutor.java |  9 +-
 .../apache/iotdb/tsfile/read/common/BatchData.java |  4 +-
 54 files changed, 490 insertions(+), 242 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index a4c02cd..1f0e4e3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -32,13 +32,13 @@ import org.apache.iotdb.cluster.entity.metadata.MetadataHolder;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
-import org.apache.iotdb.cluster.rpc.raft.processor.DataGroupNonQueryAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.raft.processor.MetaGroupNonQueryAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetadataAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetadataInStringAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.raft.processor.QueryPathsAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.raft.processor.QuerySeriesTypeAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.raft.processor.QueryTimeSeriesAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.DataGroupNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.MetaGroupNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryMetadataAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryMetadataInStringAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryPathsAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QuerySeriesTypeAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryTimeSeriesAsyncProcessor;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
 import org.apache.iotdb.cluster.utils.hash.Router;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
index ebac074..e74db12 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
@@ -31,7 +31,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.cluster.rpc.raft.closure.ResponseClosure;
-import org.apache.iotdb.cluster.rpc.raft.request.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.nonquery.DataGroupNonQueryRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
index 9592718..b45ab9c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
@@ -32,7 +32,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.cluster.rpc.raft.closure.ResponseClosure;
-import org.apache.iotdb.cluster.rpc.raft.request.MetaGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.nonquery.MetaGroupNonQueryRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index e5df083..810cc6e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -70,12 +70,17 @@ public abstract class AbstractQPExecutor {
   /**
    * ReadMetadataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
    */
-  protected int readMetadataConsistencyLevel = CLUSTER_CONFIG.getReadMetadataConsistencyLevel();
+  private ThreadLocal<Integer> readMetadataConsistencyLevel = new ThreadLocal<>();
 
   /**
    * ReadDataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
    */
-  private int readDataConsistencyLevel = CLUSTER_CONFIG.getReadDataConsistencyLevel();
+  private ThreadLocal<Integer> readDataConsistencyLevel = new ThreadLocal<>();
+
+  public AbstractQPExecutor() {
+    readMetadataConsistencyLevel.set(CLUSTER_CONFIG.getReadMetadataConsistencyLevel());
+    readDataConsistencyLevel.set(CLUSTER_CONFIG.getReadDataConsistencyLevel());
+  }
 
   /**
    * Async handle QPTask by QPTask and leader id
@@ -141,7 +146,7 @@ public abstract class AbstractQPExecutor {
 
   public void setReadMetadataConsistencyLevel(int level) throws ConsistencyLevelException {
     if (level <= ClusterConstant.MAX_CONSISTENCY_LEVEL) {
-      this.readMetadataConsistencyLevel = level;
+      readMetadataConsistencyLevel.set(level);
     } else {
       throw new ConsistencyLevelException(String.format("Consistency level %d not support", level));
     }
@@ -149,17 +154,17 @@ public abstract class AbstractQPExecutor {
 
   public void setReadDataConsistencyLevel(int level) throws ConsistencyLevelException {
     if (level <= ClusterConstant.MAX_CONSISTENCY_LEVEL) {
-      this.readDataConsistencyLevel = level;
+      readDataConsistencyLevel.set(level);
     } else {
       throw new ConsistencyLevelException(String.format("Consistency level %d not support", level));
     }
   }
 
   public int getReadMetadataConsistencyLevel() {
-    return readMetadataConsistencyLevel;
+    return readMetadataConsistencyLevel.get();
   }
 
   public int getReadDataConsistencyLevel() {
-    return readDataConsistencyLevel;
+    return readDataConsistencyLevel.get();
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
similarity index 67%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
index e7e58f8..1b82583 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
@@ -16,18 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.query.coordinatornode.executor;
+package org.apache.iotdb.cluster.qp.executor;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor;
+import org.apache.iotdb.cluster.query.coordinatornode.executor.ClusterQueryRouter;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
-import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
 import org.apache.iotdb.db.query.fill.IFill;
@@ -35,34 +39,64 @@ import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-public class ClusterQueryProcessExecutor extends QueryProcessExecutor {
+public class ClusterQueryProcessExecutor extends AbstractQPExecutor implements IQueryProcessExecutor {
 
-  private IEngineQueryRouter queryRouter = new ClusterQueryRouter();
+  private ThreadLocal<Integer> fetchSize = new ThreadLocal<>();
+  private ClusterQueryRouter clusterQueryRouter = new ClusterQueryRouter();
 
   private QueryMetadataExecutor queryMetadataExecutor = new QueryMetadataExecutor();
 
   @Override
+  public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
+      throws IOException, FileNodeManagerException, PathErrorException,
+      QueryFilterOptimizationException, ProcessorException {
+
+    QueryExpression queryExpression = QueryExpression.create().setSelectSeries(queryPlan.getPaths())
+        .setExpression(queryPlan.getExpression());
+    clusterQueryRouter.setReadDataConsistencyLevel(getReadDataConsistencyLevel());
+    if (queryPlan instanceof GroupByPlan) {
+      GroupByPlan groupByPlan = (GroupByPlan) queryPlan;
+      return groupBy(groupByPlan.getPaths(), groupByPlan.getAggregations(),
+          groupByPlan.getExpression(), groupByPlan.getUnit(), groupByPlan.getOrigin(),
+          groupByPlan.getIntervals(), context);
+    }
+
+    if (queryPlan instanceof AggregationPlan) {
+      return aggregate(queryPlan.getPaths(), queryPlan.getAggregations(),
+          queryPlan.getExpression(), context);
+    }
+
+    if (queryPlan instanceof FillQueryPlan) {
+      FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan;
+      return fill(queryPlan.getPaths(), fillQueryPlan.getQueryTime(),
+          fillQueryPlan.getFillType(), context);
+    }
+    return clusterQueryRouter.query(queryExpression, context);
+  }
+
+  @Override
   public QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression,
       QueryContext context)
       throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException {
-    return queryRouter.aggregate(paths, aggres, expression, context);
+    return clusterQueryRouter.aggregate(paths, aggres, expression, context);
   }
 
   @Override
   public QueryDataSet groupBy(List<Path> paths, List<String> aggres, IExpression expression,
       long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context)
       throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException {
-    return queryRouter.groupBy(paths, aggres, expression, unit, origin, intervals, context);
+    return clusterQueryRouter.groupBy(paths, aggres, expression, unit, origin, intervals, context);
   }
 
   @Override
   public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
       QueryContext context)
       throws ProcessorException, IOException, PathErrorException, FileNodeManagerException {
-    return queryRouter.fill(fillPaths, queryTime, fillTypes, context);
+    return clusterQueryRouter.fill(fillPaths, queryTime, fillTypes, context);
   }
 
   @Override
@@ -111,11 +145,6 @@ public class ClusterQueryProcessExecutor extends QueryProcessExecutor {
   }
 
   @Override
-  public IEngineQueryRouter getQueryRouter() {
-    return queryRouter;
-  }
-
-  @Override
   public boolean update(Path path, long startTime, long endTime, String value)
       throws ProcessorException {
     throw new UnsupportedOperationException();
@@ -144,6 +173,6 @@ public class ClusterQueryProcessExecutor extends QueryProcessExecutor {
 
   @Override
   public boolean processNonQuery(PhysicalPlan plan) throws ProcessorException {
-    return false;
+    throw new UnsupportedOperationException();
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index ecafe4e..b6247c8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -38,11 +38,11 @@ import org.apache.iotdb.cluster.qp.task.BatchQPTask;
 import org.apache.iotdb.cluster.qp.task.QPTask;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.DataGroupNonQueryRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.MetaGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.nonquery.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.nonquery.MetaGroupNonQueryRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.DataGroupNonQueryResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.MetaGroupNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.nonquery.DataGroupNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.nonquery.MetaGroupNonQueryResponse;
 import org.apache.iotdb.cluster.service.TSServiceClusterImpl.BatchResult;
 import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.cluster.utils.RaftUtils;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index aea0656..82325e1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -32,19 +32,19 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataInStringRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryPathsRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.QuerySeriesTypeRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryStorageGroupRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryTimeSeriesRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryMetadataInStringRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryPathsRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QuerySeriesTypeRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryStorageGroupRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryTimeSeriesRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataInStringResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryStorageGroupResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryMetadataInStringResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryMetadataResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryPathsResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QuerySeriesTypeResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryStorageGroupResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryTimeSeriesResponse;
 import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -125,7 +125,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
   private void handleTimseriesQuery(String groupId, List<String> pathList, List<List<String>> res)
       throws ProcessorException, InterruptedException {
     QueryTimeSeriesRequest request = new QueryTimeSeriesRequest(groupId,
-        readMetadataConsistencyLevel, pathList);
+        getReadMetadataConsistencyLevel(), pathList);
     SingleQPTask task = new SingleQPTask(false, request);
 
     LOGGER.debug("Execute show timeseries {} statement for group {}.", pathList, groupId);
@@ -152,7 +152,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
     List<SingleQPTask> taskList = new ArrayList<>();
     for (String groupId : groupIdSet) {
       QueryMetadataInStringRequest request = new QueryMetadataInStringRequest(groupId,
-          readMetadataConsistencyLevel);
+          getReadMetadataConsistencyLevel());
       SingleQPTask task = new SingleQPTask(false, request);
       taskList.add(task);
 
@@ -191,7 +191,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
     List<SingleQPTask> taskList = new ArrayList<>();
     for (String groupId : groupIdSet) {
       QueryMetadataRequest request = new QueryMetadataRequest(groupId,
-          readMetadataConsistencyLevel);
+          getReadMetadataConsistencyLevel());
       SingleQPTask task = new SingleQPTask(false, request);
       taskList.add(task);
 
@@ -235,7 +235,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
     } else {
       String groupId = router.getGroupIdBySG(storageGroupList.get(0));
       QuerySeriesTypeRequest request = new QuerySeriesTypeRequest(groupId,
-          readMetadataConsistencyLevel, path);
+          getReadMetadataConsistencyLevel(), path);
       SingleQPTask task = new SingleQPTask(false, request);
 
       LOGGER.debug("Execute get series type for {} statement for group {}.", path, groupId);
@@ -284,7 +284,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
   private void handlePathsQuery(String groupId, List<String> pathList, List<String> res)
       throws ProcessorException, InterruptedException {
     QueryPathsRequest request = new QueryPathsRequest(groupId,
-        readMetadataConsistencyLevel, pathList);
+        getReadMetadataConsistencyLevel(), pathList);
     SingleQPTask task = new SingleQPTask(false, request);
 
     LOGGER.debug("Execute get paths for {} statement for group {}.", pathList, groupId);
@@ -325,10 +325,10 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
   private Set<String> queryStorageGroupLocally() throws InterruptedException {
     final byte[] reqContext = RaftUtils.createRaftRequestContext();
     QueryStorageGroupRequest request = new QueryStorageGroupRequest(
-        ClusterConfig.METADATA_GROUP_ID, readMetadataConsistencyLevel);
+        ClusterConfig.METADATA_GROUP_ID, getReadMetadataConsistencyLevel());
     SingleQPTask task = new SingleQPTask(false, request);
     MetadataRaftHolder metadataHolder = (MetadataRaftHolder) server.getMetadataHolder();
-    if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
+    if (getReadMetadataConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryStorageGroupResponse response;
       try {
         response = QueryStorageGroupResponse
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index d224bae..43edd67 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.cluster.concurrent.pool.QPTaskManager;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.DataGroupNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.nonquery.DataGroupNonQueryResponse;
 import org.apache.iotdb.cluster.service.TSServiceClusterImpl.BatchResult;
 import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.cluster.utils.RaftUtils;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
index d1ca472..68a14c8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
@@ -40,10 +40,13 @@ public class ClusterExecutorWithTimeGenerator {
 
   private QueryExpression queryExpression;
   private IClusterSingleQueryManager queryManager;
+  private int readDataConsistencyLevel;
 
-  ClusterExecutorWithTimeGenerator(QueryExpression queryExpression, IClusterSingleQueryManager queryManager) {
+  ClusterExecutorWithTimeGenerator(QueryExpression queryExpression,
+      IClusterSingleQueryManager queryManager, int readDataConsistencyLevel) {
     this.queryExpression = queryExpression;
     this.queryManager = queryManager;
+    this.readDataConsistencyLevel = readDataConsistencyLevel;
   }
 
   /**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
index 7bd8008..65b7d8f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
@@ -46,11 +46,13 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 public class ClusterExecutorWithoutTimeGenerator {
   private QueryExpression queryExpression;
   private ClusterSingleQueryManager queryManager;
+  private int readDataConsistencyLevel;
 
   public ClusterExecutorWithoutTimeGenerator(QueryExpression queryExpression,
-      ClusterSingleQueryManager queryManager) {
+      ClusterSingleQueryManager queryManager, int readDataConsistencyLevel) {
     this.queryExpression = queryExpression;
     this.queryManager = queryManager;
+    this.readDataConsistencyLevel = readDataConsistencyLevel;
   }
 
   /**
@@ -126,7 +128,9 @@ public class ClusterExecutorWithoutTimeGenerator {
         ClusterSeriesReader reader = selectPathReaders.get(path.toString());
         readersOfSelectedSeries.add(reader);
         dataTypes.add(reader.getDataType());
+
       } else {
+        // can read series locally.
         QueryDataSource queryDataSource = QueryResourceManager.getInstance()
             .getQueryDataSource(path,
                 context);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
index 8fa1a95..da5a395 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
@@ -21,10 +21,10 @@ package org.apache.iotdb.cluster.query.coordinatornode.executor;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcQueryManager;
 import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager;
 import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager.QueryType;
-import org.apache.iotdb.cluster.query.coordinatornode.manager.IClusterSingleQueryManager;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
@@ -43,38 +43,43 @@ import org.apache.iotdb.tsfile.utils.Pair;
 
 public class ClusterQueryRouter implements IEngineQueryRouter {
 
+  private ThreadLocal<Integer> readDataConsistencyLevel = new ThreadLocal<>();
+
   @Override
   public QueryDataSet query(QueryExpression queryExpression, QueryContext context)
       throws FileNodeManagerException, PathErrorException {
 
     ClusterSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance()
         .getSingleQuery(context.getJobId());
-    if (queryExpression.hasQueryFilter()) {
-      try {
+    try {
+      if (queryExpression.hasQueryFilter()) {
+
         IExpression optimizedExpression = ExpressionOptimizer.getInstance()
             .optimize(queryExpression.getExpression(), queryExpression.getSelectedSeries());
         queryExpression.setExpression(optimizedExpression);
 
         if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
-          queryManager.init(QueryType.GLOBAL_TIME);
+          queryManager.init(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel());
           ClusterExecutorWithoutTimeGenerator engineExecutor =
-              new ClusterExecutorWithoutTimeGenerator(queryExpression, queryManager);
+              new ClusterExecutorWithoutTimeGenerator(queryExpression, queryManager,
+                  getReadDataConsistencyLevel());
           return engineExecutor.executeWithGlobalTimeFilter(context);
         } else {
-          queryManager.init(QueryType.FILTER);
+          queryManager.init(QueryType.FILTER, getReadDataConsistencyLevel());
           ClusterExecutorWithTimeGenerator engineExecutor = new ClusterExecutorWithTimeGenerator(
-              queryExpression, queryManager);
+              queryExpression, queryManager, getReadDataConsistencyLevel());
           return engineExecutor.execute(context);
         }
 
-      } catch (QueryFilterOptimizationException e) {
-        throw new FileNodeManagerException(e);
+      } else {
+        queryManager.init(QueryType.NO_FILTER, getReadDataConsistencyLevel());
+        ClusterExecutorWithoutTimeGenerator engineExecutor =
+            new ClusterExecutorWithoutTimeGenerator(queryExpression, queryManager,
+                getReadDataConsistencyLevel());
+        return engineExecutor.executeWithoutFilter(context);
       }
-    } else {
-      queryManager.init(QueryType.NO_FILTER);
-      ClusterExecutorWithoutTimeGenerator engineExecutor =
-          new ClusterExecutorWithoutTimeGenerator(queryExpression, queryManager);
-      return engineExecutor.executeWithoutFilter(context);
+    } catch (QueryFilterOptimizationException | IOException | RaftConnectionException e) {
+      throw new FileNodeManagerException(e);
     }
   }
 
@@ -98,4 +103,12 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
       QueryContext context) throws FileNodeManagerException, PathErrorException, IOException {
     return null;
   }
+
+  public int getReadDataConsistencyLevel() {
+    return readDataConsistencyLevel.get();
+  }
+
+  public void setReadDataConsistencyLevel(int readDataConsistencyLevel) {
+    this.readDataConsistencyLevel.set(readDataConsistencyLevel);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/factory/ClusterRpcReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/factory/ClusterRpcReaderFactory.java
deleted file mode 100644
index 27444af..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/factory/ClusterRpcReaderFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.query.coordinatornode.factory;
-
-import com.alipay.sofa.jraft.entity.PeerId;
-import java.util.Map;
-import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterSeriesReader;
-import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-
-public class ClusterRpcReaderFactory {
-
-  public static Map<String, ClusterSeriesReader> createClusterSeriesReader(String groupId, PeerId peerId, QueryPlan queryPlan){
-return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
index 90c1ac5..d350a7c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
@@ -19,16 +19,19 @@
 package org.apache.iotdb.cluster.query.coordinatornode.manager;
 
 import com.alipay.sofa.jraft.entity.PeerId;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import org.apache.iotdb.cluster.query.coordinatornode.factory.ClusterRpcReaderFactory;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.query.PathType;
 import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterSeriesReader;
 import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.cluster.utils.hash.Router;
+import org.apache.iotdb.cluster.utils.query.ClusterRpcReaderUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -37,7 +40,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
 public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
 
   /**
-   * Query job id assigned by QueryResourceManager.
+   * Query job id assigned by QueryResourceManager of coordinator node.
    */
   private long jobId;
 
@@ -63,6 +66,8 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
    */
   private Map<String, QueryPlan> filterPathPlans = new HashMap<>();
 
+  private Map<String, ClusterSeriesReader> filterPathReaders = new HashMap<>();
+
   public ClusterSingleQueryManager(long jobId,
       QueryPlan queryPlan) {
     this.jobId = jobId;
@@ -70,7 +75,8 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
   }
 
   @Override
-  public void init(QueryType queryType) throws PathErrorException {
+  public void init(QueryType queryType, int readDataConsistencyLevel)
+      throws PathErrorException, IOException, RaftConnectionException {
     switch (queryType) {
       case NO_FILTER:
         divideNoFilterPhysicalPlan();
@@ -84,8 +90,8 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
       default:
         throw new UnsupportedOperationException();
     }
-    initSelectedPathPlan();
-    initFilterPathPlan();
+    initSelectedPathPlan(readDataConsistencyLevel);
+    initFilterPathPlan(readDataConsistencyLevel);
   }
 
   public enum QueryType {
@@ -127,26 +133,25 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
   /**
    * Init select path
    */
-  private void initSelectedPathPlan(){
-    if(!selectPathPlans.isEmpty()){
-      for(Entry<String, QueryPlan> entry: selectPathPlans.entrySet()){
+  private void initSelectedPathPlan(int readDataConsistencyLevel)
+      throws IOException, RaftConnectionException {
+    if (!selectPathPlans.isEmpty()) {
+      for (Entry<String, QueryPlan> entry : selectPathPlans.entrySet()) {
         String groupId = entry.getKey();
         QueryPlan queryPlan = entry.getValue();
-        if(!canHandleQueryLocally(groupId)) {
+        if (!canHandleQueryLocally(groupId)) {
           PeerId randomPeer = RaftUtils.getRandomPeerID(groupId);
           readerNodes.put(groupId, randomPeer);
-          Map<String, ClusterSeriesReader> selectPathReaders = ClusterRpcReaderFactory
-              .createClusterSeriesReader(groupId, randomPeer, queryPlan);
-          for (Path path : queryPlan.getPaths()) {
-            selectPathReaders.put(path.getFullPath(), selectPathReaders.get(path.getFullPath()));
-          }
+          this.selectPathReaders = ClusterRpcReaderUtils
+              .createClusterSeriesReader(groupId, randomPeer, queryPlan, readDataConsistencyLevel,
+                  PathType.SELECT_PATH);
         }
       }
     }
   }
 
-  private void initFilterPathPlan(){
-    if(!filterPathPlans.isEmpty()){
+  private void initFilterPathPlan(int readDataConsistencyLevel) {
+    if (!filterPathPlans.isEmpty()) {
 
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
index dc84d32..bc70e65 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.cluster.query.coordinatornode.manager;
 
 import com.alipay.sofa.jraft.entity.PeerId;
+import java.io.IOException;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager.QueryType;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -33,7 +35,8 @@ public interface IClusterSingleQueryManager {
    * @param queryType
    */
   void init(
-      QueryType queryType) throws PathErrorException;
+      QueryType queryType, int readDataConsistencyLevel)
+      throws PathErrorException, IOException, RaftConnectionException;
 
   /**
    * Get physical plan of select path
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
index 5b75c9d..ba36416 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
@@ -18,19 +18,42 @@
  */
 package org.apache.iotdb.cluster.query.coordinatornode.reader;
 
+import com.alipay.sofa.jraft.entity.PeerId;
 import java.io.IOException;
 import org.apache.iotdb.cluster.query.PathType;
 import org.apache.iotdb.db.query.reader.IBatchReader;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
 public class ClusterRpcBatchDataReader implements IBatchReader {
 
-  private String PeerId;
-  private String jobId;
+  /**
+   * Remote query node
+   */
+  private PeerId peerId;
+
+  /**
+   * Job id in remote query node
+   */
+  private long jobId;
+
+  /**
+   * Path type
+   */
   private PathType type;
+
+  /**
+   * Batch data
+   */
   private BatchData batchData;
 
+  public ClusterRpcBatchDataReader(PeerId peerId, long jobId,
+      PathType type, BatchData batchData) {
+    this.peerId = peerId;
+    this.jobId = jobId;
+    this.type = type;
+    this.batchData = batchData;
+  }
+
   @Override
   public boolean hasNext() throws IOException {
     return false;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterSeriesReader.java
index 438559e..3d022f8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterSeriesReader.java
@@ -40,7 +40,7 @@ public class ClusterSeriesReader implements IPointReader {
 
   @Override
   public TimeValuePair current() throws IOException {
-    return null;
+    throw new IOException("current() in ClusterSeriesReader is an empty method.");
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
index ed38584..cc89abb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
@@ -22,6 +22,7 @@ import com.alipay.sofa.jraft.entity.PeerId;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
+import org.apache.iotdb.cluster.utils.query.QueryTask;
 
 /**
  * Handle the request and process the result as a client with the current node
@@ -38,11 +39,10 @@ public interface NodeAsClient {
 
   /**
    * Synchronous processing requests
-   *  @param clientService client rpc service handle
-   * @param leader leader node of the target group
-   * @param qpTask single QPTask to be executed
+   * @param peerId leader node of the target group
+   *
    */
-  void syncHandleRequest(BasicRequest request, PeerId leader, SingleQPTask qpTask)
+  QueryTask syncHandleRequest(BasicRequest request, PeerId peerId)
       throws RaftConnectionException;
 
   /**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index af77a1c..658b7a8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
+import org.apache.iotdb.cluster.utils.query.QueryTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -214,17 +215,13 @@ public class RaftNodeAsClientManager {
     }
 
     @Override
-    public void syncHandleRequest(BasicRequest request, PeerId leader,
-        SingleQPTask qpTask)
-        throws RaftConnectionException {
+    public QueryTask syncHandleRequest(BasicRequest request, PeerId peerId) {
       try {
         BasicResponse response = (BasicResponse) boltClientService.getRpcClient()
-            .invokeSync(leader.getEndpoint().toString(), request, TASK_TIMEOUT_MS);
-        qpTask.run(response);
+            .invokeSync(peerId.getEndpoint().toString(), request, TASK_TIMEOUT_MS);
+        return new QueryTask(response, TaskState.FINISH);
       } catch (RemotingException | InterruptedException e) {
-        qpTask.setTaskState(TaskState.EXCEPTION);
-        qpTask.run(null);
-        throw new RaftConnectionException(e);
+        return new QueryTask(null, TaskState.EXCEPTION);
       } finally {
         releaseClient();
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/DataGroupNonQueryAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
similarity index 90%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/DataGroupNonQueryAsyncProcessor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
index fb00c0d..de2d2ab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/DataGroupNonQueryAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
@@ -16,16 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.processor;
+package org.apache.iotdb.cluster.rpc.raft.processor.nonquery;
 
 import com.alipay.remoting.AsyncContext;
 import com.alipay.remoting.BizContext;
 import com.alipay.sofa.jraft.entity.PeerId;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.nonquery.DataGroupNonQueryRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.DataGroupNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.nonquery.DataGroupNonQueryResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/MetaGroupNonQueryAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/MetaGroupNonQueryAsyncProcessor.java
similarity index 89%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/MetaGroupNonQueryAsyncProcessor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/MetaGroupNonQueryAsyncProcessor.java
index d6f6270..9f09bbb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/MetaGroupNonQueryAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/MetaGroupNonQueryAsyncProcessor.java
@@ -16,16 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.processor;
+package org.apache.iotdb.cluster.rpc.raft.processor.nonquery;
 
 import com.alipay.remoting.AsyncContext;
 import com.alipay.remoting.BizContext;
 import com.alipay.sofa.jraft.entity.PeerId;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.MetaGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.nonquery.MetaGroupNonQueryRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.MetaGroupNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.nonquery.MetaGroupNonQueryResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataAsyncProcessor.java
similarity index 91%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataAsyncProcessor.java
index 176fa33..36e657c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataAsyncProcessor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.processor;
+package org.apache.iotdb.cluster.rpc.raft.processor.querymetadata;
 
 import com.alipay.remoting.AsyncContext;
 import com.alipay.remoting.BizContext;
@@ -25,8 +25,9 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataResponse;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryMetadataResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataInStringAsyncProcessor.java
similarity index 90%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataInStringAsyncProcessor.java
index b80f4ae..8771eea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataInStringAsyncProcessor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.processor;
+package org.apache.iotdb.cluster.rpc.raft.processor.querymetadata;
 
 import com.alipay.remoting.AsyncContext;
 import com.alipay.remoting.BizContext;
@@ -25,8 +25,9 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataInStringRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataInStringResponse;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryMetadataInStringRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryMetadataInStringResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.metadata.MManager;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java
index f54aba0..8e1e47b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.processor;
+package org.apache.iotdb.cluster.rpc.raft.processor.querymetadata;
 
 import com.alipay.remoting.AsyncContext;
 import com.alipay.remoting.BizContext;
@@ -25,8 +25,9 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryPathsRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryPathsRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryPathsResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QuerySeriesTypeAsyncProcessor.java
similarity index 91%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QuerySeriesTypeAsyncProcessor.java
index f0a4fc6..9e4b1c7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QuerySeriesTypeAsyncProcessor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.processor;
+package org.apache.iotdb.cluster.rpc.raft.processor.querymetadata;
 
 import com.alipay.remoting.AsyncContext;
 import com.alipay.remoting.BizContext;
@@ -25,8 +25,9 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QuerySeriesTypeRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QuerySeriesTypeRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QuerySeriesTypeResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryTimeSeriesAsyncProcessor.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryTimeSeriesAsyncProcessor.java
index c41fdcf..593f99d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryTimeSeriesAsyncProcessor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.processor;
+package org.apache.iotdb.cluster.rpc.raft.processor.querymetadata;
 
 import com.alipay.remoting.AsyncContext;
 import com.alipay.remoting.BizContext;
@@ -25,8 +25,9 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryTimeSeriesRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryTimeSeriesRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryTimeSeriesResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/BasicQueryRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/BasicQueryRequest.java
index 2cf613f..0e4ea6d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/BasicQueryRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/BasicQueryRequest.java
@@ -30,6 +30,10 @@ public abstract class BasicQueryRequest extends BasicRequest {
     this.readConsistencyLevel = readConsistencyLevel;
   }
 
+  public BasicQueryRequest(String groupID) {
+    super(groupID);
+  }
+
   public int getReadConsistencyLevel() {
     return readConsistencyLevel;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/DataGroupNonQueryRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/nonquery/DataGroupNonQueryRequest.java
similarity index 86%
copy from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/DataGroupNonQueryRequest.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/nonquery/DataGroupNonQueryRequest.java
index c1bcf5f..9a5df22 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/DataGroupNonQueryRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/nonquery/DataGroupNonQueryRequest.java
@@ -16,17 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.nonquery;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
 /**
  * Handle request to data group
  */
-public class DataGroupNonQueryRequest extends BasicRequest implements Serializable {
+public class DataGroupNonQueryRequest extends BasicRequest {
 
 
   public DataGroupNonQueryRequest(String groupID, List<PhysicalPlan> physicalPlanBytes)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/MetaGroupNonQueryRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/nonquery/MetaGroupNonQueryRequest.java
similarity index 86%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/MetaGroupNonQueryRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/nonquery/MetaGroupNonQueryRequest.java
index 69625ff..7b90e68 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/MetaGroupNonQueryRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/nonquery/MetaGroupNonQueryRequest.java
@@ -16,17 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.nonquery;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
 /**
  * Handle request to metadata group leader
  */
-public class MetaGroupNonQueryRequest extends BasicRequest implements Serializable {
+public class MetaGroupNonQueryRequest extends BasicRequest {
 
   public MetaGroupNonQueryRequest(String groupID, List<PhysicalPlan> plans)
       throws IOException {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/DataGroupNonQueryRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
similarity index 56%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/DataGroupNonQueryRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
index c1bcf5f..fd27997 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/DataGroupNonQueryRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
@@ -16,23 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querydata;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.List;
+import org.apache.iotdb.cluster.query.PathType;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
-/**
- * Handle request to data group
- */
-public class DataGroupNonQueryRequest extends BasicRequest implements Serializable {
+public class QuerySeriesDataRequest extends BasicQueryRequest {
 
+  private Stage stage;
+  private PathType pathType;
+  private List<String> paths;
 
-  public DataGroupNonQueryRequest(String groupID, List<PhysicalPlan> physicalPlanBytes)
+  public QuerySeriesDataRequest(String groupID, int readConsistencyLevel,
+      List<PhysicalPlan> physicalPlanBytes, PathType pathType)
       throws IOException {
-    super(groupID);
+    super(groupID, readConsistencyLevel);
     init(physicalPlanBytes);
+    stage = Stage.INITIAL;
+    this.pathType = pathType;
   }
 
+  public QuerySeriesDataRequest(String groupID, List<String> paths, PathType pathType)
+      throws IOException {
+    super(groupID);
+    this.paths = paths;
+    stage = Stage.READ_DATA;
+    this.pathType = pathType;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
similarity index 73%
copy from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
index 2628fb6..53d2e2a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
@@ -16,13 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querydata;
 
-import java.io.Serializable;
-
-public class QueryMetadataRequest extends BasicQueryRequest implements Serializable {
-
-  public QueryMetadataRequest(String groupID, int readConsistencyLevel) {
-    super(groupID, readConsistencyLevel);
-  }
-}
\ No newline at end of file
+public enum Stage {
+  INITIAL, READ_DATA
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataInStringRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryMetadataInStringRequest.java
similarity index 87%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataInStringRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryMetadataInStringRequest.java
index 18471a6..0e23dab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataInStringRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryMetadataInStringRequest.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querymetadata;
 
-import java.io.Serializable;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
 
-public class QueryMetadataInStringRequest extends BasicQueryRequest implements Serializable {
+public class QueryMetadataInStringRequest extends BasicQueryRequest {
 
   public QueryMetadataInStringRequest(String groupID, int readConsistencyLevel) {
     super(groupID, readConsistencyLevel);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryMetadataRequest.java
similarity index 82%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryMetadataRequest.java
index 2628fb6..3958775 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryMetadataRequest.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querymetadata;
 
-import java.io.Serializable;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
 
-public class QueryMetadataRequest extends BasicQueryRequest implements Serializable {
+public class QueryMetadataRequest extends BasicQueryRequest {
 
   public QueryMetadataRequest(String groupID, int readConsistencyLevel) {
     super(groupID, readConsistencyLevel);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryPathsRequest.java
similarity index 84%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryPathsRequest.java
index 2c600f4..49355ea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryPathsRequest.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querymetadata;
 
-import java.io.Serializable;
 import java.util.List;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
 
-public class QueryPathsRequest extends BasicQueryRequest implements Serializable {
+public class QueryPathsRequest extends BasicQueryRequest {
 
   private List<String> path;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QuerySeriesTypeRequest.java
similarity index 84%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QuerySeriesTypeRequest.java
index c486576..4f700dc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QuerySeriesTypeRequest.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querymetadata;
 
-import java.io.Serializable;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
 
-public class QuerySeriesTypeRequest extends BasicQueryRequest implements Serializable {
+public class QuerySeriesTypeRequest extends BasicQueryRequest {
 
   private String path;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryStorageGroupRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryStorageGroupRequest.java
similarity index 88%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryStorageGroupRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryStorageGroupRequest.java
index 037924f..2bcf187 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryStorageGroupRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryStorageGroupRequest.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querymetadata;
 
-import java.io.Serializable;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
 
-public class QueryStorageGroupRequest extends BasicQueryRequest implements Serializable {
+public class QueryStorageGroupRequest extends BasicQueryRequest {
 
   public QueryStorageGroupRequest(String groupID, int readConsistencyLevel) {
     super(groupID, readConsistencyLevel);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryTimeSeriesRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryTimeSeriesRequest.java
similarity index 84%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryTimeSeriesRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryTimeSeriesRequest.java
index 0106f18..0bad4fc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryTimeSeriesRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryTimeSeriesRequest.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querymetadata;
 
-import java.io.Serializable;
 import java.util.List;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
 
-public class QueryTimeSeriesRequest extends BasicQueryRequest implements Serializable {
+public class QueryTimeSeriesRequest extends BasicQueryRequest {
 
   private List<String> path;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/DataGroupNonQueryResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/nonquery/DataGroupNonQueryResponse.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/DataGroupNonQueryResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/nonquery/DataGroupNonQueryResponse.java
index 074f452..0950c0d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/DataGroupNonQueryResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/nonquery/DataGroupNonQueryResponse.java
@@ -16,7 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.nonquery;
+
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 
 /**
  * Handle response from data group leader
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/MetaGroupNonQueryResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/nonquery/MetaGroupNonQueryResponse.java
similarity index 91%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/MetaGroupNonQueryResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/nonquery/MetaGroupNonQueryResponse.java
index f662e35..2ec1165 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/MetaGroupNonQueryResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/nonquery/MetaGroupNonQueryResponse.java
@@ -16,7 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.nonquery;
+
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 
 /**
  * Handle response from metadata group leader
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querydata/QuerySeriesDataResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querydata/QuerySeriesDataResponse.java
new file mode 100644
index 0000000..9f62617
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querydata/QuerySeriesDataResponse.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response.querydata;
+
+import java.util.Map;
+import org.apache.iotdb.cluster.query.PathType;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class QuerySeriesDataResponse extends BasicResponse {
+
+  private long jobId;
+  private PathType pathType;
+  private Map<String, TSDataType> seriesType;
+  private Map<String, BatchData> seriesBatchData;
+
+  public QuerySeriesDataResponse(String groupId, PathType pathType) {
+    super(groupId, false, null, null);
+    this.pathType = pathType;
+  }
+
+  public QuerySeriesDataResponse setJobId(long jobId) {
+    this.jobId = jobId;
+    return this;
+  }
+
+  public QuerySeriesDataResponse setSeriesType(Map<String, TSDataType> seriesType) {
+    this.seriesType = seriesType;
+    return this;
+  }
+
+  public QuerySeriesDataResponse seySeriesBatchData(Map<String, BatchData> seriesBatchData) {
+    this.seriesBatchData = seriesBatchData;
+    return this;
+  }
+
+  public long getJobId() {
+    return jobId;
+  }
+
+  public PathType getPathType() {
+    return pathType;
+  }
+
+  public Map<String, TSDataType> getSeriesType() {
+    return seriesType;
+  }
+
+  public Map<String, BatchData> getSeriesBatchData() {
+    return seriesBatchData;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataInStringResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryMetadataInStringResponse.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataInStringResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryMetadataInStringResponse.java
index a3a963a..8967ea1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataInStringResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryMetadataInStringResponse.java
@@ -16,7 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.querymetadata;
+
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 
 public class QueryMetadataInStringResponse extends BasicResponse {
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryMetadataResponse.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryMetadataResponse.java
index 6c21798..3f0ad51 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryMetadataResponse.java
@@ -16,8 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.querymetadata;
 
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 import org.apache.iotdb.db.metadata.Metadata;
 
 public class QueryMetadataResponse extends BasicResponse {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryPathsResponse.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryPathsResponse.java
index 29d659a..47f51f7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryPathsResponse.java
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.querymetadata;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 
 public class QueryPathsResponse extends BasicResponse {
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QuerySeriesTypeResponse.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QuerySeriesTypeResponse.java
index e86e108..d772365 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QuerySeriesTypeResponse.java
@@ -16,8 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.querymetadata;
 
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class QuerySeriesTypeResponse extends BasicResponse {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryStorageGroupResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryStorageGroupResponse.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryStorageGroupResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryStorageGroupResponse.java
index 6abff89..4668e6d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryStorageGroupResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryStorageGroupResponse.java
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.querymetadata;
 
 import java.util.Set;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 
 public class QueryStorageGroupResponse extends BasicResponse {
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryTimeSeriesResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryTimeSeriesResponse.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryTimeSeriesResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryTimeSeriesResponse.java
index edeb4c4..2950b1f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryTimeSeriesResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryTimeSeriesResponse.java
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.querymetadata;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 
 public class QueryTimeSeriesResponse extends BasicResponse {
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index 79b2acd..d978f7a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.exception.ConsistencyLevelException;
 import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
 import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor;
-import org.apache.iotdb.cluster.query.coordinatornode.executor.ClusterQueryProcessExecutor;
+import org.apache.iotdb.cluster.qp.executor.ClusterQueryProcessExecutor;
 import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcQueryManager;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.conf.IoTDBConstant;
@@ -65,11 +65,13 @@ public class TSServiceClusterImpl extends TSServiceImpl {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TSServiceClusterImpl.class);
 
-  private ClusterRpcQueryManager queryManager = ClusterRpcQueryManager.getInstance();
-  private QueryProcessor processor = new QueryProcessor(new ClusterQueryProcessExecutor());
+  private ClusterQueryProcessExecutor queryDataExecutor = new ClusterQueryProcessExecutor();
   private NonQueryExecutor nonQueryExecutor = new NonQueryExecutor();
   private QueryMetadataExecutor queryMetadataExecutor = new QueryMetadataExecutor();
 
+  private ClusterRpcQueryManager queryManager = ClusterRpcQueryManager.getInstance();
+  private QueryProcessor queryProcessor = new QueryProcessor(queryDataExecutor);
+
   public TSServiceClusterImpl() throws IOException {
     super();
   }
@@ -126,7 +128,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
       /** find all valid physical plans **/
       for (int i = 0; i < statements.size(); i++) {
         try {
-          PhysicalPlan plan = processor
+          PhysicalPlan plan = queryProcessor
               .parseSQLToPhysicalPlan(statements.get(i), zoneIds.get());
           plan.setProposer(username.get());
 
@@ -242,13 +244,13 @@ public class TSServiceClusterImpl extends TSServiceImpl {
       if (Pattern.matches(ClusterConstant.SET_READ_METADATA_CONSISTENCY_LEVEL_PATTERN, statement)) {
         String[] splits = statement.split("\\s+");
         int level = Integer.parseInt(splits[splits.length - 1]);
-        nonQueryExecutor.setReadMetadataConsistencyLevel(level);
+        queryMetadataExecutor.setReadMetadataConsistencyLevel(level);
         return true;
       } else if (Pattern
           .matches(ClusterConstant.SET_READ_DATA_CONSISTENCY_LEVEL_PATTERN, statement)) {
         String[] splits = statement.split("\\s+");
         int level = Integer.parseInt(splits[splits.length - 1]);
-        nonQueryExecutor.setReadDataConsistencyLevel(level);
+        queryDataExecutor.setReadDataConsistencyLevel(level);
         return true;
       } else {
         return false;
@@ -295,7 +297,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
       throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException,
       ProcessorException, IOException {
     PhysicalPlan physicalPlan = queryStatus.get().get(statement);
-    processor.getExecutor().setFetchSize(fetchSize);
+    queryProcessor.getExecutor().setFetchSize(fetchSize);
 
     long jobId = QueryResourceManager.getInstance().assignJobId();
     QueryContext context = new QueryContext(jobId);
@@ -303,7 +305,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
     contextMapLocal.get().put(req.queryId, context);
 
     queryManager.addSingleQuery(jobId, (QueryPlan) physicalPlan);
-    QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan,
+    QueryDataSet queryDataSet = queryProcessor.getExecutor().processQuery((QueryPlan) physicalPlan,
         context);
     queryRet.get().put(statement, queryDataSet);
     return queryDataSet;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 5c3f3cb..d2d5429 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -47,7 +47,7 @@ import org.apache.iotdb.cluster.rpc.raft.closure.ResponseClosure;
 import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.MetaGroupNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.nonquery.MetaGroupNonQueryResponse;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
 import org.apache.iotdb.cluster.utils.hash.Router;
 import org.slf4j.Logger;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
new file mode 100644
index 0000000..35ff3fb
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.utils.query;
+
+import com.alipay.sofa.jraft.entity.PeerId;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
+import org.apache.iotdb.cluster.query.PathType;
+import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterRpcBatchDataReader;
+import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterSeriesReader;
+import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.reader.IBatchReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class ClusterRpcReaderUtils {
+
+  /**
+   * Count limit to redo a task
+   */
+  private static final int TASK_MAX_RETRY = ClusterDescriptor.getInstance().getConfig()
+      .getQpTaskRedoCount();
+
+  public static Map<String, ClusterSeriesReader> createClusterSeriesReader(String groupId,
+      PeerId peerId, QueryPlan queryPlan, int readDataConsistencyLevel, PathType pathType)
+      throws IOException, RaftConnectionException {
+
+    /** handle request **/
+    List<PhysicalPlan> physicalPlanList = new ArrayList<>();
+    physicalPlanList.add(queryPlan);
+    BasicRequest request = new QuerySeriesDataRequest(groupId, readDataConsistencyLevel,
+        physicalPlanList, pathType);
+    QuerySeriesDataResponse response = (QuerySeriesDataResponse) handleQueryRequest(request, peerId,
+        0);
+
+    /** create cluster series reader **/
+    Map<String, ClusterSeriesReader> allSeriesReader = new HashMap<>();
+    Map<String, TSDataType> seriesType = response.getSeriesType();
+    Map<String, BatchData> seriesBatchData = response.getSeriesBatchData();
+    long jobId = response.getJobId();
+    for (Entry<String, TSDataType> entry : seriesType.entrySet()) {
+      String seriesPath = entry.getKey();
+      TSDataType dataType = entry.getValue();
+      IBatchReader batchDataReader = new ClusterRpcBatchDataReader(peerId, jobId,
+          PathType.SELECT_PATH, seriesBatchData.get(seriesPath));
+      ClusterSeriesReader seriesReader = new ClusterSeriesReader(batchDataReader, seriesPath,
+          dataType);
+      allSeriesReader.put(seriesPath, seriesReader);
+    }
+    return allSeriesReader;
+  }
+
+  private static BasicResponse handleQueryRequest(BasicRequest request, PeerId peerId,
+      int taskRetryNum)
+      throws RaftConnectionException {
+    if (taskRetryNum > TASK_MAX_RETRY) {
+      throw new RaftConnectionException(
+          String.format("Query request retries reach the upper bound %s",
+              TASK_MAX_RETRY));
+    }
+    NodeAsClient nodeAsClient = RaftUtils.getRaftNodeAsClient();
+    QueryTask queryTask = nodeAsClient.syncHandleRequest(request, peerId);
+    if (queryTask.getState() == TaskState.FINISH) {
+      return queryTask.getBasicResponse();
+    } else {
+      return handleQueryRequest(request, peerId, taskRetryNum + 1);
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/QueryTask.java
similarity index 54%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/utils/query/QueryTask.java
index 5b75c9d..69f72b2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/QueryTask.java
@@ -16,33 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.query.coordinatornode.reader;
+package org.apache.iotdb.cluster.utils.query;
 
-import java.io.IOException;
-import org.apache.iotdb.cluster.query.PathType;
-import org.apache.iotdb.db.query.reader.IBatchReader;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 
-public class ClusterRpcBatchDataReader implements IBatchReader {
+public class QueryTask {
+  private BasicResponse basicResponse;
+  private TaskState state;
 
-  private String PeerId;
-  private String jobId;
-  private PathType type;
-  private BatchData batchData;
+  public QueryTask(BasicResponse basicResponse,
+      TaskState state) {
+    this.basicResponse = basicResponse;
+    this.state = state;
+  }
 
-  @Override
-  public boolean hasNext() throws IOException {
-    return false;
+  public BasicResponse getBasicResponse() {
+    return basicResponse;
   }
 
-  @Override
-  public BatchData nextBatch() throws IOException {
-    return null;
+  public void setBasicResponse(BasicResponse basicResponse) {
+    this.basicResponse = basicResponse;
   }
 
-  @Override
-  public void close() throws IOException {
+  public TaskState getState() {
+    return state;
+  }
 
+  public void setState(TaskState state) {
+    this.state = state;
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
index d35746d..85a692d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
@@ -40,7 +40,7 @@ import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.qp.task.QPTask;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.nonquery.DataGroupNonQueryRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
 import org.apache.iotdb.cluster.utils.hash.Router;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
index 68cde76..44d0a58 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@ -140,5 +140,4 @@ public interface IQueryProcessExecutor {
 
   void setFetchSize(int fetchSize);
 
-  IEngineQueryRouter getQueryRouter();
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 58702d8..9a4566e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -185,14 +185,14 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
       QueryContext context)
       throws ProcessorException, FileNodeManagerException, QueryFilterOptimizationException,
       PathErrorException, IOException {
-    return getQueryRouter().aggregate(paths, aggres, expression, context);
+    return queryRouter.aggregate(paths, aggres, expression, context);
   }
 
   @Override
   public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
       QueryContext context)
       throws ProcessorException, IOException, PathErrorException, FileNodeManagerException {
-    return getQueryRouter().fill(fillPaths, queryTime, fillTypes, context);
+    return queryRouter.fill(fillPaths, queryTime, fillTypes, context);
   }
 
   @Override
@@ -200,7 +200,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
       long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context)
       throws ProcessorException, FileNodeManagerException, QueryFilterOptimizationException,
       PathErrorException, IOException {
-    return getQueryRouter().groupBy(paths, aggres, expression, unit, origin, intervals, context);
+    return queryRouter.groupBy(paths, aggres, expression, unit, origin, intervals, context);
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 54c750b..bef0328 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -43,7 +43,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 public abstract class QueryProcessExecutor implements IQueryProcessExecutor {
 
   protected ThreadLocal<Integer> fetchSize = new ThreadLocal<>();
-  private IEngineQueryRouter queryRouter = new EngineQueryRouter();
+  protected IEngineQueryRouter queryRouter = new EngineQueryRouter();
 
   @Override
   public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
@@ -69,7 +69,7 @@ public abstract class QueryProcessExecutor implements IQueryProcessExecutor {
       return fill(queryPlan.getPaths(), fillQueryPlan.getQueryTime(),
           fillQueryPlan.getFillType(), context);
     }
-    return getQueryRouter().query(queryExpression, context);
+    return queryRouter.query(queryExpression, context);
   }
 
   @Override
@@ -86,11 +86,6 @@ public abstract class QueryProcessExecutor implements IQueryProcessExecutor {
   }
 
   @Override
-  public IEngineQueryRouter getQueryRouter() {
-    return queryRouter;
-  }
-
-  @Override
   public boolean delete(List<Path> paths, long deleteTime) throws ProcessorException {
     try {
       boolean result = true;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index fac4765..aeb789e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -28,8 +29,9 @@ import org.apache.iotdb.tsfile.utils.Binary;
  * <code>BatchData</code> is a self-defined data structure which is optimized for different type of
  * values. This class can be viewed as a collection which is more efficient than ArrayList.
  */
-public class BatchData {
+public class BatchData implements Serializable {
 
+  private static final long serialVersionUID = -4620310601188394839L;
   private int timeCapacity = 1;
   private int valueCapacity = 1;
   private int emptyTimeCapacity = 1;