You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/01/16 09:34:38 UTC

[iotdb] branch research/outlier updated: Add Distance-based Outlier Detection Function (#8885)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch research/outlier
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/research/outlier by this push:
     new eb39238007 Add Distance-based Outlier Detection Function (#8885)
eb39238007 is described below

commit eb392380070318e2db19cafaee619d50fd0e5cd6
Author: iotdb-lsmod <12...@users.noreply.github.com>
AuthorDate: Mon Jan 16 17:34:29 2023 +0800

    Add Distance-based Outlier Detection Function (#8885)
---
 .../resources/conf/iotdb-engine.properties         |   20 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   40 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   59 +
 .../apache/iotdb/db/cq/ContinuousQueryTask.java    |    2 +-
 .../trigger/sink/local/LocalIoTDBHandler.java      |    4 +-
 .../iotdb/db/metadata/path/MeasurementPath.java    |   17 +-
 .../apache/iotdb/db/metadata/path/PartialPath.java |    9 +
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |    4 +-
 .../qp/logical/crud/AggregationQueryOperator.java  |    4 +-
 .../iotdb/db/qp/logical/crud/QueryOperator.java    |    2 +-
 .../iotdb/db/qp/logical/crud/SelectComponent.java  |   14 +
 .../iotdb/db/qp/physical/crud/AggregationPlan.java |    9 +
 .../db/query/aggregation/AggregateResult.java      |   13 +
 .../db/query/aggregation/AggregationType.java      |    8 +-
 .../db/query/aggregation/impl/DoddsAggrResult.java |  233 +++
 .../db/query/executor/AggregationExecutor.java     |  839 +++++++-
 .../db/query/factory/AggregateResultFactory.java   |    6 +
 .../db/query/filter/executor/IPlanExecutor.java    |  134 ++
 .../db/query/filter/executor/PlanExecutor.java     | 2176 ++++++++++++++++++++
 .../db/service/basic/BasicServiceProvider.java     |    4 +-
 .../iotdb/db/engine/storagegroup/TTLTest.java      |    2 +-
 .../idtable/IDTableResourceControlTest.java        |    2 +-
 .../db/metadata/idtable/IDTableRestartTest.java    |    2 +-
 .../db/metadata/idtable/InsertWithIDTableTest.java |    2 +-
 .../db/metadata/idtable/LastQueryWithIDTable.java  |    2 +-
 .../QueryAlignedTimeseriesWithIDTableTest.java     |    2 +-
 .../db/metadata/idtable/QueryWithIDTableTest.java  |    2 +-
 .../iotdb/db/qp/physical/InsertRowPlanTest.java    |    2 +-
 .../db/qp/physical/InsertTabletMultiPlanTest.java  |    2 +-
 .../iotdb/db/qp/physical/InsertTabletPlanTest.java |    2 +-
 .../db/query/aggregation/AggregateResultTest.java  |   34 +
 .../query/aggregation/DescAggregateResultTest.java |  195 +-
 .../dataset/EngineDataSetWithValueFilterTest.java  |    4 +-
 .../iotdb/db/query/dataset/ListDataSetTest.java    |    4 +-
 .../iotdb/db/query/dataset/SingleDataSetTest.java  |    4 +-
 .../query/dataset/UDTFAlignByTimeDataSetTest.java  |    4 +-
 .../dataset/groupby/GroupByFillDataSetTest.java    |    4 +-
 .../dataset/groupby/GroupByLevelDataSetTest.java   |    4 +-
 .../dataset/groupby/GroupByTimeDataSetTest.java    |    4 +-
 .../valuefilter/RawQueryWithValueFilterTest.java   |    4 +-
 .../iotdb/db/utils/TsFileRewriteToolTest.java      |    4 +-
 41 files changed, 3651 insertions(+), 231 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 13becfd599..0b1026410d 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -236,7 +236,7 @@ timestamp_precision=ms
 
 # Whether to timed flush unsequence tsfiles' memtables.
 # Datatype: boolean
-# enable_timed_flush_unseq_memtable=true
+enable_timed_flush_unseq_memtable=false
 
 # If a memTable's created time is older than current time minus this, the memtable will be flushed to disk.
 # Only check unsequence tsfiles' memtables.
@@ -418,15 +418,15 @@ timestamp_precision=ms
 ####################
 # sequence space compaction: only compact the sequence files
 # Datatype: boolean
-# enable_seq_space_compaction=true
+enable_seq_space_compaction=false
 
 # unsequence space compaction: only compact the unsequence files
 # Datatype: boolean
-# enable_unseq_space_compaction=true
+enable_unseq_space_compaction=false
 
 # cross space compaction: compact the unsequence files into the overlapped sequence files
 # Datatype: boolean
-# enable_cross_space_compaction=true
+enable_cross_space_compaction=false
 
 # the strategy of inner space compaction task
 # Options: inplace_compaction
@@ -721,9 +721,12 @@ timestamp_precision=ms
 # Datatype: int [xsy]
 # page_size_in_byte=65536
 
+# xMax = 10000000
+# xMin = -1000000
+
 # The maximum number of data points in a page, default 1024*1024
 # Datatype: int [xsy]
-# max_number_of_points_in_page=1048576
+max_number_of_points_in_page=60
 
 # Max size limitation of input string
 # Datatype: int [xsy]
@@ -953,4 +956,9 @@ timestamp_precision=ms
 ### Group By Fill Configuration
 ####################
 # Datatype: float
-# group_by_fill_cache_size_in_mb=1.0
\ No newline at end of file
+# group_by_fill_cache_size_in_mb=1.0
+
+# Datatype: double
+bucket_width = 2
+window_size = 60000
+file_num = 2
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4842bb27e6..b674a277a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -283,11 +283,11 @@ public class IoTDBConfig {
   /**
    * If we enable the memory-control mechanism during index building , {@code indexBufferSize}
    * refers to the byte-size of memory buffer threshold. For each index processor, all indexes in
-   * one {@linkplain org.apache.iotdb.db.index.IndexFileProcessor IndexFileProcessor} share a total
-   * common buffer size. With the memory-control mechanism, the occupied memory of all raw data and
-   * index structures will be counted. If the memory buffer size reaches this threshold, the indexes
-   * will be flushed to the disk file. As a result, data in one series may be divided into more than
-   * one part and indexed separately. Unit: byte
+   * one {@linkplain org.apache.iotdb.db.index IndexFileProcessor} share a total common buffer size.
+   * With the memory-control mechanism, the occupied memory of all raw data and index structures
+   * will be counted. If the memory buffer size reaches this threshold, the indexes will be flushed
+   * to the disk file. As a result, data in one series may be divided into more than one part and
+   * indexed separately. Unit: byte
    */
   private long indexBufferSize = 128 * 1024 * 1024L;
 
@@ -802,6 +802,36 @@ public class IoTDBConfig {
    */
   private boolean enableIDTableLogFile = false;
 
+  public double getGamma() {
+    return gamma;
+  }
+
+  public void setGamma(double gamma) {
+    this.gamma = gamma;
+  }
+
+  public int getFileNum() {
+    return fileNum;
+  }
+
+  public void setFileNum(int fileNum) {
+    this.fileNum = fileNum;
+  }
+
+  public int getDelta() {
+    return delta;
+  }
+
+  public void setDelta(int delta) {
+    this.delta = delta;
+  }
+
+  private double gamma = 2;
+
+  private int fileNum = 1;
+
+  private int delta = 60000;
+
   public IoTDBConfig() {
     // empty constructor
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 849fddbdb6..f8311a7e72 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -829,6 +829,16 @@ public class IoTDBDescriptor {
               properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
       TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
 
+      conf.setGamma(
+          Double.parseDouble(
+              properties.getProperty("bucket_width", Double.toString(conf.getGamma()))));
+      conf.setFileNum(
+          Integer.parseInt(
+              properties.getProperty("file_num", Integer.toString(conf.getFileNum()))));
+      conf.setDelta(
+          Integer.parseInt(
+              properties.getProperty("window_size", Integer.toString(conf.getDelta()))));
+
       // timed flush memtable, timed close tsfile
       loadTimedService(properties);
 
@@ -974,6 +984,55 @@ public class IoTDBDescriptor {
                     "group_size_in_byte",
                     Integer.toString(
                         TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setXMax(
+            Double.parseDouble(
+                properties.getProperty(
+                    "xMax",
+                    Double.toString(TSFileDescriptor.getInstance().getConfig().getXMax()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setXMin(
+            Double.parseDouble(
+                properties.getProperty(
+                    "xMin",
+                    Double.toString(TSFileDescriptor.getInstance().getConfig().getXMin()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setsMax(
+            Double.parseDouble(
+                properties.getProperty(
+                    "sMax",
+                    Double.toString(TSFileDescriptor.getInstance().getConfig().getsMax()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setSmin(
+            Double.parseDouble(
+                properties.getProperty(
+                    "sMin",
+                    Double.toString(TSFileDescriptor.getInstance().getConfig().getSmin()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setMussRate(
+            Double.parseDouble(
+                properties.getProperty(
+                    "mussRate",
+                    Double.toString(TSFileDescriptor.getInstance().getConfig().getMussRate()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setUsePreSpeed(
+            Boolean.parseBoolean(
+                properties.getProperty(
+                    "usePreSpeed",
+                    Boolean.toString(TSFileDescriptor.getInstance().getConfig().isUsePreSpeed()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setUsePreRange(
+            Boolean.parseBoolean(
+                properties.getProperty(
+                    "usePreRange",
+                    Boolean.toString(TSFileDescriptor.getInstance().getConfig().isUsePreRange()))));
     TSFileDescriptor.getInstance()
         .getConfig()
         .setPageSizeInByte(
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
index 154c284fe8..6216487e5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.logical.crud.GroupByClauseComponent;
 import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
 import org.apache.iotdb.db.qp.logical.crud.SelectComponent;
@@ -34,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java
index 3b224b20e5..b4f40e1f4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java
@@ -27,10 +27,10 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index b481e478a5..756ce3ac5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.query.reader.series.SeriesReader;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -55,10 +56,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 public class MeasurementPath extends PartialPath {
 
@@ -93,6 +91,17 @@ public class MeasurementPath extends PartialPath {
     this.measurementSchema = measurementSchema;
   }
 
+  public MeasurementPath concat(String[] otherNodes) {
+    int len = nodes.length;
+    String[] newNodes = Arrays.copyOf(nodes, nodes.length + otherNodes.length);
+    System.arraycopy(otherNodes, 0, newNodes, len, otherNodes.length);
+    MeasurementPath measurementPath = new MeasurementPath();
+    measurementPath.nodes = newNodes;
+    measurementPath.fullPath = this.fullPath;
+    measurementPath.fullPath = String.join(TsFileConstant.PATH_SEPARATOR, nodes);
+    return measurementPath;
+  }
+
   public IMeasurementSchema getMeasurementSchema() {
     return measurementSchema;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
index 6dbee2febd..9e50cf1fde 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
@@ -125,6 +125,15 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
     fullPath = String.join(TsFileConstant.PATH_SEPARATOR, nodes);
   }
 
+  public PartialPath concat(String[] otherNodes) {
+    int len = nodes.length;
+    String[] newNodes = Arrays.copyOf(nodes, nodes.length + otherNodes.length);
+    System.arraycopy(otherNodes, 0, newNodes, len, otherNodes.length);
+    PartialPath partialPath = new PartialPath(newNodes);
+    partialPath.fullPath = String.join(TsFileConstant.PATH_SEPARATOR, nodes);
+    return partialPath;
+  }
+
   public PartialPath concatNode(String node) {
     String[] newPathNodes = Arrays.copyOf(nodes, nodes.length + 1);
     newPathNodes[newPathNodes.length - 1] = node;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index ce111ab37a..a4ab046546 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -70,6 +70,7 @@ public class SQLConstant {
   public static final String COUNT = "count";
   public static final String AVG = "avg";
   public static final String SUM = "sum";
+  public static final String DODDS = "dodds";
 
   public static final String ALL = "all";
 
@@ -85,7 +86,8 @@ public class SQLConstant {
               LAST_VALUE,
               COUNT,
               SUM,
-              AVG));
+              AVG,
+              DODDS));
 
   public static final int TOK_WHERE = 23;
   public static final int TOK_INSERT = 24;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
index 36719c71ec..9af0487ccb 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
@@ -125,7 +125,9 @@ public class AggregationQueryOperator extends QueryOperator {
 
   protected AggregationPlan initAggregationPlan(QueryPlan queryPlan) throws QueryProcessException {
     AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
-    aggregationPlan.setAggregations(selectComponent.getAggregationFunctions());
+    aggregationPlan.setAggregations(selectComponent.getAggregationFunctions()); // function name
+    aggregationPlan.setParameters(selectComponent.getAggregationAttributes());
+    // TODO: aggregationPlan.setParams(selectComponent.getParams)
     if (isGroupByLevel()) {
       initGroupByLevel(aggregationPlan);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index 4653ed6794..5f41e24902 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -60,7 +60,7 @@ public class QueryOperator extends Operator {
   protected WhereComponent whereComponent;
   protected SpecialClauseComponent specialClauseComponent;
 
-  protected Map<String, Object> props;
+  protected Map<String, Object> props; // parameters
   protected IndexType indexType;
 
   protected boolean enableTracing;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
index f06d66158c..0c3b64d505 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /** this class maintains information from select clause. */
 public final class SelectComponent {
@@ -128,8 +129,21 @@ public final class SelectComponent {
             expression instanceof FunctionExpression
                 ? ((FunctionExpression) resultColumn.getExpression()).getFunctionName()
                 : null);
+        // TODO: resultColumn.getExpression().getFunctionAttributes()(
       }
     }
     return aggregationFunctionsCache;
   }
+
+  public List<Map<String, String>> getAggregationAttributes() {
+    List<Map<String, String>> aggregationAttributesCache = new ArrayList<>();
+    for (ResultColumn resultColumn : resultColumns) {
+      Expression expression = resultColumn.getExpression();
+      aggregationAttributesCache.add(
+          expression instanceof FunctionExpression
+              ? ((FunctionExpression) resultColumn.getExpression()).getFunctionAttributes()
+              : null);
+    }
+    return aggregationAttributesCache;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index c81e12e5f8..fec29da6cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -46,6 +46,7 @@ public class AggregationPlan extends RawDataQueryPlan {
 
   private List<String> aggregations = new ArrayList<>();
   private List<String> deduplicatedAggregations = new ArrayList<>();
+  private List<Map<String, String>> parameters = new ArrayList<>();
 
   private int[] levels;
   private GroupByLevelController groupByLevelController;
@@ -106,6 +107,14 @@ public class AggregationPlan extends RawDataQueryPlan {
     this.aggregations = aggregations;
   }
 
+  public List<Map<String, String>> getParameters() {
+    return parameters;
+  }
+
+  public void setParameters(List<Map<String, String>> parameters) {
+    this.parameters = parameters;
+  }
+
   public List<String> getDeduplicatedAggregations() {
     return deduplicatedAggregations;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 7daa1e8d60..4f8eba913b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 public abstract class AggregateResult {
 
@@ -46,6 +47,7 @@ public abstract class AggregateResult {
   private float floatValue;
   private double doubleValue;
   private Binary binaryValue;
+  private List<Integer> intArrayValue;
 
   protected boolean hasCandidateResult;
 
@@ -67,6 +69,7 @@ public abstract class AggregateResult {
    *
    * @param statistics chunkStatistics or pageStatistics
    */
+  // update data by headers
   public abstract void updateResultFromStatistics(Statistics statistics)
       throws QueryProcessException;
 
@@ -76,6 +79,7 @@ public abstract class AggregateResult {
    *
    * @param batchIterator the data in Page
    */
+  // update by original data
   public abstract void updateResultFromPageData(IBatchDataIterator batchIterator)
       throws IOException, QueryProcessException;
 
@@ -266,6 +270,15 @@ public abstract class AggregateResult {
     this.intValue = intValue;
   }
 
+  protected List<Integer> getIntArrayValue() {
+    return intArrayValue;
+  }
+
+  public void setIntArrayValue(List<Integer> intArrayValue) {
+    this.hasCandidateResult = true;
+    this.intArrayValue = intArrayValue;
+  }
+
   protected long getLongValue() {
     return longValue;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
index a3c651835c..44e00e3024 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
@@ -35,7 +35,8 @@ public enum AggregationType {
   MIN_TIME,
   MAX_VALUE,
   MIN_VALUE,
-  EXTREME;
+  EXTREME,
+  DODDS;
 
   /**
    * give an integer to return a data type.
@@ -65,6 +66,8 @@ public enum AggregationType {
         return MIN_VALUE;
       case 9:
         return EXTREME;
+      case 10:
+        return DODDS;
       default:
         throw new IllegalArgumentException("Invalid Aggregation Type: " + i);
     }
@@ -103,6 +106,9 @@ public enum AggregationType {
       case EXTREME:
         i = 9;
         break;
+      case DODDS:
+        i = 10;
+        break;
       default:
         throw new IllegalArgumentException("Invalid Aggregation Type: " + this.name());
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/DoddsAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/DoddsAggrResult.java
new file mode 100644
index 0000000000..045456c97e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/DoddsAggrResult.java
@@ -0,0 +1,233 @@
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
+import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class DoddsAggrResult extends AggregateResult {
+  private TSDataType seriesDataType;
+  private int count;
+  private List<Long> outliers = new ArrayList<>(); // store the timestamps of outliers
+  private int k, w, s;
+  private double r;
+  private int delta;
+  private double gamma;
+  private int fileNum;
+  private boolean ifWithUpperBound;
+  private Statistics statisticsInstance = new DoubleStatistics();
+
+  // TODO: 分桶信息存到中间变量buckets中,结果信息存到结果变量outliers中
+
+  public DoddsAggrResult(TSDataType seriesDataType) {
+    super(TSDataType.VECTOR, AggregationType.DODDS);
+    this.seriesDataType = seriesDataType;
+    reset();
+  }
+
+  @Override
+  public List<Integer> getResult() {
+    // TODO: revise statistics to return the detection result
+
+    return hasCandidateResult() ? getIntArrayValue() : null;
+  }
+
+  @Override
+  public void updateResultFromStatistics(Statistics statistics) {
+    if (statistics.getCount() == 0) {
+      return;
+    }
+    if (statistics instanceof DoubleStatistics) {
+      count += statistics.getCount();
+    } else {
+      throw new StatisticsClassException("Does not support: DODDS");
+    }
+    //        setIntArrayValue(statisticsInstance.getBuckets());
+  }
+
+  @Override
+  public void updateResultFromPageData(IBatchDataIterator batchIterator)
+      throws IOException, QueryProcessException {
+    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+  }
+
+  @Override
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+    while (batchIterator.hasNext(minBound, maxBound)) {
+      if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
+        break;
+      }
+      statisticsInstance.update(batchIterator.currentTime(), (double) batchIterator.currentValue());
+      batchIterator.next();
+    }
+  }
+
+  @Override
+  public void updateResultUsingTimestamps(
+      long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
+        statisticsInstance.update(timestamps[i], (double) values[i]);
+      }
+    }
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+    int i = 0;
+    while (valueIterator.hasNext() && i < length) {
+      statisticsInstance.update(timestamps[i], (double) valueIterator.next());
+      i++;
+    }
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    // TODO:修改判断
+    return false;
+  }
+
+  @Override
+  public void merge(AggregateResult another) {
+    DoddsAggrResult anotherVar = (DoddsAggrResult) another;
+    if (anotherVar.getStatisticsInstance().getCount() == 0) {
+      return;
+    }
+    //        if (another.getResult() instanceof List<?>){
+    //            int i = 0;
+    //            for (Object o : (List<?>)another.getResult()){
+    //                mergedBuckets.set(i, mergedBuckets.get(i) + Integer.class.cast(o));
+    //                i += 1;
+    //            }
+    //        }
+  }
+
+  @Override
+  protected void deserializeSpecificFields(ByteBuffer buffer) {
+    this.seriesDataType = TSDataType.deserialize(buffer.get());
+    //        for (int i=0; i<Statistics.bucketCnt; i++){
+    //            this.mergedBuckets.set(i, buffer.getInt());
+    //        }
+  }
+
+  @Override
+  protected void serializeSpecificFields(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(seriesDataType, outputStream);
+    //        for (int i=0; i<Statistics.bucketCnt; i++){
+    //            ReadWriteIOUtils.write(this.mergedBuckets.get(i), outputStream);
+    //        }
+  }
+
+  @Override
+  public void reset() {
+    super.reset();
+    //        for (int i=0; i<Statistics.bucketCnt; i++){
+    //            mergedBuckets.set(i, 0);
+    //        }
+    statisticsInstance = new DoubleStatistics();
+  }
+
+  public Statistics getStatisticsInstance() {
+    return statisticsInstance;
+  }
+
+  public void setStatisticsInstance(Statistics statisticsInstance) {
+    this.statisticsInstance = statisticsInstance;
+  }
+
+  public void setParameters(Map<String, String> parameters) {
+    if (parameters.containsKey("k")
+        && parameters.containsKey("r")
+        && parameters.containsKey("w")
+        && parameters.containsKey("s")) {
+      this.k = Integer.parseInt(parameters.get("k"));
+      this.r = Double.parseDouble(parameters.get("r"));
+      this.w = Integer.parseInt(parameters.get("w"));
+      this.s = Integer.parseInt(parameters.get("s"));
+
+      System.out.println("Parameters are successfully set.");
+    } else System.out.println("Parameters are not obtained in AggrResult.");
+
+    if (parameters.containsKey("d")) this.delta = Integer.parseInt(parameters.get("d")) * 1000;
+    else this.delta = 60000;
+    if (parameters.containsKey("g")) this.gamma = Double.parseDouble(parameters.get("g"));
+    else this.gamma = 0.1;
+    if (parameters.containsKey("f")) this.fileNum = Integer.parseInt(parameters.get("f"));
+    else this.fileNum = 1;
+    if (parameters.containsKey("b"))
+      this.ifWithUpperBound = Boolean.parseBoolean(parameters.get("b"));
+    else this.ifWithUpperBound = true;
+  }
+
+  public double getR() {
+    return r;
+  }
+
+  public int getK() {
+    return k;
+  }
+
+  public int getW() {
+    return w;
+  }
+
+  public int getS() {
+    return s;
+  }
+
+  public int getDelta() {
+    return delta;
+  }
+
+  public double getGamma() {
+    return gamma;
+  }
+
+  public int getFileNum() {
+    return fileNum;
+  }
+
+  public boolean getIfWithUpperBound() {
+    return ifWithUpperBound;
+  }
+
+  public void detectOutliers() {
+    //        int j = (int) Math.ceil(r / Statistics.gamma);
+    //        int l = (int) Math.floor(r / Statistics.gamma);
+    //        int beta = Statistics.bucketCnt;
+    //        int lowBound, highBound;
+    //        for (int u=0; u < beta; u++){
+    //            lowBound = 0;
+    //            highBound = 0;
+    //            for (int i = Math.max(0, u-l+1); i < Math.min(beta, u+l-1); i++) {
+    //                lowBound += mergedBuckets.get(i);
+    //            }
+    //            if (lowBound >= k) continue;
+    //
+    //            for (int i = Math.max(0, u-j); i < Math.min(beta, u+j); i++){
+    //                highBound += mergedBuckets.get(i);
+    //            }
+    //            if (j - r/Statistics.gamma >= 0.5){
+    //                highBound = Math.max(highBound - mergedBuckets.get(u-j), highBound -
+    // mergedBuckets.get(u+j));
+    //            }
+    //
+    //            if (highBound < k) {continue;} //TODO: update outlier set
+    //        }
+
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index fc33f2639a..149346d63c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
@@ -34,6 +35,8 @@ import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.aggregation.impl.DoddsAggrResult;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.SingleDataSet;
@@ -47,6 +50,8 @@ import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.db.utils.ValueIterator;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -59,14 +64,8 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
 
 import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
 
@@ -76,10 +75,12 @@ public class AggregationExecutor {
   private List<PartialPath> selectedSeries;
   protected List<TSDataType> dataTypes;
   protected List<String> aggregations;
+  protected List<Map<String, String>> parameters;
   protected IExpression expression;
   protected boolean ascending;
+  private final TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
   protected QueryContext context;
-  protected AggregateResult[] aggregateResultList;
+  protected AggregateResult[] aggregssateResultList;
 
   /** aggregation batch calculation size. */
   private int aggregateFetchSize;
@@ -91,6 +92,7 @@ public class AggregationExecutor {
         .forEach(k -> selectedSeries.add(((MeasurementPath) k).transformToExactPath()));
     this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
     this.aggregations = aggregationPlan.getDeduplicatedAggregations();
+    this.parameters = aggregationPlan.getParameters();
     this.expression = aggregationPlan.getExpression();
     this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
     this.ascending = aggregationPlan.isAscending();
@@ -101,7 +103,6 @@ public class AggregationExecutor {
   /** execute aggregate function with only time filter or no filter. */
   public QueryDataSet executeWithoutValueFilter(AggregationPlan aggregationPlan)
       throws StorageEngineException, IOException, QueryProcessException {
-
     Filter timeFilter = null;
     if (expression != null) {
       timeFilter = ((GlobalTimeExpression) expression).getFilter();
@@ -155,13 +156,24 @@ public class AggregationExecutor {
       throws IOException, QueryProcessException, StorageEngineException {
     List<AggregateResult> ascAggregateResultList = new ArrayList<>();
     List<AggregateResult> descAggregateResultList = new ArrayList<>();
+
+    DoddsAggrResult doddsAggrResult = null;
     boolean[] isAsc = new boolean[aggregateResultList.length];
+    boolean[] isValidity = new boolean[aggregateResultList.length];
+    boolean[] isDodds = new boolean[aggregateResultList.length];
 
     TSDataType tsDataType = dataTypes.get(indexes.get(0));
     for (int i : indexes) {
       // construct AggregateResult
       AggregateResult aggregateResult =
           AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType);
+
+      if (aggregateResult.getAggregationType() == AggregationType.DODDS) {
+        doddsAggrResult = (DoddsAggrResult) aggregateResult;
+        doddsAggrResult.setParameters(parameters.get(i));
+        isDodds[i] = true;
+        continue;
+      }
       if (aggregateResult.isAscending()) {
         ascAggregateResultList.add(aggregateResult);
         isAsc[i] = true;
@@ -169,6 +181,7 @@ public class AggregationExecutor {
         descAggregateResultList.add(aggregateResult);
       }
     }
+
     aggregateOneSeries(
         seriesPath,
         allMeasurementsInDevice,
@@ -178,15 +191,813 @@ public class AggregationExecutor {
         ascAggregateResultList,
         descAggregateResultList,
         null);
-
+    if (doddsAggrResult != null) {
+      aggregateDodds(
+          seriesPath,
+          allMeasurementsInDevice,
+          context,
+          timeFilter,
+          tsDataType,
+          doddsAggrResult,
+          null);
+    }
     int ascIndex = 0;
     int descIndex = 0;
     for (int i : indexes) {
-      aggregateResultList[i] =
-          isAsc[i]
-              ? ascAggregateResultList.get(ascIndex++)
-              : descAggregateResultList.get(descIndex++);
+      if (isDodds[i]) {
+        if (doddsAggrResult != null) {
+          aggregateResultList[i] = doddsAggrResult;
+        }
+      } else {
+        aggregateResultList[i] =
+            (isAsc[i]
+                ? ascAggregateResultList.get(ascIndex++)
+                : descAggregateResultList.get(descIndex++));
+      }
+    }
+  }
+
+  private void aggregateDodds(
+      PartialPath seriesPath,
+      Set<String> measurements,
+      QueryContext context,
+      Filter timeFilter,
+      TSDataType tsDataType,
+      DoddsAggrResult doddsAggrResult,
+      TsFileFilter fileFilter)
+      throws QueryProcessException, StorageEngineException, IOException {
+    //        // construct series reader without value filter
+    QueryDataSource queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(seriesPath, context, timeFilter);
+    if (fileFilter != null) {
+      QueryUtils.filterQueryDataSource(queryDataSource, fileFilter);
+    }
+
+    double r = doddsAggrResult.getR();
+    int k = doddsAggrResult.getK();
+    int w = doddsAggrResult.getW();
+    int s = doddsAggrResult.getS();
+
+    System.out.println("r=" + r + ", k=" + k + ", w=" + w + ", s=" + s);
+
+//    long delta = doddsAggrResult.getDelta();
+//    double gamma = doddsAggrResult.getGamma();
+//    int fileNum = doddsAggrResult.getFileNum();
+    int delta = IoTDBDescriptor.getInstance().getConfig().getDelta();
+    double gamma = IoTDBDescriptor.getInstance().getConfig().getGamma();
+    int fileNum = IoTDBDescriptor.getInstance().getConfig().getFileNum();
+    boolean ifWithUpperBound = doddsAggrResult.getIfWithUpperBound();
+
+    System.out.println(
+        "delta="
+            + delta
+            + ", gamma="
+            + gamma
+            + ", fileNum="
+            + fileNum
+            + ", ifWithUpperBound"
+            + ifWithUpperBound);
+
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+    IAggregateReader seriesReader =
+        new SeriesAggregateReader(
+            seriesPath,
+            measurements,
+            tsDataType,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            null,
+            true);
+    long startTime = Long.MAX_VALUE, endTime = Long.MIN_VALUE;
+    double minValue = Double.MAX_VALUE, maxValue = Double.MIN_VALUE;
+    // TODO: 构造原始数据的reader,计算min/maxValue
+    while (seriesReader.hasNextFile()) {
+      Statistics fileStatistic = seriesReader.currentFileStatistics();
+      startTime = Math.min(fileStatistic.getStartTime(), startTime);
+      endTime = Math.max(fileStatistic.getEndTime(), endTime);
+      minValue = Math.min((Double) fileStatistic.getMinValue(), minValue);
+      maxValue = Math.max((Double) fileStatistic.getMaxValue(), maxValue);
+      seriesReader.skipCurrentFile();
+    }
+    startTime = startTime / delta * delta;
+    endTime = (endTime / delta + 1) * delta;
+    minValue = Math.floor(minValue / gamma) * gamma;
+    maxValue = Math.floor(maxValue / gamma + 1) * gamma;
+
+    int bucketsCnt = (int) ((maxValue - minValue) / gamma);
+    int segsCnt = (int) ((endTime - startTime) / delta);
+    System.out.println("bucketsCnt:" + bucketsCnt);
+    System.out.println("segsCnt:" + segsCnt);
+    List<SeriesAggregateReader> seriesReaderList = new ArrayList<>();
+
+    for (int f = 0; f < fileNum; f++) {
+      // construct readers for each series (bucket)
+      for (int bucketIndex = 0; bucketIndex < bucketsCnt; bucketIndex++) {
+        String newPath = seriesPath.getFullPath() + "b.b" + bucketIndex;
+        if (f > 0) {
+          String oldDatasetName = newPath.split("\\.")[1];
+          String newDatasetName = oldDatasetName + Integer.toString(f);
+          newPath = newPath.replace(oldDatasetName, newDatasetName);
+          try {
+            MeasurementPath measurementPath = new MeasurementPath(newPath);
+            QueryResourceManager queryResourceManager = QueryResourceManager.getInstance();
+            QueryContext newContext = new QueryContext(queryResourceManager.assignQueryId(true));
+            QueryDataSource newQueryDataSource =
+                queryResourceManager.getQueryDataSource(measurementPath, newContext, timeFilter);
+            SeriesAggregateReader curSeriesReader =
+                new SeriesAggregateReader(
+                    measurementPath,
+                    Collections.singleton("b" + bucketIndex),
+                    tsDataType,
+                    newContext,
+                    newQueryDataSource,
+                    timeFilter,
+                    null,
+                    null,
+                    true);
+            seriesReaderList.add(curSeriesReader);
+          } catch (IllegalPathException e) {
+            System.out.println(newPath + " failed.");
+          }
+        } else {
+          try {
+            MeasurementPath measurementPath = new MeasurementPath(newPath);
+            SeriesAggregateReader curSeriesReader =
+                new SeriesAggregateReader(
+                    measurementPath,
+                    Collections.singleton("b" + bucketIndex),
+                    tsDataType,
+                    context,
+                    queryDataSource,
+                    timeFilter,
+                    null,
+                    null,
+                    true);
+            seriesReaderList.add(curSeriesReader);
+          } catch (IllegalPathException e) {
+            System.out.println(newPath + " failed.");
+          }
+        }
+      }
+    }
+
+    //    if (fileNum == 1) {
+    //      List<Pair<Long, Double>> outlierList = new ArrayList<>();
+    //
+    //      int[] mergedBuckets = new int[bucketsCnt];
+    //      Queue<List<Integer>> curWindowBuckets = new LinkedList<>();
+    //      int lambda = (int) Math.ceil(r / gamma), ell = (int) Math.floor(r / gamma);
+    //      int upper, lower, tightUpper;
+    //
+    //      int segIndex = 0;
+    //      int[] readFlags = new int[bucketsCnt];
+    //      SeriesAggregateReader[] seriesReaders = new SeriesAggregateReader[bucketsCnt];
+    //      Long[] startTimes = new Long[bucketsCnt];
+    //      while (segIndex + w < segsCnt) {
+    //        // update buckets, mergedBuckets, curWindowBuckets
+    //        List<Integer> buckets = new ArrayList<>();
+    //        for (int bucketIndex = 0; bucketIndex < bucketsCnt; bucketIndex++) {
+    //
+    //          SeriesAggregateReader curSeriesReader = seriesReaderList.get(bucketIndex);
+    //          int readFlag = readFlags[bucketIndex];
+    //          boolean finish = false;
+    //          while (!finish) {
+    //            switch (readFlag) {
+    //              case 0:
+    //                if (curSeriesReader.hasNextFile()) {
+    //                  readFlag = 1;
+    //                } else {
+    //                  readFlag = -1;
+    //                }
+    //                break;
+    //              case 1:
+    //                if (curSeriesReader.hasNextChunk()) {
+    //                  readFlag = 2;
+    //                } else {
+    //                  readFlag = 0;
+    //                }
+    //                break;
+    //              case 2:
+    //                if (curSeriesReader.hasNextPage()) {
+    //                  long curStartTime = curSeriesReader.currentPageStatistics().getStartTime();
+    //                  if (curStartTime >= segIndex * delta && curStartTime < (segIndex + 1) *
+    // delta) {
+    //                    Statistics pageStatistic = curSeriesReader.currentPageStatistics();
+    //                    buckets.add(pageStatistic.getCount());
+    //                    mergedBuckets[bucketIndex] += pageStatistic.getCount();
+    //                    curSeriesReader.skipCurrentPage();
+    //                  } else {
+    //                    buckets.add(0);
+    //                  }
+    //                  finish = true;
+    //                } else {
+    //                  readFlag = 1;
+    //                }
+    //                break;
+    //              case -1:
+    //                buckets.add(0);
+    //                finish = true;
+    //                break;
+    //            }
+    //          }
+    //          readFlags[bucketIndex] = readFlag;
+    //        }
+    //        if (segIndex >= w) {
+    //          for (int bucketIndex = 0; bucketIndex < bucketsCnt; bucketIndex++) {
+    //            mergedBuckets[bucketIndex] =
+    //                mergedBuckets[bucketIndex] - curWindowBuckets.peek().get(bucketIndex);
+    //          }
+    //          curWindowBuckets.poll();
+    //        }
+    //        curWindowBuckets.add(buckets);
+    //
+    //        if (segIndex < w - 1 || (segIndex - w + 1) % s != 0) {
+    //          segIndex++;
+    //          continue;
+    //        }
+    //
+    //        segIndex++;
+    //
+    //        // update bounds and outlier detection
+    //        for (int bucketIndex = 0; bucketIndex < bucketsCnt; bucketIndex++) {
+    //          upper = lower = tightUpper = 0;
+    //          for (int tmpIndex = Math.max(0, bucketIndex - ell + 1);
+    //              tmpIndex <= Math.min(bucketsCnt - 1, bucketIndex + ell - 1);
+    //              tmpIndex++) {
+    //            lower += mergedBuckets[tmpIndex];
+    //          }
+    //          for (int tmpIndex = Math.max(0, bucketIndex - lambda);
+    //              tmpIndex <= Math.min(bucketsCnt - 1, bucketIndex + lambda);
+    //              tmpIndex++) {
+    //            upper += mergedBuckets[tmpIndex];
+    //          }
+    //          if (bucketIndex - lambda < 0 && bucketIndex + lambda < bucketsCnt)
+    //            tightUpper = upper - mergedBuckets[bucketIndex + lambda];
+    //          else if (bucketIndex + lambda > bucketsCnt - 1 && bucketIndex - lambda >= 0)
+    //            tightUpper = upper - mergedBuckets[bucketIndex - lambda];
+    //          else if (bucketIndex + lambda < bucketsCnt && bucketIndex - lambda >= 0) {
+    //            tightUpper =
+    //                Math.max(
+    //                    upper - mergedBuckets[bucketIndex + lambda],
+    //                    upper - mergedBuckets[bucketIndex - lambda]);
+    //          } else tightUpper = upper;
+    //          //  TODO: outlier detection
+    //          if (mergedBuckets[bucketIndex] == 0) continue;
+    //          if (lower >= k) {
+    //            //          System.out.println("All points in this buckets are inliers");
+    //            continue;
+    //          } else if ((lambda - r / gamma < 0.5 && upper < k)
+    //              || (lambda - r / gamma >= 0.5 && tightUpper < k)) {
+    //            // TODO: load outliers from PageData
+    //            System.out.println("load outliers from PageData");
+    //            System.out.println(upper);
+    //            String newPath = seriesPath.getFullPath() + "b.b" + bucketIndex;
+    //            try {
+    //              MeasurementPath measurementPath = new MeasurementPath(newPath);
+    //              QueryResourceManager queryResourceManager = QueryResourceManager.getInstance();
+    //              QueryContext newContext = new
+    // QueryContext(queryResourceManager.assignQueryId(true));
+    //              QueryDataSource newQueryDataSource =
+    //                  queryResourceManager.getQueryDataSource(measurementPath, newContext,
+    // timeFilter);
+    //              seriesReader =
+    //                  new SeriesAggregateReader(
+    //                      measurementPath,
+    //                      Collections.singleton("b" + bucketIndex),
+    //                      tsDataType,
+    //                      newContext,
+    //                      newQueryDataSource,
+    //                      timeFilter,
+    //                      null,
+    //                      null,
+    //                      true);
+    //
+    //            } catch (IllegalPathException e) {
+    //              System.out.println(newPath + " failed.");
+    //            }
+    //
+    //            boolean finished = false;
+    //            while (seriesReader.hasNextFile()) {
+    //              while (seriesReader.hasNextChunk()) {
+    //                while (seriesReader.hasNextPage()) {
+    //                  Statistics curStatistics = seriesReader.currentPageStatistics();
+    //                  if (curStatistics.getStartTime() < (long) (segIndex - w) * delta) {
+    //                    seriesReader.nextPage();
+    //                    continue;
+    //                  }
+    //                  IBatchDataIterator batchIterator =
+    // seriesReader.nextPage().getBatchDataIterator();
+    //                  while (batchIterator.hasNext()) {
+    //                    long t = batchIterator.currentTime();
+    //                    if (t >= ((long) segIndex * delta)) {
+    //                      finished = true;
+    //                      break;
+    //                    } else {
+    //                      outlierList.add(
+    //                          new Pair<>(
+    //                              batchIterator.currentTime(), (double)
+    // batchIterator.currentValue()));
+    //                      //                  System.out.println(batchIterator.currentTime());
+    //                      batchIterator.next();
+    //                    }
+    //                  }
+    //                  if (finished) break;
+    //                }
+    //                if (finished) break;
+    //              }
+    //              if (finished) break;
+    //            }
+    //          } else { // TODO: load buckets [bucketIndex] [bucketIndex-lambda]
+    // [bucketIndex+lambda]
+    //            // from
+    //            // [segIndex] to [segIndex+w]
+    //            // load bucket B[bucketIndex]
+    //            System.out.println("load and check");
+    //            List<Pair<Long, Double>> suspiciousPoints = new ArrayList<>();
+    //            List<Pair<Long, Double>> checkPoints = new ArrayList<>();
+    //            // load bucket B[bucketIndex] into suspiciousPoints
+    //            String newPath = seriesPath.getFullPath() + "b.b" + bucketIndex;
+    //            try {
+    //              MeasurementPath measurementPath = new MeasurementPath(newPath);
+    //              QueryResourceManager queryResourceManager = QueryResourceManager.getInstance();
+    //              QueryContext newContext = new
+    // QueryContext(queryResourceManager.assignQueryId(true));
+    //              QueryDataSource newQueryDataSource =
+    //                  queryResourceManager.getQueryDataSource(measurementPath, newContext,
+    // timeFilter);
+    //              seriesReader =
+    //                  new SeriesAggregateReader(
+    //                      measurementPath,
+    //                      Collections.singleton("b" + bucketIndex),
+    //                      tsDataType,
+    //                      newContext,
+    //                      newQueryDataSource,
+    //                      timeFilter,
+    //                      null,
+    //                      null,
+    //                      true);
+    //            } catch (IllegalPathException e) {
+    //              System.out.println(newPath + " failed.");
+    //            }
+    //
+    //            boolean finished = false;
+    //            while (seriesReader.hasNextFile()) {
+    //              while (seriesReader.hasNextChunk()) {
+    //                while (seriesReader.hasNextPage()) {
+    //                  Statistics curStatistics = seriesReader.currentPageStatistics();
+    //                  if (curStatistics.getStartTime() < (segIndex - w) * delta) {
+    //                    seriesReader.nextPage();
+    //                    continue;
+    //                  } else {
+    //                    IBatchDataIterator batchIterator =
+    //                        seriesReader.nextPage().getBatchDataIterator();
+    //                    while (batchIterator.hasNext()) {
+    //                      long t = batchIterator.currentTime();
+    //                      if (t >= (long) (segIndex * delta)) {
+    //                        finished = true;
+    //                        break;
+    //                      } else {
+    //                        suspiciousPoints.add(
+    //                            new Pair<>(
+    //                                batchIterator.currentTime(),
+    //                                (double) batchIterator.currentValue()));
+    //                        batchIterator.next();
+    //                      }
+    //                    }
+    //                    if (finished) break;
+    //                  }
+    //                }
+    //                if (finished) break;
+    //              }
+    //              if (finished) break;
+    //            }
+    //            // load buckets B[bucketIndex-lambda], B[bucketIndex+lambda] into checkPoints
+    //            for (int index : new int[] {bucketIndex - lambda, bucketIndex + lambda}) {
+    //              newPath = seriesPath.getFullPath() + "b.b" + index;
+    //              try {
+    //
+    //                MeasurementPath measurementPath = new MeasurementPath(newPath);
+    //                QueryResourceManager queryResourceManager =
+    // QueryResourceManager.getInstance();
+    //                QueryContext newContext =
+    //                    new QueryContext(queryResourceManager.assignQueryId(true));
+    //                QueryDataSource newQueryDataSource =
+    //                    queryResourceManager.getQueryDataSource(
+    //                        measurementPath, newContext, timeFilter);
+    //                seriesReader =
+    //                    new SeriesAggregateReader(
+    //                        measurementPath,
+    //                        Collections.singleton("b" + index),
+    //                        tsDataType,
+    //                        newContext,
+    //                        newQueryDataSource,
+    //                        timeFilter,
+    //                        null,
+    //                        null,
+    //                        true);
+    //              } catch (IllegalPathException e) {
+    //                System.out.println(newPath + " failed.");
+    //              }
+    //
+    //              while (seriesReader.hasNextFile()) {
+    //                while (seriesReader.hasNextChunk()) {
+    //                  while (seriesReader.hasNextPage()) {
+    //                    Statistics curStatistics = seriesReader.currentPageStatistics();
+    //                    if (curStatistics.getStartTime() < (segIndex - w) * delta) {
+    //                      seriesReader.nextPage();
+    //                      continue;
+    //                    }
+    //                    IBatchDataIterator batchIterator =
+    //                        seriesReader.nextPage().getBatchDataIterator();
+    //                    while (batchIterator.hasNext()) {
+    //                      checkPoints.add(
+    //                          new Pair<>(
+    //                              batchIterator.currentTime(), (double)
+    // batchIterator.currentValue()));
+    //                      if (batchIterator.hasNext()) {
+    //                        batchIterator.next();
+    //                        if (batchIterator.currentTime() >= segIndex * delta) {
+    //                          finished = true;
+    //                          break;
+    //                        }
+    //                      } else {
+    //                        break;
+    //                      }
+    //                    }
+    //                    if (finished) break;
+    //                  }
+    //                  if (finished) break;
+    //                }
+    //                if (finished) break;
+    //              }
+    //            }
+    //            // detection
+    //            int trueNeighbor = lower;
+    //            for (Pair<Long, Double> p1 : suspiciousPoints) {
+    //              for (Pair<Long, Double> p2 : checkPoints) {
+    //                if (Math.abs(p1.getSecond() - p2.getSecond()) <= r) trueNeighbor += 1;
+    //              }
+    //              if (trueNeighbor < k) {
+    //                outlierList.add(p1);
+    //              }
+    //            }
+    //          }
+    //        }
+    //      }
+    //      System.out.println("Outliers Num:" + outlierList.size());
+    //      //    for (Pair<Long, Double> p : outlierList) {
+    //      ////      System.out.println(p.getSecond());
+    //      //    }
+    //    }
+
+    Map<Long, Double> outliers = new HashMap<>();
+
+    int lambda = (int) Math.ceil(r / gamma), ell = (int) Math.floor(r / gamma);
+    int upper, lower, tightUpper;
+
+    int segIndex = 0;
+    int[] readFlags = new int[bucketsCnt * fileNum];
+    int[][] merged = new int[fileNum][bucketsCnt];
+    Queue<int[][]> expired = new LinkedList<>();
+
+    while (segIndex < segsCnt) {
+      // update buckets, mergedBuckets, curWindowBuckets
+      int[][] current = new int[fileNum][bucketsCnt];
+      for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) {
+        for (int bucketIndex = 0; bucketIndex < bucketsCnt; bucketIndex++) {
+          SeriesAggregateReader curSeriesReader =
+              seriesReaderList.get(bucketIndex + fileIndex * bucketsCnt);
+          int readFlag = readFlags[bucketIndex + fileIndex * bucketsCnt];
+          boolean finish = false;
+          while (!finish) {
+            switch (readFlag) {
+              case 0:
+                if (curSeriesReader.hasNextFile()) {
+                  readFlag = 1;
+                } else {
+                  readFlag = -1;
+                }
+                break;
+              case 1:
+                if (curSeriesReader.hasNextChunk()) {
+                  readFlag = 2;
+                } else {
+                  readFlag = 0;
+                }
+                break;
+              case 2:
+                if (curSeriesReader.hasNextPage()) {
+                  long curStartTime = curSeriesReader.currentPageStatistics().getStartTime();
+                  if (curStartTime >= segIndex * delta && curStartTime < (segIndex + 1) * delta) {
+                    Statistics pageStatistic = curSeriesReader.currentPageStatistics();
+                    // update incoming
+                    current[fileIndex][bucketIndex] = pageStatistic.getCount();
+                    if (segIndex < w) {
+                      merged[fileIndex][bucketIndex] += pageStatistic.getCount();
+                    } else {
+                      merged[fileIndex][bucketIndex] =
+                          merged[fileIndex][bucketIndex]
+                              - expired.peek()[fileIndex][bucketIndex]
+                              + pageStatistic.getCount();
+                    }
+                    curSeriesReader.skipCurrentPage();
+                  }
+                  finish = true;
+                } else {
+                  readFlag = 1;
+                }
+                break;
+              case -1:
+                finish = true;
+                break;
+            }
+          }
+          readFlags[bucketIndex + fileIndex * bucketsCnt] = readFlag;
+        }
+      }
+      if (segIndex >= w) {
+        expired.poll();
+      }
+      expired.add(current);
+      if (segIndex == 0 || (segIndex - w + 1) % s != 0) {
+        segIndex++;
+        continue;
+      }
+      segIndex++;
+
+      // update bounds and outlier detection
+      int[] bucketsCheck = new int[bucketsCnt];
+      int[] bucketsHat = new int[bucketsCnt];
+      if (fileNum > 1) {
+        for (int bucketIndex = 0; bucketIndex < bucketsCnt; bucketIndex++) {
+          for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) {
+            bucketsHat[bucketIndex] += merged[fileIndex][bucketIndex];
+            if (fileIndex == 0 || merged[fileIndex][bucketIndex] > 0) {
+              bucketsCheck[bucketIndex] = merged[fileIndex][bucketIndex];
+            }
+          }
+        }
+      }
+
+      for (int bucketIndex = 0; bucketIndex < bucketsCnt; bucketIndex++) {
+        upper = lower = tightUpper = 0;
+        if (fileNum == 1) {
+          for (int tmpIndex = Math.max(0, bucketIndex - ell + 1);
+              tmpIndex <= Math.min(bucketsCnt - 1, bucketIndex + ell - 1);
+              tmpIndex++) lower += merged[0][tmpIndex];
+          for (int tmpIndex = Math.max(0, bucketIndex - lambda);
+              tmpIndex <= Math.min(bucketsCnt - 1, bucketIndex + lambda);
+              tmpIndex++) upper += merged[0][tmpIndex];
+
+          if (bucketIndex - lambda < 0 && bucketIndex + lambda < bucketsCnt)
+            tightUpper = upper - merged[0][bucketIndex + lambda];
+          else if (bucketIndex + lambda > bucketsCnt - 1 && bucketIndex - lambda >= 0)
+            tightUpper = upper - merged[0][bucketIndex - lambda];
+          else if (bucketIndex + lambda < bucketsCnt && bucketIndex - lambda >= 0) {
+            tightUpper =
+                Math.max(
+                    upper - merged[0][bucketIndex + lambda],
+                    upper - merged[0][bucketIndex - lambda]);
+          } else tightUpper = upper;
+        }
+
+        if (fileNum > 1) {
+          for (int tmpIndex = Math.max(0, bucketIndex - ell + 1);
+              tmpIndex <= Math.min(bucketsCnt - 1, bucketIndex + ell - 1);
+              tmpIndex++) {
+            lower += bucketsCheck[tmpIndex];
+          }
+          for (int tmpIndex = Math.max(0, bucketIndex - lambda);
+              tmpIndex <= Math.min(bucketsCnt - 1, bucketIndex + lambda);
+              tmpIndex++) {
+            upper += bucketsHat[tmpIndex];
+          }
+          if (bucketIndex - lambda < 0 && bucketIndex + lambda < bucketsCnt)
+            tightUpper = upper - bucketsHat[bucketIndex + lambda];
+          else if (bucketIndex + lambda > bucketsCnt - 1 && bucketIndex - lambda >= 0)
+            tightUpper = upper - bucketsHat[bucketIndex - lambda];
+          else if (bucketIndex + lambda < bucketsCnt && bucketIndex - lambda >= 0) {
+            tightUpper =
+                Math.max(
+                    upper - bucketsHat[bucketIndex + lambda],
+                    upper - bucketsHat[bucketIndex - lambda]);
+          } else tightUpper = upper;
+        }
+
+        //  TODO: outlier detection
+        if ((fileNum == 1 && merged[0][bucketIndex] == 0)
+            || (fileNum > 1 && bucketsHat[bucketIndex] == 0)) continue;
+        if (lower >= k) {
+          continue;
+        } else if (ifWithUpperBound
+            && ((lambda - r / gamma < 0.5 && upper < k)
+                || (lambda - r / gamma >= 0.5 && tightUpper < k))) {
+          // TODO: load outliers from PageData
+          System.out.println("load outliers from PageData");
+          for (int f = fileNum - 1; f >= 0; f--) {
+            String newPath = seriesPath.getFullPath() + "b.b" + bucketIndex;
+            if (f > 0) {
+              String oldDatasetName = newPath.split("\\.")[1];
+              String newDatasetName = oldDatasetName + Integer.toString(f);
+              newPath = newPath.replace(oldDatasetName, newDatasetName);
+            }
+            try {
+              MeasurementPath measurementPath = new MeasurementPath(newPath);
+              QueryResourceManager queryResourceManager = QueryResourceManager.getInstance();
+              QueryContext newContext = new QueryContext(queryResourceManager.assignQueryId(true));
+              QueryDataSource newQueryDataSource =
+                  queryResourceManager.getQueryDataSource(measurementPath, newContext, timeFilter);
+              seriesReader =
+                  new SeriesAggregateReader(
+                      measurementPath,
+                      measurements,
+                      tsDataType,
+                      newContext,
+                      newQueryDataSource,
+                      timeFilter,
+                      null,
+                      null,
+                      true);
+            } catch (IllegalPathException e) {
+              System.out.println(newPath + " failed.");
+            }
+
+            boolean finished = false;
+            while (seriesReader.hasNextFile()) {
+              while (seriesReader.hasNextChunk()) {
+                while (seriesReader.hasNextPage()) {
+                  Statistics curStatistics = seriesReader.currentPageStatistics();
+                  if (curStatistics.getStartTime() < (segIndex - w) * delta) {
+                    seriesReader.nextPage();
+                    continue;
+                  }
+                  IBatchDataIterator batchIterator = seriesReader.nextPage().getBatchDataIterator();
+                  while (batchIterator.hasNext()) {
+                    if (!outliers.containsKey(batchIterator.currentTime())) {
+                      outliers.put(
+                          batchIterator.currentTime(), (double) batchIterator.currentValue());
+                    }
+                    batchIterator.next();
+                    if (batchIterator.currentTime() >= (long) segIndex * delta) {
+                      finished = true;
+                      break;
+                    }
+                  }
+                  if (finished) break;
+                }
+                if (finished) break;
+              }
+              if (finished) break;
+            }
+          }
+        } else {
+          // TODO: load buckets [bucketIndex] [bucketIndex-lambda] [bucketIndex+lambda]
+          System.out.println("load and check");
+
+          Map<Long, Double> suspiciousPoints = new HashMap<>();
+          Map<Long, Double> checkPoints = new HashMap<>();
+          // load bucket B[bucketIndex] into suspiciousPoints
+          for (int f = fileNum - 1; f >= 0; f--) {
+            String newPath = seriesPath.getFullPath() + "b.b" + bucketIndex;
+            if (f > 0) {
+              String oldDatasetName = newPath.split("\\.")[1];
+              String newDatasetName = oldDatasetName + Integer.toString(f);
+              newPath = newPath.replace(oldDatasetName, newDatasetName);
+            }
+            try {
+              MeasurementPath measurementPath = new MeasurementPath(newPath);
+              QueryResourceManager queryResourceManager = QueryResourceManager.getInstance();
+              QueryContext newContext = new QueryContext(queryResourceManager.assignQueryId(true));
+              QueryDataSource newQueryDataSource =
+                  queryResourceManager.getQueryDataSource(measurementPath, newContext, timeFilter);
+              seriesReader =
+                  new SeriesAggregateReader(
+                      measurementPath,
+                      measurements,
+                      tsDataType,
+                      newContext,
+                      newQueryDataSource,
+                      timeFilter,
+                      null,
+                      null,
+                      true);
+            } catch (IllegalPathException e) {
+              System.out.println(newPath + " failed.");
+            }
+
+            boolean finished = false;
+            while (seriesReader.hasNextFile()) {
+              while (seriesReader.hasNextChunk()) {
+                while (seriesReader.hasNextPage()) {
+                  Statistics curStatistics = seriesReader.currentPageStatistics();
+                  if (curStatistics.getStartTime() < (long) (segIndex - w) * delta) {
+                    seriesReader.nextPage();
+                    continue;
+                  } else {
+                    IBatchDataIterator batchIterator =
+                        seriesReader.nextPage().getBatchDataIterator();
+                    while (batchIterator.hasNext()) {
+                      if (!suspiciousPoints.containsKey(batchIterator.currentTime())) {
+                        suspiciousPoints.put(
+                            batchIterator.currentTime(), (double) batchIterator.currentValue());
+                      }
+                      batchIterator.next();
+                      if (batchIterator.currentTime() >= (long) segIndex * delta) {
+                        finished = true;
+                        break;
+                      }
+                    }
+                    if (finished) break;
+                  }
+                }
+                if (finished) break;
+              }
+              if (finished) break;
+            }
+          }
+          // load buckets B[bucketIndex-lambda], B[bucketIndex+lambda] into checkPoints
+          for (int f = fileNum - 1; f >= 0; f--) {
+            for (int index :
+                new int[] {
+                  bucketIndex - lambda, bucketIndex + lambda, bucketIndex - ell, bucketIndex + ell
+                }) {
+              if (index < 0 || index > bucketsCnt - 1) continue;
+              String newPath = seriesPath.getFullPath() + "b.b" + index;
+              if (f > 0) {
+                String oldDatasetName = newPath.split("\\.")[1];
+                String newDatasetName = oldDatasetName + Integer.toString(f);
+                newPath = newPath.replace(oldDatasetName, newDatasetName);
+              }
+              try {
+                MeasurementPath measurementPath = new MeasurementPath(newPath);
+                QueryResourceManager queryResourceManager = QueryResourceManager.getInstance();
+                QueryContext newContext =
+                    new QueryContext(queryResourceManager.assignQueryId(true));
+                QueryDataSource newQueryDataSource =
+                    queryResourceManager.getQueryDataSource(
+                        measurementPath, newContext, timeFilter);
+                seriesReader =
+                    new SeriesAggregateReader(
+                        measurementPath,
+                        measurements,
+                        tsDataType,
+                        newContext,
+                        newQueryDataSource,
+                        timeFilter,
+                        null,
+                        null,
+                        true);
+              } catch (IllegalPathException e) {
+                System.out.println(newPath + " failed.");
+              }
+              boolean finished = false;
+              while (seriesReader.hasNextFile()) {
+                while (seriesReader.hasNextChunk()) {
+                  while (seriesReader.hasNextPage()) {
+                    Statistics curStatistics = seriesReader.currentPageStatistics();
+                    if (curStatistics.getStartTime() < (segIndex - w) * delta) {
+                      seriesReader.nextPage();
+                      continue;
+                    }
+                    IBatchDataIterator batchIterator =
+                        seriesReader.nextPage().getBatchDataIterator();
+                    while (batchIterator.hasNext()) {
+                      if (!checkPoints.containsKey(batchIterator.currentTime())) {
+                        checkPoints.put(
+                            batchIterator.currentTime(), (double) batchIterator.currentValue());
+                      }
+                      batchIterator.next();
+                      if (batchIterator.currentTime() >= (long) segIndex * delta) {
+                        finished = true;
+                        break;
+                      }
+                    }
+                    if (finished) break;
+                  }
+                  if (finished) break;
+                }
+                if (finished) break;
+              }
+            }
+          }
+          // detection
+          int trueNeighbor = lower;
+          for (Long t1 : suspiciousPoints.keySet()) {
+            for (Long t2 : checkPoints.keySet()) {
+              if (Math.abs(suspiciousPoints.get(t1) - checkPoints.get(t2)) <= r) trueNeighbor += 1;
+            }
+            if (trueNeighbor < k) outliers.put(t1, suspiciousPoints.get(t1));
+          }
+        }
+      }
+    }
+    for (Long o : outliers.keySet()){
+      System.out.println(o);
     }
+    System.out.println("Outliers Num:" + outliers.keySet().size());
   }
 
   protected void aggregateOneAlignedSeries(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
index 987cdcd451..9894676046 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
@@ -67,6 +67,8 @@ public class AggregateResultFactory {
         return !ascending
             ? new LastValueDescAggrResult(dataType)
             : new LastValueAggrResult(dataType);
+      case SQLConstant.DODDS:
+        return new DoddsAggrResult(dataType);
       default:
         throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName);
     }
@@ -98,6 +100,8 @@ public class AggregateResultFactory {
         return new SumAggrResult(dataType);
       case SQLConstant.LAST_VALUE:
         return new LastValueDescAggrResult(dataType);
+      case SQLConstant.DODDS:
+        return new DoddsAggrResult(dataType);
       default:
         throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName);
     }
@@ -130,6 +134,8 @@ public class AggregateResultFactory {
         return new MinValueAggrResult(dataType);
       case EXTREME:
         return new ExtremeAggrResult(dataType);
+      case DODDS:
+        return new DoddsAggrResult(dataType);
       default:
         throw new IllegalArgumentException("Invalid Aggregation Type: " + aggregationType.name());
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/filter/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/IPlanExecutor.java
new file mode 100644
index 0000000000..4ef688a7e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/IPlanExecutor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.filter.executor;
+
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+public interface IPlanExecutor {
+
+  /**
+   * process query plan of qp layer, construct queryDataSet.
+   *
+   * @param queryPlan QueryPlan
+   * @return QueryDataSet
+   */
+  QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext context)
+      throws IOException, StorageEngineException, QueryFilterOptimizationException,
+          QueryProcessException, MetadataException, SQLException, TException, InterruptedException;
+
+  /**
+   * Process Non-Query Physical plan, including insert/update/delete operation of
+   * data/metadata/Privilege
+   *
+   * @param plan Physical Non-Query Plan
+   */
+  boolean processNonQuery(PhysicalPlan plan)
+      throws QueryProcessException, StorageGroupNotSetException, StorageEngineException;
+
+  /**
+   * execute update command and return whether the operator is successful.
+   *
+   * @param path : update series seriesPath
+   * @param startTime start time in update command
+   * @param endTime end time in update command
+   * @param value - in type of string
+   */
+  void update(PartialPath path, long startTime, long endTime, String value)
+      throws QueryProcessException;
+
+  /**
+   * execute delete command and return whether the operator is successful.
+   *
+   * @param deletePlan physical delete plan
+   */
+  void delete(DeletePlan deletePlan) throws QueryProcessException;
+
+  /**
+   * execute delete command and return whether the operator is successful.
+   *
+   * @param path delete series seriesPath
+   * @param startTime start time in delete command
+   * @param endTime end time in delete command
+   * @param planIndex index of the deletion plan
+   * @param partitionFilter specify involving time partitions, if null, all partitions are involved
+   */
+  void delete(
+      PartialPath path,
+      long startTime,
+      long endTime,
+      long planIndex,
+      TimePartitionFilter partitionFilter)
+      throws QueryProcessException;
+
+  /**
+   * execute insert command and return whether the operator is successful.
+   *
+   * @param insertRowPlan physical insert plan
+   */
+  void insert(InsertRowPlan insertRowPlan) throws QueryProcessException;
+
+  /**
+   * execute insert command and return whether the operator is successful.
+   *
+   * @param insertRowsPlan physical insert rows plan, which contains multi insertRowPlans
+   */
+  void insert(InsertRowsPlan insertRowsPlan) throws QueryProcessException;
+
+  /**
+   * execute insert command and return whether the operator is successful.
+   *
+   * @param insertRowsOfOneDevicePlan physical insert plan
+   */
+  void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan) throws QueryProcessException;
+
+  /**
+   * execute batch insert plan
+   *
+   * @throws BatchProcessException when some of the rows failed
+   */
+  void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException;
+
+  /**
+   * execute multi batch insert plan
+   *
+   * @throws QueryProcessException when some of the rows failed
+   */
+  void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan) throws QueryProcessException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/filter/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/PlanExecutor.java
new file mode 100644
index 0000000000..84f8b3c5b4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/PlanExecutor.java
@@ -0,0 +1,2176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.filter.executor;
+
+import org.apache.iotdb.db.auth.AuthException;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
+import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
+import org.apache.iotdb.db.auth.entity.PathPrivilege;
+import org.apache.iotdb.db.auth.entity.Role;
+import org.apache.iotdb.db.auth.entity.User;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.cq.ContinuousQueryService;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager;
+import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager.TaskStatus;
+import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
+import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.ContinuousQueryException;
+import org.apache.iotdb.db.exception.QueryIdNotExsitException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.TriggerExecutionException;
+import org.apache.iotdb.db.exception.TriggerManagementException;
+import org.apache.iotdb.db.exception.UDFRegistrationException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
+import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePartitionPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDAFPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
+import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.CountPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
+import org.apache.iotdb.db.qp.physical.sys.MergePlan;
+import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
+import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SettlePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowChildNodesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowLockInfoPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
+import org.apache.iotdb.db.query.dataset.ListDataSet;
+import org.apache.iotdb.db.query.dataset.ShowContinuousQueriesResult;
+import org.apache.iotdb.db.query.dataset.ShowDevicesDataSet;
+import org.apache.iotdb.db.query.dataset.ShowTimeseriesDataSet;
+import org.apache.iotdb.db.query.dataset.SingleDataSet;
+import org.apache.iotdb.db.query.executor.IQueryRouter;
+import org.apache.iotdb.db.query.executor.QueryRouter;
+import org.apache.iotdb.db.query.udf.service.UDFRegistrationInformation;
+import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.SettleService;
+import org.apache.iotdb.db.tools.TsFileRewriteTool;
+import org.apache.iotdb.db.utils.AuthUtils;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.EmptyDataSet;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_QUERY_SQL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_TARGET_PATH;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_CLASS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_LOCK_INFO;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDTF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.conf.IoTDBConstant.QUERY_ID;
+import static org.apache.iotdb.db.conf.IoTDBConstant.STATEMENT;
+import static org.apache.iotdb.rpc.TSStatusCode.INTERNAL_SERVER_ERROR;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+public class PlanExecutor implements IPlanExecutor {
+
+  private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class);
+  private static final Logger AUDIT_LOGGER =
+      LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
+  // for data query
+  protected IQueryRouter queryRouter;
+  // for administration
+  private final IAuthorizer authorizer;
+
+  private ThreadPoolExecutor insertionPool;
+
+  private static final String INSERT_MEASUREMENTS_FAILED_MESSAGE = "failed to insert measurements ";
+
+  public PlanExecutor() throws QueryProcessException {
+    queryRouter = new QueryRouter();
+    try {
+      authorizer = BasicAuthorizer.getInstance();
+    } catch (AuthException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+  }
+
+  @Override
+  public QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext context)
+      throws IOException, StorageEngineException, QueryFilterOptimizationException,
+          QueryProcessException, MetadataException, InterruptedException {
+    if (queryPlan instanceof QueryPlan) {
+      return processDataQuery((QueryPlan) queryPlan, context);
+    } else if (queryPlan instanceof AuthorPlan) {
+      return processAuthorQuery((AuthorPlan) queryPlan);
+    } else if (queryPlan instanceof ShowPlan) {
+      return processShowQuery((ShowPlan) queryPlan, context);
+    } else {
+      throw new QueryProcessException(String.format("Unrecognized query plan %s", queryPlan));
+    }
+  }
+
+  @Override
+  public boolean processNonQuery(PhysicalPlan plan)
+      throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
+    switch (plan.getOperatorType()) {
+      case DELETE:
+        delete((DeletePlan) plan);
+        return true;
+      case INSERT:
+        insert((InsertRowPlan) plan);
+        return true;
+      case BATCH_INSERT_ONE_DEVICE:
+        insert((InsertRowsOfOneDevicePlan) plan);
+        return true;
+      case BATCH_INSERT_ROWS:
+        insert((InsertRowsPlan) plan);
+        return true;
+      case BATCH_INSERT:
+        insertTablet((InsertTabletPlan) plan);
+        return true;
+      case MULTI_BATCH_INSERT:
+        insertTablet((InsertMultiTabletPlan) plan);
+        return true;
+      case CREATE_ROLE:
+      case DELETE_ROLE:
+      case CREATE_USER:
+      case REVOKE_USER_ROLE:
+      case REVOKE_ROLE_PRIVILEGE:
+      case REVOKE_USER_PRIVILEGE:
+      case GRANT_ROLE_PRIVILEGE:
+      case GRANT_USER_PRIVILEGE:
+      case GRANT_USER_ROLE:
+      case MODIFY_PASSWORD:
+      case DELETE_USER:
+        AuthorPlan author = (AuthorPlan) plan;
+        return operateAuthor(author);
+      case GRANT_WATERMARK_EMBEDDING:
+        return operateWatermarkEmbedding(((DataAuthPlan) plan).getUsers(), true);
+      case REVOKE_WATERMARK_EMBEDDING:
+        return operateWatermarkEmbedding(((DataAuthPlan) plan).getUsers(), false);
+      case DELETE_TIMESERIES:
+        return deleteTimeSeries((DeleteTimeSeriesPlan) plan);
+      case CREATE_TIMESERIES:
+        return createTimeSeries((CreateTimeSeriesPlan) plan);
+      case CREATE_ALIGNED_TIMESERIES:
+        return createAlignedTimeSeries((CreateAlignedTimeSeriesPlan) plan);
+      case CREATE_MULTI_TIMESERIES:
+        return createMultiTimeSeries((CreateMultiTimeSeriesPlan) plan);
+      case ALTER_TIMESERIES:
+        return alterTimeSeries((AlterTimeSeriesPlan) plan);
+      case SET_STORAGE_GROUP:
+        return setStorageGroup((SetStorageGroupPlan) plan);
+      case DELETE_STORAGE_GROUP:
+        return deleteStorageGroups((DeleteStorageGroupPlan) plan);
+      case TTL:
+        operateTTL((SetTTLPlan) plan);
+        return true;
+      case LOAD_CONFIGURATION:
+        loadConfiguration((LoadConfigurationPlan) plan);
+        return true;
+      case LOAD_FILES:
+        operateLoadFiles((OperateFilePlan) plan);
+        return true;
+      case REMOVE_FILE:
+        operateRemoveFile((OperateFilePlan) plan);
+        return true;
+      case UNLOAD_FILE:
+        operateUnloadFile((OperateFilePlan) plan);
+        return true;
+      case FLUSH:
+        operateFlush((FlushPlan) plan);
+        return true;
+      case MERGE:
+      case FULL_MERGE:
+        operateMerge((MergePlan) plan);
+        return true;
+      case SET_SYSTEM_MODE:
+        operateSetSystemMode((SetSystemModePlan) plan);
+        return true;
+      case CLEAR_CACHE:
+        operateClearCache();
+        return true;
+      case DELETE_PARTITION:
+        DeletePartitionPlan p = (DeletePartitionPlan) plan;
+        TimePartitionFilter filter =
+            (storageGroupName, partitionId) ->
+                storageGroupName.equals(
+                        ((DeletePartitionPlan) plan).getStorageGroupName().getFullPath())
+                    && p.getPartitionId().contains(partitionId);
+        StorageEngine.getInstance()
+            .removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter);
+        return true;
+      case CREATE_SCHEMA_SNAPSHOT:
+        operateCreateSnapshot();
+        return true;
+      case CREATE_FUNCTION:
+        return operateCreateFunction((CreateFunctionPlan) plan);
+      case DROP_FUNCTION:
+        return operateDropFunction((DropFunctionPlan) plan);
+      case CREATE_TRIGGER:
+        return operateCreateTrigger((CreateTriggerPlan) plan);
+      case DROP_TRIGGER:
+        return operateDropTrigger((DropTriggerPlan) plan);
+      case START_TRIGGER:
+        return operateStartTrigger((StartTriggerPlan) plan);
+      case STOP_TRIGGER:
+        return operateStopTrigger((StopTriggerPlan) plan);
+      case KILL:
+        try {
+          operateKillQuery((KillQueryPlan) plan);
+        } catch (QueryIdNotExsitException e) {
+          throw new QueryProcessException(e.getMessage());
+        }
+        return true;
+      case CREATE_TEMPLATE:
+        return createTemplate((CreateTemplatePlan) plan);
+      case APPEND_TEMPLATE:
+        return appendTemplate((AppendTemplatePlan) plan);
+      case PRUNE_TEMPLATE:
+        return pruneTemplate((PruneTemplatePlan) plan);
+      case SET_TEMPLATE:
+        return setTemplate((SetTemplatePlan) plan);
+      case ACTIVATE_TEMPLATE:
+        return activateTemplate((ActivateTemplatePlan) plan);
+      case UNSET_TEMPLATE:
+        return unsetTemplate((UnsetTemplatePlan) plan);
+      case CREATE_CONTINUOUS_QUERY:
+        return operateCreateContinuousQuery((CreateContinuousQueryPlan) plan);
+      case DROP_CONTINUOUS_QUERY:
+        return operateDropContinuousQuery((DropContinuousQueryPlan) plan);
+      case SETTLE:
+        settle((SettlePlan) plan);
+        return true;
+      default:
+        throw new UnsupportedOperationException(
+            String.format("operation %s is not supported", plan.getOperatorType()));
+    }
+  }
+
+  private boolean createTemplate(CreateTemplatePlan createTemplatePlan)
+      throws QueryProcessException {
+    try {
+      IoTDB.metaManager.createSchemaTemplate(createTemplatePlan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  private boolean appendTemplate(AppendTemplatePlan plan) throws QueryProcessException {
+    try {
+      IoTDB.metaManager.appendSchemaTemplate(plan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  private boolean pruneTemplate(PruneTemplatePlan plan) throws QueryProcessException {
+    try {
+      IoTDB.metaManager.pruneSchemaTemplate(plan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  private boolean setTemplate(SetTemplatePlan setTemplatePlan) throws QueryProcessException {
+    try {
+      IoTDB.metaManager.setSchemaTemplate(setTemplatePlan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  private boolean activateTemplate(ActivateTemplatePlan activateTemplatePlan)
+      throws QueryProcessException {
+    try {
+      IoTDB.metaManager.setUsingSchemaTemplate(activateTemplatePlan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  private boolean unsetTemplate(UnsetTemplatePlan unsetTemplatePlan) throws QueryProcessException {
+    try {
+      IoTDB.metaManager.unsetSchemaTemplate(unsetTemplatePlan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  private boolean operateCreateFunction(CreateFunctionPlan plan) throws UDFRegistrationException {
+    UDFRegistrationService.getInstance().register(plan.getUdfName(), plan.getClassName(), true);
+    return true;
+  }
+
+  private boolean operateDropFunction(DropFunctionPlan plan) throws UDFRegistrationException {
+    UDFRegistrationService.getInstance().deregister(plan.getUdfName());
+    return true;
+  }
+
+  private boolean operateCreateTrigger(CreateTriggerPlan plan)
+      throws TriggerManagementException, TriggerExecutionException {
+    TriggerRegistrationService.getInstance().register(plan);
+    return true;
+  }
+
+  private boolean operateDropTrigger(DropTriggerPlan plan) throws TriggerManagementException {
+    TriggerRegistrationService.getInstance().deregister(plan);
+    return true;
+  }
+
+  private boolean operateStartTrigger(StartTriggerPlan plan)
+      throws TriggerManagementException, TriggerExecutionException {
+    TriggerRegistrationService.getInstance().activate(plan);
+    return true;
+  }
+
+  private boolean operateStopTrigger(StopTriggerPlan plan) throws TriggerManagementException {
+    TriggerRegistrationService.getInstance().inactivate(plan);
+    return true;
+  }
+
+  private void operateMerge(MergePlan plan) throws StorageEngineException {
+    if (plan.getOperatorType() == OperatorType.FULL_MERGE) {
+      StorageEngine.getInstance().mergeAll(true);
+    } else {
+      StorageEngine.getInstance().mergeAll(false);
+    }
+  }
+
+  private void operateClearCache() {
+    ChunkCache.getInstance().clear();
+    TimeSeriesMetadataCache.getInstance().clear();
+  }
+
+  private void operateCreateSnapshot() {
+    IoTDB.metaManager.createMTreeSnapshot();
+  }
+
+  private void operateKillQuery(KillQueryPlan killQueryPlan) throws QueryIdNotExsitException {
+    QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+    long killQueryId = killQueryPlan.getQueryId();
+    if (killQueryId != -1) {
+      if (queryTimeManager.getQueryContextMap().get(killQueryId) != null) {
+        queryTimeManager.killQuery(killQueryId);
+      } else {
+        throw new QueryIdNotExsitException(
+            String.format(
+                "Query Id %d is not exist, please check it.", killQueryPlan.getQueryId()));
+      }
+    } else {
+      // if queryId is not specified, kill all running queries
+      if (!queryTimeManager.getQueryContextMap().isEmpty()) {
+        synchronized (queryTimeManager.getQueryContextMap()) {
+          List<Long> queryIdList = new ArrayList<>(queryTimeManager.getQueryContextMap().keySet());
+          for (Long queryId : queryIdList) {
+            queryTimeManager.killQuery(queryId);
+          }
+        }
+      }
+    }
+  }
+
+  private void operateSetSystemMode(SetSystemModePlan plan) {
+    IoTDBDescriptor.getInstance().getConfig().setReadOnly(plan.isReadOnly());
+  }
+
+  private void operateFlush(FlushPlan plan) throws StorageGroupNotSetException {
+    if (plan.getPaths().isEmpty()) {
+      StorageEngine.getInstance().syncCloseAllProcessor();
+    } else {
+      flushSpecifiedStorageGroups(plan);
+    }
+
+    if (!plan.getPaths().isEmpty()) {
+      List<PartialPath> noExistSg = checkStorageGroupExist(plan.getPaths());
+      if (!noExistSg.isEmpty()) {
+        StringBuilder sb = new StringBuilder();
+        noExistSg.forEach(storageGroup -> sb.append(storageGroup.getFullPath()).append(","));
+        throw new StorageGroupNotSetException(sb.subSequence(0, sb.length() - 1).toString(), true);
+      }
+    }
+  }
+
+  private boolean operateCreateContinuousQuery(CreateContinuousQueryPlan plan)
+      throws ContinuousQueryException {
+    return ContinuousQueryService.getInstance().register(plan, true);
+  }
+
+  private boolean operateDropContinuousQuery(DropContinuousQueryPlan plan)
+      throws ContinuousQueryException {
+    return ContinuousQueryService.getInstance().deregister(plan);
+  }
+
+  public static void flushSpecifiedStorageGroups(FlushPlan plan)
+      throws StorageGroupNotSetException {
+    Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupMap =
+        plan.getStorageGroupPartitionIds();
+    for (Entry<PartialPath, List<Pair<Long, Boolean>>> entry : storageGroupMap.entrySet()) {
+      PartialPath storageGroupName = entry.getKey();
+      // normal flush
+      if (entry.getValue() == null) {
+        if (plan.isSeq() == null) {
+          StorageEngine.getInstance()
+              .closeStorageGroupProcessor(storageGroupName, true, plan.isSync());
+          StorageEngine.getInstance()
+              .closeStorageGroupProcessor(storageGroupName, false, plan.isSync());
+        } else {
+          StorageEngine.getInstance()
+              .closeStorageGroupProcessor(storageGroupName, plan.isSeq(), plan.isSync());
+        }
+      }
+      // partition specified flush, for snapshot flush plan
+      else {
+        List<Pair<Long, Boolean>> partitionIdSequencePairs = entry.getValue();
+        for (Pair<Long, Boolean> pair : partitionIdSequencePairs) {
+          StorageEngine.getInstance()
+              .closeStorageGroupProcessor(storageGroupName, pair.left, pair.right, true);
+        }
+      }
+    }
+  }
+
+  protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext context)
+      throws StorageEngineException, QueryFilterOptimizationException, QueryProcessException,
+          IOException, InterruptedException {
+    QueryDataSet queryDataSet;
+    if (queryPlan instanceof AlignByDevicePlan) {
+      queryDataSet = getAlignByDeviceDataSet((AlignByDevicePlan) queryPlan, context, queryRouter);
+    } else {
+      if (queryPlan.getPaths() == null || queryPlan.getPaths().isEmpty()) {
+        // no time series are selected, return EmptyDataSet
+        return new EmptyDataSet();
+      } else if (queryPlan instanceof UDAFPlan) {
+        UDAFPlan udafPlan = (UDAFPlan) queryPlan;
+        queryDataSet = queryRouter.udafQuery(udafPlan, context);
+      } else if (queryPlan instanceof UDTFPlan) {
+        UDTFPlan udtfPlan = (UDTFPlan) queryPlan;
+        queryDataSet = queryRouter.udtfQuery(udtfPlan, context);
+      } else if (queryPlan instanceof GroupByTimeFillPlan) {
+        GroupByTimeFillPlan groupByFillPlan = (GroupByTimeFillPlan) queryPlan;
+        queryDataSet = queryRouter.groupByFill(groupByFillPlan, context);
+      } else if (queryPlan instanceof GroupByTimePlan) {
+        GroupByTimePlan groupByTimePlan = (GroupByTimePlan) queryPlan;
+        queryDataSet = queryRouter.groupBy(groupByTimePlan, context);
+      } else if (queryPlan instanceof QueryIndexPlan) {
+        throw new QueryProcessException("Query index hasn't been supported yet");
+      } else if (queryPlan instanceof AggregationPlan) {
+        AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
+        queryDataSet = queryRouter.aggregate(aggregationPlan, context);
+      } else if (queryPlan instanceof FillQueryPlan) {
+        FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan;
+        queryDataSet = queryRouter.fill(fillQueryPlan, context);
+      } else if (queryPlan instanceof LastQueryPlan) {
+        queryDataSet = queryRouter.lastQuery((LastQueryPlan) queryPlan, context);
+      } else {
+        queryDataSet = queryRouter.rawDataQuery((RawDataQueryPlan) queryPlan, context);
+      }
+    }
+    queryDataSet.setRowLimit(queryPlan.getRowLimit());
+    queryDataSet.setRowOffset(queryPlan.getRowOffset());
+    queryDataSet.setWithoutAllNull(queryPlan.isWithoutAllNull());
+    queryDataSet.setWithoutAnyNull(queryPlan.isWithoutAnyNull());
+    return queryDataSet;
+  }
+
+  protected AlignByDeviceDataSet getAlignByDeviceDataSet(
+      AlignByDevicePlan plan, QueryContext context, IQueryRouter router) {
+    return new AlignByDeviceDataSet(plan, context, router);
+  }
+
+  protected QueryDataSet processShowQuery(ShowPlan showPlan, QueryContext context)
+      throws QueryProcessException, MetadataException {
+    switch (showPlan.getShowContentType()) {
+      case TTL:
+        return processShowTTLQuery((ShowTTLPlan) showPlan);
+      case FLUSH_TASK_INFO:
+        return processShowFlushTaskInfo();
+      case VERSION:
+        return processShowVersion();
+      case TIMESERIES:
+        return processShowTimeseries((ShowTimeSeriesPlan) showPlan, context);
+      case STORAGE_GROUP:
+        return processShowStorageGroup((ShowStorageGroupPlan) showPlan);
+      case LOCK_INFO:
+        return processShowLockInfo((ShowLockInfoPlan) showPlan);
+      case DEVICES:
+        return processShowDevices((ShowDevicesPlan) showPlan);
+      case CHILD_PATH:
+        return processShowChildPaths((ShowChildPathsPlan) showPlan);
+      case CHILD_NODE:
+        return processShowChildNodes((ShowChildNodesPlan) showPlan);
+      case COUNT_TIMESERIES:
+        return processCountTimeSeries((CountPlan) showPlan);
+      case COUNT_NODE_TIMESERIES:
+        return processCountNodeTimeSeries((CountPlan) showPlan);
+      case COUNT_DEVICES:
+        return processCountDevices((CountPlan) showPlan);
+      case COUNT_STORAGE_GROUP:
+        return processCountStorageGroup((CountPlan) showPlan);
+      case COUNT_NODES:
+        return processCountNodes((CountPlan) showPlan);
+      case MERGE_STATUS:
+        return processShowMergeStatus();
+      case QUERY_PROCESSLIST:
+        return processShowQueryProcesslist();
+      case FUNCTIONS:
+        return processShowFunctions((ShowFunctionsPlan) showPlan);
+      case TRIGGERS:
+        return processShowTriggers();
+      case CONTINUOUS_QUERY:
+        return processShowContinuousQueries();
+      default:
+        throw new QueryProcessException(String.format("Unrecognized show plan %s", showPlan));
+    }
+  }
+
+  private QueryDataSet processCountNodes(CountPlan countPlan) throws MetadataException {
+    int num = getNodesNumInGivenLevel(countPlan.getPath(), countPlan.getLevel());
+    return createSingleDataSet(COLUMN_COUNT, TSDataType.INT32, num);
+  }
+
+  private QueryDataSet processCountNodeTimeSeries(CountPlan countPlan) throws MetadataException {
+    // get the nodes that need to group by first
+    List<PartialPath> nodes = getNodesList(countPlan.getPath(), countPlan.getLevel());
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_COLUMN, false), new PartialPath(COLUMN_COUNT, false)),
+            Arrays.asList(TSDataType.TEXT, TSDataType.INT32));
+    for (PartialPath columnPath : nodes) {
+      RowRecord record = new RowRecord(0);
+      Field field = new Field(TSDataType.TEXT);
+      field.setBinaryV(new Binary(columnPath.getFullPath()));
+      Field field1 = new Field(TSDataType.INT32);
+      // get the count of every group
+      field1.setIntV(getPathsNum(columnPath));
+      record.addField(field);
+      record.addField(field1);
+      listDataSet.putRecord(record);
+    }
+    return listDataSet;
+  }
+
+  private QueryDataSet processCountDevices(CountPlan countPlan) throws MetadataException {
+    int num = getDevicesNum(countPlan.getPath());
+    return createSingleDataSet(COLUMN_DEVICES, TSDataType.INT32, num);
+  }
+
+  private QueryDataSet processCountStorageGroup(CountPlan countPlan) throws MetadataException {
+    int num = getStorageGroupNum(countPlan.getPath());
+    return createSingleDataSet(COLUMN_STORAGE_GROUP, TSDataType.INT32, num);
+  }
+
+  private QueryDataSet createSingleDataSet(String columnName, TSDataType columnType, Object val) {
+    SingleDataSet singleDataSet =
+        new SingleDataSet(
+            Collections.singletonList(new PartialPath(columnName, false)),
+            Collections.singletonList(columnType));
+    Field field = new Field(columnType);
+    switch (columnType) {
+      case TEXT:
+        field.setBinaryV(((Binary) val));
+        break;
+      case FLOAT:
+        field.setFloatV(((float) val));
+        break;
+      case INT32:
+        field.setIntV(((int) val));
+        break;
+      case INT64:
+        field.setLongV(((long) val));
+        break;
+      case DOUBLE:
+        field.setDoubleV(((double) val));
+        break;
+      case BOOLEAN:
+        field.setBoolV(((boolean) val));
+        break;
+      default:
+        throw new UnSupportedDataTypeException("Unsupported data type" + columnType);
+    }
+    RowRecord record = new RowRecord(0);
+    record.addField(field);
+    singleDataSet.setRecord(record);
+    return singleDataSet;
+  }
+
+  protected int getDevicesNum(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getDevicesNum(path);
+  }
+
+  private int getStorageGroupNum(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getStorageGroupNum(path);
+  }
+
+  protected int getPathsNum(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getAllTimeseriesCount(path);
+  }
+
+  protected int getNodesNumInGivenLevel(PartialPath path, int level) throws MetadataException {
+    return IoTDB.metaManager.getNodesCountInGivenLevel(path, level);
+  }
+
+  protected List<MeasurementPath> getPathsName(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getMeasurementPaths(path);
+  }
+
+  protected List<PartialPath> getNodesList(PartialPath schemaPattern, int level)
+      throws MetadataException {
+    return IoTDB.metaManager.getNodesListInGivenLevel(schemaPattern, level);
+  }
+
+  private QueryDataSet processCountTimeSeries(CountPlan countPlan) throws MetadataException {
+    int num = getPathsNum(countPlan.getPath());
+    return createSingleDataSet(COLUMN_COUNT, TSDataType.INT32, num);
+  }
+
+  private QueryDataSet processShowDevices(ShowDevicesPlan showDevicesPlan)
+      throws MetadataException {
+    return new ShowDevicesDataSet(showDevicesPlan);
+  }
+
+  private QueryDataSet processShowChildPaths(ShowChildPathsPlan showChildPathsPlan)
+      throws MetadataException {
+    Set<String> childPathsList = getPathNextChildren(showChildPathsPlan.getPath());
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Collections.singletonList(new PartialPath(COLUMN_CHILD_PATHS, false)),
+            Collections.singletonList(TSDataType.TEXT));
+    for (String s : childPathsList) {
+      RowRecord record = new RowRecord(0);
+      Field field = new Field(TSDataType.TEXT);
+      field.setBinaryV(new Binary(s));
+      record.addField(field);
+      listDataSet.putRecord(record);
+    }
+    return listDataSet;
+  }
+
+  protected Set<String> getPathNextChildren(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getChildNodePathInNextLevel(path);
+  }
+
+  private QueryDataSet processShowChildNodes(ShowChildNodesPlan showChildNodesPlan)
+      throws MetadataException {
+    // getNodeNextChildren
+    Set<String> childNodesList = getNodeNextChildren(showChildNodesPlan.getPath());
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Collections.singletonList(new PartialPath(COLUMN_CHILD_NODES, false)),
+            Collections.singletonList(TSDataType.TEXT));
+    for (String s : childNodesList) {
+      RowRecord record = new RowRecord(0);
+      Field field = new Field(TSDataType.TEXT);
+      field.setBinaryV(new Binary(s));
+      record.addField(field);
+      listDataSet.putRecord(record);
+    }
+    return listDataSet;
+  }
+
+  protected Set<String> getNodeNextChildren(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getChildNodeNameInNextLevel(path);
+  }
+
+  protected List<PartialPath> getStorageGroupNames(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getMatchedStorageGroups(path);
+  }
+
+  private QueryDataSet processShowStorageGroup(ShowStorageGroupPlan showStorageGroupPlan)
+      throws MetadataException {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Collections.singletonList(new PartialPath(COLUMN_STORAGE_GROUP, false)),
+            Collections.singletonList(TSDataType.TEXT));
+    List<PartialPath> storageGroupList = getStorageGroupNames(showStorageGroupPlan.getPath());
+    addToDataSet(storageGroupList, listDataSet);
+    return listDataSet;
+  }
+
+  private void addToDataSet(Collection<PartialPath> paths, ListDataSet dataSet) {
+    for (PartialPath s : paths) {
+      RowRecord record = new RowRecord(0);
+      Field field = new Field(TSDataType.TEXT);
+      field.setBinaryV(new Binary(s.getFullPath()));
+      record.addField(field);
+      dataSet.putRecord(record);
+    }
+  }
+
+  private QueryDataSet processShowLockInfo(ShowLockInfoPlan showLockInfoPlan)
+      throws MetadataException {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_STORAGE_GROUP, false),
+                new PartialPath(COLUMN_LOCK_INFO, false)),
+            Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));
+    try {
+      List<PartialPath> storageGroupList = getStorageGroupNames(showLockInfoPlan.getPath());
+      List<String> lockHolderList = StorageEngine.getInstance().getLockInfo(storageGroupList);
+      addLockInfoToDataSet(storageGroupList, lockHolderList, listDataSet);
+    } catch (StorageEngineException e) {
+      throw new MetadataException(e);
+    }
+    return listDataSet;
+  }
+
+  private void addLockInfoToDataSet(
+      List<PartialPath> paths, List<String> lockHolderList, ListDataSet dataSet) {
+    for (int i = 0; i < paths.size(); i++) {
+      RowRecord record = new RowRecord(0);
+      Field field = new Field(TSDataType.TEXT);
+      field.setBinaryV(new Binary(paths.get(i).getFullPath()));
+      record.addField(field);
+      field = new Field(TSDataType.TEXT);
+      field.setBinaryV(new Binary(lockHolderList.get(i)));
+      record.addField(field);
+      dataSet.putRecord(record);
+    }
+  }
+
+  private QueryDataSet processShowTimeseries(
+      ShowTimeSeriesPlan showTimeSeriesPlan, QueryContext context) throws MetadataException {
+    return new ShowTimeseriesDataSet(showTimeSeriesPlan, context);
+  }
+
+  protected List<IStorageGroupMNode> getAllStorageGroupNodes() {
+    return IoTDB.metaManager.getAllStorageGroupNodes();
+  }
+
+  private QueryDataSet processShowTTLQuery(ShowTTLPlan showTTLPlan) {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_STORAGE_GROUP, false), new PartialPath(COLUMN_TTL, false)),
+            Arrays.asList(TSDataType.TEXT, TSDataType.INT64));
+    Set<PartialPath> selectedSgs = new HashSet<>(showTTLPlan.getStorageGroups());
+
+    List<IStorageGroupMNode> storageGroups = getAllStorageGroupNodes();
+    int timestamp = 0;
+    for (IStorageGroupMNode mNode : storageGroups) {
+      PartialPath sgName = mNode.getPartialPath();
+      if (!selectedSgs.isEmpty() && !selectedSgs.contains(sgName)) {
+        continue;
+      }
+      RowRecord rowRecord = new RowRecord(timestamp++);
+      Field sg = new Field(TSDataType.TEXT);
+      Field ttl;
+      sg.setBinaryV(new Binary(sgName.getFullPath()));
+      if (mNode.getDataTTL() != Long.MAX_VALUE) {
+        ttl = new Field(TSDataType.INT64);
+        ttl.setLongV(mNode.getDataTTL());
+      } else {
+        ttl = null;
+      }
+      rowRecord.addField(sg);
+      rowRecord.addField(ttl);
+      listDataSet.putRecord(rowRecord);
+    }
+
+    return listDataSet;
+  }
+
+  private QueryDataSet processShowVersion() {
+    SingleDataSet singleDataSet =
+        new SingleDataSet(
+            Collections.singletonList(new PartialPath(IoTDBConstant.COLUMN_VERSION, false)),
+            Collections.singletonList(TSDataType.TEXT));
+    Field field = new Field(TSDataType.TEXT);
+    field.setBinaryV(new Binary(IoTDBConstant.VERSION));
+    RowRecord rowRecord = new RowRecord(0);
+    rowRecord.addField(field);
+    singleDataSet.setRecord(rowRecord);
+    return singleDataSet;
+  }
+
+  private QueryDataSet processShowFlushTaskInfo() {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_ITEM, false), new PartialPath(COLUMN_VALUE, false)),
+            Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));
+
+    int timestamp = 0;
+    addRowRecordForShowQuery(
+        listDataSet,
+        timestamp++,
+        "total number of flush tasks",
+        Integer.toString(FlushTaskPoolManager.getInstance().getTotalTasks()));
+    addRowRecordForShowQuery(
+        listDataSet,
+        timestamp++,
+        "number of working flush tasks",
+        Integer.toString(FlushTaskPoolManager.getInstance().getWorkingTasksNumber()));
+    addRowRecordForShowQuery(
+        listDataSet,
+        timestamp,
+        "number of waiting flush tasks",
+        Integer.toString(FlushTaskPoolManager.getInstance().getWaitingTasksNumber()));
+    return listDataSet;
+  }
+
+  private QueryDataSet processShowFunctions(ShowFunctionsPlan showPlan)
+      throws QueryProcessException {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_FUNCTION_NAME, false),
+                new PartialPath(COLUMN_FUNCTION_TYPE, false),
+                new PartialPath(COLUMN_FUNCTION_CLASS, false)),
+            Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+
+    appendUDFs(listDataSet, showPlan);
+    appendNativeFunctions(listDataSet, showPlan);
+
+    listDataSet.sort(
+        (r1, r2) ->
+            String.CASE_INSENSITIVE_ORDER.compare(
+                r1.getFields().get(0).getStringValue(), r2.getFields().get(0).getStringValue()));
+    return listDataSet;
+  }
+
+  @SuppressWarnings("squid:S3776")
+  private void appendUDFs(ListDataSet listDataSet, ShowFunctionsPlan showPlan)
+      throws QueryProcessException {
+    for (UDFRegistrationInformation info :
+        UDFRegistrationService.getInstance().getRegistrationInformation()) {
+      RowRecord rowRecord = new RowRecord(0); // ignore timestamp
+      rowRecord.addField(Binary.valueOf(info.getFunctionName()), TSDataType.TEXT);
+      String functionType = "";
+      try {
+        if (info.isBuiltin()) {
+          if (info.isUDTF()) {
+            functionType = FUNCTION_TYPE_BUILTIN_UDTF;
+          } else if (info.isUDAF()) {
+            functionType = FUNCTION_TYPE_BUILTIN_UDAF;
+          }
+        } else {
+          if (info.isUDTF()) {
+            functionType = FUNCTION_TYPE_EXTERNAL_UDTF;
+          } else if (info.isUDAF()) {
+            functionType = FUNCTION_TYPE_EXTERNAL_UDAF;
+          }
+        }
+      } catch (InstantiationException
+          | InvocationTargetException
+          | NoSuchMethodException
+          | IllegalAccessException e) {
+        throw new QueryProcessException(e.toString());
+      }
+      rowRecord.addField(Binary.valueOf(functionType), TSDataType.TEXT);
+      rowRecord.addField(Binary.valueOf(info.getClassName()), TSDataType.TEXT);
+      listDataSet.putRecord(rowRecord);
+    }
+  }
+
+  private QueryDataSet processShowContinuousQueries() {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_NAME, false),
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL, false),
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL, false),
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_QUERY_SQL, false),
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_TARGET_PATH, false)),
+            Arrays.asList(
+                TSDataType.TEXT,
+                TSDataType.INT64,
+                TSDataType.INT64,
+                TSDataType.TEXT,
+                TSDataType.TEXT));
+
+    List<ShowContinuousQueriesResult> continuousQueriesList =
+        ContinuousQueryService.getInstance().getShowContinuousQueriesResultList();
+
+    for (ShowContinuousQueriesResult result : continuousQueriesList) {
+      RowRecord record = new RowRecord(0);
+      record.addField(Binary.valueOf(result.getContinuousQueryName()), TSDataType.TEXT);
+      record.addField(result.getEveryInterval(), TSDataType.INT64);
+      record.addField(result.getForInterval(), TSDataType.INT64);
+      record.addField(Binary.valueOf(result.getQuerySql()), TSDataType.TEXT);
+      record.addField(Binary.valueOf(result.getTargetPath().getFullPath()), TSDataType.TEXT);
+      listDataSet.putRecord(record);
+    }
+
+    return listDataSet;
+  }
+
+  private void appendNativeFunctions(ListDataSet listDataSet, ShowFunctionsPlan showPlan) {
+    final Binary functionType = Binary.valueOf(FUNCTION_TYPE_NATIVE);
+    final Binary className = Binary.valueOf("");
+    for (String functionName : SQLConstant.getNativeFunctionNames()) {
+      RowRecord rowRecord = new RowRecord(0); // ignore timestamp
+      rowRecord.addField(Binary.valueOf(functionName.toUpperCase()), TSDataType.TEXT);
+      rowRecord.addField(functionType, TSDataType.TEXT);
+      rowRecord.addField(className, TSDataType.TEXT);
+      listDataSet.putRecord(rowRecord);
+    }
+  }
+
+  private QueryDataSet processShowTriggers() {
+    return TriggerRegistrationService.getInstance().show();
+  }
+
+  private void addRowRecordForShowQuery(
+      ListDataSet listDataSet, int timestamp, String item, String value) {
+    RowRecord rowRecord = new RowRecord(timestamp);
+    Field itemField = new Field(TSDataType.TEXT);
+    itemField.setBinaryV(new Binary(item));
+    Field valueField = new Field(TSDataType.TEXT);
+    valueField.setBinaryV(new Binary(value));
+    rowRecord.addField(itemField);
+    rowRecord.addField(valueField);
+    listDataSet.putRecord(rowRecord);
+  }
+
+  @Override
+  public void delete(DeletePlan deletePlan) throws QueryProcessException {
+    AUDIT_LOGGER.info(
+        "delete data from {} in [{},{}]",
+        deletePlan.getPaths(),
+        deletePlan.getDeleteStartTime(),
+        deletePlan.getDeleteEndTime());
+    for (PartialPath path : deletePlan.getPaths()) {
+      delete(
+          path,
+          deletePlan.getDeleteStartTime(),
+          deletePlan.getDeleteEndTime(),
+          deletePlan.getIndex(),
+          deletePlan.getPartitionFilter());
+    }
+  }
+
+  private void operateLoadFiles(OperateFilePlan plan) throws QueryProcessException {
+    File file = plan.getFile();
+    if (!file.exists()) {
+      throw new QueryProcessException(
+          String.format("File path %s doesn't exists.", file.getPath()));
+    }
+    if (file.isDirectory()) {
+      loadDir(file, plan);
+    } else {
+      loadFile(file, plan);
+    }
+  }
+
+  private void loadDir(File curFile, OperateFilePlan plan) throws QueryProcessException {
+    File[] files = curFile.listFiles();
+    long[] establishTime = new long[files.length];
+    List<Integer> tsfiles = new ArrayList<>();
+
+    for (int i = 0; i < files.length; i++) {
+      File file = files[i];
+      if (!file.isDirectory()) {
+        String fileName = file.getName();
+        if (fileName.endsWith(TSFILE_SUFFIX)) {
+          establishTime[i] = Long.parseLong(fileName.split(FILE_NAME_SEPARATOR)[0]);
+          tsfiles.add(i);
+        }
+      }
+    }
+    Collections.sort(
+        tsfiles,
+        (o1, o2) -> {
+          if (establishTime[o1] == establishTime[o2]) {
+            return 0;
+          }
+          return establishTime[o1] < establishTime[o2] ? -1 : 1;
+        });
+    for (Integer i : tsfiles) {
+      loadFile(files[i], plan);
+    }
+
+    for (File file : files) {
+      if (file.isDirectory()) {
+        loadDir(file, plan);
+      }
+    }
+  }
+
+  private void loadFile(File file, OperateFilePlan plan) throws QueryProcessException {
+    if (!file.getName().endsWith(TSFILE_SUFFIX)) {
+      return;
+    }
+    TsFileResource tsFileResource = new TsFileResource(file);
+    tsFileResource.setClosed(true);
+    try {
+      // check file
+      RestorableTsFileIOWriter restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
+      if (restorableTsFileIOWriter.hasCrashed()) {
+        restorableTsFileIOWriter.close();
+        throw new QueryProcessException(
+            String.format(
+                "Cannot load file %s because the file has crashed.", file.getAbsolutePath()));
+      }
+      Map<Path, IMeasurementSchema> schemaMap = new HashMap<>();
+
+      List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
+      try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
+        reader.selfCheck(schemaMap, chunkGroupMetadataList, false);
+        if (plan.getVerifyMetadata()) {
+          loadNewTsFileVerifyMetadata(reader);
+        }
+      } catch (IOException e) {
+        logger.warn("can not get timeseries metadata from {}.", file.getAbsoluteFile());
+        throw new QueryProcessException(e.getMessage());
+      }
+
+      FileLoaderUtils.checkTsFileResource(tsFileResource);
+      if (UpgradeUtils.isNeedUpgrade(tsFileResource)) {
+        throw new QueryProcessException(
+            String.format(
+                "Cannot load file %s because the file's version is old which needs to be upgraded.",
+                file.getAbsolutePath()));
+      }
+
+      // create schemas if they doesn't exist
+      if (plan.isAutoCreateSchema()) {
+        createSchemaAutomatically(chunkGroupMetadataList, schemaMap, plan.getSgLevel());
+      }
+
+      List<TsFileResource> splitResources = new ArrayList();
+      if (tsFileResource.isSpanMultiTimePartitions()) {
+        logger.info(
+            "try to split the tsFile={} du to it spans multi partitions",
+            tsFileResource.getTsFile().getPath());
+        TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResources);
+        tsFileResource.writeLock();
+        tsFileResource.removeModFile();
+        tsFileResource.writeUnlock();
+        logger.info(
+            "after split, the old tsFile was split to {} new tsFiles", splitResources.size());
+      }
+
+      if (splitResources.isEmpty()) {
+        splitResources.add(tsFileResource);
+      }
+
+      for (TsFileResource resource : splitResources) {
+        StorageEngine.getInstance().loadNewTsFile(resource);
+      }
+    } catch (Exception e) {
+      logger.error("fail to load file {}", file.getName(), e);
+      throw new QueryProcessException(
+          String.format("Cannot load file %s because %s", file.getAbsolutePath(), e.getMessage()));
+    }
+  }
+
+  private void loadNewTsFileVerifyMetadata(TsFileSequenceReader tsFileSequenceReader)
+      throws MetadataException, QueryProcessException, IOException {
+    Map<String, List<TimeseriesMetadata>> metadataSet =
+        tsFileSequenceReader.getAllTimeseriesMetadata();
+    for (Map.Entry<String, List<TimeseriesMetadata>> entry : metadataSet.entrySet()) {
+      String deviceId = entry.getKey();
+      PartialPath devicePath = new PartialPath(deviceId);
+      if (!IoTDB.metaManager.isPathExist(devicePath)) {
+        continue;
+      }
+      for (TimeseriesMetadata metadata : entry.getValue()) {
+        PartialPath fullPath =
+            new PartialPath(deviceId + TsFileConstant.PATH_SEPARATOR + metadata.getMeasurementId());
+        if (IoTDB.metaManager.isPathExist(fullPath)) {
+          TSDataType dataType = IoTDB.metaManager.getSeriesType(fullPath);
+          if (dataType != metadata.getTSDataType()) {
+            throw new QueryProcessException(
+                fullPath.getFullPath()
+                    + " is "
+                    + metadata.getTSDataType().name()
+                    + " in the loading TsFile but is "
+                    + dataType.name()
+                    + " in IoTDB.");
+          }
+        }
+      }
+    }
+  }
+
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+  private void createSchemaAutomatically(
+      List<ChunkGroupMetadata> chunkGroupMetadataList,
+      Map<Path, IMeasurementSchema> knownSchemas,
+      int sgLevel)
+      throws MetadataException {
+    if (chunkGroupMetadataList.isEmpty()) {
+      return;
+    }
+
+    Set<PartialPath> registeredSeries = new HashSet<>();
+    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+      String device = chunkGroupMetadata.getDevice();
+      Set<String> existSeriesSet = new HashSet<>();
+      PartialPath devicePath = new PartialPath(device);
+      PartialPath storageGroupPath = MetaUtils.getStorageGroupPathByLevel(devicePath, sgLevel);
+      try {
+        IoTDB.metaManager.setStorageGroup(storageGroupPath);
+      } catch (StorageGroupAlreadySetException alreadySetException) {
+        if (!alreadySetException.getStorageGroupPath().equals(storageGroupPath.getFullPath())) {
+          throw alreadySetException;
+        }
+      }
+      for (PartialPath path :
+          IoTDB.metaManager.getMeasurementPaths(devicePath.concatNode(ONE_LEVEL_PATH_WILDCARD))) {
+        existSeriesSet.add(path.getMeasurement());
+        existSeriesSet.add(path.getMeasurementAlias());
+      }
+      for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+        PartialPath series =
+            new PartialPath(
+                chunkGroupMetadata.getDevice()
+                    + TsFileConstant.PATH_SEPARATOR
+                    + chunkMetadata.getMeasurementUid());
+        if (!registeredSeries.contains(series)) {
+          registeredSeries.add(series);
+          IMeasurementSchema schema =
+              knownSchemas.get(new Path(series.getDevice(), series.getMeasurement()));
+          if (schema == null) {
+            throw new MetadataException(
+                String.format(
+                    "Can not get the schema of measurement [%s]",
+                    chunkMetadata.getMeasurementUid()));
+          }
+          if (!existSeriesSet.contains(chunkMetadata.getMeasurementUid())) {
+            IoTDB.metaManager.createTimeseries(
+                series,
+                schema.getType(),
+                schema.getEncodingType(),
+                schema.getCompressor(),
+                Collections.emptyMap());
+          }
+        }
+      }
+    }
+  }
+
+  private void operateRemoveFile(OperateFilePlan plan) throws QueryProcessException {
+    try {
+      if (!StorageEngine.getInstance().deleteTsfile(plan.getFile())) {
+        throw new QueryProcessException(
+            String.format("File %s doesn't exist.", plan.getFile().getName()));
+      }
+    } catch (StorageEngineException | IllegalPathException e) {
+      throw new QueryProcessException(
+          String.format("Cannot remove file because %s", e.getMessage()));
+    }
+  }
+
+  private void operateUnloadFile(OperateFilePlan plan) throws QueryProcessException {
+    if (!plan.getTargetDir().exists() || !plan.getTargetDir().isDirectory()) {
+      throw new QueryProcessException(
+          String.format("Target dir %s is invalid.", plan.getTargetDir().getPath()));
+    }
+    try {
+      if (!StorageEngine.getInstance().unloadTsfile(plan.getFile(), plan.getTargetDir())) {
+        throw new QueryProcessException(
+            String.format("File %s doesn't exist.", plan.getFile().getName()));
+      }
+    } catch (StorageEngineException | IllegalPathException e) {
+      throw new QueryProcessException(
+          String.format(
+              "Cannot unload file %s to target directory %s because %s",
+              plan.getFile().getPath(), plan.getTargetDir().getPath(), e.getMessage()));
+    }
+  }
+
+  private void operateTTL(SetTTLPlan plan) throws QueryProcessException {
+    try {
+      List<PartialPath> storageGroupPaths =
+          IoTDB.metaManager.getMatchedStorageGroups(plan.getStorageGroup());
+      for (PartialPath storagePath : storageGroupPaths) {
+        IoTDB.metaManager.setTTL(storagePath, plan.getDataTTL());
+        StorageEngine.getInstance().setTTL(storagePath, plan.getDataTTL());
+      }
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    } catch (IOException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+  }
+
+  @Override
+  public void update(PartialPath path, long startTime, long endTime, String value) {
+    throw new UnsupportedOperationException("update is not supported now");
+  }
+
+  @Override
+  public void delete(
+      PartialPath path,
+      long startTime,
+      long endTime,
+      long planIndex,
+      TimePartitionFilter timePartitionFilter)
+      throws QueryProcessException {
+    try {
+      StorageEngine.getInstance().delete(path, startTime, endTime, planIndex, timePartitionFilter);
+    } catch (StorageEngineException e) {
+      throw new QueryProcessException(e);
+    }
+  }
+
+  private void checkFailedMeasurments(InsertPlan plan)
+      throws PathNotExistException, StorageEngineException {
+    // check if all path not exist exceptions
+    List<String> failedPaths = plan.getFailedMeasurements();
+    List<Exception> exceptions = plan.getFailedExceptions();
+    boolean isPathNotExistException = true;
+    for (Exception e : exceptions) {
+      Throwable curException = e;
+      while (curException.getCause() != null) {
+        curException = curException.getCause();
+      }
+      if (!(curException instanceof PathNotExistException)) {
+        isPathNotExistException = false;
+        break;
+      }
+    }
+    if (isPathNotExistException) {
+      throw new PathNotExistException(failedPaths);
+    } else {
+      throw new StorageEngineException(
+          INSERT_MEASUREMENTS_FAILED_MESSAGE
+              + plan.getFailedMeasurements()
+              + (!exceptions.isEmpty() ? (" caused by " + exceptions.get(0).getMessage()) : ""));
+    }
+  }
+
+  @Override
+  public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+      throws QueryProcessException {
+    if (insertRowsOfOneDevicePlan.getRowPlans().length == 0) {
+      return;
+    }
+    try {
+      // insert to storage engine
+      StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan);
+
+      List<String> notExistedPaths = null;
+      List<String> failedMeasurements = null;
+
+      // If there are some exceptions, we assume they caused by the same reason.
+      Exception exception = null;
+      for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+        if (plan.getFailedMeasurements() != null) {
+          if (notExistedPaths == null) {
+            notExistedPaths = new ArrayList<>();
+            failedMeasurements = new ArrayList<>();
+          }
+          // check if all path not exist exceptions
+          List<String> failedPaths = plan.getFailedMeasurements();
+          List<Exception> exceptions = plan.getFailedExceptions();
+          boolean isPathNotExistException = true;
+          for (Exception e : exceptions) {
+            exception = e;
+            Throwable curException = e;
+            while (curException.getCause() != null) {
+              curException = curException.getCause();
+            }
+            if (!(curException instanceof PathNotExistException)) {
+              isPathNotExistException = false;
+              break;
+            }
+          }
+          if (isPathNotExistException) {
+            notExistedPaths.addAll(failedPaths);
+          } else {
+            failedMeasurements.addAll(plan.getFailedMeasurements());
+          }
+        }
+      }
+      if (notExistedPaths != null && !notExistedPaths.isEmpty()) {
+        throw new PathNotExistException(notExistedPaths);
+      } else if (notExistedPaths != null && !failedMeasurements.isEmpty()) {
+        throw new StorageEngineException(
+            "failed to insert points "
+                + failedMeasurements
+                + (exception != null ? (" caused by " + exception.getMessage()) : ""));
+      }
+
+    } catch (StorageEngineException | MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+  }
+
+  @Override
+  public void insert(InsertRowsPlan plan) throws QueryProcessException {
+    for (int i = 0; i < plan.getInsertRowPlanList().size(); i++) {
+      if (plan.getResults().containsKey(i) || plan.isExecuted(i)) {
+        continue;
+      }
+      try {
+        insert(plan.getInsertRowPlanList().get(i));
+      } catch (QueryProcessException e) {
+        plan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      }
+    }
+    if (!plan.getResults().isEmpty()) {
+      throw new BatchProcessException(plan.getFailingStatus());
+    }
+  }
+
+  @Override
+  public void insert(InsertRowPlan insertRowPlan) throws QueryProcessException {
+    try {
+      insertRowPlan.setMeasurementMNodes(
+          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
+      // When insert data with sql statement, the data types will be null here.
+      // We need to predicted the data types first
+      if (insertRowPlan.getDataTypes()[0] == null) {
+        for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
+          insertRowPlan.getDataTypes()[i] =
+              TypeInferenceUtils.getPredictedDataType(
+                  insertRowPlan.getValues()[i], insertRowPlan.isNeedInferType());
+        }
+      }
+
+      StorageEngine.getInstance().insert(insertRowPlan);
+
+      if (insertRowPlan.getFailedMeasurements() != null) {
+        checkFailedMeasurments(insertRowPlan);
+      }
+    } catch (StorageEngineException | MetadataException e) {
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+        StatMonitor.getInstance().updateFailedStatValue();
+      }
+      throw new QueryProcessException(e);
+    } catch (Exception e) {
+      // update failed statistics
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+        StatMonitor.getInstance().updateFailedStatValue();
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan)
+      throws QueryProcessException {
+    if (insertMultiTabletPlan.isEnableMultiThreading()) {
+      insertTabletParallel(insertMultiTabletPlan);
+    } else {
+      insertTabletSerial(insertMultiTabletPlan);
+    }
+  }
+
+  private void insertTabletSerial(InsertMultiTabletPlan insertMultiTabletPlan)
+      throws BatchProcessException {
+    for (int i = 0; i < insertMultiTabletPlan.getInsertTabletPlanList().size(); i++) {
+      if (insertMultiTabletPlan.getResults().containsKey(i)
+          || insertMultiTabletPlan.isExecuted(i)) {
+        continue;
+      }
+      try {
+        insertTablet(insertMultiTabletPlan.getInsertTabletPlanList().get(i));
+      } catch (QueryProcessException e) {
+        insertMultiTabletPlan
+            .getResults()
+            .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      }
+    }
+    if (!insertMultiTabletPlan.getResults().isEmpty()) {
+      throw new BatchProcessException(insertMultiTabletPlan.getFailingStatus());
+    }
+  }
+
+  private void insertTabletParallel(InsertMultiTabletPlan insertMultiTabletPlan)
+      throws BatchProcessException {
+    updateInsertTabletsPool(insertMultiTabletPlan.getDifferentStorageGroupsCount());
+
+    List<InsertTabletPlan> planList = insertMultiTabletPlan.getInsertTabletPlanList();
+    List<Future<?>> futureList = new ArrayList<>();
+
+    Map<Integer, TSStatus> results = insertMultiTabletPlan.getResults();
+
+    List<InsertTabletPlan> runPlanList = new ArrayList<>();
+    Map<Integer, Integer> runIndexToRealIndex = new HashMap<>();
+    for (int i = 0; i < planList.size(); i++) {
+      if (!(results.containsKey(i) || insertMultiTabletPlan.isExecuted(i))) {
+        runPlanList.add(planList.get(i));
+        runIndexToRealIndex.put(runPlanList.size() - 1, i);
+      }
+    }
+    for (InsertTabletPlan plan : runPlanList) {
+      Future<?> f =
+          insertionPool.submit(
+              () -> {
+                insertTablet(plan);
+                return null;
+              });
+      futureList.add(f);
+    }
+    for (int i = 0; i < futureList.size(); i++) {
+      try {
+        futureList.get(i).get();
+      } catch (Exception e) {
+        if (e.getCause() instanceof QueryProcessException) {
+          QueryProcessException qe = (QueryProcessException) e.getCause();
+          results.put(
+              runIndexToRealIndex.get(i), RpcUtils.getStatus(qe.getErrorCode(), qe.getMessage()));
+        } else {
+          results.put(
+              runIndexToRealIndex.get(i),
+              RpcUtils.getStatus(INTERNAL_SERVER_ERROR, e.getMessage()));
+        }
+      }
+    }
+
+    if (!results.isEmpty()) {
+      throw new BatchProcessException(insertMultiTabletPlan.getFailingStatus());
+    }
+  }
+
+  private void updateInsertTabletsPool(int sgSize) {
+    int updateCoreSize = Math.min(sgSize, Runtime.getRuntime().availableProcessors() / 2);
+    if (insertionPool == null || insertionPool.isTerminated()) {
+      insertionPool =
+          (ThreadPoolExecutor)
+              IoTDBThreadPoolFactory.newFixedThreadPool(
+                  updateCoreSize, ThreadName.INSERTION_SERVICE.getName());
+    } else if (insertionPool.getCorePoolSize() > updateCoreSize) {
+      insertionPool.setCorePoolSize(updateCoreSize);
+      insertionPool.setMaximumPoolSize(updateCoreSize);
+    } else if (insertionPool.getCorePoolSize() < updateCoreSize) {
+      insertionPool.setMaximumPoolSize(updateCoreSize);
+      insertionPool.setCorePoolSize(updateCoreSize);
+    }
+  }
+
+  @Override
+  public void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException {
+
+    if (insertTabletPlan.getRowCount() == 0) {
+      return;
+    }
+    try {
+      insertTabletPlan.setMeasurementMNodes(
+          new IMeasurementMNode[insertTabletPlan.getMeasurements().length]);
+
+      StorageEngine.getInstance().insertTablet(insertTabletPlan);
+
+      if (insertTabletPlan.getFailedMeasurements() != null) {
+        checkFailedMeasurments(insertTabletPlan);
+      }
+    } catch (StorageEngineException | MetadataException e) {
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+        StatMonitor.getInstance().updateFailedStatValue();
+      }
+      throw new QueryProcessException(e);
+    } catch (Exception e) {
+      // update failed statistics
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+        StatMonitor.getInstance().updateFailedStatValue();
+      }
+      throw e;
+    }
+  }
+
+  private boolean operateAuthor(AuthorPlan author) throws QueryProcessException {
+    AuthorOperator.AuthorType authorType = author.getAuthorType();
+    String userName = author.getUserName();
+    String roleName = author.getRoleName();
+    String password = author.getPassword();
+    String newPassword = author.getNewPassword();
+    Set<Integer> permissions = author.getPermissions();
+    PartialPath nodeName = author.getNodeName();
+    try {
+      switch (authorType) {
+        case UPDATE_USER:
+          authorizer.updateUserPassword(userName, newPassword);
+          break;
+        case CREATE_USER:
+          authorizer.createUser(userName, password);
+          break;
+        case CREATE_ROLE:
+          authorizer.createRole(roleName);
+          break;
+        case DROP_USER:
+          authorizer.deleteUser(userName);
+          break;
+        case DROP_ROLE:
+          authorizer.deleteRole(roleName);
+          break;
+        case GRANT_ROLE:
+          for (int i : permissions) {
+            authorizer.grantPrivilegeToRole(roleName, nodeName.getFullPath(), i);
+          }
+          break;
+        case GRANT_USER:
+          for (int i : permissions) {
+            authorizer.grantPrivilegeToUser(userName, nodeName.getFullPath(), i);
+          }
+          break;
+        case GRANT_ROLE_TO_USER:
+          authorizer.grantRoleToUser(roleName, userName);
+          break;
+        case REVOKE_USER:
+          for (int i : permissions) {
+            authorizer.revokePrivilegeFromUser(userName, nodeName.getFullPath(), i);
+          }
+          break;
+        case REVOKE_ROLE:
+          for (int i : permissions) {
+            authorizer.revokePrivilegeFromRole(roleName, nodeName.getFullPath(), i);
+          }
+          break;
+        case REVOKE_ROLE_FROM_USER:
+          authorizer.revokeRoleFromUser(roleName, userName);
+          break;
+        default:
+          throw new QueryProcessException("Unsupported operation " + authorType);
+      }
+    } catch (AuthException e) {
+      throw new QueryProcessException(e.getMessage(), true);
+    }
+    return true;
+  }
+
+  private boolean operateWatermarkEmbedding(List<String> users, boolean useWatermark)
+      throws QueryProcessException {
+    try {
+      for (String user : users) {
+        authorizer.setUserUseWaterMark(user, useWatermark);
+      }
+    } catch (AuthException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+    return true;
+  }
+
+  private boolean createTimeSeries(CreateTimeSeriesPlan createTimeSeriesPlan)
+      throws QueryProcessException {
+    try {
+      IoTDB.metaManager.createTimeseries(createTimeSeriesPlan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  private boolean createAlignedTimeSeries(CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan)
+      throws QueryProcessException {
+    try {
+      IoTDB.metaManager.createAlignedTimeSeries(createAlignedTimeSeriesPlan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  @SuppressWarnings("squid:S3776") // high Cognitive Complexity
+  private boolean createMultiTimeSeries(CreateMultiTimeSeriesPlan multiPlan)
+      throws BatchProcessException {
+    int dataTypeIdx = 0;
+    for (int i = 0; i < multiPlan.getPaths().size(); i++) {
+      if (multiPlan.getResults().containsKey(i) || multiPlan.isExecuted(i)) {
+        continue;
+      }
+      PartialPath path = multiPlan.getPaths().get(i);
+      String measurement = path.getMeasurement();
+      CreateTimeSeriesPlan plan =
+          new CreateTimeSeriesPlan(
+              multiPlan.getPaths().get(i),
+              multiPlan.getDataTypes().get(i),
+              multiPlan.getEncodings().get(i),
+              multiPlan.getCompressors().get(i),
+              multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
+              multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
+              multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
+              multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
+      dataTypeIdx++;
+      try {
+        createTimeSeries(plan);
+      } catch (QueryProcessException e) {
+        multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      }
+    }
+    if (!multiPlan.getResults().isEmpty()) {
+      throw new BatchProcessException(multiPlan.getFailingStatus());
+    }
+    return true;
+  }
+
+  protected boolean deleteTimeSeries(DeleteTimeSeriesPlan deleteTimeSeriesPlan)
+      throws QueryProcessException {
+    AUDIT_LOGGER.info("delete timeseries {}", deleteTimeSeriesPlan.getPaths());
+    List<PartialPath> deletePathList = deleteTimeSeriesPlan.getPaths();
+    for (int i = 0; i < deletePathList.size(); i++) {
+      PartialPath path = deletePathList.get(i);
+      try {
+        StorageEngine.getInstance()
+            .deleteTimeseries(
+                path, deleteTimeSeriesPlan.getIndex(), deleteTimeSeriesPlan.getPartitionFilter());
+        String failed = IoTDB.metaManager.deleteTimeseries(path);
+        if (failed != null) {
+          deleteTimeSeriesPlan
+              .getResults()
+              .put(i, RpcUtils.getStatus(TSStatusCode.NODE_DELETE_FAILED_ERROR, failed));
+        }
+      } catch (StorageEngineException | MetadataException e) {
+        deleteTimeSeriesPlan
+            .getResults()
+            .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      }
+    }
+    if (!deleteTimeSeriesPlan.getResults().isEmpty()) {
+      throw new BatchProcessException(deleteTimeSeriesPlan.getFailingStatus());
+    }
+    return true;
+  }
+
+  private boolean alterTimeSeries(AlterTimeSeriesPlan alterTimeSeriesPlan)
+      throws QueryProcessException {
+    PartialPath path = alterTimeSeriesPlan.getPath();
+    Map<String, String> alterMap = alterTimeSeriesPlan.getAlterMap();
+    try {
+      switch (alterTimeSeriesPlan.getAlterType()) {
+        case RENAME:
+          String beforeName = alterMap.keySet().iterator().next();
+          String currentName = alterMap.get(beforeName);
+          IoTDB.metaManager.renameTagOrAttributeKey(beforeName, currentName, path);
+          break;
+        case SET:
+          IoTDB.metaManager.setTagsOrAttributesValue(alterMap, path);
+          break;
+        case DROP:
+          IoTDB.metaManager.dropTagsOrAttributes(alterMap.keySet(), path);
+          break;
+        case ADD_TAGS:
+          IoTDB.metaManager.addTags(alterMap, path);
+          break;
+        case ADD_ATTRIBUTES:
+          IoTDB.metaManager.addAttributes(alterMap, path);
+          break;
+        case UPSERT:
+          IoTDB.metaManager.upsertTagsAndAttributes(
+              alterTimeSeriesPlan.getAlias(),
+              alterTimeSeriesPlan.getTagsMap(),
+              alterTimeSeriesPlan.getAttributesMap(),
+              path);
+          break;
+      }
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    } catch (IOException e) {
+      throw new QueryProcessException(
+          String.format(
+              "Something went wrong while read/write the [%s]'s tag/attribute info.",
+              path.getFullPath()));
+    }
+    return true;
+  }
+
+  public boolean setStorageGroup(SetStorageGroupPlan setStorageGroupPlan)
+      throws QueryProcessException {
+    AUDIT_LOGGER.info("set storage group to {}", setStorageGroupPlan.getPaths());
+    PartialPath path = setStorageGroupPlan.getPath();
+    try {
+      IoTDB.metaManager.setStorageGroup(path);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  protected boolean deleteStorageGroups(DeleteStorageGroupPlan deleteStorageGroupPlan)
+      throws QueryProcessException {
+    AUDIT_LOGGER.info("delete storage group {}", deleteStorageGroupPlan.getPaths());
+    List<PartialPath> deletePathList = new ArrayList<>();
+    try {
+      for (PartialPath storageGroupPath : deleteStorageGroupPlan.getPaths()) {
+        List<PartialPath> allRelatedStorageGroupPath =
+            IoTDB.metaManager.getMatchedStorageGroups(storageGroupPath);
+        if (allRelatedStorageGroupPath.isEmpty()) {
+          throw new PathNotExistException(storageGroupPath.getFullPath(), true);
+        }
+        for (PartialPath path : allRelatedStorageGroupPath) {
+          StorageEngine.getInstance().deleteStorageGroup(path);
+          deletePathList.add(path);
+        }
+      }
+      IoTDB.metaManager.deleteStorageGroups(deletePathList);
+      operateClearCache();
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  protected QueryDataSet processAuthorQuery(AuthorPlan plan) throws QueryProcessException {
+    AuthorType authorType = plan.getAuthorType();
+    String userName = plan.getUserName();
+    String roleName = plan.getRoleName();
+    PartialPath path = plan.getNodeName();
+
+    ListDataSet dataSet;
+
+    try {
+      switch (authorType) {
+        case LIST_ROLE:
+          dataSet = executeListRole(plan);
+          break;
+        case LIST_USER:
+          dataSet = executeListUser(plan);
+          break;
+        case LIST_ROLE_USERS:
+          dataSet = executeListRoleUsers(roleName);
+          break;
+        case LIST_USER_ROLES:
+          dataSet = executeListUserRoles(userName);
+          break;
+        case LIST_ROLE_PRIVILEGE:
+          dataSet = executeListRolePrivileges(roleName, path);
+          break;
+        case LIST_USER_PRIVILEGE:
+          dataSet = executeListUserPrivileges(userName, path);
+          break;
+        default:
+          throw new QueryProcessException("Unsupported operation " + authorType);
+      }
+    } catch (AuthException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+    return dataSet;
+  }
+
+  private ListDataSet executeListRole(AuthorPlan plan) throws AuthException {
+    ListDataSet dataSet =
+        new ListDataSet(
+            Collections.singletonList(new PartialPath(COLUMN_ROLE, false)),
+            Collections.singletonList(TSDataType.TEXT));
+
+    // check if current user is granted list_role privilege
+    boolean hasListRolePrivilege =
+        AuthorityChecker.check(
+            plan.getLoginUserName(),
+            Collections.emptyList(),
+            plan.getOperatorType(),
+            plan.getLoginUserName());
+    if (!hasListRolePrivilege) {
+      return dataSet;
+    }
+
+    List<String> roleList = authorizer.listAllRoles();
+    addToDataSet(roleList, dataSet);
+    return dataSet;
+  }
+
+  private void addToDataSet(List<String> strResults, ListDataSet dataSet) {
+    int index = 0;
+    for (String role : strResults) {
+      RowRecord record = new RowRecord(index++);
+      Field field = new Field(TSDataType.TEXT);
+      field.setBinaryV(new Binary(role));
+      record.addField(field);
+      dataSet.putRecord(record);
+    }
+  }
+
+  private ListDataSet executeListUser(AuthorPlan plan) throws AuthException {
+    ListDataSet dataSet =
+        new ListDataSet(
+            Collections.singletonList(new PartialPath(COLUMN_USER, false)),
+            Collections.singletonList(TSDataType.TEXT));
+
+    // check if current user is granted list_user privilege
+    boolean hasListUserPrivilege =
+        AuthorityChecker.check(
+            plan.getLoginUserName(),
+            Collections.singletonList((plan.getNodeName())),
+            plan.getOperatorType(),
+            plan.getLoginUserName());
+    if (!hasListUserPrivilege) {
+      return dataSet;
+    }
+
+    List<String> userList = authorizer.listAllUsers();
+    addToDataSet(userList, dataSet);
+    return dataSet;
+  }
+
+  private ListDataSet executeListRoleUsers(String roleName) throws AuthException {
+    Role role = authorizer.getRole(roleName);
+    if (role == null) {
+      throw new AuthException("No such role : " + roleName);
+    }
+    ListDataSet dataSet =
+        new ListDataSet(
+            Collections.singletonList(new PartialPath(COLUMN_USER, false)),
+            Collections.singletonList(TSDataType.TEXT));
+    List<String> userList = authorizer.listAllUsers();
+    int index = 0;
+    for (String userN : userList) {
+      User userObj = authorizer.getUser(userN);
+      if (userObj != null && userObj.hasRole(roleName)) {
+        RowRecord record = new RowRecord(index++);
+        Field field = new Field(TSDataType.TEXT);
+        field.setBinaryV(new Binary(userN));
+        record.addField(field);
+        dataSet.putRecord(record);
+      }
+    }
+    return dataSet;
+  }
+
+  private ListDataSet executeListUserRoles(String userName) throws AuthException {
+    User user = authorizer.getUser(userName);
+    if (user != null) {
+      ListDataSet dataSet =
+          new ListDataSet(
+              Collections.singletonList(new PartialPath(COLUMN_ROLE, false)),
+              Collections.singletonList(TSDataType.TEXT));
+      int index = 0;
+      for (String roleN : user.getRoleList()) {
+        RowRecord record = new RowRecord(index++);
+        Field field = new Field(TSDataType.TEXT);
+        field.setBinaryV(new Binary(roleN));
+        record.addField(field);
+        dataSet.putRecord(record);
+      }
+      return dataSet;
+    } else {
+      throw new AuthException("No such user : " + userName);
+    }
+  }
+
+  private ListDataSet executeListRolePrivileges(String roleName, PartialPath path)
+      throws AuthException {
+    Role role = authorizer.getRole(roleName);
+    if (role != null) {
+      List<PartialPath> headerList = new ArrayList<>();
+      List<TSDataType> typeList = new ArrayList<>();
+      headerList.add(new PartialPath(COLUMN_PRIVILEGE, false));
+      typeList.add(TSDataType.TEXT);
+      ListDataSet dataSet = new ListDataSet(headerList, typeList);
+      int index = 0;
+      for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
+        if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
+          RowRecord record = new RowRecord(index++);
+          Field field = new Field(TSDataType.TEXT);
+          field.setBinaryV(new Binary(pathPrivilege.toString()));
+          record.addField(field);
+          dataSet.putRecord(record);
+        }
+      }
+      return dataSet;
+    } else {
+      throw new AuthException("No such role : " + roleName);
+    }
+  }
+
+  private ListDataSet executeListUserPrivileges(String userName, PartialPath path)
+      throws AuthException {
+    User user = authorizer.getUser(userName);
+    if (user == null) {
+      throw new AuthException("No such user : " + userName);
+    }
+    List<PartialPath> headerList = new ArrayList<>();
+    List<TSDataType> typeList = new ArrayList<>();
+    headerList.add(new PartialPath(COLUMN_ROLE, false));
+    headerList.add(new PartialPath(COLUMN_PRIVILEGE, false));
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.TEXT);
+    ListDataSet dataSet = new ListDataSet(headerList, typeList);
+    int index = 0;
+    for (PathPrivilege pathPrivilege : user.getPrivilegeList()) {
+      if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
+        RowRecord record = new RowRecord(index++);
+        Field roleF = new Field(TSDataType.TEXT);
+        roleF.setBinaryV(new Binary(""));
+        record.addField(roleF);
+        Field privilegeF = new Field(TSDataType.TEXT);
+        privilegeF.setBinaryV(new Binary(pathPrivilege.toString()));
+        record.addField(privilegeF);
+        dataSet.putRecord(record);
+      }
+    }
+    for (String roleN : user.getRoleList()) {
+      Role role = authorizer.getRole(roleN);
+      if (role == null) {
+        continue;
+      }
+      for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
+        if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
+          RowRecord record = new RowRecord(index++);
+          Field roleF = new Field(TSDataType.TEXT);
+          roleF.setBinaryV(new Binary(roleN));
+          record.addField(roleF);
+          Field privilegeF = new Field(TSDataType.TEXT);
+          privilegeF.setBinaryV(new Binary(pathPrivilege.toString()));
+          record.addField(privilegeF);
+          dataSet.putRecord(record);
+        }
+      }
+    }
+    return dataSet;
+  }
+
+  @SuppressWarnings("unused") // for the distributed version
+  protected void loadConfiguration(LoadConfigurationPlan plan) throws QueryProcessException {
+    IoTDBDescriptor.getInstance().loadHotModifiedProps();
+  }
+
+  private QueryDataSet processShowMergeStatus() {
+    List<PartialPath> headerList = new ArrayList<>();
+    List<TSDataType> typeList = new ArrayList<>();
+    headerList.add(new PartialPath(COLUMN_STORAGE_GROUP, false));
+    headerList.add(new PartialPath(COLUMN_TASK_NAME, false));
+    headerList.add(new PartialPath(COLUMN_CREATED_TIME, false));
+    headerList.add(new PartialPath(COLUMN_PROGRESS, false));
+    headerList.add(new PartialPath(COLUMN_CANCELLED, false));
+    headerList.add(new PartialPath(COLUMN_DONE, false));
+
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.BOOLEAN);
+    typeList.add(TSDataType.BOOLEAN);
+    ListDataSet dataSet = new ListDataSet(headerList, typeList);
+    Map<String, List<TaskStatus>>[] taskStatus = MergeManager.getINSTANCE().collectTaskStatus();
+    for (Map<String, List<TaskStatus>> statusMap : taskStatus) {
+      for (Entry<String, List<TaskStatus>> stringListEntry : statusMap.entrySet()) {
+        for (TaskStatus status : stringListEntry.getValue()) {
+          dataSet.putRecord(toRowRecord(status, stringListEntry.getKey()));
+        }
+      }
+    }
+    return dataSet;
+  }
+
+  public RowRecord toRowRecord(TaskStatus status, String storageGroup) {
+    RowRecord record = new RowRecord(0);
+    record.addField(new Binary(storageGroup), TSDataType.TEXT);
+    record.addField(new Binary(status.getTaskName()), TSDataType.TEXT);
+    record.addField(new Binary(status.getCreatedTime()), TSDataType.TEXT);
+    record.addField(new Binary(status.getProgress()), TSDataType.TEXT);
+    record.addField(status.isCancelled(), TSDataType.BOOLEAN);
+    record.addField(status.isDone(), TSDataType.BOOLEAN);
+    return record;
+  }
+
+  private QueryDataSet processShowQueryProcesslist() {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(new PartialPath(QUERY_ID, false), new PartialPath(STATEMENT, false)),
+            Arrays.asList(TSDataType.INT64, TSDataType.TEXT));
+    QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+    for (Entry<Long, QueryContext> context : queryTimeManager.getQueryContextMap().entrySet()) {
+      RowRecord record = new RowRecord(context.getValue().getStartTime());
+      record.addField(context.getKey(), TSDataType.INT64);
+      record.addField(new Binary(context.getValue().getStatement()), TSDataType.TEXT);
+      listDataSet.putRecord(record);
+    }
+    return listDataSet;
+  }
+
+  /**
+   * @param storageGroups the storage groups to check
+   * @return List of PartialPath the storage groups that not exist
+   */
+  List<PartialPath> checkStorageGroupExist(List<PartialPath> storageGroups) {
+    List<PartialPath> noExistSg = new ArrayList<>();
+    if (storageGroups == null) {
+      return noExistSg;
+    }
+    for (PartialPath storageGroup : storageGroups) {
+      if (!IoTDB.metaManager.isStorageGroup(storageGroup)) {
+        noExistSg.add(storageGroup);
+      }
+    }
+    return noExistSg;
+  }
+
+  private void settle(SettlePlan plan) throws StorageEngineException {
+    if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
+      throw new StorageEngineException(
+          "Current system mode is read only, does not support file settle");
+    }
+    if (!SettleService.getINSTANCE().isRecoverFinish()) {
+      throw new StorageEngineException("Existing sg that is not ready, please try later.");
+    }
+    PartialPath sgPath = null;
+    try {
+      List<TsFileResource> seqResourcesToBeSettled = new ArrayList<>();
+      List<TsFileResource> unseqResourcesToBeSettled = new ArrayList<>();
+      List<String> tsFilePaths = new ArrayList<>();
+      if (plan.isSgPath()) {
+        sgPath = plan.getSgPath();
+      } else {
+        String tsFilePath = plan.getTsFilePath();
+        if (new File(tsFilePath).isDirectory()) {
+          throw new WriteProcessException("The file should not be a directory.");
+        } else if (!new File(tsFilePath).exists()) {
+          throw new WriteProcessException("The tsFile " + tsFilePath + " is not existed.");
+        }
+        sgPath = SettleService.getINSTANCE().getSGByFilePath(tsFilePath);
+        tsFilePaths.add(tsFilePath);
+      }
+      StorageEngine.getInstance()
+          .getResourcesToBeSettled(
+              sgPath, seqResourcesToBeSettled, unseqResourcesToBeSettled, tsFilePaths);
+      SettleService.getINSTANCE().startSettling(seqResourcesToBeSettled, unseqResourcesToBeSettled);
+      StorageEngine.getInstance().setSettling(sgPath, false);
+    } catch (WriteProcessException e) {
+      if (sgPath != null) {
+        StorageEngine.getInstance().setSettling(sgPath, false);
+      }
+      throw new StorageEngineException(e.getMessage());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
index 6f8e823a4e..c9bcdf1cd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
@@ -33,8 +33,6 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
@@ -44,6 +42,8 @@ import org.apache.iotdb.db.query.control.QueryTimeManager;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.SessionTimeoutManager;
 import org.apache.iotdb.db.query.control.tracing.TracingManager;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 27bba9d092..11a2026e5c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -39,10 +39,10 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java
index 9766bfc85b..860bdd0ab7 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java
@@ -30,9 +30,9 @@ import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java
index 526226884f..e3e21b1cb6 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java
@@ -25,11 +25,11 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
index e70e5ee404..21284733fe 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -34,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java
index 2a596895e9..60ba99f132 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java
@@ -26,10 +26,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java
index 8aeb162b48..a449f92ff0 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java
@@ -28,10 +28,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
index 329e4bf090..f7fd194709 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
@@ -28,10 +28,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
index 5677ad27c8..4f8e894bb8 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
@@ -25,12 +25,12 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
index 062bbc3611..db9584f819 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
@@ -23,10 +23,10 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
index 75213a288a..bd0cb839cf 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
@@ -25,12 +25,12 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
index d33d1c9230..ab955ed0f8 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
@@ -36,6 +36,8 @@ import java.nio.ByteBuffer;
 /** Unit tests of AggregateResult without desc aggregate result. */
 public class AggregateResultTest {
 
+  private static final double maxError = 0.0001d;
+
   @Test
   public void avgAggrResultTest() throws QueryProcessException, IOException {
     AggregateResult avgAggrResult1 =
@@ -291,4 +293,36 @@ public class AggregateResultTest {
     AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
     Assert.assertEquals(2d, (double) result.getResult(), 0.01);
   }
+
+  //  @Test
+  //  public void validityAggrResultTest() throws QueryProcessException, IOException {
+  //    AggregateResult validityAggrResult1 =
+  //        AggregateResultFactory.getAggrResultByName(SQLConstant.VALIDITY, TSDataType.DOUBLE,
+  // true);
+  //    AggregateResult validityAggrResult2 =
+  //        AggregateResultFactory.getAggrResultByName(SQLConstant.VALIDITY, TSDataType.DOUBLE,
+  // true);
+  //
+  //    Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+  //    Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+  //    statistics1.update(1L, 1d);
+  //    statistics1.update(2L, 1d);
+  //    statistics2.update(3L, 1d);
+  //    statistics2.update(4L, 3d);
+  //
+  //    validityAggrResult1.updateResultFromStatistics(statistics1);
+  //    validityAggrResult2.updateResultFromStatistics(statistics2);
+  //    //    System.out.println(validityAggrResult1.getResult());
+  //    //    System.out.println(validityAggrResult2.getResult());
+  //    validityAggrResult1.merge(validityAggrResult2);
+  //    //    System.out.println(validityAggrResult1.getResult());
+  //
+  //    //    Assert.assertEquals(0.25, (double) validityAggrResult1.getResult(), maxError);
+  //
+  //    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+  //    validityAggrResult1.serializeTo(outputStream);
+  //    ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
+  //    AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
+  //    Assert.assertEquals(0.25, (double) result.getResult(), maxError);
+  //  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java b/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
index d038b3467f..56fb6ef09d 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
@@ -19,172 +19,37 @@
 
 package org.apache.iotdb.db.query.aggregation;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.constant.SQLConstant;
-import org.apache.iotdb.db.query.factory.AggregateResultFactory;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
-import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
-import org.apache.iotdb.tsfile.utils.Binary;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
 /** Unit tests of desc aggregate result. */
 public class DescAggregateResultTest {
 
-  @Test
-  public void maxTimeDescAggrResultTest() throws QueryProcessException, IOException {
-    AggregateResult maxTimeDescAggrResult =
-        AggregateResultFactory.getAggrResultByName(SQLConstant.MAX_TIME, TSDataType.FLOAT, false);
-
-    Statistics statistics1 = Statistics.getStatsByType(TSDataType.FLOAT);
-    Statistics statistics2 = Statistics.getStatsByType(TSDataType.FLOAT);
-    statistics1.update(10L, 10.0f);
-    statistics2.update(1L, 1.0f);
-
-    maxTimeDescAggrResult.updateResultFromStatistics(statistics1);
-    Assert.assertEquals(10L, (long) maxTimeDescAggrResult.getResult());
-    maxTimeDescAggrResult.updateResultFromStatistics(statistics2);
-    Assert.assertEquals(10L, (long) maxTimeDescAggrResult.getResult());
-
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    maxTimeDescAggrResult.serializeTo(outputStream);
-    ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
-    AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
-    Assert.assertEquals(10L, (long) result.getResult());
-
-    maxTimeDescAggrResult.reset();
-    BatchData batchData = BatchDataFactory.createBatchData(TSDataType.FLOAT, false, false);
-    batchData.putFloat(1, 1.0F);
-    batchData.putFloat(2, 2.0F);
-    batchData.putFloat(3, 3.0F);
-    batchData.putFloat(4, 4.0F);
-    batchData.putFloat(5, 5.0F);
-    batchData.resetBatchData();
-    IBatchDataIterator it = batchData.getBatchDataIterator();
-    maxTimeDescAggrResult.updateResultFromPageData(it);
-    Assert.assertEquals(5L, maxTimeDescAggrResult.getResult());
-    it.reset();
-    maxTimeDescAggrResult.updateResultFromPageData(it, 2, 5);
-    Assert.assertEquals(5L, maxTimeDescAggrResult.getResult());
-  }
-
-  @Test
-  public void minTimeDescAggrResultTest() throws QueryProcessException, IOException {
-    AggregateResult minTimeDescAggrResult =
-        AggregateResultFactory.getAggrResultByName(SQLConstant.MIN_TIME, TSDataType.FLOAT, false);
-
-    Statistics statistics1 = Statistics.getStatsByType(TSDataType.FLOAT);
-    Statistics statistics2 = Statistics.getStatsByType(TSDataType.FLOAT);
-    statistics1.update(10L, 10.0f);
-    statistics2.update(1L, 1.0f);
-
-    minTimeDescAggrResult.updateResultFromStatistics(statistics1);
-    Assert.assertEquals(10L, (long) minTimeDescAggrResult.getResult());
-    minTimeDescAggrResult.updateResultFromStatistics(statistics2);
-    Assert.assertEquals(1L, (long) minTimeDescAggrResult.getResult());
-
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    minTimeDescAggrResult.serializeTo(outputStream);
-    ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
-    AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
-    Assert.assertEquals(1L, (long) result.getResult());
-
-    minTimeDescAggrResult.reset();
-    BatchData batchData = BatchDataFactory.createBatchData(TSDataType.FLOAT, false, false);
-    batchData.putFloat(1, 1.0F);
-    batchData.putFloat(2, 2.0F);
-    batchData.putFloat(3, 3.0F);
-    batchData.putFloat(4, 4.0F);
-    batchData.putFloat(5, 5.0F);
-    batchData.resetBatchData();
-    IBatchDataIterator it = batchData.getBatchDataIterator();
-    minTimeDescAggrResult.updateResultFromPageData(it);
-    Assert.assertEquals(1L, minTimeDescAggrResult.getResult());
-    it.reset();
-    minTimeDescAggrResult.updateResultFromPageData(it, 1, 3);
-    Assert.assertEquals(1L, minTimeDescAggrResult.getResult());
-  }
-
-  @Test
-  public void firstValueDescAggrResultTest() throws QueryProcessException, IOException {
-    AggregateResult firstValueDescAggrResult =
-        AggregateResultFactory.getAggrResultByName(
-            SQLConstant.FIRST_VALUE, TSDataType.BOOLEAN, false);
-
-    Statistics statistics1 = Statistics.getStatsByType(TSDataType.BOOLEAN);
-    Statistics statistics2 = Statistics.getStatsByType(TSDataType.BOOLEAN);
-    statistics1.update(10L, true);
-    statistics2.update(1L, false);
-
-    firstValueDescAggrResult.updateResultFromStatistics(statistics1);
-    Assert.assertEquals(true, firstValueDescAggrResult.getResult());
-    firstValueDescAggrResult.updateResultFromStatistics(statistics2);
-    Assert.assertEquals(false, firstValueDescAggrResult.getResult());
-
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    firstValueDescAggrResult.serializeTo(outputStream);
-    ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
-    AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
-    Assert.assertEquals(false, result.getResult());
-
-    firstValueDescAggrResult.reset();
-    BatchData batchData = BatchDataFactory.createBatchData(TSDataType.BOOLEAN, false, false);
-    batchData.putBoolean(1, true);
-    batchData.putBoolean(2, false);
-    batchData.putBoolean(3, false);
-    batchData.putBoolean(4, true);
-    batchData.putBoolean(5, false);
-    batchData.resetBatchData();
-    IBatchDataIterator it = batchData.getBatchDataIterator();
-    firstValueDescAggrResult.updateResultFromPageData(it);
-    Assert.assertTrue((boolean) firstValueDescAggrResult.getResult());
-    it.reset();
-    firstValueDescAggrResult.updateResultFromPageData(it, 1, 3);
-    Assert.assertTrue((boolean) firstValueDescAggrResult.getResult());
-  }
-
-  @Test
-  public void lastValueDescAggrResultTest() throws QueryProcessException, IOException {
-    AggregateResult lastValueDescAggrResult =
-        AggregateResultFactory.getAggrResultByName(SQLConstant.LAST_VALUE, TSDataType.TEXT, false);
-
-    Statistics statistics1 = Statistics.getStatsByType(TSDataType.TEXT);
-    Statistics statistics2 = Statistics.getStatsByType(TSDataType.TEXT);
-    statistics1.update(10L, new Binary("last"));
-    statistics2.update(1L, new Binary("first"));
-
-    lastValueDescAggrResult.updateResultFromStatistics(statistics1);
-    Assert.assertEquals("last", String.valueOf(lastValueDescAggrResult.getResult()));
-    lastValueDescAggrResult.updateResultFromStatistics(statistics2);
-    Assert.assertEquals("last", String.valueOf(lastValueDescAggrResult.getResult()));
-
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    lastValueDescAggrResult.serializeTo(outputStream);
-    ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
-    AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
-    Assert.assertEquals("last", String.valueOf(result.getResult()));
-
-    lastValueDescAggrResult.reset();
-    BatchData batchData = BatchDataFactory.createBatchData(TSDataType.TEXT, false, false);
-    batchData.putBinary(1L, new Binary("a"));
-    batchData.putBinary(2L, new Binary("b"));
-    batchData.putBinary(3L, new Binary("c"));
-    batchData.putBinary(4L, new Binary("d"));
-    batchData.putBinary(5L, new Binary("e"));
-    batchData.resetBatchData();
-    IBatchDataIterator it = batchData.getBatchDataIterator();
-    lastValueDescAggrResult.updateResultFromPageData(it);
-    Assert.assertEquals("e", ((Binary) lastValueDescAggrResult.getResult()).getStringValue());
-    it.reset();
-    lastValueDescAggrResult.updateResultFromPageData(it, 3, 5);
-    Assert.assertEquals("e", ((Binary) lastValueDescAggrResult.getResult()).getStringValue());
-  }
+  //  @Test
+  //  public void validityMergeTest() throws QueryProcessException, IOException {
+  //    AggregateResult ValidityAggrResult =
+  //        AggregateResultFactory.getAggrResultByName(SQLConstant.VALIDITY, TSDataType.DOUBLE,
+  // true);
+  //
+  //    System.out.println(Runtime.getRuntime().freeMemory() / 1024 / 1024);
+  //
+  //    BatchData batchData = BatchDataFactory.createBatchData(TSDataType.DOUBLE, true, true);
+  //    BatchData batchData2 = BatchDataFactory.createBatchData(TSDataType.DOUBLE, true, true);
+  //
+  //    for (int i = 0; i < 3000; i++) {
+  //      batchData.putDouble(1623311071000L - 3000 + i, 0d);
+  //    }
+  //    batchData.putDouble(1623311071000L, 0d);
+  //    batchData.putDouble(1623312051000L, 2d);
+  //    batchData.putDouble(1623312053000L, 2d);
+  //    batchData.putDouble(1623312055000L, 2d);
+  //    batchData.putDouble(1623312057000L, 2d);
+  //    for (int i = 0; i < 7024; i++) {
+  //      batchData2.putDouble(1623312057000L + i, 0d);
+  //    }
+  //    batchData.resetBatchData();
+  //    batchData2.resetBatchData();
+  //    IBatchDataIterator it = batchData.getBatchDataIterator();
+  //    IBatchDataIterator it2 = batchData2.getBatchDataIterator();
+  //    ValidityAggrResult.updateResultFromPageData(it);
+  //    it.reset();
+  //    ValidityAggrResult.updateResultFromPageData(it2);
+  //  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java
index 07fad6d9df..92171f832c 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.query.dataset;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java
index 14873748bd..8909c62d82 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java
index 8f9a15b1c1..6aa4073366 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java
index 34d7edc939..23ed66971f 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
index ba6578af53..141673fe59 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.db.query.dataset.groupby;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
index 03cd7c431c..1602a3f207 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.query.dataset.groupby;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
index 30f3468b8b..05f999139b 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.query.dataset.groupby;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java
index 522518c5ea..5d7193fc8d 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
index 864b4c0048..b49d9a5f1c 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
@@ -29,8 +29,8 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.tools.TsFileRewriteTool;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;