You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/15 08:37:37 UTC

[incubator-iotdb] branch master updated: Refactor iotdb code for cluster (#172)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e7886ca  Refactor iotdb code for cluster (#172)
e7886ca is described below

commit e7886caba49d303b69706b42d1c549ee27f52687
Author: Tianan Li <li...@163.com>
AuthorDate: Wed May 15 16:37:32 2019 +0800

    Refactor iotdb code for cluster (#172)
    
    * refactor iotdb code for cluster
    
    * delete useless code
    
    * fix issues according to pr reviews
    
    * fix issues according to pr reviews
    
    * delete useless code in MGraph
    
    * delete useless map in Metadata
    
    * delete useless map in Metadata
    
    * update
    
    * update
    
    * fix some issues
    
    * update combineMetadatas() in Metadata
    
    * update combineMetadatas() in Metadata
    
    * remove closeClusterService
---
 .../db/concurrent/IoTDBThreadPoolFactory.java      |   8 +-
 .../org/apache/iotdb/db/concurrent/ThreadName.java |   2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  24 ++-
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   6 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   7 +-
 .../apache/iotdb/db/engine/pool/FlushManager.java  |   6 +-
 .../apache/iotdb/db/engine/pool/MergeManager.java  |   8 +-
 .../java/org/apache/iotdb/db/metadata/MGraph.java  |  12 +-
 .../org/apache/iotdb/db/metadata/MManager.java     |  13 --
 .../java/org/apache/iotdb/db/metadata/MTree.java   |  32 ----
 .../org/apache/iotdb/db/metadata/Metadata.java     | 106 ++++++++---
 .../org/apache/iotdb/db/qp/QueryProcessor.java     |  11 +-
 .../db/qp/executor/IQueryProcessExecutor.java      | 147 +++++++++++++++
 .../iotdb/db/qp/executor/OverflowQPExecutor.java   |   3 +-
 .../iotdb/db/qp/executor/QueryProcessExecutor.java |  97 +---------
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   2 +-
 .../db/qp/logical/crud/BasicFunctionOperator.java  |   3 +-
 .../iotdb/db/qp/logical/crud/FilterOperator.java   |   5 +-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   4 +-
 .../iotdb/db/qp/physical/crud/AggregationPlan.java |   1 +
 .../iotdb/db/qp/physical/crud/DeletePlan.java      |   1 +
 .../iotdb/db/qp/physical/crud/FillQueryPlan.java   |   1 +
 .../iotdb/db/qp/physical/crud/GroupByPlan.java     |   1 +
 .../iotdb/db/qp/physical/crud/InsertPlan.java      |   1 +
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |   5 +-
 .../iotdb/db/qp/physical/crud/UpdatePlan.java      |   1 +
 .../iotdb/db/qp/physical/sys/AuthorPlan.java       |   1 +
 .../iotdb/db/qp/physical/sys/LoadDataPlan.java     |   1 +
 .../iotdb/db/qp/physical/sys/MetadataPlan.java     |   1 +
 .../iotdb/db/qp/physical/sys/PropertyPlan.java     |   1 +
 .../{writelog => qp/physical}/transfer/Codec.java  |   3 +-
 .../physical}/transfer/CodecInstances.java         |   5 +-
 .../physical}/transfer/PhysicalPlanCodec.java      |  14 +-
 .../transfer/PhysicalPlanLogTransfer.java          |   2 +-
 .../physical}/transfer/SystemLogOperator.java      |   2 +-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |   6 +-
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |   5 +-
 .../db/query/aggregation/AggreResultData.java      |   2 +-
 .../db/query/aggregation/impl/CountAggrFunc.java   |   2 +-
 .../db/query/aggregation/impl/FirstAggrFunc.java   |   2 +-
 .../db/query/aggregation/impl/LastAggrFunc.java    |   2 +-
 .../db/query/aggregation/impl/MaxTimeAggrFunc.java |   2 +-
 .../query/aggregation/impl/MaxValueAggrFunc.java   |   2 +-
 .../db/query/aggregation/impl/MeanAggrFunc.java    |   2 +-
 .../db/query/aggregation/impl/MinTimeAggrFunc.java |   2 +-
 .../query/aggregation/impl/MinValueAggrFunc.java   |   2 +-
 .../db/query/control/QueryResourceManager.java     |  27 ++-
 .../query/dataset/AggreResultDataPointReader.java  |   2 +-
 .../dataset/EngineDataSetWithTimeGenerator.java    |  35 +---
 .../dataset/EngineDataSetWithoutTimeGenerator.java |   4 +
 .../groupby/GroupByEngineDataSet.java              |  11 +-
 .../groupby/GroupByWithOnlyTimeFilterDataSet.java} |   8 +-
 .../groupby/GroupByWithValueFilterDataSet.java}    |   8 +-
 .../AbstractExecutorWithoutTimeGenerator.java      |  84 +++++++++
 .../db/query/executor/AggregateEngineExecutor.java |   2 +-
 .../EngineExecutorWithoutTimeGenerator.java        | 105 +----------
 .../iotdb/db/query/executor/EngineQueryRouter.java |  44 ++---
 .../db/query/executor/IEngineQueryRouter.java      |  78 ++++++++
 .../{aggregation => factory}/AggreFuncFactory.java |   3 +-
 ...nstructor.java => AbstractNodeConstructor.java} |  61 +++---
 .../query/timegenerator/EngineNodeConstructor.java |  62 +------
 .../query/timegenerator/EngineTimeGenerator.java   |   1 -
 .../org/apache/iotdb/db/service/JDBCService.java   |  10 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 204 +++++++++++++--------
 .../iotdb/db/sync/receiver/SyncServiceImpl.java    |   2 +-
 .../iotdb/db/sync/receiver/SyncServiceManager.java |   5 +-
 .../apache/iotdb/db/writelog/io/RAFLogReader.java  |   2 +-
 .../db/writelog/node/ExclusiveWriteLogNode.java    |   2 +-
 .../db/concurrent/IoTDBThreadPoolFactoryTest.java  |   2 +-
 .../iotdb/db/integration/IoTDBSeriesReaderIT.java  |   2 +-
 .../iotdb/db/metadata/MManagerBasicTest.java       |  29 +--
 .../org/apache/iotdb/db/metadata/MTreeTest.java    |  28 +--
 .../org/apache/iotdb/db/metadata/MetadataTest.java |  93 ++++++++++
 .../org/apache/iotdb/db/qp/QueryProcessorTest.java |  88 ++++++++-
 .../transfer/PhysicalPlanLogTransferTest.java      |   2 +-
 .../org/apache/iotdb/db/qp/plan/QPUpdateTest.java  |   2 +-
 .../apache/iotdb/db/qp/utils/MemIntQpExecutor.java |   3 +-
 .../EngineDataSetWithTimeGeneratorTest.java        |   2 +-
 .../query/executor/GroupByEngineDataSetTest.java   |  14 +-
 .../org/apache/iotdb/db/tools/WalCheckerTest.java  |   2 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   2 +-
 .../apache/iotdb/db/writelog/PerformanceTest.java  |   3 +-
 .../apache/iotdb/db/writelog/WriteLogNodeTest.java |   2 +-
 .../iotdb/db/writelog/io/LogWriterReaderTest.java  |   3 +-
 .../apache/iotdb/tsfile/read/common/BatchData.java |   4 +-
 .../org/apache/iotdb/tsfile/read/common/Field.java |  22 +++
 .../org/apache/iotdb/tsfile/read/common/Path.java  |   4 +-
 .../tsfile/read/expression/ExpressionType.java     |  30 ++-
 .../tsfile/read/expression/IBinaryExpression.java  |   4 +
 .../iotdb/tsfile/read/expression/IExpression.java  |   4 +-
 .../read/expression/impl/BinaryExpression.java     |  38 +++-
 .../read/expression/impl/GlobalTimeExpression.java |  10 +-
 .../expression/impl/SingleSeriesExpression.java    |  10 +-
 .../tsfile/read/filter/basic/BinaryFilter.java     |   3 +
 .../iotdb/tsfile/read/filter/basic/Filter.java     |   2 +
 .../tsfile/read/filter/basic/UnaryFilter.java      |   3 +
 .../tsfile/read/filter/operator/AndFilter.java     |   7 +-
 .../iotdb/tsfile/read/filter/operator/Eq.java      |   6 +
 .../iotdb/tsfile/read/filter/operator/Gt.java      |   6 +
 .../iotdb/tsfile/read/filter/operator/GtEq.java    |   6 +
 .../iotdb/tsfile/read/filter/operator/Lt.java      |   6 +
 .../iotdb/tsfile/read/filter/operator/LtEq.java    |   6 +
 .../iotdb/tsfile/read/filter/operator/NotEq.java   |   6 +
 .../tsfile/read/filter/operator/NotFilter.java     |   5 +
 .../tsfile/read/filter/operator/OrFilter.java      |   5 +
 .../query/dataset/DataSetWithTimeGenerator.java    |  32 ----
 .../tsfile/read/query/dataset/QueryDataSet.java    |  36 ++++
 .../java/org/apache/iotdb/tsfile/utils/Pair.java   |   5 +-
 108 files changed, 1168 insertions(+), 695 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
index f191607..7399903 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
@@ -110,18 +110,18 @@ public class IoTDBThreadPoolFactory {
   }
 
   /**
-   * function for creating JDBC client thread pool.
+   * function for creating thrift rpc client thread pool.
    */
-  public static ExecutorService createJDBCClientThreadPool(Args args, String poolName) {
+  public static ExecutorService createThriftRpcClientThreadPool(Args args, String poolName) {
     SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
     return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, args.stopTimeoutVal,
         args.stopTimeoutUnit, executorQueue, new IoTThreadFactory(poolName));
   }
 
   /**
-   * function for creating JDBC client thread pool.
+   * function for creating thrift rpc client thread pool.
    */
