You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/16 12:27:07 UTC

[incubator-iotdb] 16/19: add query series data processor

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 446cec98fd748105ba4c73728e655d62f2b307b6
Author: lta <li...@163.com>
AuthorDate: Tue Apr 16 19:02:17 2019 +0800

    add query series data processor
---
 .../iotdb/cluster/config/ClusterConstant.java      |   2 +
 .../org/apache/iotdb/cluster/entity/Server.java    |  29 ++++-
 .../cluster/qp/executor/NonQueryExecutor.java      |   5 +-
 .../executor/ClusterExecutorWithTimeGenerator.java |   6 +-
 .../ClusterExecutorWithoutTimeGenerator.java       |   6 +-
 .../executor/ClusterQueryRouter.java               |   6 +-
 .../manager/ClusterRpcQueryManager.java            |   8 +-
 ...ager.java => ClusterRpcSingleQueryManager.java} |   4 +-
 ...ger.java => IClusterRpcSingleQueryManager.java} |   4 +-
 .../manager/ClusterLocalQueryManager.java          |  74 +++++++++++
 .../manager/ClusterLocalSingleQueryManager.java    | 142 +++++++++++++++++++++
 .../querydata/QuerySeriesDataSyncProcessor.java    |  62 +++++++++
 .../request/querydata/QuerySeriesDataRequest.java  |  36 +++++-
 .../cluster/rpc/raft/request/querydata/Stage.java  |   2 +-
 .../querydata/QuerySeriesDataResponse.java         |  25 ++--
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  |  41 +++++-
 .../cluster/utils/query/ClusterRpcReaderUtils.java |  14 +-
 .../dataset/EngineDataSetWithoutTimeGenerator.java |   4 +
 18 files changed, 422 insertions(+), 48 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
