You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/16 12:27:02 UTC

[incubator-iotdb] 11/19: add query manager

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit fda20ccccd25d7bad0d42fbf1a03a2033eb1ed15
Author: lta <li...@163.com>
AuthorDate: Mon Apr 15 22:42:45 2019 +0800

    add query manager
---
 .../cluster/qp/executor/AbstractQPExecutor.java    |   2 +-
 .../cluster/qp/executor/QueryMetadataExecutor.java |   2 +-
 ...cutor.java => ClusterQueryProcessExecutor.java} | 102 +++++++++------
 .../manager/ClusterRpcQueryManager.java            |  47 +++----
 .../manager/ClusterSingleQueryManager.java         | 106 +++++++++++++++
 ...anager.java => IClusterSingleQueryManager.java} |  42 +++---
 .../cluster/service/TSServiceClusterImpl.java      |  50 ++++---
 .../org/apache/iotdb/db/qp/QueryProcessor.java     |  13 +-
 .../db/qp/executor/IQueryProcessExecutor.java      | 144 +++++++++++++++++++++
 .../iotdb/db/qp/executor/OverflowQPExecutor.java   |  11 +-
 ...cessExecutor.java => QueryProcessExecutor.java} | 103 ++-------------
 .../db/qp/logical/crud/BasicFunctionOperator.java  |   5 +-
 .../iotdb/db/qp/logical/crud/FilterOperator.java   |   7 +-
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |   4 +-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |   8 +-
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |   7 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  20 ++-
 .../apache/iotdb/db/qp/utils/MemIntQpExecutor.java |   7 +-
 .../EngineDataSetWithTimeGeneratorTest.java        |   4 +-
 19 files changed, 457 insertions(+), 227 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index 654d13c..6463ed3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -69,7 +69,7 @@ public abstract class AbstractQPExecutor {
   /**
    * The task in progress.
    */
-  protected ThreadLocal<QPTask> currentTask;
+  protected ThreadLocal<QPTask> currentTask = new ThreadLocal<>();
 
   /**
    * Count limit to redo a single task
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index f1098f8..0702548 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -227,7 +227,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
 
   public TSDataType processSeriesTypeQuery(String path)
       throws InterruptedException, ProcessorException, PathErrorException {
-    TSDataType dataType = null;
+    TSDataType dataType;
     List<String> storageGroupList = mManager.getAllFileNamesByPath(path);
     if (storageGroupList.size() != 1) {
       throw new PathErrorException("path " + path + " is not valid.");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/QueryProcessorExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java
similarity index 71%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/QueryProcessorExecutor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java
index e01e72d..7b51a4f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/QueryProcessorExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java
@@ -26,8 +26,10 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
-import org.apache.iotdb.db.qp.executor.AbstractQueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
 import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -36,61 +38,31 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-public class QueryProcessorExecutor extends AbstractQueryProcessExecutor {
+public class ClusterQueryProcessExecutor extends QueryProcessExecutor {
 
-  private QueryMetadataExecutor queryMetadataExecutor = new QueryMetadataExecutor();
-
-  public QueryProcessorExecutor() {
-    super(new ClusterQueryRouter());
-  }
+  private IEngineQueryRouter queryRouter = new ClusterQueryRouter();
 
-  @Override
-  public boolean judgePathExists(Path fullPath) {
-    return false;
-  }
+  private QueryMetadataExecutor queryMetadataExecutor = new QueryMetadataExecutor();
 
   @Override
   public QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression,
       QueryContext context)
       throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException {
-    return null;
+    return queryRouter.aggregate(paths, aggres, expression, context);
   }
 
   @Override
   public QueryDataSet groupBy(List<Path> paths, List<String> aggres, IExpression expression,
       long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context)
       throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException {
-    return null;
+    return queryRouter.groupBy(paths, aggres, expression, unit, origin, intervals, context);
   }
 
   @Override
   public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
       QueryContext context)
       throws ProcessorException, IOException, PathErrorException, FileNodeManagerException {
-    return null;
-  }
-
-  @Override
-  public boolean update(Path path, long startTime, long endTime, String value)
-      throws ProcessorException {
-    throw new ProcessorException("Cluster QueryProcessorExecutor doesn't support update method.");
-  }
-
-  @Override
-  protected boolean delete(Path path, long deleteTime) throws ProcessorException {
-    throw new ProcessorException("Cluster QueryProcessorExecutor doesn't support delete method.");
-  }
-
-  @Override
-  public int insert(Path path, long insertTime, String value) throws ProcessorException {
-    throw new ProcessorException("Cluster QueryProcessorExecutor doesn't support insert method.");
-  }
-
-  @Override
-  public int multiInsert(String deviceId, long insertTime, List<String> measurementList,
-      List<String> insertValues) throws ProcessorException {
-    throw new ProcessorException(
-        "Cluster QueryProcessorExecutor doesn't support multiInsert method.");
+    return queryRouter.fill(fillPaths, queryTime, fillTypes, context);
   }
 
   @Override
@@ -117,4 +89,60 @@ public class QueryProcessorExecutor extends AbstractQueryProcessExecutor {
       throw new PathErrorException(e.getMessage());
     }
   }
+
+  @Override
+  public boolean judgePathExists(Path fullPath) {
+    try {
+      List<List<String>> results = queryMetadataExecutor.processTimeSeriesQuery(fullPath.toString());
+      return !results.isEmpty();
+    } catch (InterruptedException | PathErrorException | ProcessorException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public int getFetchSize() {
+    return fetchSize.get();
+  }
+
+  @Override
+  public void setFetchSize(int fetchSize) {
+    this.fetchSize.set(fetchSize);
+  }
+
+  public IEngineQueryRouter getQueryRouter() {
+    return queryRouter;
+  }
+
+  @Override
+  public boolean update(Path path, long startTime, long endTime, String value)
+      throws ProcessorException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean delete(List<Path> paths, long deleteTime) throws ProcessorException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean delete(Path path, long deleteTime) throws ProcessorException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int insert(Path path, long insertTime, String value) throws ProcessorException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int multiInsert(String deviceId, long insertTime, List<String> measurementList,
+      List<String> insertValues) throws ProcessorException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean processNonQuery(PhysicalPlan plan) throws ProcessorException {
+    return false;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
index 3df1ef6..3e9a68c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
@@ -18,37 +18,40 @@
  */
 package org.apache.iotdb.cluster.query.coordinatornode.manager;
 
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 
-public class ClusterRpcQueryManager implements IClusterQueryManager{
-
-  Map<Long, Map<String, PhysicalPlan>> selectPathPlan = new HashMap<>();
-  Map<Long, Map<String, PhysicalPlan>> filterPathPlan = new HashMap<>();
-
-  private ClusterRpcQueryManager(){
-  }
+/**
+ * Manage all query in cluster
+ */
+public class ClusterRpcQueryManager{
 
-  @Override
-  public void registerQuery(Long jobId, PhysicalPlan plan) {
+  /**
+   * Key is group id, value is manager of a client query.
+   */
+  ConcurrentHashMap<Long, IClusterSingleQueryManager> singleQueryManagerMap = new ConcurrentHashMap<>();
 
+  /**
+   * Add a query
+   */
+  public void addSingleQuery(long jobId, QueryPlan physicalPlan){
+    singleQueryManagerMap.put(jobId, new ClusterSingleQueryManager(jobId, physicalPlan));
   }
 
-  @Override
-  public PhysicalPlan getSelectPathPhysicalPlan(Long jobId, String fullPath) {
-    return selectPathPlan.get(jobId).get(fullPath);
+  /**
+   * Get query manager by group id
+   */
+  public IClusterSingleQueryManager getSingleQuery(long jobId) {
+    return singleQueryManagerMap.get(jobId);
   }
 
-  @Override
-  public PhysicalPlan getFilterPathPhysicalPlan(Long jobId, String fullPath) {
-    return filterPathPlan.get(jobId).get(fullPath);
+  public void releaseQueryResource(long jobId){
+    if(singleQueryManagerMap.containsKey(jobId)){
+     singleQueryManagerMap.remove(jobId).releaseQueryResource();
+    }
   }
 
-  @Override
-  public void remove(Long jobId) {
-    selectPathPlan.remove(jobId);
-    filterPathPlan.remove(jobId);
+  private ClusterRpcQueryManager(){
   }
 
   public static final ClusterRpcQueryManager getInstance() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
new file mode 100644
index 0000000..156b9a5
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.coordinatornode.manager;
+
+import com.alipay.sofa.jraft.entity.PeerId;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+
+public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
+
+  /**
+   * Query job id assigned by QueryResourceManager.
+   */
+  private long jobId;
+
+  /**
+   * Origin query plan parsed by QueryProcessor
+   */
+  private QueryPlan originPhysicalPlan;
+
+  /**
+   * Represent selected reader nodes, key is group id and value is selected peer id
+   */
+  private Map<String, PeerId> readerNodes = new HashMap<>();
+
+  /**
+   * Physical plans of select paths which are divided from originPhysicalPlan
+   */
+  private Map<String, QueryPlan> selectPathPlans = new HashMap<>();
+
+  /**
+   * Physical plans of filter paths which are divided from originPhysicalPlan
+   */
+  private Map<String, QueryPlan> filterPathPlans = new HashMap<>();
+
+  public ClusterSingleQueryManager(long jobId,
+      QueryPlan originPhysicalPlan) {
+    this.jobId = jobId;
+    this.originPhysicalPlan = originPhysicalPlan;
+  }
+
+  @Override
+  public void dividePhysicalPlan() {
+//    List<Path>
+//    MManager.getInstance().getFileNameByPath()
+  }
+
+  @Override
+  public PhysicalPlan getSelectPathPhysicalPlan(String fullPath) {
+    return selectPathPlans.get(fullPath);
+  }
+
+  @Override
+  public PhysicalPlan getFilterPathPhysicalPlan(String fullPath) {
+    return filterPathPlans.get(fullPath);
+  }
+
+  @Override
+  public void setDataGroupReaderNode(String groupId, PeerId readerNode) {
+    readerNodes.put(groupId, readerNode);
+  }
+
+  @Override
+  public PeerId getDataGroupReaderNode(String groupId) {
+    return readerNodes.get(groupId);
+  }
+
+  @Override
+  public void releaseQueryResource() {
+
+  }
+
+  public long getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(long jobId) {
+    this.jobId = jobId;
+  }
+
+  public PhysicalPlan getOriginPhysicalPlan() {
+    return originPhysicalPlan;
+  }
+
+  public void setOriginPhysicalPlan(QueryPlan originPhysicalPlan) {
+    this.originPhysicalPlan = originPhysicalPlan;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
similarity index 54%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterQueryManager.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
index a7a1418..c5fffbc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
@@ -18,38 +18,50 @@
  */
 package org.apache.iotdb.cluster.query.coordinatornode.manager;
 
+import com.alipay.sofa.jraft.entity.PeerId;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
-public interface IClusterQueryManager {
+/**
+ * Manage a single query.
+ */
+public interface IClusterSingleQueryManager {
 
   /**
-   * Register a query, divide physical plan into several sub physical plans according to timeseries
-   * full path.
-   *
-   * @param jobId Query job id assigned by QueryResourceManager.
-   * @param plan Physical plan parsed by QueryProcessor
+   * Divide physical plan into several sub physical plans according to timeseries full path.
    */
-  void registerQuery(Long jobId, PhysicalPlan plan);
+  void dividePhysicalPlan();
 
   /**
    * Get physical plan of select path
    *
-   * @param jobId Query job id assigned by QueryResourceManager.
-   * @param path Timeseries full path in select paths
+   * @param fullPath Timeseries full path in select paths
    */
-  PhysicalPlan getSelectPathPhysicalPlan(Long jobId, String path);
+  PhysicalPlan getSelectPathPhysicalPlan(String fullPath);
 
   /**
    * Get physical plan of filter path
    *
-   * @param jobId Query job id assigned by QueryResourceManager.
-   * @param path Timeseries full path in filter
+   * @param fullPath Timeseries full path in filter
    */
-  PhysicalPlan getFilterPathPhysicalPlan(Long jobId, String path);
+  PhysicalPlan getFilterPathPhysicalPlan(String fullPath);
 
+  /**
+   * Set reader node of a data group
+   *
+   * @param groupId data group id
+   * @param readerNode peer id
+   */
+  void setDataGroupReaderNode(String groupId, PeerId readerNode);
+
+  /**
+   * Get reader node of a data group by group id
+   * @param groupId data group id
+   * @return peer id of reader node
+   */
+  PeerId getDataGroupReaderNode(String groupId);
 
   /**
-   * Remove resource of a job id
+   * Release query resource
    */
-  void remove(Long jobId);
+  void releaseQueryResource();
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index de5aef0..a11fae4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.exception.ConsistencyLevelException;
 import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
 import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor;
-import org.apache.iotdb.cluster.query.coordinatornode.executor.QueryProcessorExecutor;
+import org.apache.iotdb.cluster.query.coordinatornode.executor.ClusterQueryProcessExecutor;
 import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcQueryManager;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.conf.IoTDBConstant;
@@ -41,7 +41,9 @@ import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.Metadata;
 import org.apache.iotdb.db.qp.QueryProcessor;
+import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.service.TSServiceImpl;
@@ -65,7 +67,8 @@ public class TSServiceClusterImpl extends TSServiceImpl {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TSServiceClusterImpl.class);
 
-  private QueryProcessor processor = new QueryProcessor(new QueryProcessorExecutor());
+  private ClusterRpcQueryManager queryManager = ClusterRpcQueryManager.getInstance();
+  private QueryProcessor processor = new QueryProcessor(new ClusterQueryProcessExecutor());
   private NonQueryExecutor nonQueryExecutor = new NonQueryExecutor();
   private QueryMetadataExecutor queryMetadataExecutor = new QueryMetadataExecutor();
 
@@ -268,17 +271,18 @@ public class TSServiceClusterImpl extends TSServiceImpl {
   }
 
   /**
-   * //TODO
+   * It's unnecessary to do this check. It has benn checked in transforming query physical plan.
    */
   @Override
   public void checkFileLevelSet(List<Path> paths) throws PathErrorException {
-    MManager.getInstance().checkFileLevel(paths);
   }
 
   @Override
   public void recordANewQuery(String statement, PhysicalPlan physicalPlan) {
     long jobId = QueryResourceManager.getInstance().assignJobId();
+    queryStatus.get().put(statement, physicalPlan);
     queryJobIdMap.get().put(statement, jobId);
+    queryManager.addSingleQuery(jobId, (QueryPlan) physicalPlan);
     // refresh current queryRet for statement
     if (queryRet.get().containsKey(statement)) {
       queryRet.get().remove(statement);
@@ -295,12 +299,13 @@ public class TSServiceClusterImpl extends TSServiceImpl {
       // end query for all the query tokens created by current thread
       for (QueryContext context : contextMap.values()) {
         QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
-        ClusterRpcQueryManager.getInstance().remove(context.getJobId());
+        queryManager.releaseQueryResource(context.getJobId());
       }
+      contextMapLocal.set(new HashMap<>());
     } else {
       long jobId = contextMap.remove(req.queryId).getJobId();
       QueryResourceManager.getInstance().endQueryForGivenJob(jobId);
-      ClusterRpcQueryManager.getInstance().remove(jobId);
+      queryManager.releaseQueryResource(jobId);
     }
   }
 
@@ -313,6 +318,10 @@ public class TSServiceClusterImpl extends TSServiceImpl {
     if (this.queryJobIdMap.get() != null) {
       this.queryJobIdMap.get().clear();
     }
+
+    if (this.queryStatus.get() != null) {
+      this.queryStatus.get().clear();
+    }
   }
 
 
@@ -320,21 +329,20 @@ public class TSServiceClusterImpl extends TSServiceImpl {
   public QueryDataSet createNewDataSet(String statement, int fetchSize, TSFetchResultsReq req)
       throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException,
       ProcessorException, IOException {
-//    PhysicalPlan physicalPlan = queryJobIdMap.get().get(statement);
-//    processor.getExecutor().setFetchSize(fetchSize);
-//
-//    QueryContext context = new QueryContext(QueryResourceManager.getInstance().assignJobId());
-//    Map<Long, QueryContext> contextMap = contextMapLocal.get();
-//    if (contextMap == null) {
-//      contextMap = new HashMap<>();
-//      contextMapLocal.set(contextMap);
-//    }
-//    contextMap.put(req.queryId, context);
-//
-//    QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan,
-//        context);
-//    queryRet.get().put(statement, queryDataSet);
-//    return queryDataSet;
+    PhysicalPlan physicalPlan = queryStatus.get().get(statement);
+    processor.getExecutor().setFetchSize(fetchSize);
+    long jobId = queryJobIdMap.get().get(statement);
+
+    QueryContext context = new QueryContext(jobId);
+    initContextMap();
+    contextMapLocal.get().put(req.queryId, context);
+
+    queryManager.getSingleQuery(jobId).dividePhysicalPlan();
+
+    QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan,
+        context);
+    queryRet.get().put(statement, queryDataSet);
+    return queryDataSet;
   }
   /**
    * Close cluster service
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
index 62f4d80..f63514d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
@@ -27,7 +27,8 @@ import org.apache.iotdb.db.exception.qp.IllegalASTFormatException;
 import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
 import org.apache.iotdb.db.exception.qp.LogicalOptimizeException;
 import org.apache.iotdb.db.exception.qp.QueryProcessorException;
-import org.apache.iotdb.db.qp.executor.AbstractQueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.RootOperator;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
@@ -49,13 +50,13 @@ import org.apache.iotdb.db.sql.parse.ParseUtils;
  */
 public class QueryProcessor {
 
-  private AbstractQueryProcessExecutor executor;
+  private IQueryProcessExecutor executor;
 
-  public QueryProcessor(AbstractQueryProcessExecutor executor) {
+  public QueryProcessor(IQueryProcessExecutor executor) {
     this.executor = executor;
   }
 
-  public AbstractQueryProcessExecutor getExecutor() {
+  public IQueryProcessExecutor getExecutor() {
     return executor;
   }
 
@@ -122,7 +123,7 @@ public class QueryProcessor {
    * @throws LogicalOptimizeException
    *             exception in logical optimizing
    */
-  private Operator logicalOptimize(Operator operator, AbstractQueryProcessExecutor executor)
+  private Operator logicalOptimize(Operator operator, IQueryProcessExecutor executor)
       throws LogicalOperatorException {
     switch (operator.getType()) {
       case AUTHOR:
@@ -156,7 +157,7 @@ public class QueryProcessor {
    * @throws LogicalOptimizeException
    *             exception in SFW optimizing
    */
-  private SFWOperator optimizeSFWOperator(SFWOperator root, AbstractQueryProcessExecutor executor)
+  private SFWOperator optimizeSFWOperator(SFWOperator root, IQueryProcessExecutor executor)
       throws LogicalOperatorException {
     ConcatPathOptimizer concatPathOptimizer = new ConcatPathOptimizer(executor);
     root = (SFWOperator) concatPathOptimizer.transform(root);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
new file mode 100644
index 0000000..68cde76
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.executor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
+import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public interface IQueryProcessExecutor {
+
+  boolean processNonQuery(PhysicalPlan plan) throws ProcessorException;
+
+  /**
+   * process query plan of qp layer, construct queryDataSet.
+   *
+   * @param queryPlan QueryPlan
+   * @return QueryDataSet
+   */
+  QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
+      throws IOException, FileNodeManagerException, PathErrorException,
+      QueryFilterOptimizationException, ProcessorException;
+
+  /**
+   * process aggregate plan of qp layer, construct queryDataSet.
+   *
+   */
+  QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression,
+      QueryContext context)
+      throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException;
+
+  /**
+   * process group by plan of qp layer, construct queryDataSet.
+   */
+  QueryDataSet groupBy(List<Path> paths, List<String> aggres, IExpression expression,
+      long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context)
+      throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException;
+
+  /**
+   * process fill plan of qp layer, construct queryDataSet.
+   */
+  QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
+      QueryContext context)
+      throws ProcessorException, IOException, PathErrorException, FileNodeManagerException;
+
+  /**
+   * executeWithGlobalTimeFilter update command and return whether the operator is successful.
+   *
+   * @param path : update series seriesPath
+   * @param startTime start time in update command
+   * @param endTime end time in update command
+   * @param value - in type of string
+   * @return - whether the operator is successful.
+   */
+  boolean update(Path path, long startTime, long endTime, String value)
+      throws ProcessorException;
+
+  /**
+   * executeWithGlobalTimeFilter delete command and return whether the operator is successful.
+   *
+   * @param paths : delete series paths
+   * @param deleteTime end time in delete command
+   * @return - whether the operator is successful.
+   */
+  boolean delete(List<Path> paths, long deleteTime) throws ProcessorException;
+
+  /**
+   * executeWithGlobalTimeFilter delete command and return whether the operator is successful.
+   *
+   * @param path : delete series seriesPath
+   * @param deleteTime end time in delete command
+   * @return - whether the operator is successful.
+   */
+  boolean delete(Path path, long deleteTime) throws ProcessorException;
+
+  /**
+   * insert a single value. Only used in test
+   *
+   * @param path seriesPath to be inserted
+   * @param insertTime - it's time point but not a range
+   * @param value value to be inserted
+   * @return - Operate Type.
+   */
+  int insert(Path path, long insertTime, String value) throws ProcessorException;
+
+  /**
+   * executeWithGlobalTimeFilter insert command and return whether the operator is successful.
+   *
+   * @param deviceId deviceId to be inserted
+   * @param insertTime - it's time point but not a range
+   * @param measurementList measurements to be inserted
+   * @param insertValues values to be inserted
+   * @return - Operate Type.
+   */
+  int multiInsert(String deviceId, long insertTime, List<String> measurementList,
+      List<String> insertValues) throws ProcessorException;
+
+  boolean judgePathExists(Path fullPath);
+
+  /**
+   * Get data type of series
+   */
+  TSDataType getSeriesType(Path path) throws PathErrorException;
+
+  /**
+   * Get all paths of a full path
+   */
+  List<String> getAllPaths(String originPath) throws PathErrorException;
+
+  int getFetchSize();
+
+  void setFetchSize(int fetchSize);
+
+  IEngineQueryRouter getQueryRouter();
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 505bbcc..58702d8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -70,7 +70,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class OverflowQPExecutor extends AbstractQueryProcessExecutor {
+public class OverflowQPExecutor extends QueryProcessExecutor {
 
   private static final Logger LOG = LoggerFactory.getLogger(OverflowQPExecutor.class);
 
@@ -78,7 +78,6 @@ public class OverflowQPExecutor extends AbstractQueryProcessExecutor {
   private MManager mManager = MManager.getInstance();
 
   public OverflowQPExecutor() {
-    super(new EngineQueryRouter());
     fileNodeManager = FileNodeManager.getInstance();
   }
 
@@ -186,14 +185,14 @@ public class OverflowQPExecutor extends AbstractQueryProcessExecutor {
       QueryContext context)
       throws ProcessorException, FileNodeManagerException, QueryFilterOptimizationException,
       PathErrorException, IOException {
-    return queryRouter.aggregate(paths, aggres, expression, context);
+    return getQueryRouter().aggregate(paths, aggres, expression, context);
   }
 
   @Override
   public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
       QueryContext context)
       throws ProcessorException, IOException, PathErrorException, FileNodeManagerException {
-    return queryRouter.fill(fillPaths, queryTime, fillTypes, context);
+    return getQueryRouter().fill(fillPaths, queryTime, fillTypes, context);
   }
 
   @Override
@@ -201,7 +200,7 @@ public class OverflowQPExecutor extends AbstractQueryProcessExecutor {
       long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context)
       throws ProcessorException, FileNodeManagerException, QueryFilterOptimizationException,
       PathErrorException, IOException {
-    return queryRouter.groupBy(paths, aggres, expression, unit, origin, intervals, context);
+    return getQueryRouter().groupBy(paths, aggres, expression, unit, origin, intervals, context);
   }
 
   @Override
@@ -228,7 +227,7 @@ public class OverflowQPExecutor extends AbstractQueryProcessExecutor {
   }
 
   @Override
-  protected boolean delete(Path path, long timestamp) throws ProcessorException {
+  public boolean delete(Path path, long timestamp) throws ProcessorException {
     String deviceId = path.getDevice();
     String measurementId = path.getMeasurement();
     try {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
similarity index 53%
rename from iotdb/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java
rename to iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 2e0317e..54c750b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/AbstractQueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -34,30 +33,19 @@ import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.EngineQueryRouter;
 import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
-import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.utils.Pair;
 
-public abstract class AbstractQueryProcessExecutor {
+public abstract class QueryProcessExecutor implements IQueryProcessExecutor {
 
   protected ThreadLocal<Integer> fetchSize = new ThreadLocal<>();
-  protected IEngineQueryRouter queryRouter;
+  private IEngineQueryRouter queryRouter = new EngineQueryRouter();
 
-  public AbstractQueryProcessExecutor(IEngineQueryRouter queryRouter) {
-    this.queryRouter = queryRouter;
-  }
-
-  /**
-   * process query plan of qp layer, construct queryDataSet.
-   * @param queryPlan QueryPlan
-   * @return QueryDataSet
-   */
+  @Override
   public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
       throws IOException, FileNodeManagerException, PathErrorException,
       QueryFilterOptimizationException, ProcessorException {
@@ -81,17 +69,10 @@ public abstract class AbstractQueryProcessExecutor {
       return fill(queryPlan.getPaths(), fillQueryPlan.getQueryTime(),
           fillQueryPlan.getFillType(), context);
     }
-    return queryRouter.query(queryExpression, context);
-  }
-
-  public abstract TSDataType getSeriesType(Path fullPath) throws PathErrorException;
-
-  public abstract boolean judgePathExists(Path fullPath);
-
-  public boolean processNonQuery(PhysicalPlan plan) throws ProcessorException {
-    throw new UnsupportedOperationException();
+    return getQueryRouter().query(queryExpression, context);
   }
 
+  @Override
   public int getFetchSize() {
     if (fetchSize.get() == null) {
       return 100;
@@ -99,42 +80,17 @@ public abstract class AbstractQueryProcessExecutor {
     return fetchSize.get();
   }
 
+  @Override
   public void setFetchSize(int fetchSize) {
     this.fetchSize.set(fetchSize);
   }
 
-  public abstract QueryDataSet aggregate(List<Path> paths, List<String> aggres,
-      IExpression expression, QueryContext context) throws ProcessorException, IOException,
-      PathErrorException, FileNodeManagerException, QueryFilterOptimizationException;
-
-  public abstract QueryDataSet groupBy(List<Path> paths, List<String> aggres,
-      IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals,
-      QueryContext context) throws ProcessorException, IOException, PathErrorException,
-      FileNodeManagerException, QueryFilterOptimizationException;
-
-  public abstract QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType,
-      IFill> fillTypes, QueryContext context)
-      throws ProcessorException, IOException, PathErrorException, FileNodeManagerException;
-
-  /**
-   * executeWithGlobalTimeFilter update command and return whether the operator is successful.
-   *
-   * @param path : update series seriesPath
-   * @param startTime start time in update command
-   * @param endTime end time in update command
-   * @param value - in type of string
-   * @return - whether the operator is successful.
-   */
-  public abstract boolean update(Path path, long startTime, long endTime, String value)
-      throws ProcessorException;
+  @Override
+  public IEngineQueryRouter getQueryRouter() {
+    return queryRouter;
+  }
 
-  /**
-   * executeWithGlobalTimeFilter delete command and return whether the operator is successful.
-   *
-   * @param paths : delete series paths
-   * @param deleteTime end time in delete command
-   * @return - whether the operator is successful.
-   */
+  @Override
   public boolean delete(List<Path> paths, long deleteTime) throws ProcessorException {
     try {
       boolean result = true;
@@ -162,39 +118,4 @@ public abstract class AbstractQueryProcessExecutor {
       throw new ProcessorException(e.getMessage());
     }
   }
-
-  /**
-   * executeWithGlobalTimeFilter delete command and return whether the operator is successful.
-   *
-   * @param path : delete series seriesPath
-   * @param deleteTime end time in delete command
-   * @return - whether the operator is successful.
-   */
-  protected abstract boolean delete(Path path, long deleteTime) throws ProcessorException;
-
-  /**
-   * insert a single value. Only used in test
-   *
-   * @param path seriesPath to be inserted
-   * @param insertTime - it's time point but not a range
-   * @param value value to be inserted
-   * @return - Operate Type.
-   */
-  public abstract int insert(Path path, long insertTime, String value) throws ProcessorException;
-
-  /**
-   * executeWithGlobalTimeFilter insert command and return whether the operator is successful.
-   *
-   * @param deviceId deviceId to be inserted
-   * @param insertTime - it's time point but not a range
-   * @param measurementList measurements to be inserted
-   * @param insertValues values to be inserted
-   * @return - Operate Type.
-   */
-  public abstract int multiInsert(String deviceId, long insertTime, List<String> measurementList,
-      List<String> insertValues) throws ProcessorException;
-
-  public abstract List<String> getAllPaths(String originPath)
-      throws PathErrorException, ProcessorException, InterruptedException;
-
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java
index a909db6..1610554 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java
@@ -21,7 +21,8 @@ package org.apache.iotdb.db.qp.logical.crud;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
-import org.apache.iotdb.db.qp.executor.AbstractQueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -97,7 +98,7 @@ public class BasicFunctionOperator extends FunctionOperator {
 
   @Override
   protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
-      AbstractQueryProcessExecutor executor)
+      IQueryProcessExecutor executor)
       throws LogicalOperatorException, PathErrorException {
     TSDataType type = executor.getSeriesType(path);
     if (type == null) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java
index 3956c57..1f38654 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java
@@ -26,7 +26,8 @@ import java.util.List;
 import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
 import org.apache.iotdb.db.exception.qp.QueryProcessorException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
-import org.apache.iotdb.db.qp.executor.AbstractQueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -111,7 +112,7 @@ public class FilterOperator extends Operator implements Comparable<FilterOperato
    *
    * @return QueryFilter in TsFile
    */
-  public IExpression transformToExpression(AbstractQueryProcessExecutor executor)
+  public IExpression transformToExpression(IQueryProcessExecutor executor)
       throws QueryProcessorException {
     if (isSingle) {
       Pair<IUnaryExpression, String> ret = transformToSingleQueryFilter(executor);
@@ -149,7 +150,7 @@ public class FilterOperator extends Operator implements Comparable<FilterOperato
    * @throws QueryProcessorException exception in filter transforming
    */
   protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
-      AbstractQueryProcessExecutor executor)
+      IQueryProcessExecutor executor)
       throws QueryProcessorException {
     if (childOperators.isEmpty()) {
       throw new LogicalOperatorException(
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 5a747fa..1a4377c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.qp.physical.crud;
 
 import java.util.List;
 import org.apache.iotdb.db.exception.qp.QueryProcessorException;
-import org.apache.iotdb.db.qp.executor.AbstractQueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -43,7 +43,7 @@ public class QueryPlan extends PhysicalPlan {
   /**
    * Check if all paths exist.
    */
-  public void checkPaths(AbstractQueryProcessExecutor executor) throws QueryProcessorException {
+  public void checkPaths(IQueryProcessExecutor executor) throws QueryProcessorException {
     for (Path path : paths) {
       if (!executor.judgePathExists(path)) {
         throw new QueryProcessorException("Path doesn't exist: " + path);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index c0cb8bb..ff13a2c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
 import org.apache.iotdb.db.exception.qp.QueryProcessorException;
-import org.apache.iotdb.db.qp.executor.AbstractQueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
 import org.apache.iotdb.db.qp.logical.crud.DeleteOperator;
@@ -57,9 +57,9 @@ import org.slf4j.LoggerFactory;
 public class PhysicalGenerator {
 
   private static final Logger logger = LoggerFactory.getLogger(PhysicalGenerator.class);
-  private AbstractQueryProcessExecutor executor;
+  private IQueryProcessExecutor executor;
 
-  public PhysicalGenerator(AbstractQueryProcessExecutor executor) {
+  public PhysicalGenerator(IQueryProcessExecutor executor) {
     this.executor = executor;
   }
 
@@ -278,7 +278,7 @@ public class PhysicalGenerator {
   }
 
   // private SingleQueryPlan constructSelectPlan(FilterOperator filterOperator, List<Path> paths,
-  // AbstractQueryProcessExecutor conf) throws QueryProcessorException {
+  // QueryProcessExecutor conf) throws QueryProcessorException {
   // FilterOperator timeFilter = null;
   // FilterOperator freqFilter = null;
   // FilterOperator valueFilter = null;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index cdc1ca3..358d20f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -26,7 +26,8 @@ import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
 import org.apache.iotdb.db.exception.qp.LogicalOptimizeException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
-import org.apache.iotdb.db.qp.executor.AbstractQueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
@@ -46,9 +47,9 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
   private static final Logger LOG = LoggerFactory.getLogger(ConcatPathOptimizer.class);
   private static final String WARNING_NO_SUFFIX_PATHS = "given SFWOperator doesn't have suffix paths, cannot concat seriesPath";
 
-  private AbstractQueryProcessExecutor executor;
+  private IQueryProcessExecutor executor;
 
-  public ConcatPathOptimizer(AbstractQueryProcessExecutor executor) {
+  public ConcatPathOptimizer(IQueryProcessExecutor executor) {
     this.executor = executor;
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index bbe67a0..186c9d6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -103,7 +103,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   // Record the username for every rpc connection. Username.get() is null if
   // login is failed.
   protected ThreadLocal<String> username = new ThreadLocal<>();
-  private ThreadLocal<HashMap<String, PhysicalPlan>> queryStatus = new ThreadLocal<>();
+  protected ThreadLocal<HashMap<String, PhysicalPlan>> queryStatus = new ThreadLocal<>();
   protected ThreadLocal<HashMap<String, QueryDataSet>> queryRet = new ThreadLocal<>();
   protected ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -208,6 +208,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       for (QueryContext context : contextMap.values()) {
         QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
       }
+      contextMapLocal.set(new HashMap<>());
     } else {
       QueryResourceManager.getInstance()
           .endQueryForGivenJob(contextMap.remove(req.queryId).getJobId());
@@ -698,12 +699,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     processor.getExecutor().setFetchSize(fetchSize);
 
     QueryContext context = new QueryContext(QueryResourceManager.getInstance().assignJobId());
-    Map<Long, QueryContext> contextMap = contextMapLocal.get();
-    if (contextMap == null) {
-      contextMap = new HashMap<>();
-      contextMapLocal.set(contextMap);
-    }
-    contextMap.put(req.queryId, context);
+
+    initContextMap();
+    contextMapLocal.get().put(req.queryId, context);
 
     QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan,
         context);
@@ -711,6 +709,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return queryDataSet;
   }
 
+  public void initContextMap(){
+    Map<Long, QueryContext> contextMap = contextMapLocal.get();
+    if (contextMap == null) {
+      contextMap = new HashMap<>();
+      contextMapLocal.set(contextMap);
+    }
+  }
+
   @Override
   public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req)
       throws TException {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index a8ace20..fa7301d 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
-import org.apache.iotdb.db.qp.executor.AbstractQueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Implement a simple executor with a memory demo reading processor for test.
  */
-public class MemIntQpExecutor extends AbstractQueryProcessExecutor {
+public class MemIntQpExecutor extends QueryProcessExecutor {
 
   private static Logger LOG = LoggerFactory.getLogger(MemIntQpExecutor.class);
 
@@ -62,7 +62,6 @@ public class MemIntQpExecutor extends AbstractQueryProcessExecutor {
   private Map<String, List<String>> fakeAllPaths;
 
   public MemIntQpExecutor() {
-    super(new EngineQueryRouter());
     this.fetchSize.set(5);
   }
 
@@ -163,7 +162,7 @@ public class MemIntQpExecutor extends AbstractQueryProcessExecutor {
   }
 
   @Override
-  protected boolean delete(Path path, long deleteTime) {
+  public boolean delete(Path path, long deleteTime) {
     if (!demoMemDataBase.containsKey(path.toString())) {
       return true;
     }
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGeneratorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGeneratorTest.java
index 04b31e0..b53e4d8 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGeneratorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGeneratorTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.iotdb.db.qp.QueryProcessor;
-import org.apache.iotdb.db.qp.executor.AbstractQueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -35,7 +35,7 @@ import org.junit.Test;
 
 public class EngineDataSetWithTimeGeneratorTest {
 
-  private AbstractQueryProcessExecutor queryExecutor = new OverflowQPExecutor();
+  private QueryProcessExecutor queryExecutor = new OverflowQPExecutor();
   private QueryProcessor processor = new QueryProcessor(queryExecutor);
   private String[] sqls = {
       "SET STORAGE GROUP TO root.vehicle",