-  public static ExecutorService createJDBCClientThreadPool(Args args, String poolName,
+  public static ExecutorService createThriftRpcClientThreadPool(Args args, String poolName,
       Thread.UncaughtExceptionHandler handler) {
     SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
     return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, args.stopTimeoutVal,
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 6053da6..a1c3e44 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -41,7 +41,7 @@ public enum ThreadName {
 
   private String name;
 
-  private ThreadName(String name) {
+  ThreadName(String name) {
     this.name = name;
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 9790f37..4bf5ac8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -23,6 +23,7 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.service.TSServiceImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -269,13 +270,18 @@ public class IoTDBConfig {
    * data, choose "true". 2. It's more likely not to update historical data or you don't know
    * exactly, choose "false".
    */
-  private boolean update_historical_data_possibility = false;
+  private boolean updateHistoricalDataPossibility = false;
   private String ipWhiteList = "0.0.0.0/0";
   /**
    * Examining period of cache file reader : 100 seconds.
    */
   private long cacheFileReaderClearPeriod = 100000;
 
+  /**
+   * Replace implementation class of JDBC service
+   */
+  private String rpcImplClassName = TSServiceImpl.class.getName();
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -792,12 +798,12 @@ public class IoTDBConfig {
     this.languageVersion = languageVersion;
   }
 
-  public boolean isUpdate_historical_data_possibility() {
-    return update_historical_data_possibility;
+  public boolean isUpdateHistoricalDataPossibility() {
+    return updateHistoricalDataPossibility;
   }
 
-  public void setUpdate_historical_data_possibility(boolean update_historical_data_possibility) {
-    this.update_historical_data_possibility = update_historical_data_possibility;
+  public void setUpdateHistoricalDataPossibility(boolean updateHistoricalDataPossibility) {
+    this.updateHistoricalDataPossibility = updateHistoricalDataPossibility;
   }
 
   public String getIpWhiteList() {
@@ -815,4 +821,12 @@ public class IoTDBConfig {
   public void setCacheFileReaderClearPeriod(long cacheFileReaderClearPeriod) {
     this.cacheFileReaderClearPeriod = cacheFileReaderClearPeriod;
   }
+
+  public String getRpcImplClassName() {
+    return rpcImplClassName;
+  }
+
+  public void setRpcImplClassName(String rpcImplClassName) {
+    this.rpcImplClassName = rpcImplClassName;
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 5135e79..48e927f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -20,7 +20,8 @@ package org.apache.iotdb.db.conf;
 
 public class IoTDBConstant {
 
-  private IoTDBConstant() {}
+  private IoTDBConstant() {
+  }
 
   public static final String ENV_FILE_NAME = "iotdb-env";
   public static final String IOTDB_CONF = "IOTDB_CONF";
@@ -55,4 +56,7 @@ public class IoTDBConstant {
   public static final String MAX_TIME = "max_time";
   public static final String MIN_TIME = "min_time";
   public static final int MIN_SUPPORTED_JDK_VERSION = 8;
+
+  // for cluster, set read consistency level
+  public static final String SET_READ_CONSISTENCY_LEVEL_PATTERN = "set\\s+read.*level.*";
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7ab5215..38892fd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -51,6 +51,7 @@ public class IoTDBDescriptor {
    */
   private void loadProps() {
     InputStream inputStream;
+
     String url = System.getProperty(IoTDBConstant.IOTDB_CONF, null);
     if (url == null) {
       url = System.getProperty(IoTDBConstant.IOTDB_HOME, null);
@@ -112,10 +113,10 @@ public class IoTDBDescriptor {
       conf.setRpcAddress(properties.getProperty("rpc_address", conf.getRpcAddress()));
 
       conf.setRpcPort(Integer.parseInt(properties.getProperty("rpc_port",
-              Integer.toString(conf.getRpcPort()))));
+          Integer.toString(conf.getRpcPort()))));
 
       conf.setEnableWal(Boolean.parseBoolean(properties.getProperty("enable_wal",
-              Boolean.toString(conf.isEnableWal()))));
+          Boolean.toString(conf.isEnableWal()))));
 
       conf.setFlushWalThreshold(Integer
           .parseInt(properties.getProperty("flush_wal_threshold",
@@ -196,7 +197,7 @@ public class IoTDBDescriptor {
       conf.setSyncServerPort(Integer
           .parseInt(properties.getProperty("sync_server_port",
                   Integer.toString(conf.getSyncServerPort())).trim()));
-      conf.setUpdate_historical_data_possibility(Boolean.parseBoolean(
+      conf.setUpdateHistoricalDataPossibility(Boolean.parseBoolean(
           properties.getProperty("update_historical_data_possibility",
                   Boolean.toString(conf.isSyncEnable()))));
       conf.setIpWhiteList(properties.getProperty("IP_white_list", conf.getIpWhiteList()));
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
index 7c996aa..86feaf0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
@@ -94,16 +94,16 @@ public class FlushManager {
    *
    * @param block
    *            if set to true, this method will wait for timeOut milliseconds.
-   * @param timeOut
+   * @param timeout
    *            block time out in milliseconds.
    * @throws ProcessorException
    *             if timeOut is reached or being interrupted while waiting to exit.
    */
-  public void close(boolean block, long timeOut) throws ProcessorException {
+  public void close(boolean block, long timeout) throws ProcessorException {
     pool.shutdown();
     if (block) {
       try {
-        if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+        if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
           throw new ProcessorException("Flush thread pool doesn't exit after "
               + EXIT_WAIT_TIME + " ms");
         }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
index 75190f2..44874bb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java
@@ -62,16 +62,16 @@ public class MergeManager {
    *
    * @param block if set block to true, this method will wait for timeOut milliseconds to close the
    * merge pool. false, return directly.
-   * @param timeOut block time out in milliseconds.
+   * @param timeout block time out in milliseconds.
    * @throws ProcessorException if timeOut reach or interrupted while waiting to exit.
    */
-  public void forceClose(boolean block, long timeOut) throws ProcessorException {
+  public void forceClose(boolean block, long timeout) throws ProcessorException {
     pool.shutdownNow();
     if (block) {
       try {
-        if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) {
+        if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
           throw new ProcessorException(
-              "Merge thread pool doesn't exit after " + timeOut + " ms");
+              "Merge thread pool doesn't exit after " + timeout + " ms");
         }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
index cc6e1cf..70e9b17 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
@@ -169,16 +169,6 @@ public class MGraph implements Serializable {
   }
 
   /**
-   * Check whether the storage group of the input path exists or not
-   *
-   * @param path Format: root.node.(node)*
-   * @apiNote :for cluster
-   */
-  public boolean checkStorageExistOfPath(String path) {
-    return mtree.checkStorageExistOfPath(path);
-  }
-
-  /**
    * Get all paths for given seriesPath regular expression if given seriesPath belongs to MTree, or
    * get all linked seriesPath for given seriesPath if given seriesPath belongs to PTree Notice:
    * Regular expression in this method is formed by the amalgamation of seriesPath and the character
@@ -250,7 +240,7 @@ public class MGraph implements Serializable {
   public Metadata getMetadata() throws PathErrorException {
     Map<String, List<MeasurementSchema>> seriesMap = getSchemaForAllType();
     Map<String, List<String>> deviceIdMap = getDeviceForAllType();
-    return new Metadata(seriesMap, deviceIdMap);
+    return new Metadata(deviceIdMap);
   }
 
   public HashSet<String> getAllStorageGroup() throws PathErrorException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index ea4ec6f..c45a3f0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -357,19 +357,6 @@ public class MManager {
   }
 
   /**
-   * function for checking if the storage group of given path exists in mTree or not.
-   * @apiNote :for cluster
-   */
-  public boolean checkStorageExistOfPath(String path) {
-    lock.readLock().lock();
-    try {
-      return mgraph.checkStorageExistOfPath(path);
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  /**
    * function for adding a pTree.
    */
   public void addAPTree(String ptreeRootName) throws IOException, MetadataArgsErrorException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index f5bf6a2..27d17e8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -248,38 +248,6 @@ public class MTree implements Serializable {
   }
 
   /**
-   * Check whether the storage group of the path exists or not
-   * @param path input path
-   * @return If it's storage group exists, return true. Else return false
-   * @apiNote :for cluster
-   */
-  public boolean checkStorageExistOfPath(String path) {
-    String[] nodeNames = path.split(DOUB_SEPARATOR);
-    MNode cur = root;
-    if (nodeNames.length <= 1 || !nodeNames[0].equals(root.getName())) {
-      return false;
-    }
-    int i = 1;
-    while (i < nodeNames.length - 1) {
-      MNode temp = cur.getChild(nodeNames[i]);
-      if (temp == null) {
-        return false;
-      }
-      if(temp.isStorageLevel()){
-        return true;
-      }
-      cur = cur.getChild(nodeNames[i]);
-      i++;
-    }
-    MNode temp = cur.getChild(nodeNames[i]);
-    if(temp != null && temp.isStorageLevel()) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  /**
    * Check whether set file seriesPath for this node or not. If not, throw an exception
    */
   private void checkStorageGroup(MNode node) throws PathErrorException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
index 40fd0e4..70aa9e4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
@@ -18,58 +18,108 @@
  */
 package org.apache.iotdb.db.metadata;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
 
 /**
  * This class stores all the metadata info for every deviceId and every timeseries.
  */
-public class Metadata {
+public class Metadata implements Serializable {
 
-  private Map<String, List<MeasurementSchema>> seriesMap;
   private Map<String, List<String>> deviceIdMap;
 
-  public Metadata(Map<String, List<MeasurementSchema>> seriesMap,
-      Map<String, List<String>> deviceIdMap) {
-    this.seriesMap = seriesMap;
+  public Metadata(Map<String, List<String>> deviceIdMap) {
     this.deviceIdMap = deviceIdMap;
   }
 
-  /**
-   * function for getting series for one type.
-   */
-  public List<MeasurementSchema> getSeriesForOneType(String type) throws PathErrorException {
-    if (this.seriesMap.containsKey(type)) {
-      return seriesMap.get(type);
-    } else {
-      throw new PathErrorException("Input deviceIdType is not exist. " + type);
-    }
+  public Map<String, List<String>> getDeviceMap() {
+    return deviceIdMap;
   }
 
   /**
-   * function for getting devices for one type.
+   * combine multiple metadatas
    */
-  public List<String> getDevicesForOneType(String type) throws PathErrorException {
-    if (this.seriesMap.containsKey(type)) {
-      return deviceIdMap.get(type);
-    } else {
-      throw new PathErrorException("Input deviceIdType is not exist. " + type);
+  public static Metadata combineMetadatas(Metadata[] metadatas) {
+    Map<String, List<String>> deviceIdMap = new HashMap<>();
+
+    if (metadatas == null || metadatas.length == 0) {
+      return new Metadata(deviceIdMap);
     }
+
+    for (int i = 0; i < metadatas.length; i++) {
+      Map<String, List<String>> subDeviceIdMap = metadatas[i].deviceIdMap;
+      for (Entry<String, List<String>> entry : subDeviceIdMap.entrySet()) {
+        List<String> list = deviceIdMap.getOrDefault(entry.getKey(), new ArrayList<>());
+        list.addAll(entry.getValue());
+
+        if (!deviceIdMap.containsKey(entry.getKey())) {
+          deviceIdMap.put(entry.getKey(), list);
+        }
+      }
+      metadatas[i] = null;
+    }
+
+    return new Metadata(deviceIdMap);
   }
 
-  public Map<String, List<MeasurementSchema>> getSeriesMap() {
-    return seriesMap;
+  @Override
+  public String toString() {
+    return deviceIdMap.toString();
   }
 
-  public Map<String, List<String>> getDeviceMap() {
-    return deviceIdMap;
+  @Override
+  public boolean equals(Object obj) {
+    if(this == obj){
+      return true;
+    }
+    if(obj == null){
+      return false;
+    }
+    if(this.getClass() != obj.getClass()){
+      return false;
+    }
+
+    Metadata metadata = (Metadata) obj;
+    return deviceIdMapEquals(deviceIdMap, metadata.deviceIdMap);
   }
 
   @Override
-  public String toString() {
-    return seriesMap.toString() + "\n" + deviceIdMap.toString();
+  public int hashCode() {
+    return Objects.hash(deviceIdMap);
   }
 
+  /**
+   * only used to check if deviceIdMap is equal to another deviceIdMap
+   */
+  private boolean deviceIdMapEquals(Map<String, List<String>> map1, Map<String, List<String>> map2) {
+    if (!map1.keySet().equals(map2.keySet())) {
+      return false;
+    }
+
+    for (Entry<String, List<String>> entry : map1.entrySet()) {
+      List list1 = entry.getValue();
+      List list2 = map2.get(entry.getKey());
+
+      if (!listEquals(list1, list2)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean listEquals(List list1, List list2) {
+    Set set1 = new HashSet();
+    set1.addAll(list1);
+    Set set2 = new HashSet();
+    set2.addAll(list2);
+
+    return set1.equals(set2);
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
index c64c71a..f63514d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/QueryProcessor.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.exception.qp.IllegalASTFormatException;
 import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
 import org.apache.iotdb.db.exception.qp.LogicalOptimizeException;
 import org.apache.iotdb.db.exception.qp.QueryProcessorException;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
 import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.RootOperator;
@@ -49,13 +50,13 @@ import org.apache.iotdb.db.sql.parse.ParseUtils;
  */
 public class QueryProcessor {
 
-  private QueryProcessExecutor executor;
+  private IQueryProcessExecutor executor;
 
-  public QueryProcessor(QueryProcessExecutor executor) {
+  public QueryProcessor(IQueryProcessExecutor executor) {
     this.executor = executor;
   }
 
-  public QueryProcessExecutor getExecutor() {
+  public IQueryProcessExecutor getExecutor() {
     return executor;
   }
 
@@ -122,7 +123,7 @@ public class QueryProcessor {
    * @throws LogicalOptimizeException
    *             exception in logical optimizing
    */
-  private Operator logicalOptimize(Operator operator, QueryProcessExecutor executor)
+  private Operator logicalOptimize(Operator operator, IQueryProcessExecutor executor)
       throws LogicalOperatorException {
     switch (operator.getType()) {
       case AUTHOR:
@@ -156,7 +157,7 @@ public class QueryProcessor {
    * @throws LogicalOptimizeException
    *             exception in SFW optimizing
    */
-  private SFWOperator optimizeSFWOperator(SFWOperator root, QueryProcessExecutor executor)
+  private SFWOperator optimizeSFWOperator(SFWOperator root, IQueryProcessExecutor executor)
       throws LogicalOperatorException {
     ConcatPathOptimizer concatPathOptimizer = new ConcatPathOptimizer(executor);
     root = (SFWOperator) concatPathOptimizer.transform(root);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
new file mode 100644
index 0000000..920aeef
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.executor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public interface IQueryProcessExecutor {
+
+  /**
+   * Process Non-Query Physical plan, including insert/update/delete operation of
+   * data/metadata/Privilege
+   *
+   * @param plan Physical Non-Query Plan
+   */
+  boolean processNonQuery(PhysicalPlan plan) throws ProcessorException;
+
+  /**
+   * process query plan of qp layer, construct queryDataSet.
+   *
+   * @param queryPlan QueryPlan
+   * @return QueryDataSet
+   */
+  QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
+      throws IOException, FileNodeManagerException, PathErrorException,
+      QueryFilterOptimizationException, ProcessorException;
+
+  /**
+   * process aggregate plan of qp layer, construct queryDataSet.
+   */
+  QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression,
+      QueryContext context)
+      throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException;
+
+  /**
+   * process group by plan of qp layer, construct queryDataSet.
+   */
+  QueryDataSet groupBy(List<Path> paths, List<String> aggres, IExpression expression,
+      long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context)
+      throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException;
+
+  /**
+   * process fill plan of qp layer, construct queryDataSet.
+   */
+  QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
+      QueryContext context)
+      throws ProcessorException, IOException, PathErrorException, FileNodeManagerException;
+
+  /**
+   * execute update command and return whether the operator is successful.
+   *
+   * @param path : update series seriesPath
+   * @param startTime start time in update command
+   * @param endTime end time in update command
+   * @param value - in type of string
+   * @return - whether the operator is successful.
+   */
+  boolean update(Path path, long startTime, long endTime, String value)
+      throws ProcessorException;
+
+  /**
+   * execute delete command and return whether the operator is successful.
+   *
+   * @param paths : delete series paths
+   * @param deleteTime end time in delete command
+   * @return - whether the operator is successful.
+   */
+  boolean delete(List<Path> paths, long deleteTime) throws ProcessorException;
+
+  /**
+   * execute delete command and return whether the operator is successful.
+   *
+   * @param path : delete series seriesPath
+   * @param deleteTime end time in delete command
+   * @return - whether the operator is successful.
+   */
+  boolean delete(Path path, long deleteTime) throws ProcessorException;
+
+  /**
+   * insert a single value. Only used in test
+   *
+   * @param path seriesPath to be inserted
+   * @param insertTime - it's time point but not a range
+   * @param value value to be inserted
+   * @return - Operate Type.
+   */
+  int insert(Path path, long insertTime, String value) throws ProcessorException;
+
+  /**
+   * execute insert command and return whether the operator is successful.
+   *
+   * @param deviceId deviceId to be inserted
+   * @param insertTime - it's time point but not a range
+   * @param measurementList measurements to be inserted
+   * @param insertValues values to be inserted
+   * @return - Operate Type.
+   */
+  int multiInsert(String deviceId, long insertTime, String[] measurementList,
+      String[] insertValues) throws ProcessorException;
+
+  boolean judgePathExists(Path fullPath);
+
+  /**
+   * Get data type of series
+   */
+  TSDataType getSeriesType(Path path) throws PathErrorException;
+
+  /**
+   * Get all paths of a full path
+   */
+  List<String> getAllPaths(String originPath) throws PathErrorException;
+
+  int getFetchSize();
+
+  void setFetchSize(int fetchSize);
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index b80c393..9ca337e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
 import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
 import org.apache.iotdb.db.qp.physical.sys.PropertyPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.EngineQueryRouter;
 import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.db.utils.AuthUtils;
 import org.apache.iotdb.db.utils.LoadDataUtils;
@@ -226,7 +227,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
   }
 
   @Override
-  protected boolean delete(Path path, long timestamp) throws ProcessorException {
+  public boolean delete(Path path, long timestamp) throws ProcessorException {
     String deviceId = path.getDevice();
     String measurementId = path.getMeasurement();
     try {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 376997e..99476f2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -35,28 +34,18 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.EngineQueryRouter;
-import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.utils.Pair;
 
-public abstract class QueryProcessExecutor {
+public abstract class QueryProcessExecutor implements IQueryProcessExecutor {
 
   protected ThreadLocal<Integer> fetchSize = new ThreadLocal<>();
-  protected EngineQueryRouter queryRouter = new EngineQueryRouter();
+  protected IEngineQueryRouter queryRouter = new EngineQueryRouter();
 
-  public QueryProcessExecutor() {
-  }
-
-  /**
-   * process query plan of qp layer, construct queryDataSet.
-   * @param queryPlan QueryPlan
-   * @return QueryDataSet
-   */
+  @Override
   public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
       throws IOException, FileNodeManagerException, PathErrorException,
       QueryFilterOptimizationException, ProcessorException {
@@ -72,7 +61,7 @@ public abstract class QueryProcessExecutor {
 
     if (queryPlan instanceof AggregationPlan) {
       return aggregate(queryPlan.getPaths(), queryPlan.getAggregations(),
-          ((AggregationPlan) queryPlan).getExpression(), context);
+          queryPlan.getExpression(), context);
     }
 
     if (queryPlan instanceof FillQueryPlan) {
@@ -83,14 +72,7 @@ public abstract class QueryProcessExecutor {
     return queryRouter.query(queryExpression, context);
   }
 
-  public abstract TSDataType getSeriesType(Path fullPath) throws PathErrorException;
-
-  public abstract boolean judgePathExists(Path fullPath);
-
-  public boolean processNonQuery(PhysicalPlan plan) throws ProcessorException {
-    throw new UnsupportedOperationException();
-  }
-
+  @Override
   public int getFetchSize() {
     if (fetchSize.get() == null) {
       return 100;
@@ -98,42 +80,12 @@ public abstract class QueryProcessExecutor {
     return fetchSize.get();
   }
 
+  @Override
   public void setFetchSize(int fetchSize) {
     this.fetchSize.set(fetchSize);
   }
 
-  public abstract QueryDataSet aggregate(List<Path> paths, List<String> aggres,
-      IExpression expression, QueryContext context) throws ProcessorException, IOException,
-      PathErrorException, FileNodeManagerException, QueryFilterOptimizationException;
-
-  public abstract QueryDataSet groupBy(List<Path> paths, List<String> aggres,
-      IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals,
-      QueryContext context) throws ProcessorException, IOException, PathErrorException,
-      FileNodeManagerException, QueryFilterOptimizationException;
-
-  public abstract QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType,
-      IFill> fillTypes, QueryContext context)
-      throws ProcessorException, IOException, PathErrorException, FileNodeManagerException;
-
-  /**
-   * executeWithGlobalTimeFilter update command and return whether the operator is successful.
-   *
-   * @param path : update series seriesPath
-   * @param startTime start time in update command
-   * @param endTime end time in update command
-   * @param value - in type of string
-   * @return - whether the operator is successful.
-   */
-  public abstract boolean update(Path path, long startTime, long endTime, String value)
-      throws ProcessorException;
-
-  /**
-   * executeWithGlobalTimeFilter delete command and return whether the operator is successful.
-   *
-   * @param paths : delete series paths
-   * @param deleteTime end time in delete command
-   * @return - whether the operator is successful.
-   */
+  @Override
   public boolean delete(List<Path> paths, long deleteTime) throws ProcessorException {
     try {
       boolean result = true;
@@ -162,37 +114,4 @@ public abstract class QueryProcessExecutor {
     }
   }
 
-  /**
-   * executeWithGlobalTimeFilter delete command and return whether the operator is successful.
-   *
-   * @param path : delete series seriesPath
-   * @param deleteTime end time in delete command
-   * @return - whether the operator is successful.
-   */
-  protected abstract boolean delete(Path path, long deleteTime) throws ProcessorException;
-
-  /**
-   * insert a single value. Only used in test
-   *
-   * @param path seriesPath to be inserted
-   * @param insertTime - it's time point but not a range
-   * @param value value to be inserted
-   * @return - Operate Type.
-   */
-  public abstract int insert(Path path, long insertTime, String value) throws ProcessorException;
-
-  /**
-   * executeWithGlobalTimeFilter insert command and return whether the operator is successful.
-   *
-   * @param deviceId deviceId to be inserted
-   * @param insertTime - it's time point but not a range
-   * @param measurementList measurements to be inserted
-   * @param insertValues values to be inserted
-   * @return - Operate Type.
-   */
-  public abstract int multiInsert(String deviceId, long insertTime, String[] measurementList,
-      String[] insertValues) throws ProcessorException;
-
-  public abstract List<String> getAllPaths(String originPath) throws PathErrorException;
-
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index c7a9eb7..498b429 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -72,6 +72,6 @@ public abstract class Operator {
     SET_STORAGE_GROUP, CREATE_TIMESERIES, DELETE_TIMESERIES, CREATE_USER, DELETE_USER, MODIFY_PASSWORD,
     GRANT_USER_PRIVILEGE, REVOKE_USER_PRIVILEGE, GRANT_USER_ROLE, REVOKE_USER_ROLE, CREATE_ROLE,
     DELETE_ROLE, GRANT_ROLE_PRIVILEGE, REVOKE_ROLE_PRIVILEGE, LIST_USER, LIST_ROLE,
-    LIST_USER_PRIVILEGE, LIST_ROLE_PRIVILEGE, LIST_USER_ROLES, LIST_ROLE_USERS;
+    LIST_USER_PRIVILEGE, LIST_ROLE_PRIVILEGE, LIST_USER_ROLES, LIST_ROLE_USERS
   }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java
index 1472e54..1610554 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/BasicFunctionOperator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.logical.crud;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
 import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -97,7 +98,7 @@ public class BasicFunctionOperator extends FunctionOperator {
 
   @Override
   protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
-      QueryProcessExecutor executor)
+      IQueryProcessExecutor executor)
       throws LogicalOperatorException, PathErrorException {
     TSDataType type = executor.getSeriesType(path);
     if (type == null) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java
index a6c032b..1f38654 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/FilterOperator.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
 import org.apache.iotdb.db.exception.qp.QueryProcessorException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
 import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -111,7 +112,7 @@ public class FilterOperator extends Operator implements Comparable<FilterOperato
    *
    * @return QueryFilter in TsFile
    */
-  public IExpression transformToExpression(QueryProcessExecutor executor)
+  public IExpression transformToExpression(IQueryProcessExecutor executor)
       throws QueryProcessorException {
     if (isSingle) {
       Pair<IUnaryExpression, String> ret = transformToSingleQueryFilter(executor);
@@ -149,7 +150,7 @@ public class FilterOperator extends Operator implements Comparable<FilterOperato
    * @throws QueryProcessorException exception in filter transforming
    */
   protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
-      QueryProcessExecutor executor)
+      IQueryProcessExecutor executor)
       throws QueryProcessorException {
     if (childOperators.isEmpty()) {
       throw new LogicalOperatorException(
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 77dd6e4..d2f7bb9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical;
 
+import java.io.Serializable;
 import java.util.List;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -25,8 +26,9 @@ import org.apache.iotdb.tsfile.read.common.Path;
 /**
  * This class is a abstract class for all type of PhysicalPlan.
  */
-public abstract class PhysicalPlan {
+public abstract class PhysicalPlan implements Serializable {
 
+  private static final long serialVersionUID = -6274856391535568352L;
   private boolean isQuery;
   private Operator.OperatorType operatorType;
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index 43a5e28..6e5f430 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.qp.logical.Operator;
 
 public class AggregationPlan extends QueryPlan {
 
+  private static final long serialVersionUID = -2049810573809076643L;
   private List<String> aggregations = new ArrayList<>();
 
   public AggregationPlan() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
index b89717b..ae183bf 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
 
 public class DeletePlan extends PhysicalPlan {
 
+  private static final long serialVersionUID = -6532570247476907037L;
   private long deleteTime;
   private List<Path> paths = new ArrayList<>();
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
index 7f0399f..4fd666d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class FillQueryPlan extends QueryPlan {
 
+  private static final long serialVersionUID = -2091710518816582444L;
   private long queryTime;
   private Map<TSDataType, IFill> fillType;
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java
index f4087eb..d016c36 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 
 public class GroupByPlan extends AggregationPlan {
 
+  private static final long serialVersionUID = 8769258112457178898L;
   private long unit;
   private long origin;
   private List<Pair<Long, Long>> intervals; // show intervals
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 3e0c13a..cdb519c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
 
 public class InsertPlan extends PhysicalPlan {
 
+  private static final long serialVersionUID = 6102845312368561515L;
   private String deviceId;
   private String[] measurements;
   private String[] values;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 4e60c5b..6bf7c0e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.qp.physical.crud;
 
 import java.util.List;
 import org.apache.iotdb.db.exception.qp.QueryProcessorException;
-import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
 
 public class QueryPlan extends PhysicalPlan {
 
+  private static final long serialVersionUID = -5865840981549195660L;
   private List<Path> paths = null;
   private IExpression expression = null;
 
@@ -43,7 +44,7 @@ public class QueryPlan extends PhysicalPlan {
   /**
    * Check if all paths exist.
    */
-  public void checkPaths(QueryProcessExecutor executor) throws QueryProcessorException {
+  public void checkPaths(IQueryProcessExecutor executor) throws QueryProcessorException {
     for (Path path : paths) {
       if (!executor.judgePathExists(path)) {
         throw new QueryProcessorException("Path doesn't exist: " + path);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java
index 5da9c2a..60768d6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.utils.StringContainer;
 
 public class UpdatePlan extends PhysicalPlan {
 
+  private static final long serialVersionUID = 8952248212926920033L;
   private List<Pair<Long, Long>> intervals = new ArrayList<>();
   private String value;
   private Path path;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
index b4b108a..3ec812c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
 
 public class AuthorPlan extends PhysicalPlan {
 
+  private static final long serialVersionUID = 6501894026593590182L;
   private final AuthorOperator.AuthorType authorType;
   private String userName;
   private String roleName;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
index 7e2eb7a..f9b4553 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
 
 public class LoadDataPlan extends PhysicalPlan {
 
+  private static final long serialVersionUID = -6631296704227106470L;
   private final String inputFilePath;
   private final String measureType;
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
index 93a98a3..e3b71f5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
 
 public class MetadataPlan extends PhysicalPlan {
 
+  private static final long serialVersionUID = -3717406842093744475L;
   private final MetadataOperator.NamespaceType namespaceType;
   private Path path;
   private TSDataType dataType;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java
index 2f2c591..94ec623 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
  */
 public class PropertyPlan extends PhysicalPlan {
 
+  private static final long serialVersionUID = -1462399624512066104L;
   private final PropertyOperator.PropertyType propertyType;
   private Path propertyPath;
   private Path metadataPath;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/Codec.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/Codec.java
similarity index 91%
rename from iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/Codec.java
rename to iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/Codec.java
index 76c0598..51fc06b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/Codec.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/Codec.java
@@ -16,10 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.writelog.transfer;
+package org.apache.iotdb.db.qp.physical.transfer;
 
 import java.io.IOException;
-import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
 interface Codec<T extends PhysicalPlan> {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/CodecInstances.java
similarity index 99%
rename from iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java
rename to iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/CodecInstances.java
index 31b8527..4fcaac6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/CodecInstances.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.writelog.transfer;
+package org.apache.iotdb.db.qp.physical.transfer;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -81,8 +81,9 @@ public class CodecInstances {
    */
   static String readString(ByteBuffer buffer) {
     int valueLen = buffer.getInt();
-    if(valueLen == NULL_VALUE_LEN)
+    if (valueLen == NULL_VALUE_LEN) {
       return null;
+    }
     return ReadWriteIOUtils.readStringWithoutLength(buffer, valueLen);
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanCodec.java
similarity index 78%
rename from iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java
rename to iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanCodec.java
index b2189ba..11ba358 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanCodec.java
@@ -16,22 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.writelog.transfer;
+package org.apache.iotdb.db.qp.physical.transfer;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.BytesUtils;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 public enum PhysicalPlanCodec {
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanLogTransfer.java
similarity index 98%
rename from iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java
rename to iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanLogTransfer.java
index 838ddb7..6d11b11 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanLogTransfer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.writelog.transfer;
+package org.apache.iotdb.db.qp.physical.transfer;
 
 import java.io.IOException;
 import java.nio.BufferOverflowException;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/SystemLogOperator.java
similarity index 96%
rename from iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java
rename to iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/SystemLogOperator.java
index 19b4d29..3993980 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/transfer/SystemLogOperator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.writelog.transfer;
+package org.apache.iotdb.db.qp.physical.transfer;
 
 /**
  * To avoid conflict with org.apache.iotdb.tsfiledb.qp.constant.SQLConstant.Operator.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index d33e044..ff13a2c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
 import org.apache.iotdb.db.exception.qp.QueryProcessorException;
-import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
 import org.apache.iotdb.db.qp.logical.crud.DeleteOperator;
@@ -57,9 +57,9 @@ import org.slf4j.LoggerFactory;
 public class PhysicalGenerator {
 
   private static final Logger logger = LoggerFactory.getLogger(PhysicalGenerator.class);
-  private QueryProcessExecutor executor;
+  private IQueryProcessExecutor executor;
 
-  public PhysicalGenerator(QueryProcessExecutor executor) {
+  public PhysicalGenerator(IQueryProcessExecutor executor) {
     this.executor = executor;
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index 5626923..9736f7c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
 import org.apache.iotdb.db.exception.qp.LogicalOptimizeException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
 import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
@@ -46,9 +47,9 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
   private static final Logger LOG = LoggerFactory.getLogger(ConcatPathOptimizer.class);
   private static final String WARNING_NO_SUFFIX_PATHS = "given SFWOperator doesn't have suffix paths, cannot concat seriesPath";
 
-  private QueryProcessExecutor executor;
+  private IQueryProcessExecutor executor;
 
-  public ConcatPathOptimizer(QueryProcessExecutor executor) {
+  public ConcatPathOptimizer(IQueryProcessExecutor executor) {
     this.executor = executor;
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreResultData.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreResultData.java
index 0efae01..41cef8c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreResultData.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreResultData.java
@@ -44,7 +44,7 @@ public class AggreResultData {
     this.isSetValue = false;
   }
 
-  public void reSet() {
+  public void reset() {
     isSetValue = false;
     isSetTime = false;
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
index 020aaf1..99ef06c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
@@ -40,7 +40,7 @@ public class CountAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-    resultData.reSet();
+    resultData.reset();
     resultData.setTimestamp(0);
     resultData.setLongRet(0);
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
index 3e8bb05..0688303 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
@@ -37,7 +37,7 @@ public class FirstAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-    resultData.reSet();
+    resultData.reset();
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
index 8c12728..b8ea98b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
@@ -37,7 +37,7 @@ public class LastAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-    resultData.reSet();
+    resultData.reset();
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
index 73b8f1c..ba3460b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
@@ -37,7 +37,7 @@ public class MaxTimeAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-    resultData.reSet();
+    resultData.reset();
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
index 8e2d2a7..40bea9c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
@@ -36,7 +36,7 @@ public class MaxValueAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-    resultData.reSet();
+    resultData.reset();
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
index da6cc95..5373018 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
@@ -42,7 +42,7 @@ public class MeanAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-    resultData.reSet();
+    resultData.reset();
     sum = 0.0;
     cnt = 0;
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
index f083812..278fde2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
@@ -36,7 +36,7 @@ public class MinTimeAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-    resultData.reSet();
+    resultData.reset();
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
index ad64fb3..137f8fe 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
@@ -36,7 +36,7 @@ public class MinValueAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-    resultData.reSet();
+    resultData.reset();
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index e4ef67e..c5f029e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -142,6 +142,23 @@ public class QueryResourceManager {
     }
   }
 
+  /**
+   * Begin query and set query tokens of all filter paths in expression. This method is used in
+   * filter calculation.
+   * @param remoteDeviceIdSet device id set which can not handle locally
+   * Note : the method is for cluster
+   */
+  public void beginQueryOfGivenExpression(long jobId, IExpression expression,
+      Set<String> remoteDeviceIdSet) throws FileNodeManagerException {
+    Set<String> deviceIdSet = new HashSet<>();
+    getUniquePaths(expression, deviceIdSet);
+    deviceIdSet.removeAll(remoteDeviceIdSet);
+    for (String deviceId : deviceIdSet) {
+      putQueryTokenForCurrentRequestThread(jobId, deviceId,
+          FileNodeManager.getInstance().beginQuery(deviceId));
+    }
+  }
+
   public QueryDataSource getQueryDataSource(Path selectedPath,
       QueryContext context)
       throws FileNodeManagerException {
@@ -165,12 +182,12 @@ public class QueryResourceManager {
       // no resource need to be released.
       return;
     }
-      for (Map.Entry<String, List<Integer>> entry : queryTokensMap.get(jobId).entrySet()) {
-        for (int token : entry.getValue()) {
-          FileNodeManager.getInstance().endQuery(entry.getKey(), token);
-        }
+    for (Map.Entry<String, List<Integer>> entry : queryTokensMap.get(jobId).entrySet()) {
+      for (int token : entry.getValue()) {
+        FileNodeManager.getInstance().endQuery(entry.getKey(), token);
       }
-      queryTokensMap.remove(jobId);
+    }
+    queryTokensMap.remove(jobId);
     // remove usage of opened file paths of current thread
     filePathsManager.removeUsedFilesForGivenJob(jobId);
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
index c2edab0..6545bfa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
@@ -36,7 +36,7 @@ public class AggreResultDataPointReader implements IPointReader {
   @Override
   public TimeValuePair next() {
     TimeValuePair timeValuePair = TimeValuePairUtils.getCurrentTimeValuePair(aggreResultData);
-    aggreResultData.reSet();
+    aggreResultData.reset();
     return timeValuePair;
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGenerator.java
index 6e76e66..04b46f1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGenerator.java
@@ -98,36 +98,11 @@ public class EngineDataSetWithTimeGenerator extends QueryDataSet {
     return hasCachedRowRecord;
   }
 
-  private Field getField(Object value, TSDataType dataType) {
-    Field field = new Field(dataType);
-
-    if (value == null) {
-      field.setNull();
-      return field;
-    }
+  public EngineTimeGenerator getTimeGenerator() {
+    return timeGenerator;
+  }
 
-    switch (dataType) {
-      case DOUBLE:
-        field.setDoubleV((double) value);
-        break;
-      case FLOAT:
-        field.setFloatV((float) value);
-        break;
-      case INT64:
-        field.setLongV((long) value);
-        break;
-      case INT32:
-        field.setIntV((int) value);
-        break;
-      case BOOLEAN:
-        field.setBoolV((boolean) value);
-        break;
-      case TEXT:
-        field.setBinaryV((Binary) value);
-        break;
-      default:
-        throw new UnSupportedDataTypeException("UnSupported: " + dataType);
-    }
-    return field;
+  public List<EngineReaderByTimeStamp> getReaders() {
+    return readers;
   }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java
index 73fc71f..bc9bb08 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java
@@ -154,4 +154,8 @@ public class EngineDataSetWithoutTimeGenerator extends QueryDataSet {
     timeSet.remove(t);
     return t;
   }
+
+  public List<IPointReader> getReaders() {
+    return readers;
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByEngineDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
similarity index 96%
rename from iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByEngineDataSet.java
rename to iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index 2ccecf5..9cf4dc3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByEngineDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -17,14 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.query.executor.groupby;
+package org.apache.iotdb.db.query.dataset.groupby;
 
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.query.aggregation.AggreFuncFactory;
+import org.apache.iotdb.db.query.factory.AggreFuncFactory;
 import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -90,15 +90,12 @@ public abstract class GroupByEngineDataSet extends QueryDataSet {
       return true;
     }
 
-    // end
-    if (usedIndex >= mergedIntervals.size()) {
-      return false;
-    }
-
     // skip the intervals in coverage of last time-partition
     while (usedIndex < mergedIntervals.size() && mergedIntervals.get(usedIndex).right < endTime) {
       usedIndex++;
     }
+
+    // end
     if (usedIndex >= mergedIntervals.size()) {
       return false;
     }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
similarity index 97%
rename from iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java
rename to iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
index e82b567..e2688ba 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.query.executor.groupby;
+package org.apache.iotdb.db.query.dataset.groupby;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -45,7 +45,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-public class GroupByWithOnlyTimeFilterDataSetDataSet extends GroupByEngineDataSet {
+public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngineDataSet {
 
   protected List<IPointReader> unSequenceReaderList;
   protected List<IAggregateReader> sequenceReaderList;
@@ -56,7 +56,7 @@ public class GroupByWithOnlyTimeFilterDataSetDataSet extends GroupByEngineDataSe
   /**
    * constructor.
    */
-  public GroupByWithOnlyTimeFilterDataSetDataSet(long jobId, List<Path> paths, long unit,
+  public GroupByWithOnlyTimeFilterDataSet(long jobId, List<Path> paths, long unit,
       long origin, List<Pair<Long, Long>> mergedIntervals) {
     super(jobId, paths, unit, origin, mergedIntervals);
     this.unSequenceReaderList = new ArrayList<>();
@@ -103,7 +103,7 @@ public class GroupByWithOnlyTimeFilterDataSetDataSet extends GroupByEngineDataSe
   public RowRecord next() throws IOException {
     if (!hasCachedTimeInterval) {
       throw new IOException("need to call hasNext() before calling next() "
-          + "in GroupByWithOnlyTimeFilterDataSetDataSet.");
+          + "in GroupByWithOnlyTimeFilterDataSet.");
     }
     hasCachedTimeInterval = false;
     RowRecord record = new RowRecord(startTime);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithValueFilterDataSetDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
similarity index 94%
rename from iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithValueFilterDataSetDataSet.java
rename to iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 7a6b3d2..f7ffa29 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithValueFilterDataSetDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.query.executor.groupby;
+package org.apache.iotdb.db.query.dataset.groupby;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -38,7 +38,7 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-public class GroupByWithValueFilterDataSetDataSet extends GroupByEngineDataSet {
+public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
 
   private List<EngineReaderByTimeStamp> allDataReaderList;
   private TimeGenerator timestampGenerator;
@@ -59,7 +59,7 @@ public class GroupByWithValueFilterDataSetDataSet extends GroupByEngineDataSet {
   /**
    * constructor.
    */
-  public GroupByWithValueFilterDataSetDataSet(long jobId, List<Path> paths, long unit, long origin,
+  public GroupByWithValueFilterDataSet(long jobId, List<Path> paths, long unit, long origin,
       List<Pair<Long, Long>> mergedIntervals) {
     super(jobId, paths, unit, origin, mergedIntervals);
     this.allDataReaderList = new ArrayList<>();
@@ -85,7 +85,7 @@ public class GroupByWithValueFilterDataSetDataSet extends GroupByEngineDataSet {
   public RowRecord next() throws IOException {
     if (!hasCachedTimeInterval) {
       throw new IOException("need to call hasNext() before calling next()"
-          + " in GroupByWithOnlyTimeFilterDataSetDataSet.");
+          + " in GroupByWithOnlyTimeFilterDataSet.");
     }
     hasCachedTimeInterval = false;
     for (AggregateFunction function : functions) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractExecutorWithoutTimeGenerator.java
new file mode 100644
index 0000000..f5f07c8
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractExecutorWithoutTimeGenerator.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.executor;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.AllDataReader;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+/**
+ * Query executor with global time filter.
+ */
+public abstract class AbstractExecutorWithoutTimeGenerator {
+
+  /**
+   * Create reader of a series
+   *
+   * @param context query context
+   * @param path series path
+   * @param dataTypes list of data type
+   * @param timeFilter time filter
+   * @return reader of the series
+   */
+  public static IPointReader createSeriesReader(QueryContext context, Path path,
+      List<TSDataType> dataTypes, Filter timeFilter)
+      throws FileNodeManagerException {
+    QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path,
+        context);
+    // add data type
+    try {
+      dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
+    } catch (PathErrorException e) {
+      throw new FileNodeManagerException(e);
+    }
+
+    // sequence reader for one sealed tsfile
+    SequenceDataReader tsFilesReader;
+    try {
+      tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
+          timeFilter, context);
+    } catch (IOException e) {
+      throw new FileNodeManagerException(e);
+    }
+
+    // unseq reader for all chunk groups in unSeqFile
+    PriorityMergeReader unSeqMergeReader;
+    try {
+      unSeqMergeReader = SeriesReaderFactory.getInstance()
+          .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
+    } catch (IOException e) {
+      throw new FileNodeManagerException(e);
+    }
+    // merge sequence data with unsequence data.
+    return new AllDataReader(tsFilesReader, unSeqMergeReader);
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index ba76c2f..508a787 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.query.aggregation.AggreFuncFactory;
+import org.apache.iotdb.db.query.factory.AggreFuncFactory;
 import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
index 07073cd..24a9e62 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
@@ -16,24 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.db.query.executor;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
-import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
-import org.apache.iotdb.db.query.reader.AllDataReader;
 import org.apache.iotdb.db.query.reader.IPointReader;
-import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
-import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
@@ -42,9 +34,9 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 /**
- * IoTDB query executor with global time filter.
+ * IoTDB query executor of Stand-alone version with global time filter.
  */
-public class EngineExecutorWithoutTimeGenerator {
+public class EngineExecutorWithoutTimeGenerator extends AbstractExecutorWithoutTimeGenerator {
 
   private QueryExpression queryExpression;
 
@@ -53,66 +45,15 @@ public class EngineExecutorWithoutTimeGenerator {
   }
 
   /**
-   * with global time filter.
+   * without filter or with global time filter.
    */
-  public QueryDataSet executeWithGlobalTimeFilter(QueryContext context)
+  public QueryDataSet execute(QueryContext context)
       throws FileNodeManagerException {
 
-    Filter timeFilter = ((GlobalTimeExpression) queryExpression.getExpression()).getFilter();
-
-    List<IPointReader> readersOfSelectedSeries = new ArrayList<>();
-    List<TSDataType> dataTypes = new ArrayList<>();
-
-    QueryResourceManager.getInstance()
-        .beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries());
-
-    for (Path path : queryExpression.getSelectedSeries()) {
-
-      QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path,
-          context);
-
-      // add data type
-      try {
-        dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
-      } catch (PathErrorException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-      // sequence reader for one sealed tsfile
-      SequenceDataReader tsFilesReader;
-      try {
-        tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
-            timeFilter, context);
-      } catch (IOException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-      // unseq reader for all chunk groups in unSeqFile
-      PriorityMergeReader unSeqMergeReader;
-      try {
-        unSeqMergeReader = SeriesReaderFactory.getInstance()
-            .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
-      } catch (IOException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-      // merge sequence data with unsequence data.
-      readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
-    }
-
-    try {
-      return new EngineDataSetWithoutTimeGenerator(queryExpression.getSelectedSeries(), dataTypes,
-          readersOfSelectedSeries);
-    } catch (IOException e) {
-      throw new FileNodeManagerException(e);
+    Filter timeFilter = null;
+    if (queryExpression.hasQueryFilter()) {
+      timeFilter = ((GlobalTimeExpression) queryExpression.getExpression()).getFilter();
     }
-  }
-
-  /**
-   * without filter.
-   */
-  public QueryDataSet executeWithoutFilter(QueryContext context)
-      throws FileNodeManagerException {
 
     List<IPointReader> readersOfSelectedSeries = new ArrayList<>();
     List<TSDataType> dataTypes = new ArrayList<>();
@@ -122,36 +63,8 @@ public class EngineExecutorWithoutTimeGenerator {
 
     for (Path path : queryExpression.getSelectedSeries()) {
 
-      QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path,
-          context);
-
-      // add data type
-      try {
-        dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
-      } catch (PathErrorException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-      // sequence insert data
-      SequenceDataReader tsFilesReader;
-      try {
-        tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
-            null, context);
-      } catch (IOException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-      // unseq insert data
-      PriorityMergeReader unSeqMergeReader;
-      try {
-        unSeqMergeReader = SeriesReaderFactory.getInstance()
-            .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
-      } catch (IOException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-      // merge sequence data with unsequence data.
-      readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
+      IPointReader reader = createSeriesReader(context, path, dataTypes, timeFilter);
+      readersOfSelectedSeries.add(reader);
     }
 
     try {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index 96ad5ad..03c600d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.executor.groupby.GroupByWithOnlyTimeFilterDataSetDataSet;
-import org.apache.iotdb.db.query.executor.groupby.GroupByWithValueFilterDataSetDataSet;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
 import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -47,13 +47,9 @@ import org.apache.iotdb.tsfile.utils.Pair;
  * Query entrance class of IoTDB query process. All query clause will be transformed to physical
  * plan, physical plan will be executed by EngineQueryRouter.
  */
-public class EngineQueryRouter {
+public class EngineQueryRouter implements IEngineQueryRouter{
 
-
-
-  /**
-   * execute physical plan.
-   */
+  @Override
   public QueryDataSet query(QueryExpression queryExpression, QueryContext context)
       throws FileNodeManagerException {
 
@@ -66,7 +62,7 @@ public class EngineQueryRouter {
         if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
           EngineExecutorWithoutTimeGenerator engineExecutor =
               new EngineExecutorWithoutTimeGenerator(queryExpression);
-          return engineExecutor.executeWithGlobalTimeFilter(context);
+          return engineExecutor.execute(context);
         } else {
           EngineExecutorWithTimeGenerator engineExecutor = new EngineExecutorWithTimeGenerator(
               queryExpression);
@@ -79,13 +75,11 @@ public class EngineQueryRouter {
     } else {
       EngineExecutorWithoutTimeGenerator engineExecutor = new EngineExecutorWithoutTimeGenerator(
           queryExpression);
-      return engineExecutor.executeWithoutFilter(context);
+      return engineExecutor.execute(context);
     }
   }
 
-  /**
-   * execute aggregation query.
-   */
+  @Override
   public QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
       IExpression expression, QueryContext context) throws QueryFilterOptimizationException,
       FileNodeManagerException, IOException, PathErrorException, ProcessorException {
@@ -107,17 +101,7 @@ public class EngineQueryRouter {
     }
   }
 
-  /**
-   * execute groupBy query.
-   *
-   * @param selectedSeries select path list
-   * @param aggres aggregation name list
-   * @param expression filter expression
-   * @param unit time granularity for interval partitioning, unit is ms.
-   * @param origin the datum time point for interval division is divided into a time interval for
-   * each TimeUnit time from this point forward and backward.
-   * @param intervals time intervals, closed interval.
-   */
+  @Override
   public QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
       IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals,
       QueryContext context)
@@ -164,12 +148,12 @@ public class EngineQueryRouter {
     IExpression optimizedExpression = ExpressionOptimizer.getInstance()
         .optimize(expression, selectedSeries);
     if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
-      GroupByWithOnlyTimeFilterDataSetDataSet groupByEngine = new GroupByWithOnlyTimeFilterDataSetDataSet(
+      GroupByWithOnlyTimeFilterDataSet groupByEngine = new GroupByWithOnlyTimeFilterDataSet(
           nextJobId, selectedSeries, unit, origin, mergedIntervalList);
       groupByEngine.initGroupBy(context, aggres, optimizedExpression);
       return groupByEngine;
     } else {
-      GroupByWithValueFilterDataSetDataSet groupByEngine = new GroupByWithValueFilterDataSetDataSet(
+      GroupByWithValueFilterDataSet groupByEngine = new GroupByWithValueFilterDataSet(
           nextJobId,
           selectedSeries, unit, origin, mergedIntervalList);
       groupByEngine.initGroupBy(context, aggres, optimizedExpression);
@@ -177,13 +161,7 @@ public class EngineQueryRouter {
     }
   }
 
-  /**
-   * execute fill query.
-   *
-   * @param fillPaths select path list
-   * @param queryTime timestamp
-   * @param fillType type IFill map
-   */
+  @Override
   public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType,
       QueryContext context)
       throws FileNodeManagerException, PathErrorException, IOException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
new file mode 100644
index 0000000..01c1aed
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.executor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public interface IEngineQueryRouter {
+
+  /**
+   * Execute physical plan.
+   */
+  QueryDataSet query(QueryExpression queryExpression, QueryContext context)
+      throws FileNodeManagerException, PathErrorException;
+
+  /**
+   * Execute aggregation query.
+   */
+  QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
+      IExpression expression, QueryContext context)
+      throws QueryFilterOptimizationException, FileNodeManagerException, IOException, PathErrorException, ProcessorException;
+
+  /**
+   * Execute groupBy query.
+   *
+   * @param selectedSeries select path list
+   * @param aggres aggregation name list
+   * @param expression filter expression
+   * @param unit time granularity for interval partitioning, unit is ms.
+   * @param origin the datum time point for interval division is divided into a time interval for
+   * each TimeUnit time from this point forward and backward.
+   * @param intervals time intervals, closed interval.
+   */
+  QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
+      IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals,
+      QueryContext context)
+      throws ProcessorException, QueryFilterOptimizationException, FileNodeManagerException,
+      PathErrorException, IOException;
+  /**
+   * Execute fill query.
+   *
+   * @param fillPaths select path list
+   * @param queryTime timestamp
+   * @param fillType type IFill map
+   */
+  QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType,
+      QueryContext context) throws FileNodeManagerException, PathErrorException, IOException;
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/AggreFuncFactory.java
similarity index 96%
rename from iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
rename to iotdb/src/main/java/org/apache/iotdb/db/query/factory/AggreFuncFactory.java
index c2d1883..495d853 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/AggreFuncFactory.java
@@ -17,9 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.query.aggregation;
+package org.apache.iotdb.db.query.factory;
 
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.aggregation.impl.CountAggrFunc;
 import org.apache.iotdb.db.query.aggregation.impl.FirstAggrFunc;
 import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java
similarity index 70%
copy from iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
copy to iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java
index 4ffe62b..5d83314 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java
@@ -16,12 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.db.query.timegenerator;
 
 import static org.apache.iotdb.tsfile.read.expression.ExpressionType.AND;
 import static org.apache.iotdb.tsfile.read.expression.ExpressionType.OR;
-import static org.apache.iotdb.tsfile.read.expression.ExpressionType.SERIES;
 
 import java.io.IOException;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -42,48 +40,48 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.node.AndNode;
 import org.apache.iotdb.tsfile.read.query.timegenerator.node.Node;
 import org.apache.iotdb.tsfile.read.query.timegenerator.node.OrNode;
 
-public class EngineNodeConstructor {
-
-
-  public EngineNodeConstructor() {
-  }
+/**
+ * Construct node in expression tree while reading process.
+ */
+public abstract class AbstractNodeConstructor {
 
   /**
-   * construct expression node.
+   * Construct expression node.
    *
    * @param expression expression
    * @return Node object
    * @throws IOException IOException
    * @throws FileNodeManagerException FileNodeManagerException
    */
-  public Node construct(IExpression expression, QueryContext context)
+  public abstract Node construct(IExpression expression, QueryContext context)
+      throws FileNodeManagerException;
+
+  /**
+   * Construct not series type node.
+   *
+   * @param expression expression
+   * @return Node object
+   * @throws FileNodeManagerException FileNodeManagerException
+   */
+  protected Node constructNotSeriesNode(IExpression expression, QueryContext context)
       throws FileNodeManagerException {
-    if (expression.getType() == SERIES) {
-      try {
-        return new EngineLeafNode(generateSeriesReader((SingleSeriesExpression) expression,
-            context));
-      } catch (IOException e) {
-        throw new FileNodeManagerException(e);
-      }
+    Node leftChild;
+    Node rightChild;
+    if (expression.getType() == OR) {
+      leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
+      rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
+      return new OrNode(leftChild, rightChild);
+    } else if (expression.getType() == AND) {
+      leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
+      rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
+      return new AndNode(leftChild, rightChild);
     } else {
-      Node leftChild;
-      Node rightChild;
-      if (expression.getType() == OR) {
-        leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
-        rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
-        return new OrNode(leftChild, rightChild);
-      } else if (expression.getType() == AND) {
-        leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
-        rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
-        return new AndNode(leftChild, rightChild);
-      } else {
-        throw new UnSupportedDataTypeException(
-            "Unsupported QueryFilterType when construct OperatorNode: " + expression.getType());
-      }
+      throw new UnSupportedDataTypeException(
+          "Unsupported QueryFilterType when construct OperatorNode: " + expression.getType());
     }
   }
 
-  private IReader generateSeriesReader(SingleSeriesExpression singleSeriesExpression,
+  protected IReader generateSeriesReader(SingleSeriesExpression singleSeriesExpression,
       QueryContext context)
       throws IOException, FileNodeManagerException {
 
@@ -108,5 +106,4 @@ public class EngineNodeConstructor {
       return new AllDataReader(tsFilesReader, unSeqMergeReader);
     }
   }
-
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
index 4ffe62b..45e6ac6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
@@ -19,43 +19,29 @@
 
 package org.apache.iotdb.db.query.timegenerator;
 
-import static org.apache.iotdb.tsfile.read.expression.ExpressionType.AND;
-import static org.apache.iotdb.tsfile.read.expression.ExpressionType.OR;
 import static org.apache.iotdb.tsfile.read.expression.ExpressionType.SERIES;
 
 import java.io.IOException;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
-import org.apache.iotdb.db.query.reader.AllDataReader;
-import org.apache.iotdb.db.query.reader.IReader;
-import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
-import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.query.timegenerator.node.AndNode;
 import org.apache.iotdb.tsfile.read.query.timegenerator.node.Node;
-import org.apache.iotdb.tsfile.read.query.timegenerator.node.OrNode;
-
-public class EngineNodeConstructor {
 
+public class EngineNodeConstructor extends AbstractNodeConstructor {
 
   public EngineNodeConstructor() {
   }
 
   /**
-   * construct expression node.
+   * Construct expression node.
    *
    * @param expression expression
    * @return Node object
    * @throws IOException IOException
    * @throws FileNodeManagerException FileNodeManagerException
    */
+  @Override
   public Node construct(IExpression expression, QueryContext context)
       throws FileNodeManagerException {
     if (expression.getType() == SERIES) {
@@ -66,47 +52,7 @@ public class EngineNodeConstructor {
         throw new FileNodeManagerException(e);
       }
     } else {
-      Node leftChild;
-      Node rightChild;
-      if (expression.getType() == OR) {
-        leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
-        rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
-        return new OrNode(leftChild, rightChild);
-      } else if (expression.getType() == AND) {
-        leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
-        rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
-        return new AndNode(leftChild, rightChild);
-      } else {
-        throw new UnSupportedDataTypeException(
-            "Unsupported QueryFilterType when construct OperatorNode: " + expression.getType());
-      }
-    }
-  }
-
-  private IReader generateSeriesReader(SingleSeriesExpression singleSeriesExpression,
-      QueryContext context)
-      throws IOException, FileNodeManagerException {
-
-    QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(
-        singleSeriesExpression.getSeriesPath(), context);
-
-    Filter filter = singleSeriesExpression.getFilter();
-
-    // reader for all sequence data
-    SequenceDataReader tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
-        filter, context);
-
-    // reader for all unSequence data
-    PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
-        .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), filter);
-
-    if (!tsFilesReader.hasNext()) {
-      //only have unsequence data.
-      return unSeqMergeReader;
-    } else {
-      //merge sequence data with unsequence data.
-      return new AllDataReader(tsFilesReader, unSeqMergeReader);
+      return constructNotSeriesNode(expression, context);
     }
   }
-
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
index 350ea6f..13d7dbc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
@@ -61,7 +61,6 @@ public class EngineTimeGenerator implements TimeGenerator {
 
   @Override
   public Object getValue(Path path, long time) {
-    // TODO implement the optimization
     return null;
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
index d9ccecf..1326e23 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
@@ -129,7 +129,7 @@ public class JDBCService implements JDBCServiceMBean, IService {
       jdbcServiceThread.setName(ThreadName.JDBC_SERVICE.getName());
       jdbcServiceThread.start();
       startLatch.await();
-    } catch (IOException | InterruptedException e) {
+    } catch (IOException | InterruptedException | ClassNotFoundException | IllegalAccessException | InstantiationException e) {
       String errorMessage = String
           .format("Failed to start %s because of %s", this.getID().getName(),
               e.getMessage());
@@ -187,9 +187,11 @@ public class JDBCService implements JDBCServiceMBean, IService {
     private CountDownLatch threadStartLatch;
     private CountDownLatch threadStopLatch;
 
-    public JDBCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch) throws IOException {
+    public JDBCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
+        throws IOException, ClassNotFoundException, IllegalAccessException, InstantiationException {
       protocolFactory = new TBinaryProtocol.Factory();
-      impl = new TSServiceImpl();
+      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+      impl = (TSServiceImpl) Class.forName(config.getRpcImplClassName()).newInstance();
       processor = new TSIService.Processor<>(impl);
       this.threadStartLatch = threadStartLatch;
       this.threadStopLatch = threadStopLatch;
@@ -202,7 +204,7 @@ public class JDBCService implements JDBCServiceMBean, IService {
         serverTransport = new TServerSocket(new InetSocketAddress(config.getRpcAddress(),
             config.getRpcPort()));
         poolArgs = new TThreadPoolServer.Args(serverTransport);
-        poolArgs.executorService = IoTDBThreadPoolFactory.createJDBCClientThreadPool(poolArgs,
+        poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
             ThreadName.JDBC_CLIENT.getName());
         poolArgs.processor(processor);
         poolArgs.protocolFactory(protocolFactory);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index f355445..b02f1c2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -41,7 +42,6 @@ import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.exception.qp.IllegalASTFormatException;
 import org.apache.iotdb.db.exception.qp.QueryProcessorException;
-import org.apache.iotdb.db.metadata.MGraph;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.Metadata;
 import org.apache.iotdb.db.qp.QueryProcessor;
@@ -81,6 +81,7 @@ import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
 import org.apache.iotdb.service.rpc.thrift.TS_Status;
 import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.thrift.TException;
@@ -95,21 +96,21 @@ import org.slf4j.LoggerFactory;
 public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TSServiceImpl.class);
-  private static final String INFO_NOT_LOGIN = "{}: Not login.";
-  private static final String ERROR_NOT_LOGIN = "Not login";
+  protected static final String INFO_NOT_LOGIN = "{}: Not login.";
+  protected static final String ERROR_NOT_LOGIN = "Not login";
 
-  private QueryProcessor processor = new QueryProcessor(new OverflowQPExecutor());
+  protected QueryProcessor processor;
   // Record the username for every rpc connection. Username.get() is null if
   // login is failed.
-  private ThreadLocal<String> username = new ThreadLocal<>();
-  private ThreadLocal<HashMap<String, PhysicalPlan>> queryStatus = new ThreadLocal<>();
-  private ThreadLocal<HashMap<String, QueryDataSet>> queryRet = new ThreadLocal<>();
-  private ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
+  protected ThreadLocal<String> username = new ThreadLocal<>();
+  protected ThreadLocal<HashMap<String, PhysicalPlan>> queryStatus = new ThreadLocal<>();
+  protected ThreadLocal<HashMap<String, QueryDataSet>> queryRet = new ThreadLocal<>();
+  protected ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new ThreadLocal<>();
+  protected ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new ThreadLocal<>();
 
   public TSServiceImpl() throws IOException {
-    // do nothing because there is no need
+    processor = new QueryProcessor(new OverflowQPExecutor());
   }
 
   @Override
@@ -118,7 +119,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         req.getUsername());
 
     boolean status;
-    IAuthorizer authorizer = null;
+    IAuthorizer authorizer;
     try {
       authorizer = LocalFileAuthorizer.getInstance();
     } catch (AuthException e) {
@@ -191,22 +192,23 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       releaseQueryResource(req);
 
       clearAllStatusForCurrentRequest();
-    } catch (FileNodeManagerException e) {
+    } catch (Exception e) {
       LOGGER.error("Error in closeOperation : {}", e.getMessage());
     }
     return new TSCloseOperationResp(new TS_Status(TS_StatusCode.SUCCESS_STATUS));
   }
 
-  private void releaseQueryResource(TSCloseOperationReq req) throws FileNodeManagerException {
+  protected void releaseQueryResource(TSCloseOperationReq req) throws Exception {
     Map<Long, QueryContext> contextMap = contextMapLocal.get();
     if (contextMap == null) {
       return;
     }
-    if(req == null || req.queryId == -1) {
+    if (req == null || req.queryId == -1) {
       // end query for all the query tokens created by current thread
       for (QueryContext context : contextMap.values()) {
         QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
       }
+      contextMapLocal.set(new HashMap<>());
     } else {
       QueryResourceManager.getInstance()
           .endQueryForGivenJob(contextMap.remove(req.queryId).getJobId());
@@ -241,10 +243,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       case "SHOW_TIMESERIES":
         String path = req.getColumnPath();
         try {
-          List<List<String>> showTimeseriesList = MManager.getInstance()
-              .getShowTimeseriesPath(path);
+          List<List<String>> showTimeseriesList = getTimeSeriesForPath(path);
           resp.setShowTimeseriesList(showTimeseriesList);
-        } catch (PathErrorException e) {
+        } catch (PathErrorException | InterruptedException | ProcessorException e) {
           status = getErrorStatus(
               String.format("Failed to fetch timeseries %s's metadata because: %s",
                   req.getColumnPath(), e));
@@ -264,9 +265,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         break;
       case "SHOW_STORAGE_GROUP":
         try {
-          Set<String> storageGroups = MManager.getInstance().getAllStorageGroup();
+          Set<String> storageGroups = getAllStorageGroups();
           resp.setShowStorageGroups(storageGroups);
-        } catch (PathErrorException e) {
+        } catch (PathErrorException | InterruptedException e) {
           status = getErrorStatus(
               String.format("Failed to fetch storage groups' metadata because: %s", e));
           resp.setStatus(status);
@@ -282,9 +283,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         status = new TS_Status(TS_StatusCode.SUCCESS_STATUS);
         break;
       case "METADATA_IN_JSON":
-        String metadataInJson = null;
+        String metadataInJson;
         try {
-          metadataInJson = MManager.getInstance().getMetadataInString();
+          metadataInJson = getMetadataInString();
+        } catch (PathErrorException | InterruptedException | ProcessorException e) {
+          status = getErrorStatus(
+              String.format("Failed to fetch all metadata in json because: %s", e));
+          resp.setStatus(status);
+          return resp;
         } catch (OutOfMemoryError outOfMemoryError) { // TODO OOME
           LOGGER.error("Failed to fetch all metadata in json", outOfMemoryError);
           status = getErrorStatus(
@@ -299,14 +305,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         Metadata metadata;
         try {
           String column = req.getColumnPath();
-          metadata = MManager.getInstance().getMetadata();
+          metadata = getMetadata();
           Map<String, List<String>> deviceMap = metadata.getDeviceMap();
           if (deviceMap == null || !deviceMap.containsKey(column)) {
             resp.setColumnsList(new ArrayList<>());
           } else {
             resp.setColumnsList(deviceMap.get(column));
           }
-        } catch (PathErrorException e) {
+        } catch (PathErrorException | InterruptedException | ProcessorException e) {
           LOGGER.error("cannot get delta object map", e);
           status = getErrorStatus(String.format("Failed to fetch delta object map because: %s", e));
           resp.setStatus(status);
@@ -321,8 +327,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         break;
       case "COLUMN":
         try {
-          resp.setDataType(MManager.getInstance().getSeriesType(req.getColumnPath()).toString());
-        } catch (PathErrorException e) {
+          resp.setDataType(getSeriesType(req.getColumnPath()).toString());
+        } catch (PathErrorException | InterruptedException | ProcessorException e) {
           // TODO aggregate seriesPath e.g. last(root.ln.wf01.wt01.status)
           // status = new TS_Status(TS_StatusCode.ERROR_STATUS);
           // status.setErrorMessage(String.format("Failed to fetch %s's data type because: %s",
@@ -334,8 +340,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         break;
       case "ALL_COLUMNS":
         try {
-          resp.setColumnsList(MManager.getInstance().getPaths(req.getColumnPath()));
-        } catch (PathErrorException e) {
+          resp.setColumnsList(getPaths(req.getColumnPath()));
+        } catch (PathErrorException | InterruptedException | ProcessorException e) {
           status = getErrorStatus(String
               .format("Failed to fetch %s's all columns because: %s", req.getColumnPath(), e));
           resp.setStatus(status);
@@ -359,8 +365,37 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return resp;
   }
 
+  protected Set<String> getAllStorageGroups() throws PathErrorException, InterruptedException {
+    return MManager.getInstance().getAllStorageGroup();
+  }
+
+  protected List<List<String>> getTimeSeriesForPath(String path)
+      throws PathErrorException, InterruptedException, ProcessorException {
+    return MManager.getInstance().getShowTimeseriesPath(path);
+  }
+
+  protected String getMetadataInString()
+      throws InterruptedException, PathErrorException, ProcessorException {
+    return MManager.getInstance().getMetadataInString();
+  }
+
+  protected Metadata getMetadata()
+      throws PathErrorException, InterruptedException, ProcessorException {
+    return MManager.getInstance().getMetadata();
+  }
+
+  protected TSDataType getSeriesType(String path)
+      throws PathErrorException, InterruptedException, ProcessorException {
+    return MManager.getInstance().getSeriesType(path);
+  }
+
+  protected List<String> getPaths(String path)
+      throws PathErrorException, InterruptedException, ProcessorException {
+    return MManager.getInstance().getPaths(path);
+  }
+
   /**
-   * Judge whether the statement is ADMIN COMMAND and if true, executeWithGlobalTimeFilter it.
+   * Judge whether the statement is ADMIN COMMAND and if true, execute it.
    *
    * @param statement command
    * @return true if the statement is ADMIN COMMAND
@@ -427,7 +462,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
           }
         } catch (Exception e) {
           String errMessage = String.format(
-              "Fail to generate physcial plan and executeWithGlobalTimeFilter for statement "
+              "Fail to generate physcial plan and execute for statement "
                   + "%s beacuse %s",
               statement, e.getMessage());
           result.add(Statement.EXECUTE_FAILED);
@@ -465,6 +500,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
       }
 
+      try {
+        if (execSetConsistencyLevel(statement)) {
+          return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS,
+              "Execute set consistency level successfully");
+        }
+      } catch (Exception e) {
+        return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
+      }
+
       PhysicalPlan physicalPlan;
       try {
         physicalPlan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get());
@@ -483,11 +527,28 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         return executeUpdateStatement(physicalPlan);
       }
     } catch (Exception e) {
-      LOGGER.info("meet error: {}  while executing statement: {}", e.getMessage(), req.getStatement());
+      LOGGER.info("meet error: {}  while executing statement: {}", e.getMessage(),
+          req.getStatement());
       return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
     }
   }
 
+  /**
+   * Set consistency level
+   */
+  public boolean execSetConsistencyLevel(String statement) throws Exception {
+    if (statement == null) {
+      return false;
+    }
+    statement = statement.toLowerCase().trim();
+    if (Pattern.matches(IoTDBConstant.SET_READ_CONSISTENCY_LEVEL_PATTERN, statement)) {
+      throw new Exception(
+          "IoTDB Stand-alone version does not support setting read-write consistency level");
+    } else {
+      return false;
+    }
+  }
+
   @Override
   public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) throws TException {
 
@@ -510,8 +571,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       }
 
       // check file level set
+
       try {
-        MManager.getInstance().checkFileLevel(paths);
+        checkFileLevelSet(paths);
       } catch (PathErrorException e) {
         LOGGER.error("meet error while checking file level.", e);
         return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
@@ -592,6 +654,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
+  protected void checkFileLevelSet(List<Path> paths) throws PathErrorException {
+    MManager.getInstance().checkFileLevel(paths);
+  }
+
   @Override
   public TSFetchResultsResp fetchResults(TSFetchResultsReq req) throws TException {
     try {
@@ -627,19 +693,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
-  private QueryDataSet createNewDataSet(String statement, int fetchSize, TSFetchResultsReq req)
+  protected QueryDataSet createNewDataSet(String statement, int fetchSize, TSFetchResultsReq req)
       throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException,
       ProcessorException, IOException {
     PhysicalPlan physicalPlan = queryStatus.get().get(statement);
     processor.getExecutor().setFetchSize(fetchSize);
 
     QueryContext context = new QueryContext(QueryResourceManager.getInstance().assignJobId());
-    Map<Long, QueryContext> contextMap = contextMapLocal.get();
-    if (contextMap == null) {
-      contextMap = new HashMap<>();
-      contextMapLocal.set(contextMap);
-    }
-    contextMap.put(req.queryId, context);
+
+    initContextMap();
+    contextMapLocal.get().put(req.queryId, context);
 
     QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan,
         context);
@@ -647,6 +710,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return queryDataSet;
   }
 
+  protected void initContextMap() {
+    Map<Long, QueryContext> contextMap = contextMapLocal.get();
+    if (contextMap == null) {
+      contextMap = new HashMap<>();
+      contextMapLocal.set(contextMap);
+    }
+  }
+
   @Override
   public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req)
       throws TException {
@@ -683,7 +754,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     // Do we need to add extra information of executive condition
     boolean execRet;
     try {
-      execRet = processor.getExecutor().processNonQuery(plan);
+      execRet = executeNonQuery(plan);
     } catch (ProcessorException e) {
       LOGGER.debug("meet error while processing non-query. {}", e.getMessage());
       return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
@@ -700,6 +771,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return resp;
   }
 
+  protected boolean executeNonQuery(PhysicalPlan plan) throws ProcessorException {
+    return processor.getExecutor().processNonQuery(plan);
+  }
+
   private TSExecuteStatementResp executeUpdateStatement(String statement)
       throws ProcessorException {
 
@@ -717,40 +792,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
           "Statement is a query statement.");
     }
 
-    // if operation belongs to add/delete/update
-    List<Path> paths = physicalPlan.getPaths();
-
-    try {
-      if (!checkAuthorization(paths, physicalPlan)) {
-        return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
-            "No permissions for this operation " + physicalPlan.getOperatorType());
-      }
-    } catch (AuthException e) {
-      LOGGER.error("meet error while checking authorization.", e);
-      return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
-          "Uninitialized authorizer : " + e.getMessage());
-    }
-
-    // TODO
-    // In current version, we only return OK/ERROR
-    // Do we need to add extra information of executive condition
-    boolean execRet;
-    try {
-      execRet = processor.getExecutor().processNonQuery(physicalPlan);
-    } catch (ProcessorException e) {
-      LOGGER.error("meet error while processing non-query.", e);
-      return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
-    }
-    TS_StatusCode statusCode = execRet ? TS_StatusCode.SUCCESS_STATUS : TS_StatusCode.ERROR_STATUS;
-    String msg = execRet ? "Execute successfully" : "Execute statement error.";
-    TSExecuteStatementResp resp = getTSExecuteStatementResp(statusCode, msg);
-    TSHandleIdentifier operationId = new TSHandleIdentifier(
-        ByteBuffer.wrap(username.get().getBytes()),
-        ByteBuffer.wrap(("PASS".getBytes())));
-    TSOperationHandle operationHandle;
-    operationHandle = new TSOperationHandle(operationId, false);
-    resp.setOperationHandle(operationHandle);
-    return resp;
+    return executeUpdateStatement(physicalPlan);
   }
 
   private void recordANewQuery(String statement, PhysicalPlan physicalPlan) {
@@ -766,11 +808,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
    *
    * @return true: If logined; false: If not logined
    */
-  private boolean checkLogin() {
+  protected boolean checkLogin() {
     return username.get() != null;
   }
 
-  private boolean checkAuthorization(List<Path> paths, PhysicalPlan plan) throws AuthException {
+  protected boolean checkAuthorization(List<Path> paths, PhysicalPlan plan) throws AuthException {
     String targetUser = null;
     if (plan instanceof AuthorPlan) {
       targetUser = ((AuthorPlan) plan).getUserName();
@@ -791,7 +833,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return resp;
   }
 
-  private TSExecuteBatchStatementResp getTSBathExecuteStatementResp(TS_StatusCode code, String msg,
+  protected TSExecuteBatchStatementResp getTSBathExecuteStatementResp(TS_StatusCode code,
+      String msg,
       List<Integer> result) {
     TSExecuteBatchStatementResp resp = new TSExecuteBatchStatementResp();
     TS_Status tsStatus = new TS_Status(code);
@@ -809,15 +852,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return resp;
   }
 
-  public void handleClientExit() throws TException {
+  protected void handleClientExit() throws TException {
     closeOperation(null);
     closeSession(null);
   }
 
   @Override
   public TSGetTimeZoneResp getTimeZone() throws TException {
-    TS_Status tsStatus = null;
-    TSGetTimeZoneResp resp = null;
+    TS_Status tsStatus;
+    TSGetTimeZoneResp resp;
     try {
       tsStatus = new TS_Status(TS_StatusCode.SUCCESS_STATUS);
       resp = new TSGetTimeZoneResp(tsStatus, zoneIds.get().toString());
@@ -832,7 +875,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
   @Override
   public TSSetTimeZoneResp setTimeZone(TSSetTimeZoneReq req) throws TException {
-    TS_Status tsStatus = null;
+    TS_Status tsStatus;
     try {
       String timeZoneID = req.getTimeZone();
       zoneIds.set(ZoneId.of(timeZoneID));
@@ -855,3 +898,4 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return properties;
   }
 }
+
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
index 400065e..ad2722d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
@@ -477,7 +477,7 @@ public class SyncServiceImpl implements SyncService.Iface {
         try {
           if (!fileNodeManager.appendFileToFileNode(storageGroup, fileNode, path)) {
             // it is a file with overflow data
-            if (config.isUpdate_historical_data_possibility()) {
+            if (config.isUpdateHistoricalDataPossibility()) {
               loadOldData(path);
             } else {
               List<String> overlapFiles = fileNodeManager.getOverlapFilesFromFileNode(
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
index c0c1c00..47e8f10 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.sync.receiver;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.CountDownLatch;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
@@ -149,8 +150,10 @@ public class SyncServiceManager implements IService {
         protocolFactory = new TBinaryProtocol.Factory();
         processor = new SyncService.Processor<>(new SyncServiceImpl());
         poolArgs = new TThreadPoolServer.Args(serverTransport);
-        poolArgs.processor(processor);
+        poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
+            ThreadName.SYNC_CLIENT.getName());
         poolArgs.protocolFactory(protocolFactory);
+        poolArgs.processor(processor);
         poolServer = new TThreadPoolServer(poolArgs);
         poolServer.setServerEventHandler(new SyncServiceEventHandler(threadStartLatch));
         poolServer.serve();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java
index 1834569..13c5737 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java
@@ -25,7 +25,7 @@ import java.io.RandomAccessFile;
 import java.util.NoSuchElementException;
 import java.util.zip.CRC32;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 2c1fd72..d8e99ae 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -36,7 +36,7 @@ import org.apache.iotdb.db.writelog.io.ILogWriter;
 import org.apache.iotdb.db.writelog.io.LogWriter;
 import org.apache.iotdb.db.writelog.recover.ExclusiveLogRecoverPerformer;
 import org.apache.iotdb.db.writelog.recover.RecoverPerformer;
-import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactoryTest.java b/iotdb/src/test/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactoryTest.java
index 40c8179..93b5db4 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactoryTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactoryTest.java
@@ -173,7 +173,7 @@ public class IoTDBThreadPoolFactoryTest {
     int threadCount = 50;
     latch = new CountDownLatch(threadCount);
     ExecutorService exec = IoTDBThreadPoolFactory
-        .createJDBCClientThreadPool(args, POOL_NAME, handler);
+        .createThriftRpcClientThreadPool(args, POOL_NAME, handler);
     for (int i = 0; i < threadCount; i++) {
       Runnable task = new TestThread(reason);
       exec.execute(task);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
index 6b8d413..aa2ef7c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
@@ -138,7 +138,7 @@ public class IoTDBSeriesReaderIT {
         statement.execute(sql);
       }
 
-      // statement.executeWithGlobalTimeFilter("flush");
+      // statement.execute("flush");
 
       // insert large amount of data time range : 13700 ~ 24000
       for (int time = 13700; time < 24000; time++) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index 36dba77..4fbf8bf 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.db.metadata;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
@@ -315,27 +317,28 @@ public class MManagerBasicTest {
     }
   }
 
+  @Test
   public void testCheckStorageExistOfPath() {
     MManager manager = MManager.getInstance();
 
     try {
-      assertEquals(false, manager.checkStorageExistOfPath("root"));
-      assertEquals(false, manager.checkStorageExistOfPath("root.vehicle"));
-      assertEquals(false, manager.checkStorageExistOfPath("root.vehicle.device"));
-      assertEquals(false, manager.checkStorageExistOfPath("root.vehicle.device.sensor"));
+      assertTrue(manager.getAllPathGroupByFileName("root").keySet().isEmpty());
+      assertTrue(manager.getAllFileNamesByPath("root.vehicle").isEmpty());
+      assertTrue(manager.getAllFileNamesByPath("root.vehicle.device").isEmpty());
+      assertTrue(manager.getAllFileNamesByPath("root.vehicle.device.sensor").isEmpty());
 
       manager.setStorageLevelToMTree("root.vehicle");
-      assertEquals(true, manager.checkStorageExistOfPath("root.vehicle"));
-      assertEquals(true, manager.checkStorageExistOfPath("root.vehicle.device"));
-      assertEquals(true, manager.checkStorageExistOfPath("root.vehicle.device.sensor"));
-      assertEquals(false, manager.checkStorageExistOfPath("root.vehicle1"));
-      assertEquals(false, manager.checkStorageExistOfPath("root.vehicle1.device"));
+      assertFalse(manager.getAllFileNamesByPath("root.vehicle").isEmpty());
+      assertFalse(manager.getAllFileNamesByPath("root.vehicle.device").isEmpty());
+      assertFalse(manager.getAllFileNamesByPath("root.vehicle.device.sensor").isEmpty());
+      assertTrue(manager.getAllFileNamesByPath("root.vehicle1").isEmpty());
+      assertTrue(manager.getAllFileNamesByPath("root.vehicle1.device").isEmpty());
 
       manager.setStorageLevelToMTree("root.vehicle1.device");
-      assertEquals(false, manager.checkStorageExistOfPath("root.vehicle1.device1"));
-      assertEquals(false, manager.checkStorageExistOfPath("root.vehicle1.device2"));
-      assertEquals(false, manager.checkStorageExistOfPath("root.vehicle1.device3"));
-      assertEquals(true, manager.checkStorageExistOfPath("root.vehicle1.device"));
+      assertTrue(manager.getAllFileNamesByPath("root.vehicle1.device1").isEmpty());
+      assertTrue(manager.getAllFileNamesByPath("root.vehicle1.device2").isEmpty());
+      assertTrue(manager.getAllFileNamesByPath("root.vehicle1.device3").isEmpty());
+      assertFalse(manager.getAllFileNamesByPath("root.vehicle1.device").isEmpty());
     } catch (PathErrorException | IOException e) {
       e.printStackTrace();
       fail(e.getMessage());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java
index 4f838e1..af1175a 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.metadata;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -286,27 +287,28 @@ public class MTreeTest {
     }
   }
 
+  @Test
   public void testCheckStorageExistOfPath() {
     // set storage group first
     MTree root = new MTree("root");
     try {
-      assertEquals(false, root.checkStorageExistOfPath("root"));
-      assertEquals(false, root.checkStorageExistOfPath("root.vehicle"));
-      assertEquals(false, root.checkStorageExistOfPath("root.vehicle.device"));
-      assertEquals(false, root.checkStorageExistOfPath("root.vehicle.device.sensor"));
+      assertTrue(root.getAllFileNamesByPath("root").isEmpty());
+      assertTrue(root.getAllFileNamesByPath("root.vehicle").isEmpty());
+      assertTrue(root.getAllFileNamesByPath("root.vehicle.device").isEmpty());
+      assertTrue(root.getAllFileNamesByPath("root.vehicle.device.sensor").isEmpty());
 
       root.setStorageGroup("root.vehicle");
-      assertEquals(true, root.checkStorageExistOfPath("root.vehicle"));
-      assertEquals(true, root.checkStorageExistOfPath("root.vehicle.device"));
-      assertEquals(true, root.checkStorageExistOfPath("root.vehicle.device.sensor"));
-      assertEquals(false, root.checkStorageExistOfPath("root.vehicle1"));
-      assertEquals(false, root.checkStorageExistOfPath("root.vehicle1.device"));
+      assertFalse(root.getAllFileNamesByPath("root.vehicle").isEmpty());
+      assertFalse(root.getAllFileNamesByPath("root.vehicle.device").isEmpty());
+      assertFalse(root.getAllFileNamesByPath("root.vehicle.device.sensor").isEmpty());
+      assertTrue(root.getAllFileNamesByPath("root.vehicle1").isEmpty());
+      assertTrue(root.getAllFileNamesByPath("root.vehicle1.device").isEmpty());
 
       root.setStorageGroup("root.vehicle1.device");
-      assertEquals(false, root.checkStorageExistOfPath("root.vehicle1.device1"));
-      assertEquals(false, root.checkStorageExistOfPath("root.vehicle1.device2"));
-      assertEquals(false, root.checkStorageExistOfPath("root.vehicle1.device3"));
-      assertEquals(true, root.checkStorageExistOfPath("root.vehicle1.device"));
+      assertTrue(root.getAllFileNamesByPath("root.vehicle1.device1").isEmpty());
+      assertTrue(root.getAllFileNamesByPath("root.vehicle1.device2").isEmpty());
+      assertTrue(root.getAllFileNamesByPath("root.vehicle1.device3").isEmpty());
+      assertFalse(root.getAllFileNamesByPath("root.vehicle1.device").isEmpty());
     } catch (PathErrorException e) {
       e.printStackTrace();
       fail(e.getMessage());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java
new file mode 100644
index 0000000..9e1adc7
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MetadataTest {
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testCombineMetadatas() {
+    MManager manager = MManager.getInstance();
+
+    try {
+      manager.setStorageLevelToMTree("root.t.d1");
+      manager.addPathToMTree("root.t.d1.s0", "INT32", "RLE");
+      manager.addPathToMTree("root.t.d1.s1", "DOUBLE", "RLE");
+      manager.setStorageLevelToMTree("root.t.d2");
+      manager.addPathToMTree("root.t.d2.s1", "DOUBLE", "RLE");
+      Metadata metadata1 = manager.getMetadata();
+
+      manager.clear();
+
+      manager.setStorageLevelToMTree("root.t.d3");
+      manager.addPathToMTree("root.t.d3.s1", "DOUBLE", "RLE");
+      manager.addPathToMTree("root.t.d3.s2", "TEXT", "RLE");
+      manager.setStorageLevelToMTree("root.t1.d1");
+      manager.addPathToMTree("root.t1.d1.s1", "DOUBLE", "RLE");
+      manager.addPathToMTree("root.t1.d1.s2", "TEXT", "RLE");
+      Metadata metadata2 = manager.getMetadata();
+
+      manager.clear();
+
+      manager.setStorageLevelToMTree("root.t.d1");
+      manager.addPathToMTree("root.t.d1.s0", "INT32", "RLE");
+      manager.addPathToMTree("root.t.d1.s1", "DOUBLE", "RLE");
+      manager.setStorageLevelToMTree("root.t.d2");
+      manager.addPathToMTree("root.t.d2.s1", "DOUBLE", "RLE");
+      manager.setStorageLevelToMTree("root.t.d3");
+      manager.addPathToMTree("root.t.d3.s1", "DOUBLE", "RLE");
+      manager.addPathToMTree("root.t.d3.s2", "TEXT", "RLE");
+      manager.setStorageLevelToMTree("root.t1.d1");
+      manager.addPathToMTree("root.t1.d1.s1", "DOUBLE", "RLE");
+      manager.addPathToMTree("root.t1.d1.s2", "TEXT", "RLE");
+      Metadata metadata = manager.getMetadata();
+
+      Metadata combineMetadata = Metadata.combineMetadatas(new Metadata[]{metadata1, metadata2});
+      assertTrue(metadata.equals(combineMetadata));
+    } catch (PathErrorException | IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/QueryProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/QueryProcessorTest.java
index 4145936..e9daf05 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/QueryProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/QueryProcessorTest.java
@@ -20,22 +20,76 @@ package org.apache.iotdb.db.qp;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.iotdb.db.exception.ArgsErrorException;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.exception.qp.QueryProcessorException;
+import java.util.Collections;
+import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.utils.MemIntQpExecutor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public class QueryProcessorTest {
 
-  private QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor());
+  private CompressionType compressionType = CompressionType.valueOf(TSFileConfig.compressor);
+  private MManager mManager = MManager.getInstance();
+  private QueryProcessor processor = new QueryProcessor(new OverflowQPExecutor());
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+    mManager.setStorageLevelToMTree("root.vehicle");
+    mManager.setStorageLevelToMTree("root.vehicle1");
+    mManager.addPathToMTree("root.vehicle.device1.sensor1", TSDataType.valueOf("INT32"),
+        TSEncoding.valueOf("RLE"), compressionType, Collections
+            .emptyMap());
+    mManager.addPathToMTree("root.vehicle.device1.sensor2", TSDataType.valueOf("INT32"),
+        TSEncoding.valueOf("RLE"), compressionType, Collections
+            .emptyMap());
+    mManager.addPathToMTree("root.vehicle.device1.sensor3", TSDataType.valueOf("INT32"),
+        TSEncoding.valueOf("RLE"), compressionType, Collections
+            .emptyMap());
+    mManager.addPathToMTree("root.vehicle.device2.sensor1", TSDataType.valueOf("INT32"),
+        TSEncoding.valueOf("RLE"), compressionType, Collections
+            .emptyMap());
+    mManager.addPathToMTree("root.vehicle.device2.sensor2", TSDataType.valueOf("INT32"),
+        TSEncoding.valueOf("RLE"), compressionType, Collections
+            .emptyMap());
+    mManager.addPathToMTree("root.vehicle.device2.sensor3", TSDataType.valueOf("INT32"),
+        TSEncoding.valueOf("RLE"), compressionType, Collections
+            .emptyMap());
+    mManager.addPathToMTree("root.vehicle1.device1.sensor1", TSDataType.valueOf("INT32"),
+        TSEncoding.valueOf("RLE"), compressionType, Collections
+            .emptyMap());
+    mManager.addPathToMTree("root.vehicle1.device1.sensor2", TSDataType.valueOf("INT32"),
+        TSEncoding.valueOf("RLE"), compressionType, Collections
+            .emptyMap());
+    mManager.addPathToMTree("root.vehicle1.device1.sensor3", TSDataType.valueOf("INT32"),
+        TSEncoding.valueOf("RLE"), compressionType, Collections
+            .emptyMap());
+    mManager.addPathToMTree("root.vehicle1.device2.sensor1", TSDataType.valueOf("INT32"),
+        TSEncoding.valueOf("RLE"), compressionType, Collections
+            .emptyMap());
+    mManager.addPathToMTree("root.vehicle1.device2.sensor2", TSDataType.valueOf("INT32"),
+        TSEncoding.valueOf("RLE"), compressionType, Collections
+            .emptyMap());
+    mManager.addPathToMTree("root.vehicle1.device2.sensor3", TSDataType.valueOf("INT32"),
+        TSEncoding.valueOf("RLE"), compressionType, Collections
+            .emptyMap());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
 
   @Test
-  public void parseSQLToPhysicalPlan()
-      throws ArgsErrorException, ProcessorException, QueryProcessorException {
+  public void parseSQLToPhysicalPlan() throws Exception {
     String createSGStatement = "set storage group to root.vehicle";
     PhysicalPlan plan1 = processor.parseSQLToPhysicalPlan(createSGStatement);
     assertEquals(OperatorType.SET_STORAGE_GROUP, plan1.getOperatorType());
@@ -56,5 +110,25 @@ public class QueryProcessorTest {
     PhysicalPlan plan5 = processor.parseSQLToPhysicalPlan(propertyStatement);
     assertEquals(OperatorType.PROPERTY, plan5.getOperatorType());
 
+    String deleteStatement = "DELETE FROM root.device0.sensor0,root.device0.sensor1 WHERE time <= 5000";
+    PhysicalPlan plan6 = processor.parseSQLToPhysicalPlan(deleteStatement);
+    assertEquals(OperatorType.DELETE, plan6.getOperatorType());
+
+    String queryStatement = "select * from root.vehicle where root.vehicle.device1.sensor1 > 50";
+    PhysicalPlan plan7 = processor.parseSQLToPhysicalPlan(queryStatement);
+    assertEquals(OperatorType.QUERY, plan7.getOperatorType());
+
+    String aggregationStatement = "select sum(*) from root.vehicle where root.vehicle.device1.sensor1 > 50";
+    PhysicalPlan plan8 = processor.parseSQLToPhysicalPlan(aggregationStatement);
+    assertEquals(OperatorType.AGGREGATION, plan8.getOperatorType());
+
+    String groupbyStatement = "select sum(*) from root.vehicle where root.vehicle.device1.sensor1 > 50 group by (20ms, [100,1100])";
+    PhysicalPlan plan9 = processor.parseSQLToPhysicalPlan(groupbyStatement);
+    assertEquals(OperatorType.GROUPBY, plan9.getOperatorType());
+
+    String fillStatement = "select sensor1 from root.vehicle.device1 where time = 50 Fill(int32[linear, 5m, 5m], boolean[previous, 5m])";
+    PhysicalPlan plan10 = processor.parseSQLToPhysicalPlan(fillStatement);
+    assertEquals(OperatorType.FILL, plan10.getOperatorType());
+
   }
 }
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanLogTransferTest.java
similarity index 99%
rename from iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java
rename to iotdb/src/test/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanLogTransferTest.java
index 6a23f0d..c747fb6 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/physical/transfer/PhysicalPlanLogTransferTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.writelog.transfer;
+package org.apache.iotdb.db.qp.physical.transfer;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
index 7caaa59..7baf34c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
@@ -188,7 +188,7 @@ public class QPUpdateTest {
     String sqlStr = "insert into root.qp_update_test.device_1 (timestamp, sensor_1, sensor_2) values (13, 50, 40)";
     PhysicalPlan plan1 = processor.parseSQLToPhysicalPlan(sqlStr);
 
-    // executeWithGlobalTimeFilter insert
+    // execute insert
     boolean upRet = processor.getExecutor().processNonQuery(plan1);
     assertTrue(upRet);
 
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index de521c0..71df644 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.EngineQueryRouter;
 import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -161,7 +162,7 @@ public class MemIntQpExecutor extends QueryProcessExecutor {
   }
 
   @Override
-  protected boolean delete(Path path, long deleteTime) {
+  public boolean delete(Path path, long deleteTime) {
     if (!demoMemDataBase.containsKey(path.toString())) {
       return true;
     }
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGeneratorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGeneratorTest.java
index 6dec434..b53e4d8 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGeneratorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGeneratorTest.java
@@ -24,8 +24,8 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.iotdb.db.qp.QueryProcessor;
-import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
 import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
index b4f808d..066f05f 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineDataSetTest.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.db.query.executor;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.iotdb.db.query.executor.groupby.GroupByEngineDataSet;
-import org.apache.iotdb.db.query.executor.groupby.GroupByWithValueFilterDataSetDataSet;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet;
+import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.junit.Assert;
 import org.junit.Test;
@@ -39,7 +39,7 @@ public class GroupByEngineDataSetTest {
 
     long[] startTimeArray = {805, 810, 830};
     long[] endTimeArray = {810, 830, 850};
-    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSetDataSet(jobId, null, unit, startTimePoint, pairList);
+    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(jobId, null, unit, startTimePoint, pairList);
     int cnt = 0;
     while (groupByEngine.hasNext()){
       Pair pair = groupByEngine.nextTimePartition();
@@ -64,7 +64,7 @@ public class GroupByEngineDataSetTest {
 
     long[] startTimeArray = {805, 810, 830, 850, 1200, 1210};
     long[] endTimeArray = {810, 830, 850, 870, 1210, 1230};
-    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSetDataSet(jobId, null, unit, startTimePoint, pairList);
+    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(jobId, null, unit, startTimePoint, pairList);
     int cnt = 0;
     while (groupByEngine.hasNext()){
       Pair pair = groupByEngine.nextTimePartition();
@@ -89,7 +89,7 @@ public class GroupByEngineDataSetTest {
 
     long[] startTimeArray = {805, 820, 850, 860, 1200, 1220};
     long[] endTimeArray = {820, 840, 860, 880, 1220, 1240};
-    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSetDataSet(jobId, null, unit, startTimePoint, pairList);
+    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(jobId, null, unit, startTimePoint, pairList);
     int cnt = 0;
     while (groupByEngine.hasNext()){
       Pair pair = groupByEngine.nextTimePartition();
@@ -114,7 +114,7 @@ public class GroupByEngineDataSetTest {
 
     long[] startTimeArray = {805, 1200};
     long[] endTimeArray = {900, 1300};
-    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSetDataSet(jobId, null, unit, startTimePoint, pairList);
+    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(jobId, null, unit, startTimePoint, pairList);
     int cnt = 0;
     while (groupByEngine.hasNext()){
       Pair pair = groupByEngine.nextTimePartition();
@@ -139,7 +139,7 @@ public class GroupByEngineDataSetTest {
 
     long[] startTimeArray = {50, 585, 590};
     long[] endTimeArray = {110, 590, 670};
-    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSetDataSet(jobId, null, unit, startTimePoint, pairList);
+    GroupByEngineDataSet groupByEngine = new GroupByWithValueFilterDataSet(jobId, null, unit, startTimePoint, pairList);
     int cnt = 0;
     while (groupByEngine.hasNext()){
       Pair pair = groupByEngine.nextTimePartition();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
index 485a33c..be34d32 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
@@ -34,7 +34,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.exception.SysCheckException;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.writelog.io.LogWriter;
-import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
 import org.junit.Test;
 
 public class WalCheckerTest {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 3f55893..31f12fd 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -155,7 +155,7 @@ public class EnvironmentUtils {
     config.setEnableMemMonitor(false);
     // disable the system monitor
     config.setEnableStatMonitor(false);
-    IAuthorizer authorizer = null;
+    IAuthorizer authorizer;
     try {
       authorizer = LocalFileAuthorizer.getInstance();
     } catch (AuthException e) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
index eb0a117..17777e7 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.MetadataArgsErrorException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.RecoverException;
-import org.apache.iotdb.db.exception.WALOverSizedException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -35,7 +34,7 @@ import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index d125ebc..8b1c241 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.junit.After;
 import org.junit.Assert;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
index 20d15fd..dc40b84 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
@@ -25,12 +25,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.iotdb.db.exception.WALOverSizedException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
-import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index fac4765..aeb789e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -28,8 +29,9 @@ import org.apache.iotdb.tsfile.utils.Binary;
  * <code>BatchData</code> is a self-defined data structure which is optimized for different type of
  * values. This class can be viewed as a collection which is more efficient than ArrayList.
  */
-public class BatchData {
+public class BatchData implements Serializable {
 
+  private static final long serialVersionUID = -4620310601188394839L;
   private int timeCapacity = 1;
   private int valueCapacity = 1;
   private int emptyTimeCapacity = 1;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Field.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Field.java
index 421e296..2bb4a4d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Field.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Field.java
@@ -132,4 +132,26 @@ public class Field {
   public boolean isNull() {
     return this.isNull;
   }
+
+  public Object getObjectValue(TSDataType dataType) {
+    if (isNull) {
+      return null;
+    }
+    switch (dataType) {
+      case DOUBLE:
+        return getDoubleV();
+      case FLOAT:
+        return getFloatV();
+      case INT64:
+        return getLongV();
+      case INT32:
+        return getIntV();
+      case BOOLEAN:
+        return getBoolV();
+      case TEXT:
+        return getBinaryV();
+      default:
+        throw new UnSupportedDataTypeException("UnSupported: " + dataType);
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
index 283cbe3..54d9f73 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common;
 
+import java.io.Serializable;
 import org.apache.iotdb.tsfile.common.constant.SystemConstant;
 import org.apache.iotdb.tsfile.utils.StringContainer;
 import org.slf4j.Logger;
@@ -31,8 +32,9 @@ import java.io.IOException;
  *
  * @author Kangrong
  */
-public class Path {
+public class Path implements Serializable {
 
+  private static final long serialVersionUID = 3405277066329298200L;
   private String measurement = null;
   private String device = null;
   private String fullPath;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/ExpressionType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/ExpressionType.java
index 4bffddb..31fa6d3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/ExpressionType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/ExpressionType.java
@@ -19,5 +19,33 @@
 package org.apache.iotdb.tsfile.read.expression;
 
 public enum ExpressionType {
-  AND, OR, SERIES, GLOBAL_TIME
+
+  /**
+   * Represent the relationship between the left expression and the right expression is AND
+   */
+  AND,
+
+  /**
+   * Represent the relationship between the left expression and the right expression is OR
+   */
+  OR,
+
+  /**
+   * Represents that the expression is a leaf node in the expression tree and the type is value
+   * filtering
+   */
+  SERIES,
+
+  /**
+   * Represents that the expression is a leaf node in the expression tree and the type is time
+   * filtering
+   */
+  GLOBAL_TIME,
+
+  /**
+   * This type is used in the pruning process of expression tree in the distributed reading process.
+   * When pruning a expression tree for a data group, leaf nodes belonging to other data groups will
+   * be set to that type.
+   */
+  TRUE
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IBinaryExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IBinaryExpression.java
index b97310a..ad9330a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IBinaryExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IBinaryExpression.java
@@ -27,4 +27,8 @@ public interface IBinaryExpression extends IExpression {
 
   IExpression getRight();
 
+  void setLeft(IExpression leftExpression);
+
+  void setRight(IExpression rightExpression);
+
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
index a05b842..a039311 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
@@ -18,7 +18,9 @@
  */
 package org.apache.iotdb.tsfile.read.expression;
 
-public interface IExpression {
+public interface IExpression{
 
   ExpressionType getType();
+
+  IExpression clone();
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
index 60e68e0..4a6d4eb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
@@ -18,11 +18,14 @@
  */
 package org.apache.iotdb.tsfile.read.expression.impl;
 
+import java.io.Serializable;
 import org.apache.iotdb.tsfile.read.expression.ExpressionType;
 import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 
-public abstract class BinaryExpression implements IBinaryExpression {
+public abstract class BinaryExpression implements IBinaryExpression, Serializable {
+
+  private static final long serialVersionUID = -711801318534904452L;
 
   public static AndExpression and(IExpression left, IExpression right) {
     return new AndExpression(left, right);
@@ -32,6 +35,9 @@ public abstract class BinaryExpression implements IBinaryExpression {
     return new OrExpression(left, right);
   }
 
+  @Override
+  public abstract IExpression clone();
+
   protected static class AndExpression extends BinaryExpression {
 
     public IExpression left;
@@ -53,11 +59,26 @@ public abstract class BinaryExpression implements IBinaryExpression {
     }
 
     @Override
+    public void setLeft(IExpression leftExpression) {
+      this.left = leftExpression;
+    }
+
+    @Override
+    public void setRight(IExpression rightExpression) {
+      this.right = rightExpression;
+    }
+
+    @Override
     public ExpressionType getType() {
       return ExpressionType.AND;
     }
 
     @Override
+    public IExpression clone() {
+      return new AndExpression(left.clone(), right.clone());
+    }
+
+    @Override
     public String toString() {
       return "[" + left + " && " + right + "]";
     }
@@ -84,11 +105,26 @@ public abstract class BinaryExpression implements IBinaryExpression {
     }
 
     @Override
+    public void setLeft(IExpression leftExpression) {
+      this.left = leftExpression;
+    }
+
+    @Override
+    public void setRight(IExpression rightExpression) {
+      this.right = rightExpression;
+    }
+
+    @Override
     public ExpressionType getType() {
       return ExpressionType.OR;
     }
 
     @Override
+    public IExpression clone() {
+      return new OrExpression(left.clone(), right.clone());
+    }
+
+    @Override
     public String toString() {
       return "[" + left + " || " + right + "]";
     }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
index d69a65a..685b948 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
@@ -18,12 +18,15 @@
  */
 package org.apache.iotdb.tsfile.read.expression.impl;
 
+import java.io.Serializable;
 import org.apache.iotdb.tsfile.read.expression.ExpressionType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.IUnaryExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-public class GlobalTimeExpression implements IUnaryExpression {
+public class GlobalTimeExpression implements IUnaryExpression, Serializable {
 
+  private static final long serialVersionUID = 1146132942359113670L;
   private Filter filter;
 
   public GlobalTimeExpression(Filter filter) {
@@ -46,6 +49,11 @@ public class GlobalTimeExpression implements IUnaryExpression {
   }
 
   @Override
+  public IExpression clone() {
+    return new GlobalTimeExpression(filter.clone());
+  }
+
+  @Override
   public String toString() {
     return "[" + this.filter.toString() + "]";
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
index b2c2b24..2c888e1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
@@ -18,13 +18,16 @@
  */
 package org.apache.iotdb.tsfile.read.expression.impl;
 
+import java.io.Serializable;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.ExpressionType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.IUnaryExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-public class SingleSeriesExpression implements IUnaryExpression {
+public class SingleSeriesExpression implements IUnaryExpression, Serializable {
 
+  private static final long serialVersionUID = 7131207370394865228L;
   private Path seriesPath;
   private Filter filter;
 
@@ -39,6 +42,11 @@ public class SingleSeriesExpression implements IUnaryExpression {
   }
 
   @Override
+  public IExpression clone() {
+    return new SingleSeriesExpression(seriesPath.clone(), filter.clone());
+  }
+
+  @Override
   public Filter getFilter() {
     return filter;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java
index aeba875..fc23983 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java
@@ -47,4 +47,7 @@ public abstract class BinaryFilter implements Filter, Serializable {
   public String toString() {
     return "( " + left + "," + right + " )";
   }
+
+  @Override
+  public abstract Filter clone();
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
index cabaf4f..03e2a84 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
@@ -61,4 +61,6 @@ public interface Filter {
    * @param endTime end time of a partition
    */
   boolean containStartEndTime(long startTime, long endTime);
+
+  Filter clone();
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/UnaryFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/UnaryFilter.java
index ff66ef5..e2d46d4 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/UnaryFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/UnaryFilter.java
@@ -49,4 +49,7 @@ public abstract class UnaryFilter<T extends Comparable<T>> implements Filter, Se
 
   @Override
   public abstract String toString();
+
+  @Override
+  public abstract Filter clone();
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/AndFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/AndFilter.java
index fdeed48..19ccc6e 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/AndFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/AndFilter.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
  */
 public class AndFilter extends BinaryFilter {
 
-  private static final long serialVersionUID = 6705254093824897938L;
+  private static final long serialVersionUID = -8212850098906044102L;
 
   public AndFilter(Filter left, Filter right) {
     super(left, right);
@@ -59,4 +59,9 @@ public class AndFilter extends BinaryFilter {
   public String toString() {
     return "(" + left + " && " + right + ")";
   }
+
+  @Override
+  public Filter clone() {
+    return new AndFilter(left.clone(), right.clone());
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Eq.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Eq.java
index 90ac31a..6c3b587 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Eq.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Eq.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.tsfile.read.filter.operator;
 
 import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
 
@@ -79,6 +80,11 @@ public class Eq<T extends Comparable<T>> extends UnaryFilter<T> {
   }
 
   @Override
+  public Filter clone() {
+    return new Eq(value, filterType);
+  }
+
+  @Override
   public String toString() {
     return getFilterType() + " == " + value;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Gt.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Gt.java
index 21fa6e4..0b12713 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Gt.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Gt.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.tsfile.read.filter.operator;
 
 import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
 
@@ -78,6 +79,11 @@ public class Gt<T extends Comparable<T>> extends UnaryFilter<T> {
   }
 
   @Override
+  public Filter clone() {
+    return new Gt(value, filterType);
+  }
+
+  @Override
   public String toString() {
     return getFilterType() + " > " + value;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/GtEq.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/GtEq.java
index 7c8ba53..5d1cb6d 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/GtEq.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/GtEq.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.tsfile.read.filter.operator;
 
 import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
 
@@ -78,6 +79,11 @@ public class GtEq<T extends Comparable<T>> extends UnaryFilter<T> {
   }
 
   @Override
+  public Filter clone() {
+    return new GtEq(value, filterType);
+  }
+
+  @Override
   public String toString() {
     return getFilterType() + " >= " + value;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Lt.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Lt.java
index eb0cdf7..3385241 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Lt.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/Lt.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.tsfile.read.filter.operator;
 
 import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
 
@@ -78,6 +79,11 @@ public class Lt<T extends Comparable<T>> extends UnaryFilter<T> {
   }
 
   @Override
+  public Filter clone() {
+    return new Lt(value, filterType);
+  }
+
+  @Override
   public String toString() {
     return getFilterType() + " < " + value;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/LtEq.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/LtEq.java
index 7c6800f..b1cef19 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/LtEq.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/LtEq.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.tsfile.read.filter.operator;
 
 import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
 
@@ -78,6 +79,11 @@ public class LtEq<T extends Comparable<T>> extends UnaryFilter<T> {
   }
 
   @Override
+  public Filter clone() {
+    return new LtEq(value, filterType);
+  }
+
+  @Override
   public String toString() {
     return getFilterType() + " <= " + value;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotEq.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotEq.java
index ad1c42f..0b92639 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotEq.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotEq.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.tsfile.read.filter.operator;
 
 import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
 
@@ -79,6 +80,11 @@ public class NotEq<T extends Comparable<T>> extends UnaryFilter<T> {
   }
 
   @Override
+  public Filter clone() {
+    return new NotEq(value, filterType);
+  }
+
+  @Override
   public String toString() {
     return getFilterType() + " != " + value;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotFilter.java
index 473b3b0..9ebbbeb 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotFilter.java
@@ -58,6 +58,11 @@ public class NotFilter implements Filter, Serializable {
     return !that.satisfyStartEndTime(startTime, endTime);
   }
 
+  @Override
+  public Filter clone() {
+    return new NotFilter(that.clone());
+  }
+
   public Filter getFilter() {
     return this.that;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/OrFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/OrFilter.java
index 856cc45..dbe538d 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/OrFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/OrFilter.java
@@ -40,6 +40,11 @@ public class OrFilter extends BinaryFilter implements Serializable {
   }
 
   @Override
+  public Filter clone() {
+    return new OrFilter(left.clone(), right.clone());
+  }
+
+  @Override
   public boolean satisfy(DigestForFilter digest) {
     return left.satisfy(digest) || right.satisfy(digest);
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
index 557cedc..8659c40 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/DataSetWithTimeGenerator.java
@@ -84,36 +84,4 @@ public class DataSetWithTimeGenerator extends QueryDataSet {
 
     return rowRecord;
   }
-
-  private Field getField(Object value, TSDataType dataType) {
-    Field field = new Field(dataType);
-
-    if (value == null) {
-      field.setNull();
-      return field;
-    }
-    switch (dataType) {
-      case DOUBLE:
-        field.setDoubleV((double) value);
-        break;
-      case FLOAT:
-        field.setFloatV((float) value);
-        break;
-      case INT64:
-        field.setLongV((long) value);
-        break;
-      case INT32:
-        field.setIntV((int) value);
-        break;
-      case BOOLEAN:
-        field.setBoolV((boolean) value);
-        break;
-      case TEXT:
-        field.setBinaryV((Binary) value);
-        break;
-      default:
-        throw new UnSupportedDataTypeException("UnSupported" + String.valueOf(dataType));
-    }
-    return field;
-  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index c46da70..7f59031 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -20,9 +20,12 @@ package org.apache.iotdb.tsfile.read.query.dataset;
 
 import java.io.IOException;
 import java.util.List;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.utils.Binary;
 
 public abstract class QueryDataSet {
 
@@ -59,4 +62,37 @@ public abstract class QueryDataSet {
   public void setDataTypes(List<TSDataType> dataTypes) {
     this.dataTypes = dataTypes;
   }
+
+  protected Field getField(Object value, TSDataType dataType) {
+    Field field = new Field(dataType);
+
+    if (value == null) {
+      field.setNull();
+      return field;
+    }
+
+    switch (dataType) {
+      case DOUBLE:
+        field.setDoubleV((double) value);
+        break;
+      case FLOAT:
+        field.setFloatV((float) value);
+        break;
+      case INT64:
+        field.setLongV((long) value);
+        break;
+      case INT32:
+        field.setIntV((int) value);
+        break;
+      case BOOLEAN:
+        field.setBoolV((boolean) value);
+        break;
+      case TEXT:
+        field.setBinaryV((Binary) value);
+        break;
+      default:
+        throw new UnSupportedDataTypeException("UnSupported: " + dataType);
+    }
+    return field;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Pair.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Pair.java
index ef88272..60995db 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Pair.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Pair.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.tsfile.utils;
 
+import java.io.Serializable;
+
 /**
  * Pair is a template class to represent a couple of values. It also override the Object basic
  * methods like hasnCode, equals and toString.
@@ -26,8 +28,9 @@ package org.apache.iotdb.tsfile.utils;
  * @param <R> R type
  * @author kangrong
  */
-public class Pair<L, R> {
+public class Pair<L, R> implements Serializable {
 
+  private static final long serialVersionUID = -1398609631703707002L;
   public L left;
   public R right;