index 5aca9b0..b78ce56 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
@@ -37,4 +37,6 @@ public class ClusterConstant {
    */
   public static final int CLOSE_QP_SUB_TASK_BLOCK_TIMEOUT = 1000;
 
+  public static final int BATCH_READ_SIZE = 1000;
+
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index 1f0e4e3..acd57f8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
 import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.DataGroupNonQueryAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.MetaGroupNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querydata.QuerySeriesDataSyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryMetadataAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryMetadataInStringAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryPathsAsyncProcessor;
@@ -95,13 +96,9 @@ public class Server {
     RpcServer rpcServer = new RpcServer(serverId.getPort());
     RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
 
-    rpcServer.registerUserProcessor(new DataGroupNonQueryAsyncProcessor());
-    rpcServer.registerUserProcessor(new MetaGroupNonQueryAsyncProcessor());
-    rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor());
-    rpcServer.registerUserProcessor(new QueryMetadataInStringAsyncProcessor());
-    rpcServer.registerUserProcessor(new QueryMetadataAsyncProcessor());
-    rpcServer.registerUserProcessor(new QuerySeriesTypeAsyncProcessor());
-    rpcServer.registerUserProcessor(new QueryPathsAsyncProcessor());
+    registerNonQueryProcessor(rpcServer);
+    registerQueryMetadataProcessor(rpcServer);
+    registerQueryDataProcessor(rpcServer);
 
     metadataHolder = new MetadataRaftHolder(peerIds, serverId, rpcServer, true);
     metadataHolder.init();
@@ -127,6 +124,24 @@ public class Server {
 
   }
 
+  private void registerNonQueryProcessor(RpcServer rpcServer) {
+    rpcServer.registerUserProcessor(new DataGroupNonQueryAsyncProcessor());
+    rpcServer.registerUserProcessor(new MetaGroupNonQueryAsyncProcessor());
+  }
+
+  private void registerQueryMetadataProcessor(RpcServer rpcServer) {
+    rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor());
+    rpcServer.registerUserProcessor(new QueryMetadataInStringAsyncProcessor());
+    rpcServer.registerUserProcessor(new QueryMetadataAsyncProcessor());
+    rpcServer.registerUserProcessor(new QuerySeriesTypeAsyncProcessor());
+    rpcServer.registerUserProcessor(new QueryPathsAsyncProcessor());
+    rpcServer.registerUserProcessor(new QuerySeriesTypeAsyncProcessor());
+  }
+
+  private void registerQueryDataProcessor(RpcServer rpcServer) {
+    rpcServer.registerUserProcessor(new QuerySeriesDataSyncProcessor());
+  }
+
   public void stop() throws ProcessorException {
     QPTaskManager.getInstance().close(true, ClusterConstant.CLOSE_QP_SUB_TASK_BLOCK_TIMEOUT);
     iotdb.deactivate();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index b6247c8..1420370 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -100,10 +100,13 @@ public class NonQueryExecutor extends AbstractQPExecutor {
    * @param batchResult batch result
    */
   public void processBatch(PhysicalPlan[] physicalPlans, BatchResult batchResult)
-      throws InterruptedException {
+      throws InterruptedException, ProcessorException {
 
     Status nullReadTaskStatus = Status.OK();
     RaftUtils.handleNullReadToMetaGroup(nullReadTaskStatus);
+    if(!nullReadTaskStatus.isOk()){
+      throw new ProcessorException("Null read while processing batch failed");
+    }
     nullReaderEnable = false;
 
     /** 1. Classify physical plans by group id **/
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
index 68a14c8..6c0354a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.cluster.query.coordinatornode.executor;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.iotdb.cluster.query.coordinatornode.manager.IClusterSingleQueryManager;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.IClusterRpcSingleQueryManager;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
@@ -39,11 +39,11 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 public class ClusterExecutorWithTimeGenerator {
 
   private QueryExpression queryExpression;
-  private IClusterSingleQueryManager queryManager;
+  private IClusterRpcSingleQueryManager queryManager;
   private int readDataConsistencyLevel;
 
   ClusterExecutorWithTimeGenerator(QueryExpression queryExpression,
-      IClusterSingleQueryManager queryManager, int readDataConsistencyLevel) {
+      IClusterRpcSingleQueryManager queryManager, int readDataConsistencyLevel) {
     this.queryExpression = queryExpression;
     this.queryManager = queryManager;
     this.readDataConsistencyLevel = readDataConsistencyLevel;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
index 65b7d8f..e52e604 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcSingleQueryManager;
 import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterSeriesReader;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -45,11 +45,11 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 public class ClusterExecutorWithoutTimeGenerator {
   private QueryExpression queryExpression;
-  private ClusterSingleQueryManager queryManager;
+  private ClusterRpcSingleQueryManager queryManager;
   private int readDataConsistencyLevel;
 
   public ClusterExecutorWithoutTimeGenerator(QueryExpression queryExpression,
-      ClusterSingleQueryManager queryManager, int readDataConsistencyLevel) {
+      ClusterRpcSingleQueryManager queryManager, int readDataConsistencyLevel) {
     this.queryExpression = queryExpression;
     this.queryManager = queryManager;
     this.readDataConsistencyLevel = readDataConsistencyLevel;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
index da5a395..6f07e58 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
@@ -23,8 +23,8 @@ import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcQueryManager;
-import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager;
-import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager.QueryType;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcSingleQueryManager.QueryType;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
@@ -49,7 +49,7 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
   public QueryDataSet query(QueryExpression queryExpression, QueryContext context)
       throws FileNodeManagerException, PathErrorException {
 
-    ClusterSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance()
+    ClusterRpcSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance()
         .getSingleQuery(context.getJobId());
     try {
       if (queryExpression.hasQueryFilter()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
index f33fe05..bdebd1a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
@@ -27,21 +27,21 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 public class ClusterRpcQueryManager{
 
   /**
-   * Key is group id, value is manager of a client query.
+   * Key is job id, value is manager of a client query.
    */
-  ConcurrentHashMap<Long, ClusterSingleQueryManager> singleQueryManagerMap = new ConcurrentHashMap<>();
+  ConcurrentHashMap<Long, ClusterRpcSingleQueryManager> singleQueryManagerMap = new ConcurrentHashMap<>();
 
   /**
    * Add a query
    */
   public void addSingleQuery(long jobId, QueryPlan physicalPlan){
-    singleQueryManagerMap.put(jobId, new ClusterSingleQueryManager(jobId, physicalPlan));
+    singleQueryManagerMap.put(jobId, new ClusterRpcSingleQueryManager(jobId, physicalPlan));
   }
 
   /**
    * Get query manager by group id
    */
-  public ClusterSingleQueryManager getSingleQuery(long jobId) {
+  public ClusterRpcSingleQueryManager getSingleQuery(long jobId) {
     return singleQueryManagerMap.get(jobId);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcSingleQueryManager.java
similarity index 98%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcSingleQueryManager.java
index d350a7c..ae982a1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcSingleQueryManager.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.tsfile.read.common.Path;
 
-public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
+public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManager {
 
   /**
    * Query job id assigned by QueryResourceManager of coordinator node.
@@ -68,7 +68,7 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
 
   private Map<String, ClusterSeriesReader> filterPathReaders = new HashMap<>();
 
-  public ClusterSingleQueryManager(long jobId,
+  public ClusterRpcSingleQueryManager(long jobId,
       QueryPlan queryPlan) {
     this.jobId = jobId;
     this.queryPlan = queryPlan;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterRpcSingleQueryManager.java
similarity index 96%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterRpcSingleQueryManager.java
index bc70e65..01cdb8a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterRpcSingleQueryManager.java
@@ -21,14 +21,14 @@ package org.apache.iotdb.cluster.query.coordinatornode.manager;
 import com.alipay.sofa.jraft.entity.PeerId;
 import java.io.IOException;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
-import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager.QueryType;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcSingleQueryManager.QueryType;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
 /**
  * Manage a single query.
  */
-public interface IClusterSingleQueryManager {
+public interface IClusterRpcSingleQueryManager {
 
   /**
    * Divide physical plan into several sub physical plans according to timeseries full path.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/querynode/manager/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/querynode/manager/ClusterLocalQueryManager.java
new file mode 100644
index 0000000..f23e70e
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/querynode/manager/ClusterLocalQueryManager.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.querynode.manager;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+
+public class ClusterLocalQueryManager {
+
+  /**
+   * Key is job id, value is manager of a client query.
+   */
+  ConcurrentHashMap<Long, ClusterLocalSingleQueryManager> singleQueryManagerMap = new ConcurrentHashMap<>();
+
+  private ClusterLocalQueryManager() {
+  }
+
+  public void createQueryDataSet(QuerySeriesDataRequest request, QuerySeriesDataResponse response)
+      throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException {
+    long jobId = QueryResourceManager.getInstance().assignJobId();
+    response.setJobId(jobId);
+    ClusterLocalSingleQueryManager localQueryManager = new ClusterLocalSingleQueryManager(jobId);
+    localQueryManager.init(request, response);
+    singleQueryManagerMap.put(jobId, localQueryManager);
+  }
+
+  public void readBatchData(QuerySeriesDataRequest request, QuerySeriesDataResponse response)
+      throws IOException {
+    long jobId = request.getJobId();
+    singleQueryManagerMap.get(jobId).readBatchData(request, response);
+  }
+
+  public void close(long jobId) throws FileNodeManagerException {
+    if(singleQueryManagerMap.containsKey(jobId)){
+      singleQueryManagerMap.remove(jobId).close();
+    }
+  }
+
+  public static final ClusterLocalQueryManager getInstance() {
+    return ClusterLocalQueryManager.ClusterLocalQueryManagerHolder.INSTANCE;
+  }
+
+  private static class ClusterLocalQueryManagerHolder {
+
+    private static final ClusterLocalQueryManager INSTANCE = new ClusterLocalQueryManager();
+
+    private ClusterLocalQueryManagerHolder() {
+
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/querynode/manager/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/querynode/manager/ClusterLocalSingleQueryManager.java
new file mode 100644
index 0000000..cf9c9f5
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/querynode/manager/ClusterLocalSingleQueryManager.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.querynode.manager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
+import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.ExpressionType;
+
+public class ClusterLocalSingleQueryManager {
+
+  private long jobId;
+
+  /**
+   * Key is series full path, value is reader of series
+   */
+  private Map<String, IPointReader> seriesReaders = new HashMap<>();
+
+  /**
+   * Key is series full path, value is data type of series
+   */
+  private Map<String, TSDataType> dataTypeMap = new HashMap<>();
+
+  private QueryProcessExecutor queryProcessExecutor = new OverflowQPExecutor();
+
+  public ClusterLocalSingleQueryManager(long jobId) {
+    this.jobId = jobId;
+  }
+
+  /**
+   * Init create series reader.
+   */
+  public void init(QuerySeriesDataRequest request, QuerySeriesDataResponse response)
+      throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException {
+    List<byte[]> planBytes = request.getPhysicalPlanBytes();
+    for (byte[] planByte : planBytes) {
+      QueryPlan plan = (QueryPlan) PhysicalPlanLogTransfer.logToOperator(planByte);
+      if (plan instanceof GroupByPlan) {
+        throw new UnsupportedOperationException();
+      } else if (plan instanceof AggregationPlan) {
+        throw new UnsupportedOperationException();
+      } else {
+        QueryContext context = new QueryContext(jobId);
+        if (plan.getExpression() == null
+            || plan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
+          handleDataSetWithoutTimeGenerator(plan, context, request, response);
+        } else {
+          throw new UnsupportedOperationException();
+        }
+
+      }
+    }
+  }
+
+  private void handleDataSetWithoutTimeGenerator(QueryPlan plan, QueryContext context,
+      QuerySeriesDataRequest request, QuerySeriesDataResponse response)
+      throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException, ProcessorException, IOException {
+    EngineDataSetWithoutTimeGenerator queryDataSet = (EngineDataSetWithoutTimeGenerator) queryProcessExecutor
+        .processQuery(plan, context);
+    List<Path> paths = plan.getPaths();
+    List<IPointReader> readers = queryDataSet.getReaders();
+    List<TSDataType> dataTypes = queryDataSet.getDataTypes();
+    for (int i = 0; i < paths.size(); i++) {
+      String fullPath = paths.get(i).getFullPath();
+      IPointReader reader = readers.get(i);
+      seriesReaders.put(fullPath, reader);
+      dataTypeMap.put(fullPath, dataTypes.get(i));
+    }
+    response.setSeriesType(dataTypes);
+    readBatchData(request, response);
+  }
+
+  /**
+   * Read batch data
+   */
+  public void readBatchData(QuerySeriesDataRequest request, QuerySeriesDataResponse response)
+      throws IOException {
+    List<String> paths = request.getPaths();
+    List<BatchData> batchDataList = new ArrayList<>();
+    for (String fullPath : paths) {
+      BatchData batchData = new BatchData(dataTypeMap.get(fullPath));
+      IPointReader reader = seriesReaders.get(fullPath);
+      for (int i = 0; i < ClusterConstant.BATCH_READ_SIZE; i++) {
+        if (reader.hasNext()) {
+          TimeValuePair pair = reader.next();
+          batchData.putTime(pair.getTimestamp());
+          batchData.putAnObject(pair.getValue().getValue());
+        } else {
+          break;
+        }
+      }
+      batchDataList.add(batchData);
+    }
+    response.setSeriesBatchData(batchDataList);
+  }
+
+  /**
+   * Release query resource
+   */
+  public void close() throws FileNodeManagerException {
+    QueryResourceManager.getInstance().endQueryForGivenJob(jobId);
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querydata/QuerySeriesDataSyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querydata/QuerySeriesDataSyncProcessor.java
new file mode 100644
index 0000000..d2064ba
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querydata/QuerySeriesDataSyncProcessor.java
@@ -0,0 +1,62 @@
+package org.apache.iotdb.cluster.rpc.raft.processor.querydata;
+
+import com.alipay.remoting.BizContext;
+import com.alipay.sofa.jraft.Status;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.query.PathType;
+import org.apache.iotdb.cluster.query.querynode.manager.ClusterLocalQueryManager;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicSyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querydata.Stage;
+import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.db.exception.ProcessorException;
+
+public class QuerySeriesDataSyncProcessor extends
+    BasicSyncUserProcessor<QuerySeriesDataRequest> {
+
+  @Override
+  public Object handleRequest(BizContext bizContext, QuerySeriesDataRequest request)
+      throws Exception {
+    Stage stage = request.getStage();
+    String groupId = request.getGroupID();
+    PathType pathType = request.getPathType();
+    QuerySeriesDataResponse response = new QuerySeriesDataResponse(groupId, pathType);
+    switch (stage) {
+      case INITIAL:
+        handleNullRead(request.getReadConsistencyLevel(), groupId);
+        ClusterLocalQueryManager.getInstance().createQueryDataSet(request, response);
+        break;
+      case READ_DATA:
+        ClusterLocalQueryManager.getInstance().readBatchData(request, response);
+        break;
+      case CLOSE:
+        ClusterLocalQueryManager.getInstance().close(request.getJobId());
+        break;
+      default:
+        throw new UnsupportedOperationException();
+    }
+    return response;
+  }
+
+  /**
+   * It's necessary to do null read while creating query data set with a strong consistency level.
+   *
+   * @param readConsistencyLevel read concistency level
+   * @param groupId group id
+   */
+  private void handleNullRead(int readConsistencyLevel, String groupId) throws ProcessorException {
+    if (readConsistencyLevel == ClusterConstant.STRONG_CONSISTENCY_LEVEL) {
+      Status nullReadTaskStatus = Status.OK();
+      RaftUtils.handleNullReadToDataGroup(nullReadTaskStatus, groupId);
+      if (!nullReadTaskStatus.isOk()) {
+        throw new ProcessorException("Null read to data group failed");
+      }
+    }
+  }
+
+  @Override
+  public String interest() {
+    return QuerySeriesDataSyncProcessor.class.getName();
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
index fd27997..4974c46 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 public class QuerySeriesDataRequest extends BasicQueryRequest {
 
   private Stage stage;
+  private long jobId;
   private PathType pathType;
   private List<String> paths;
 
@@ -39,11 +40,44 @@ public class QuerySeriesDataRequest extends BasicQueryRequest {
     this.pathType = pathType;
   }
 
-  public QuerySeriesDataRequest(String groupID, List<String> paths, PathType pathType)
+  public QuerySeriesDataRequest(String groupID, long jobId, List<String> paths, PathType pathType)
       throws IOException {
     super(groupID);
     this.paths = paths;
     stage = Stage.READ_DATA;
+    this.jobId = jobId;
     this.pathType = pathType;
   }
+
+  public Stage getStage() {
+    return stage;
+  }
+
+  public void setStage(Stage stage) {
+    this.stage = stage;
+  }
+
+  public PathType getPathType() {
+    return pathType;
+  }
+
+  public void setPathType(PathType pathType) {
+    this.pathType = pathType;
+  }
+
+  public List<String> getPaths() {
+    return paths;
+  }
+
+  public void setPaths(List<String> paths) {
+    this.paths = paths;
+  }
+
+  public long getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(long jobId) {
+    this.jobId = jobId;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
index 53d2e2a..0687927 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
@@ -19,5 +19,5 @@
 package org.apache.iotdb.cluster.rpc.raft.request.querydata;
 
 public enum Stage {
-  INITIAL, READ_DATA
+  INITIAL, READ_DATA, CLOSE
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querydata/QuerySeriesDataResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querydata/QuerySeriesDataResponse.java
index 9f62617..b77a3f3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querydata/QuerySeriesDataResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querydata/QuerySeriesDataResponse.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.cluster.rpc.raft.response.querydata;
 
-import java.util.Map;
+import java.util.List;
 import org.apache.iotdb.cluster.query.PathType;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -28,27 +28,30 @@ public class QuerySeriesDataResponse extends BasicResponse {
 
   private long jobId;
   private PathType pathType;
-  private Map<String, TSDataType> seriesType;
-  private Map<String, BatchData> seriesBatchData;
+  private List<TSDataType> seriesType;
+  private List<BatchData> seriesBatchData;
 
   public QuerySeriesDataResponse(String groupId, PathType pathType) {
     super(groupId, false, null, null);
     this.pathType = pathType;
   }
 
-  public QuerySeriesDataResponse setJobId(long jobId) {
+  public void setJobId(long jobId) {
     this.jobId = jobId;
-    return this;
   }
 
-  public QuerySeriesDataResponse setSeriesType(Map<String, TSDataType> seriesType) {
+  public void setPathType(PathType pathType) {
+    this.pathType = pathType;
+  }
+
+  public void setSeriesType(
+      List<TSDataType> seriesType) {
     this.seriesType = seriesType;
-    return this;
   }
 
-  public QuerySeriesDataResponse seySeriesBatchData(Map<String, BatchData> seriesBatchData) {
+  public void setSeriesBatchData(
+      List<BatchData> seriesBatchData) {
     this.seriesBatchData = seriesBatchData;
-    return this;
   }
 
   public long getJobId() {
@@ -59,11 +62,11 @@ public class QuerySeriesDataResponse extends BasicResponse {
     return pathType;
   }
 
-  public Map<String, TSDataType> getSeriesType() {
+  public List<TSDataType> getSeriesType() {
     return seriesType;
   }
 
-  public Map<String, BatchData> getSeriesBatchData() {
+  public List<BatchData> getSeriesBatchData() {
     return seriesBatchData;
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index d2d5429..f007027 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.cluster.rpc.raft.closure.ResponseClosure;
 import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.nonquery.DataGroupNonQueryResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.nonquery.MetaGroupNonQueryResponse;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
 import org.apache.iotdb.cluster.utils.hash.Router;
@@ -273,18 +274,16 @@ public class RaftUtils {
 
   /**
    * Handle null-read process in metadata group if the request is to set path.
-   *
-   * @param status status to return result if this node is leader of the data group
    */
   public static void handleNullReadToMetaGroup(Status status) {
     SingleQPTask nullReadTask = new SingleQPTask(false, null);
     handleNullReadToMetaGroup(status, server, nullReadTask);
   }
 
-  public static void handleNullReadToMetaGroup(Status status, Server server,
+  private static void handleNullReadToMetaGroup(Status status, Server server,
       SingleQPTask nullReadTask) {
     try {
-      LOGGER.debug("Handle null-read in meta group for adding path request.");
+      LOGGER.debug("Handle null-read in meta group for metadata request.");
       final byte[] reqContext = RaftUtils.createRaftRequestContext();
       MetadataRaftHolder metadataRaftHolder = (MetadataRaftHolder) server.getMetadataHolder();
       ((RaftService) metadataRaftHolder.getService()).getNode()
@@ -307,6 +306,40 @@ public class RaftUtils {
     }
   }
 
+  /**
+   * Handle null-read process in data group while reading process
+   */
+  public static void handleNullReadToDataGroup(Status status, String groupId) {
+    SingleQPTask nullReadTask = new SingleQPTask(false, null);
+    handleNullReadToDataGroup(status, server, nullReadTask, groupId);
+  }
+
+  private static void handleNullReadToDataGroup(Status status, Server server,
+      SingleQPTask nullReadTask, String groupId) {
+    try {
+      LOGGER.debug("Handle null-read in data group for reading.");
+      final byte[] reqContext = RaftUtils.createRaftRequestContext();
+      DataPartitionRaftHolder dataPartitionRaftHolder = (DataPartitionRaftHolder) server.getDataPartitionHolder(groupId);
+      ((RaftService) dataPartitionRaftHolder.getService()).getNode()
+          .readIndex(reqContext, new ReadIndexClosure() {
+            @Override
+            public void run(Status status, long index, byte[] reqCtx) {
+              BasicResponse response = DataGroupNonQueryResponse
+                  .createEmptyResponse(groupId);
+              if (!status.isOk()) {
+                status.setCode(-1);
+                status.setErrorMsg(status.getErrorMsg());
+              }
+              nullReadTask.run(response);
+            }
+          });
+      nullReadTask.await();
+    } catch (InterruptedException e) {
+      status.setCode(-1);
+      status.setErrorMsg(e.getMessage());
+    }
+  }
+
   public static Status createErrorStatus(String errorMsg){
     Status status = new Status();
     status.setErrorMsg(errorMsg);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
index 35ff3fb..510112c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.reader.IBatchReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
 
 public class ClusterRpcReaderUtils {
 
@@ -65,14 +66,15 @@ public class ClusterRpcReaderUtils {
 
     /** create cluster series reader **/
     Map<String, ClusterSeriesReader> allSeriesReader = new HashMap<>();
-    Map<String, TSDataType> seriesType = response.getSeriesType();
-    Map<String, BatchData> seriesBatchData = response.getSeriesBatchData();
+    List<Path> paths = queryPlan.getPaths();
+    List<TSDataType> seriesType = response.getSeriesType();
+    List<BatchData> seriesBatchData = response.getSeriesBatchData();
     long jobId = response.getJobId();
-    for (Entry<String, TSDataType> entry : seriesType.entrySet()) {
-      String seriesPath = entry.getKey();
-      TSDataType dataType = entry.getValue();
+    for (int i =0 ; i < paths.size(); i++) {
+      String seriesPath = paths.get(i).getFullPath();
+      TSDataType dataType = seriesType.get(i);
       IBatchReader batchDataReader = new ClusterRpcBatchDataReader(peerId, jobId,
-          PathType.SELECT_PATH, seriesBatchData.get(seriesPath));
+          pathType, seriesBatchData.get(i));
       ClusterSeriesReader seriesReader = new ClusterSeriesReader(batchDataReader, seriesPath,
           dataType);
       allSeriesReader.put(seriesPath, seriesReader);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java
index 73fc71f..bc9bb08 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java
@@ -154,4 +154,8 @@ public class EngineDataSetWithoutTimeGenerator extends QueryDataSet {
     timeSet.remove(t);
     return t;
   }
+
+  public List<IPointReader> getReaders() {
+    return readers;
+  }
 }