You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/10/28 14:25:31 UTC

[iotdb] branch research/quality-validity updated: update validity aggregation (#7783)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch research/quality-validity
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/research/quality-validity by this push:
     new 52d35570b0 update validity aggregation (#7783)
52d35570b0 is described below

commit 52d35570b05321a32d61eee1dea30150c0218966
Author: iotdbValidity <99...@users.noreply.github.com>
AuthorDate: Fri Oct 28 22:25:17 2022 +0800

    update validity aggregation (#7783)
    
    update validity aggregation
---
 .../resources/conf/iotdb-engine.properties         |   17 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   28 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   56 +
 .../apache/iotdb/db/cq/ContinuousQueryTask.java    |    2 +-
 .../trigger/sink/local/LocalIoTDBHandler.java      |    4 +-
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |    6 +-
 .../qp/logical/crud/AggregationQueryOperator.java  |    3 +-
 .../iotdb/db/qp/logical/crud/QueryOperator.java    |    2 +-
 .../iotdb/db/qp/logical/crud/SelectComponent.java  |    1 +
 .../db/query/aggregation/AggregateResult.java      |    2 +
 .../db/query/aggregation/AggregationType.java      |   14 +-
 .../db/query/aggregation/impl/CountAggrResult.java |    9 +-
 .../query/aggregation/impl/ValidityAggrResult.java |  177 ++
 .../aggregation/impl/ValidityAllAggrResult.java    |  165 ++
 .../db/query/executor/AggregationExecutor.java     |  553 ++++-
 .../db/query/factory/AggregateResultFactory.java   |   12 +
 .../db/query/filter/executor/IPlanExecutor.java    |  134 ++
 .../db/query/filter/executor/PlanExecutor.java     | 2176 ++++++++++++++++++++
 .../db/service/basic/BasicServiceProvider.java     |    4 +-
 .../iotdb/db/engine/storagegroup/TTLTest.java      |    2 +-
 .../idtable/IDTableResourceControlTest.java        |    2 +-
 .../db/metadata/idtable/IDTableRestartTest.java    |    2 +-
 .../db/metadata/idtable/InsertWithIDTableTest.java |    2 +-
 .../db/metadata/idtable/LastQueryWithIDTable.java  |    2 +-
 .../QueryAlignedTimeseriesWithIDTableTest.java     |    2 +-
 .../db/metadata/idtable/QueryWithIDTableTest.java  |    2 +-
 .../iotdb/db/qp/physical/InsertRowPlanTest.java    |    2 +-
 .../db/qp/physical/InsertTabletMultiPlanTest.java  |    2 +-
 .../iotdb/db/qp/physical/InsertTabletPlanTest.java |    2 +-
 .../db/query/aggregation/AggregateResultTest.java  |   32 +
 .../query/aggregation/DescAggregateResultTest.java |  173 +-
 .../dataset/EngineDataSetWithValueFilterTest.java  |    4 +-
 .../iotdb/db/query/dataset/ListDataSetTest.java    |    4 +-
 .../iotdb/db/query/dataset/SingleDataSetTest.java  |    4 +-
 .../query/dataset/UDTFAlignByTimeDataSetTest.java  |    4 +-
 .../dataset/groupby/GroupByFillDataSetTest.java    |    4 +-
 .../dataset/groupby/GroupByLevelDataSetTest.java   |    4 +-
 .../dataset/groupby/GroupByTimeDataSetTest.java    |    4 +-
 .../valuefilter/RawQueryWithValueFilterTest.java   |    4 +-
 .../iotdb/db/utils/TsFileRewriteToolTest.java      |    4 +-
 40 files changed, 3421 insertions(+), 205 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 13becfd599..acf9325cd5 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -236,7 +236,7 @@ timestamp_precision=ms
 
 # Whether to timed flush unsequence tsfiles' memtables.
 # Datatype: boolean
-# enable_timed_flush_unseq_memtable=true
+enable_timed_flush_unseq_memtable=false
 
 # If a memTable's created time is older than current time minus this, the memtable will be flushed to disk.
 # Only check unsequence tsfiles' memtables.
@@ -418,15 +418,15 @@ timestamp_precision=ms
 ####################
 # sequence space compaction: only compact the sequence files
 # Datatype: boolean
-# enable_seq_space_compaction=true
+enable_seq_space_compaction=false
 
 # unsequence space compaction: only compact the unsequence files
 # Datatype: boolean
-# enable_unseq_space_compaction=true
+enable_unseq_space_compaction=false
 
 # cross space compaction: compact the unsequence files into the overlapped sequence files
 # Datatype: boolean
-# enable_cross_space_compaction=true
+enable_cross_space_compaction=false
 
 # the strategy of inner space compaction task
 # Options: inplace_compaction
@@ -721,9 +721,12 @@ timestamp_precision=ms
 # Datatype: int [xsy]
 # page_size_in_byte=65536
 
+# xMax = 10000000
+# xMin = -1000000
+
 # The maximum number of data points in a page, default 1024*1024
 # Datatype: int [xsy]
-# max_number_of_points_in_page=1048576
+max_number_of_points_in_page=1024
 
 # Max size limitation of input string
 # Datatype: int [xsy]
@@ -953,4 +956,6 @@ timestamp_precision=ms
 ### Group By Fill Configuration
 ####################
 # Datatype: float
-# group_by_fill_cache_size_in_mb=1.0
\ No newline at end of file
+# group_by_fill_cache_size_in_mb=1.0
+
+c = 100
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4842bb27e6..487beb299f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -802,10 +802,38 @@ public class IoTDBConfig {
    */
   private boolean enableIDTableLogFile = false;
 
+  private int c = 100;
+  private double smin = -999;
+  private double smax = 999;
+
   public IoTDBConfig() {
     // empty constructor
   }
 
+  public int getParamC() {
+    return c;
+  }
+
+  public void setParamC(int c) {
+    this.c = c;
+  }
+
+  public double getsmin() {
+    return smin;
+  }
+
+  public void setsmin(double smin) {
+    this.smin = smin;
+  }
+
+  public double getsmax() {
+    return smax;
+  }
+
+  public void setsmax(double smax) {
+    this.smax = smax;
+  }
+
   public float getUdfMemoryBudgetInMB() {
     return udfMemoryBudgetInMB;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 849fddbdb6..8216e48229 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -774,6 +774,13 @@ public class IoTDBDescriptor {
                   "insert_multi_tablet_enable_multithreading_column_threshold",
                   String.valueOf(conf.getInsertMultiTabletEnableMultithreadingColumnThreshold()))));
 
+      conf.setParamC(
+          Integer.parseInt(properties.getProperty("c", Integer.toString(conf.getParamC()))));
+      conf.setsmin(
+          Double.parseDouble(properties.getProperty("smin", Double.toString(conf.getsmin()))));
+      conf.setsmax(
+          Double.parseDouble(properties.getProperty("smax", Double.toString(conf.getsmax()))));
+
       // At the same time, set TSFileConfig
       TSFileDescriptor.getInstance()
           .getConfig()
@@ -974,6 +981,55 @@ public class IoTDBDescriptor {
                     "group_size_in_byte",
                     Integer.toString(
                         TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setXMax(
+            Double.parseDouble(
+                properties.getProperty(
+                    "xMax",
+                    Double.toString(TSFileDescriptor.getInstance().getConfig().getXMax()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setXMin(
+            Double.parseDouble(
+                properties.getProperty(
+                    "xMin",
+                    Double.toString(TSFileDescriptor.getInstance().getConfig().getXMin()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setsMax(
+            Double.parseDouble(
+                properties.getProperty(
+                    "sMax",
+                    Double.toString(TSFileDescriptor.getInstance().getConfig().getsMax()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setSmin(
+            Double.parseDouble(
+                properties.getProperty(
+                    "sMin",
+                    Double.toString(TSFileDescriptor.getInstance().getConfig().getSmin()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setMussRate(
+            Double.parseDouble(
+                properties.getProperty(
+                    "mussRate",
+                    Double.toString(TSFileDescriptor.getInstance().getConfig().getMussRate()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setUsePreSpeed(
+            Boolean.parseBoolean(
+                properties.getProperty(
+                    "usePreSpeed",
+                    Boolean.toString(TSFileDescriptor.getInstance().getConfig().isUsePreSpeed()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setUsePreRange(
+            Boolean.parseBoolean(
+                properties.getProperty(
+                    "usePreRange",
+                    Boolean.toString(TSFileDescriptor.getInstance().getConfig().isUsePreRange()))));
     TSFileDescriptor.getInstance()
         .getConfig()
         .setPageSizeInByte(
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
index 154c284fe8..6216487e5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.logical.crud.GroupByClauseComponent;
 import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
 import org.apache.iotdb.db.qp.logical.crud.SelectComponent;
@@ -34,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java
index 3b224b20e5..b4f40e1f4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/trigger/sink/local/LocalIoTDBHandler.java
@@ -27,10 +27,10 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index ce111ab37a..a7bb68583c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -70,6 +70,8 @@ public class SQLConstant {
   public static final String COUNT = "count";
   public static final String AVG = "avg";
   public static final String SUM = "sum";
+  public static final String VALIDITY = "validity";
+  public static final String VALIDITYALL = "validityall";
 
   public static final String ALL = "all";
 
@@ -85,7 +87,9 @@ public class SQLConstant {
               LAST_VALUE,
               COUNT,
               SUM,
-              AVG));
+              AVG,
+              VALIDITY,
+              VALIDITYALL));
 
   public static final int TOK_WHERE = 23;
   public static final int TOK_INSERT = 24;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
index 36719c71ec..560fb23a15 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
@@ -125,7 +125,8 @@ public class AggregationQueryOperator extends QueryOperator {
 
   protected AggregationPlan initAggregationPlan(QueryPlan queryPlan) throws QueryProcessException {
     AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
-    aggregationPlan.setAggregations(selectComponent.getAggregationFunctions());
+    aggregationPlan.setAggregations(selectComponent.getAggregationFunctions()); // function name
+    // TODO: aggregationPlan.setParams(selectComponent.getParams)
     if (isGroupByLevel()) {
       initGroupByLevel(aggregationPlan);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index 4653ed6794..5f41e24902 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -60,7 +60,7 @@ public class QueryOperator extends Operator {
   protected WhereComponent whereComponent;
   protected SpecialClauseComponent specialClauseComponent;
 
-  protected Map<String, Object> props;
+  protected Map<String, Object> props; // parameters
   protected IndexType indexType;
 
   protected boolean enableTracing;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
index f06d66158c..397f99f147 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
@@ -128,6 +128,7 @@ public final class SelectComponent {
             expression instanceof FunctionExpression
                 ? ((FunctionExpression) resultColumn.getExpression()).getFunctionName()
                 : null);
+        // TODO: resultColumn.getExpression().getFunctionAttributes()(
       }
     }
     return aggregationFunctionsCache;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 7daa1e8d60..0b9e27b61c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -67,6 +67,7 @@ public abstract class AggregateResult {
    *
    * @param statistics chunkStatistics or pageStatistics
    */
+  // update data by headers
   public abstract void updateResultFromStatistics(Statistics statistics)
       throws QueryProcessException;
 
@@ -76,6 +77,7 @@ public abstract class AggregateResult {
    *
    * @param batchIterator the data in Page
    */
+  // update by original data
   public abstract void updateResultFromPageData(IBatchDataIterator batchIterator)
       throws IOException, QueryProcessException;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
index a3c651835c..ad6632da1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
@@ -35,7 +35,9 @@ public enum AggregationType {
   MIN_TIME,
   MAX_VALUE,
   MIN_VALUE,
-  EXTREME;
+  EXTREME,
+  VALIDITY,
+  VALIDITYALL;
 
   /**
    * give an integer to return a data type.
@@ -65,6 +67,10 @@ public enum AggregationType {
         return MIN_VALUE;
       case 9:
         return EXTREME;
+      case 10:
+        return VALIDITY;
+      case 11:
+        return VALIDITYALL;
       default:
         throw new IllegalArgumentException("Invalid Aggregation Type: " + i);
     }
@@ -103,6 +109,12 @@ public enum AggregationType {
       case EXTREME:
         i = 9;
         break;
+      case VALIDITY:
+        i = 10;
+        break;
+      case VALIDITYALL:
+        i = 11;
+        break;
       default:
         throw new IllegalArgumentException("Invalid Aggregation Type: " + this.name());
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index ec279729f9..84a25ab51f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
@@ -32,6 +33,7 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 public class CountAggrResult extends AggregateResult {
+  private Statistics statisticsInstance = new DoubleStatistics();
 
   public CountAggrResult() {
     super(TSDataType.INT64, AggregationType.COUNT);
@@ -41,12 +43,15 @@ public class CountAggrResult extends AggregateResult {
 
   @Override
   public Long getResult() {
-    return getLongValue();
+    if (statisticsInstance.getCount() > 0) {
+      setLongValue((long) statisticsInstance.getValidityErrors());
+    }
+    return hasCandidateResult() ? getLongValue() : null;
   }
 
   @Override
   public void updateResultFromStatistics(Statistics statistics) {
-    setLongValue(getLongValue() + statistics.getCount());
+    setLongValue(getLongValue() + statistics.getValidityErrors());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ValidityAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ValidityAggrResult.java
new file mode 100644
index 0000000000..a3cc39b039
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ValidityAggrResult.java
@@ -0,0 +1,177 @@
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
+import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class ValidityAggrResult extends AggregateResult {
+  private TSDataType seriesDataType;
+  private double validity = 0.0;
+  private double sum = 0.0;
+  private long cnt = 0;
+  private Statistics statisticsInstance = new DoubleStatistics();
+
+  public ValidityAggrResult(TSDataType seriesDataType) {
+    super(TSDataType.DOUBLE, AggregationType.VALIDITY);
+    this.seriesDataType = seriesDataType;
+    reset();
+  }
+
+  @Override
+  public Double getResult() {
+    if (statisticsInstance.getCount() > 0) {
+      setDoubleValue(statisticsInstance.getValidity());
+    }
+    return hasCandidateResult() ? getDoubleValue() : null;
+  }
+
+  @Override
+  public void updateResultFromStatistics(Statistics statistics) {
+    double speedBetween = 0;
+    // skip empty statistics
+    if (statistics.getCount() == 0) {
+      return;
+    }
+    // update intermediate results from statistics
+    // must be sure no overlap between two statistics
+    if (statistics instanceof DoubleStatistics) {
+      statisticsInstance.mergeStatistics(statistics);
+    } else {
+      throw new StatisticsClassException("Does not support: validity");
+    }
+    setDoubleValue(statisticsInstance.getValidity());
+  }
+
+  @Override
+  public void updateResultFromPageData(IBatchDataIterator batchIterator)
+      throws IOException, QueryProcessException {
+    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+  }
+
+  public void updateDPAndReverseDPAll() {
+    if (statisticsInstance.getTimeWindow().size() == 0) {
+      return;
+    }
+    statisticsInstance.optUpdateDPandReverseDP();
+  }
+
+  @Override
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) throws IOException {
+    while (batchIterator.hasNext(minBound, maxBound)) {
+      if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
+        break;
+      }
+      statisticsInstance.update(batchIterator.currentTime(), (double) batchIterator.currentValue());
+      batchIterator.next();
+    }
+  }
+
+  @Override
+  public void updateResultUsingTimestamps(
+      long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
+        statisticsInstance.update(timestamps[i], (double) values[i]);
+      }
+    }
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+    int i = 0;
+    while (valueIterator.hasNext() && i < length) {
+      statisticsInstance.update(timestamps[i], (double) valueIterator.next());
+      i++;
+    }
+  }
+
+  @Override
+  // TODO:修改判断
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public void merge(AggregateResult another) {
+    // TODO: merge need be updated
+    ValidityAggrResult anotherVar = (ValidityAggrResult) another;
+    // skip empty results
+    if (anotherVar.getStatisticsInstance().getCount() == 0) {
+      return;
+    }
+    validity +=
+        (anotherVar.getResult() * anotherVar.getStatisticsInstance().getCount()
+                + validity * statisticsInstance.getCount())
+            / (anotherVar.getStatisticsInstance().getCount() + statisticsInstance.getCount());
+    setDoubleValue(validity);
+  }
+
+  @Override
+  protected void deserializeSpecificFields(ByteBuffer buffer) {
+    this.seriesDataType = TSDataType.deserialize(buffer.get());
+    this.validity = buffer.getDouble();
+    this.sum = buffer.getDouble();
+    this.cnt = buffer.getLong();
+  }
+
+  @Override
+  protected void serializeSpecificFields(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(seriesDataType, outputStream);
+    ReadWriteIOUtils.write(validity, outputStream);
+    ReadWriteIOUtils.write(sum, outputStream);
+    ReadWriteIOUtils.write(cnt, outputStream);
+  }
+
+  @Override
+  public void reset() {
+    super.reset();
+    validity = 0.0;
+    sum = 0.0;
+    cnt = 0L;
+    statisticsInstance = new DoubleStatistics();
+  }
+
+  public void setValidityErrors(int errors) {
+    this.statisticsInstance.setValidityErrors(errors);
+    this.statisticsInstance.setRepairSelfLast(true);
+    this.statisticsInstance.setRepairSelfFirst(true);
+  }
+
+  public double getValidity() {
+    return validity;
+  }
+
+  public double getSum() {
+    return sum;
+  }
+
+  public long getCnt() {
+    return cnt;
+  }
+
+  public Statistics getStatisticsInstance() {
+    return statisticsInstance;
+  }
+
+  public void setStatisticsInstance(Statistics statisticsInstance) {
+    this.statisticsInstance = statisticsInstance;
+  }
+
+  public boolean checkMergeable(Statistics statistics) {
+    return statisticsInstance.checkMergeable(statistics);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ValidityAllAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ValidityAllAggrResult.java
new file mode 100644
index 0000000000..426ecd5d7a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ValidityAllAggrResult.java
@@ -0,0 +1,165 @@
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
+import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+// TODO: update
+public class ValidityAllAggrResult extends AggregateResult {
+  private TSDataType seriesDataType;
+  private double validity = 0.0;
+  private double sum = 0.0;
+  private long cnt = 0;
+  private Statistics statisticsInstance = new DoubleStatistics();
+
+  public ValidityAllAggrResult(TSDataType seriesDataType) {
+    super(TSDataType.DOUBLE, AggregationType.VALIDITYALL);
+    this.seriesDataType = seriesDataType;
+    reset();
+  }
+
+  @Override
+  public Double getResult() {
+    if (statisticsInstance.getCount() > 0) {
+      setDoubleValue(statisticsInstance.getValidity());
+    }
+    return hasCandidateResult() ? getDoubleValue() : null;
+  }
+
+  @Override
+  public void updateResultFromStatistics(Statistics statistics) {
+    double speedBetween = 0;
+    // skip empty statistics
+    if (statistics.getCount() == 0) {
+      return;
+    }
+    // update intermediate results from statistics
+    // must be sure no overlap between two statistics
+    if (statistics instanceof DoubleStatistics) {
+      statisticsInstance.mergeStatistics(statistics);
+    } else {
+      throw new StatisticsClassException("Does not support: validityall");
+    }
+    setDoubleValue(statisticsInstance.getValidity());
+  }
+
+  @Override
+  public void updateResultFromPageData(IBatchDataIterator batchIterator)
+      throws IOException, QueryProcessException {
+    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+  }
+
+  public void updateDPAndReverseDP() {
+    statisticsInstance.optUpdateDPandReverseDP();
+  }
+
+  @Override
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) throws IOException {
+    while (batchIterator.hasNext(minBound, maxBound)) {
+      if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
+        break;
+      }
+      statisticsInstance.updateAll(
+          batchIterator.currentTime(), (double) batchIterator.currentValue());
+      batchIterator.next();
+    }
+  }
+
+  @Override
+  public void updateResultUsingTimestamps(
+      long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
+        statisticsInstance.updateAll(timestamps[i], (double) values[i]);
+      }
+    }
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+    int i = 0;
+    while (valueIterator.hasNext() && i < length) {
+      statisticsInstance.updateAll(timestamps[i], (double) valueIterator.next());
+      i++;
+    }
+  }
+
+  @Override
+  // TODO:修改判断
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public void merge(AggregateResult another) {
+    // TODO: merge need be updated
+    ValidityAggrResult anotherVar = (ValidityAggrResult) another;
+    // skip empty results
+    if (anotherVar.getStatisticsInstance().getCount() == 0) {
+      return;
+    }
+    validity +=
+        (anotherVar.getResult() * anotherVar.getStatisticsInstance().getCount()
+                + validity * statisticsInstance.getCount())
+            / (anotherVar.getStatisticsInstance().getCount() + statisticsInstance.getCount());
+    setDoubleValue(validity);
+  }
+
+  @Override
+  protected void deserializeSpecificFields(ByteBuffer buffer) {
+    this.seriesDataType = TSDataType.deserialize(buffer.get());
+    this.validity = buffer.getDouble();
+    this.sum = buffer.getDouble();
+    this.cnt = buffer.getLong();
+  }
+
+  @Override
+  protected void serializeSpecificFields(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(seriesDataType, outputStream);
+    ReadWriteIOUtils.write(validity, outputStream);
+    ReadWriteIOUtils.write(sum, outputStream);
+    ReadWriteIOUtils.write(cnt, outputStream);
+  }
+
+  @Override
+  public void reset() {
+    super.reset();
+    validity = 0.0;
+    sum = 0.0;
+    cnt = 0L;
+  }
+
+  public double getValidity() {
+    return validity;
+  }
+
+  public double getSum() {
+    return sum;
+  }
+
+  public long getCnt() {
+    return cnt;
+  }
+
+  public Statistics getStatisticsInstance() {
+    return statisticsInstance;
+  }
+
+  public boolean checkMergeable(Statistics statistics) {
+    return statisticsInstance.checkMergeable(statistics);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index fc33f2639a..6ae35f6674 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -34,6 +34,9 @@ import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.aggregation.impl.ValidityAggrResult;
+import org.apache.iotdb.db.query.aggregation.impl.ValidityAllAggrResult;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.SingleDataSet;
@@ -47,6 +50,8 @@ import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.db.utils.ValueIterator;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -59,14 +64,8 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
 
 import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
 
@@ -78,6 +77,7 @@ public class AggregationExecutor {
   protected List<String> aggregations;
   protected IExpression expression;
   protected boolean ascending;
+  private final TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
   protected QueryContext context;
   protected AggregateResult[] aggregateResultList;
 
@@ -101,6 +101,7 @@ public class AggregationExecutor {
   /** execute aggregate function with only time filter or no filter. */
   public QueryDataSet executeWithoutValueFilter(AggregationPlan aggregationPlan)
       throws StorageEngineException, IOException, QueryProcessException {
+    // TODO: store the params of aggregationPlan
 
     Filter timeFilter = null;
     if (expression != null) {
@@ -155,13 +156,27 @@ public class AggregationExecutor {
       throws IOException, QueryProcessException, StorageEngineException {
     List<AggregateResult> ascAggregateResultList = new ArrayList<>();
     List<AggregateResult> descAggregateResultList = new ArrayList<>();
+    ValidityAggrResult validityAggrResult = null;
+    ValidityAllAggrResult validityAggrALLResult = null;
     boolean[] isAsc = new boolean[aggregateResultList.length];
+    boolean[] isValidity = new boolean[aggregateResultList.length];
 
     TSDataType tsDataType = dataTypes.get(indexes.get(0));
     for (int i : indexes) {
       // construct AggregateResult
       AggregateResult aggregateResult =
           AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType);
+      // TODO: revise AAggregateResultFactory, Type
+      // TODO: if (getAggregationType() == ...) {}
+      if (aggregateResult.getAggregationType() == AggregationType.VALIDITY) {
+        validityAggrResult = (ValidityAggrResult) aggregateResult;
+        isValidity[i] = true;
+        continue;
+      } else if (aggregateResult.getAggregationType() == AggregationType.VALIDITYALL) {
+        validityAggrALLResult = (ValidityAllAggrResult) aggregateResult;
+        isValidity[i] = true;
+        continue;
+      }
       if (aggregateResult.isAscending()) {
         ascAggregateResultList.add(aggregateResult);
         isAsc[i] = true;
@@ -178,15 +193,529 @@ public class AggregationExecutor {
         ascAggregateResultList,
         descAggregateResultList,
         null);
-
+    if (validityAggrResult != null) {
+      aggregateValidity(
+          seriesPath,
+          allMeasurementsInDevice,
+          context,
+          timeFilter,
+          tsDataType,
+          validityAggrResult,
+          null);
+    } else if (validityAggrALLResult != null) {
+      aggregateValidityALL(
+          seriesPath,
+          allMeasurementsInDevice,
+          context,
+          timeFilter,
+          tsDataType,
+          validityAggrALLResult,
+          null);
+    }
     int ascIndex = 0;
     int descIndex = 0;
     for (int i : indexes) {
-      aggregateResultList[i] =
-          isAsc[i]
-              ? ascAggregateResultList.get(ascIndex++)
-              : descAggregateResultList.get(descIndex++);
+      if (isValidity[i]) {
+        if (validityAggrResult != null) {
+          aggregateResultList[i] = validityAggrResult;
+        } else {
+          aggregateResultList[i] = validityAggrALLResult;
+        }
+      } else {
+        aggregateResultList[i] =
+            (isAsc[i]
+                ? ascAggregateResultList.get(ascIndex++)
+                : descAggregateResultList.get(descIndex++));
+      }
+    }
+  }
+
+  // 所有都查
+  private void aggregateValidityALL(
+      PartialPath seriesPath,
+      Set<String> measurements,
+      QueryContext context,
+      Filter timeFilter,
+      TSDataType tsDataType,
+      ValidityAllAggrResult validityAllAggrResult,
+      TsFileFilter fileFilter)
+      throws QueryProcessException, StorageEngineException, IOException {
+    // construct series reader without value filter
+    QueryDataSource queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(seriesPath, context, timeFilter);
+    if (fileFilter != null) {
+      QueryUtils.filterQueryDataSource(queryDataSource, fileFilter);
+    }
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    IAggregateReader seriesReader =
+        new SeriesAggregateReader(
+            seriesPath,
+            measurements,
+            tsDataType,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            null,
+            true);
+    int res = 0;
+    int len = 0;
+    while (seriesReader.hasNextFile()) {
+      while (seriesReader.hasNextChunk()) {
+        while (seriesReader.hasNextPage()) {
+          if (seriesReader.canUseCurrentPageStatistics()) {
+            Statistics pageStatistic = seriesReader.currentPageStatistics();
+            res += pageStatistic.getValidityErrors();
+            len += pageStatistic.getCount();
+          }
+        }
+      }
+    }
+    System.out.println(1 - (double) res / len);
+  }
+
+  // validity
+  private void aggregateValidity(
+      PartialPath seriesPath,
+      Set<String> measurements,
+      QueryContext context,
+      Filter timeFilter,
+      TSDataType tsDataType,
+      ValidityAggrResult validityAggrResult,
+      TsFileFilter fileFilter)
+      throws QueryProcessException, StorageEngineException, IOException {
+    // construct series reader without value filter
+    QueryDataSource queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(seriesPath, context, timeFilter);
+    if (fileFilter != null) {
+      QueryUtils.filterQueryDataSource(queryDataSource, fileFilter);
+    }
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    IAggregateReader seriesReader =
+        new SeriesAggregateReader(
+            seriesPath,
+            measurements,
+            tsDataType,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            null,
+            true);
+
+    // unseq or seq
+    int nowPageIndex = 0;
+    List<Statistics> statisticsList = new ArrayList<>();
+    List<Integer> indexUsed = new ArrayList<>();
+    List<Integer> unseqIndex = new ArrayList<>();
+    List<Integer> unseqStartIndex = new ArrayList<>();
+    int cnt = 0;
+
+    boolean unseqMerge = true;
+    while (seriesReader.hasNextFile()) {
+      while (seriesReader.hasNextChunk()) {
+        while (seriesReader.hasNextPage()) {
+          cnt += 1;
+          // seq
+          if (seriesReader.canUseCurrentPageStatistics()) {
+            unseqMerge = false;
+            if (unseqIndex.size() > 0) {
+              indexUsed.add(unseqIndex.get(0));
+              unseqStartIndex.add(unseqIndex.get(0));
+              unseqIndex.clear();
+            }
+            indexUsed.add(nowPageIndex);
+            Statistics pageStatistic = seriesReader.currentPageStatistics();
+            if (nowPageIndex == 0) {
+              //              if (!pageStatistic.sameConstraints(
+              //                  tsFileConfig.getsMax(),
+              //                  tsFileConfig.getSmin(),
+              //                  tsFileConfig.getXMax(),
+              //                  tsFileConfig.getXMin())) {
+              //                System.out.println("constraints inconsistent with TsFile");
+              //                return;
+              //              }
+              validityAggrResult.setStatisticsInstance(pageStatistic);
+              nowPageIndex++;
+              seriesReader.skipCurrentPage();
+              continue;
+            }
+            //                        validityAggrResult.updateDPAndReverseDPAll();
+            statisticsList.add(validityAggrResult.getStatisticsInstance());
+            validityAggrResult.setStatisticsInstance(pageStatistic);
+            seriesReader.skipCurrentPage();
+            nowPageIndex++;
+            continue;
+          } else {
+            System.out.println("unSeq");
+            if (!unseqMerge && nowPageIndex > 0) {
+              statisticsList.add(validityAggrResult.getStatisticsInstance());
+              validityAggrResult.reset();
+            }
+            unseqMerge = true;
+          }
+          IBatchDataIterator batchDataIterator = seriesReader.nextPage().getBatchDataIterator();
+          validityAggrResult.updateResultFromPageData(batchDataIterator);
+          unseqIndex.add(nowPageIndex);
+          nowPageIndex++;
+          batchDataIterator.reset();
+        }
+      }
+    }
+    if (unseqIndex.size() > 0) {
+      indexUsed.add(unseqIndex.get(0));
+      unseqStartIndex.add(unseqIndex.get(0));
+      unseqIndex.clear();
+      //      validityAggrResult.updateDPAndReverseDPAll();
+    }
+    statisticsList.add(validityAggrResult.getStatisticsInstance());
+
+    // can merge or not merge
+    boolean[] indexBoolean = new boolean[nowPageIndex];
+    for (int j = nowPageIndex - 1; j > 0; j--) {
+      indexBoolean[j] = indexUsed.contains(j);
+    }
+
+    System.out.println("page num:" + cnt);
+    System.out.println("indexUsed-length:" + indexUsed.size());
+    System.out.println("unseq-length:" + unseqStartIndex.size());
+    System.out.println("statisticList-length:" + statisticsList.size());
+
+    List<Statistics> splitStatisticsList = new ArrayList<>();
+
+    int p = 0, index = 0;
+    int c = IoTDBDescriptor.getInstance().getConfig().getParamC();
+    System.out.println("c=" + c);
+    List<Integer> newIndexList = new ArrayList<>();
+    Map<Integer, Integer> newIndexToIndexUsed = new HashMap<>();
+    while (true) {
+      // only re-split overlapping segments
+      if (p >= statisticsList.size() || p >= indexUsed.size()) break;
+      Statistics pageStatistic = statisticsList.get(p);
+      if (!unseqStartIndex.contains(indexUsed.get(p))) {
+        splitStatisticsList.add(pageStatistic);
+        newIndexToIndexUsed.put(index, indexUsed.get(p));
+        p++;
+        index++;
+        continue;
+      }
+      System.out.println("Splitting one overlapping page.");
+
+      List<Long> timeWindow = pageStatistic.getTimeWindow();
+      List<Double> valueWindow = pageStatistic.getValueWindow();
+      boolean[] ifValueViolation = new boolean[timeWindow.size()];
+      boolean[] ifSpeedViolation = new boolean[timeWindow.size()];
+
+      double speedAVG = pageStatistic.getSpeedAVG(), speedSTD = pageStatistic.getSpeedSTD();
+      double smax = speedAVG + 3 * speedSTD;
+      double smin = speedAVG - 3 * speedSTD;
+      double xmin = pageStatistic.getxMin(), xmax = pageStatistic.getxMax();
+      if (Math.abs(smax) > Math.abs(smin)) {
+        smin = -(speedAVG + 3 * speedSTD);
+      } else {
+        smax = -(speedAVG - 3 * speedSTD);
+      }
+
+      int vioCnt = 0;
+      List<Double> speeds = new ArrayList<>();
+      // determine value violation points
+      for (int j = 0; j < timeWindow.size(); j++) {
+        if (valueWindow.get(j) >= xmin && valueWindow.get(j) <= xmax) ifValueViolation[j] = false;
+        else ifValueViolation[j] = true;
+        if (j > 0)
+          speeds.add(
+              (valueWindow.get(j) - valueWindow.get(j - 1))
+                  / (timeWindow.get(j) - timeWindow.get(j - 1)));
+      }
+
+      // determine speed violation points
+      if (speeds.get(0) >= smin && speeds.get(0) <= smax) ifSpeedViolation[0] = false;
+      else ifSpeedViolation[0] = true;
+      for (int j = 1; j < timeWindow.size() - 1; j++) {
+        if (speeds.get(j - 1) >= smin
+            && speeds.get(j - 1) <= smax
+            && speeds.get(j) >= smin
+            && speeds.get(j) <= smax) ifSpeedViolation[j] = false;
+        else ifSpeedViolation[j] = true;
+      }
+      if (speeds.get(speeds.size() - 1) >= smin && speeds.get(speeds.size() - 1) <= smax)
+        ifSpeedViolation[timeWindow.size() - 1] = false;
+      else ifSpeedViolation[timeWindow.size() - 1] = true;
+      System.out.println("speed violation points:" + vioCnt);
+
+      // optimistic split
+      int s = 0, e = 0; // double pointers
+      int consecCnt = 0, violationCnt = 0;
+      List<Integer> violationIndexList = new ArrayList<>();
+
+      while (e < ifValueViolation.length) {
+        if (!ifValueViolation[e] && !ifSpeedViolation[e]) consecCnt += 1;
+        else {
+          consecCnt = 0;
+          violationIndexList.add(e);
+          violationCnt += 1;
+        }
+        if (consecCnt == c) {
+          ValueIterator valueIterator =
+              new ValueIterator(valueWindow.subList(s, e - c / 2 + 1).toArray());
+          long[] timestamps = new long[e - c / 2 - s + 1];
+          for (int k = 0; k < e - c / 2 - s + 1; k++) {
+            timestamps[k] = timeWindow.get(s + k);
+          }
+          validityAggrResult.reset();
+          validityAggrResult.updateResultUsingValues(timestamps, e - c / 2 - s + 1, valueIterator);
+
+          if (violationCnt < 2) validityAggrResult.setValidityErrors(violationCnt);
+          else validityAggrResult.updateDPAndReverseDPAll();
+          splitStatisticsList.add(validityAggrResult.getStatisticsInstance());
+          newIndexList.add(index);
+          index++;
+
+          s = e - c / 2 + 1;
+          consecCnt = 0;
+          violationCnt = 0;
+          violationIndexList.clear();
+        }
+        e++;
+      }
+      if (e == ifValueViolation.length && s < e) {
+        ValueIterator valueIterator = new ValueIterator(valueWindow.subList(s, e).toArray());
+        long[] timestamps = new long[e - s + 1];
+        for (int k = 0; k < e - s; k++) {
+          timestamps[k] = timeWindow.get(s + k);
+        }
+        validityAggrResult.reset();
+        validityAggrResult.updateResultUsingValues(timestamps, e - s, valueIterator);
+        if (violationCnt < 2) validityAggrResult.setValidityErrors(violationCnt);
+        else validityAggrResult.updateDPAndReverseDPAll();
+        splitStatisticsList.add(validityAggrResult.getStatisticsInstance());
+        newIndexList.add(index);
+        index++;
+      }
+      p++;
+    }
+    System.out.println("splitStatisticsList:" + splitStatisticsList.size());
+
+    int i = 0;
+    List<Statistics> finalStatisticsList = new ArrayList<>();
+    finalStatisticsList = statisticsList;
+    boolean[] canMerge = new boolean[splitStatisticsList.size()];
+    for (int k = 0; k < splitStatisticsList.size(); k++) {
+      canMerge[k] = true;
+    }
+    validityAggrResult.reset();
+    int pagesize = 0;
+    while (i < splitStatisticsList.size()) {
+      // TODO: check merge code
+      Statistics pageStatistic = splitStatisticsList.get(i);
+      if (validityAggrResult.checkMergeable(pageStatistic)) {
+        finalStatisticsList.add(pageStatistic);
+        validityAggrResult.setStatisticsInstance(pageStatistic);
+        pagesize++;
+        System.out.println("can merge");
+        i++;
+      } else {
+        System.out.println("can not Merge");
+        canMerge[i] = false;
+
+        for (int j = i - 1; j >= 0; j--) {
+          if (!canMerge[j] && j > 0) {
+            continue;
+          }
+          System.out.println("Merging from " + j + "-th sub-page to " + i + "-th sub-page");
+
+          validityAggrResult.reset();
+          if (finalStatisticsList.size() > 0) {
+            finalStatisticsList.remove(finalStatisticsList.size() - 1);
+          }
+          for (int pageIndex = j; pageIndex <= i; pageIndex++) {
+            if (newIndexList.contains(pageIndex)) {
+              Statistics curStatistics = splitStatisticsList.get(pageIndex);
+              validityAggrResult.updateResultFromStatistics(curStatistics);
+            } else {
+              int originPageIndex = newIndexToIndexUsed.get(pageIndex);
+              IAggregateReader seriesReaderTemp =
+                  new SeriesAggregateReader(
+                      seriesPath,
+                      measurements,
+                      tsDataType,
+                      context,
+                      queryDataSource,
+                      timeFilter,
+                      null,
+                      null,
+                      true);
+              int curIndex = 0;
+              while (seriesReaderTemp.hasNextFile()) {
+                while (seriesReaderTemp.hasNextChunk()) {
+                  while (seriesReaderTemp.hasNextPage()) {
+                    if (curIndex < originPageIndex) {
+                      seriesReaderTemp.nextPage();
+                      curIndex++;
+                    } else if (curIndex > originPageIndex) break;
+                    else {
+                      IBatchDataIterator batchDataIterator =
+                          seriesReaderTemp.nextPage().getBatchDataIterator();
+                      validityAggrResult.updateResultFromPageData(batchDataIterator);
+                      batchDataIterator.reset();
+                      curIndex++;
+                    }
+                  }
+                  if (curIndex > originPageIndex) break;
+                }
+                if (curIndex > originPageIndex) break;
+              }
+            }
+          }
+          if (validityAggrResult.getStatisticsInstance().getCount() > c) {
+            int errors =
+                optimisticSplit(validityAggrResult.getStatisticsInstance(), validityAggrResult, c);
+            validityAggrResult.reset();
+            validityAggrResult.setValidityErrors(errors);
+          } else {
+            validityAggrResult.updateDPAndReverseDPAll();
+          }
+          if (finalStatisticsList.size() == 0) {
+            break;
+          }
+          if (finalStatisticsList
+              .get(finalStatisticsList.size() - 1)
+              .checkMergeable(validityAggrResult.getStatisticsInstance())) {
+            break;
+          } else {
+            canMerge[j] = false;
+          }
+        }
+        finalStatisticsList.add(validityAggrResult.getStatisticsInstance());
+        i++;
+      }
+    }
+
+    validityAggrResult.reset();
+    i = 0;
+    while (i < finalStatisticsList.size()) {
+      //      System.out.println("merge process");
+      validityAggrResult.updateResultFromStatistics(finalStatisticsList.get(i));
+      i++;
+    }
+    //    System.out.println(validityAggrResult.getStatisticsInstance().getRepairSelfLast());
+    //    double smax =
+    //        validityAggrResult.getStatisticsInstance().getSpeedAVG()
+    //            + 3 * validityAggrResult.getStatisticsInstance().getSpeedSTD();
+    //    double smin =
+    //        validityAggrResult.getStatisticsInstance().getSpeedAVG()
+    //            - 3 * validityAggrResult.getStatisticsInstance().getSpeedSTD();
+    //    if (tsFileConfig.isUsePreSpeed()) {
+    //      System.out.println("smax:" + tsFileConfig.getsMax() + "smin:" + tsFileConfig.getSmin());
+    //    } else {
+    //      System.out.println("smax:" + smax + "smin:" + smin);
+    //    }
+    //    System.out.println(finalStatisticsList.size());
+  }
+
+  private int optimisticSplit(
+      Statistics pageStatistic, ValidityAggrResult validityAggrResult, int c) {
+    List<Long> timeWindow = pageStatistic.getTimeWindow();
+    List<Double> valueWindow = pageStatistic.getValueWindow();
+    boolean[] ifValueViolation = new boolean[timeWindow.size()];
+    boolean[] ifSpeedViolation = new boolean[timeWindow.size()];
+
+    double preSpeed,
+        succSpeed,
+        speedAVG = pageStatistic.getSpeedAVG(),
+        speedSTD = pageStatistic.getSpeedSTD();
+    double smax = speedAVG + 3 * speedSTD;
+    double smin = speedAVG - 3 * speedSTD;
+    double xmin = pageStatistic.getxMin(), xmax = pageStatistic.getxMax();
+    if (Math.abs(smax) > Math.abs(smin)) {
+      smin = -(speedAVG + 3 * speedSTD);
+    } else {
+      smax = -(speedAVG - 3 * speedSTD);
+    }
+
+    int vioCnt = 0;
+    List<Double> speeds = new ArrayList<>();
+    // determine value violation points
+    for (int j = 0; j < timeWindow.size(); j++) {
+      if (valueWindow.get(j) >= xmin && valueWindow.get(j) <= xmax) ifValueViolation[j] = false;
+      else ifValueViolation[j] = true;
+      if (j > 0)
+        speeds.add(
+            (valueWindow.get(j) - valueWindow.get(j - 1))
+                / (timeWindow.get(j) - timeWindow.get(j - 1)));
+    }
+
+    // determine speed violation points
+    if (speeds.get(0) >= smin && speeds.get(0) <= smax) ifSpeedViolation[0] = false;
+    else ifSpeedViolation[0] = true;
+    for (int j = 1; j < timeWindow.size() - 1; j++) {
+      if (speeds.get(j - 1) >= smin
+          && speeds.get(j - 1) <= smax
+          && speeds.get(j) >= smin
+          && speeds.get(j) <= smax) ifSpeedViolation[j] = false;
+      else ifSpeedViolation[j] = true;
+    }
+    if (speeds.get(speeds.size() - 1) >= smin && speeds.get(speeds.size() - 1) <= smax)
+      ifSpeedViolation[timeWindow.size() - 1] = false;
+    else ifSpeedViolation[timeWindow.size() - 1] = true;
+    System.out.println("speed violation points:" + vioCnt);
+
+    // optimistic split
+    int s = 0, e = 0; // double pointers
+    int consecCnt = 0, violationCnt = 0;
+    List<Integer> violationIndexList = new ArrayList<>();
+    List<Statistics> resStatisticsList = new ArrayList<>();
+
+    while (e < ifValueViolation.length) {
+      if (!ifValueViolation[e] && !ifSpeedViolation[e]) consecCnt += 1;
+      else {
+        consecCnt = 0;
+        violationIndexList.add(e);
+        violationCnt += 1;
+      }
+      if (consecCnt == c) {
+        ValueIterator valueIterator =
+            new ValueIterator(valueWindow.subList(s, e - c / 2 + 1).toArray());
+        long[] timestamps = new long[e - c / 2 - s + 1];
+        for (int k = 0; k < e - c / 2 - s + 1; k++) {
+          timestamps[k] = timeWindow.get(s + k);
+        }
+        validityAggrResult.reset();
+        validityAggrResult.updateResultUsingValues(timestamps, e - c / 2 - s + 1, valueIterator);
+
+        if (violationCnt < 2) validityAggrResult.setValidityErrors(violationCnt);
+        else validityAggrResult.updateDPAndReverseDPAll();
+        resStatisticsList.add(validityAggrResult.getStatisticsInstance());
+
+        s = e - c / 2 + 1;
+        consecCnt = 0;
+        violationCnt = 0;
+        violationIndexList.clear();
+      }
+      e++;
+    }
+    if (e == ifValueViolation.length && s < e) {
+      ValueIterator valueIterator = new ValueIterator(valueWindow.subList(s, e).toArray());
+      long[] timestamps = new long[e - s + 1];
+      for (int k = 0; k < e - s; k++) {
+        timestamps[k] = timeWindow.get(s + k);
+      }
+      validityAggrResult.reset();
+      validityAggrResult.updateResultUsingValues(timestamps, e - s, valueIterator);
+      if (violationCnt < 2) validityAggrResult.setValidityErrors(violationCnt);
+      else validityAggrResult.updateDPAndReverseDPAll();
+      resStatisticsList.add(validityAggrResult.getStatisticsInstance());
+    }
+    int cnt = 0, errors = 0;
+    for (int k = 0; k < resStatisticsList.size(); k++) {
+      cnt += resStatisticsList.get(k).getCount();
+      errors += resStatisticsList.get(k).getValidityErrors();
     }
+    return errors;
   }
 
   protected void aggregateOneAlignedSeries(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
index 987cdcd451..360bb82f25 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
@@ -67,6 +67,10 @@ public class AggregateResultFactory {
         return !ascending
             ? new LastValueDescAggrResult(dataType)
             : new LastValueAggrResult(dataType);
+      case SQLConstant.VALIDITY:
+        return new ValidityAggrResult(dataType);
+      case SQLConstant.VALIDITYALL:
+        return new ValidityAllAggrResult(dataType);
       default:
         throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName);
     }
@@ -98,6 +102,10 @@ public class AggregateResultFactory {
         return new SumAggrResult(dataType);
       case SQLConstant.LAST_VALUE:
         return new LastValueDescAggrResult(dataType);
+      case SQLConstant.VALIDITY:
+        return new ValidityAggrResult(dataType);
+      case SQLConstant.VALIDITYALL:
+        return new ValidityAllAggrResult(dataType);
       default:
         throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName);
     }
@@ -130,6 +138,10 @@ public class AggregateResultFactory {
         return new MinValueAggrResult(dataType);
       case EXTREME:
         return new ExtremeAggrResult(dataType);
+      case VALIDITY:
+        return new ValidityAggrResult(dataType);
+      case VALIDITYALL:
+        return new ValidityAllAggrResult(dataType);
       default:
         throw new IllegalArgumentException("Invalid Aggregation Type: " + aggregationType.name());
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/filter/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/IPlanExecutor.java
new file mode 100644
index 0000000000..4ef688a7e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/IPlanExecutor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.filter.executor;
+
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+public interface IPlanExecutor {
+
+  /**
+   * process query plan of qp layer, construct queryDataSet.
+   *
+   * @param queryPlan QueryPlan
+   * @return QueryDataSet
+   */
+  QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext context)
+      throws IOException, StorageEngineException, QueryFilterOptimizationException,
+          QueryProcessException, MetadataException, SQLException, TException, InterruptedException;
+
+  /**
+   * Process Non-Query Physical plan, including insert/update/delete operation of
+   * data/metadata/Privilege
+   *
+   * @param plan Physical Non-Query Plan
+   */
+  boolean processNonQuery(PhysicalPlan plan)
+      throws QueryProcessException, StorageGroupNotSetException, StorageEngineException;
+
+  /**
+   * execute update command and return whether the operator is successful.
+   *
+   * @param path : update series seriesPath
+   * @param startTime start time in update command
+   * @param endTime end time in update command
+   * @param value - in type of string
+   */
+  void update(PartialPath path, long startTime, long endTime, String value)
+      throws QueryProcessException;
+
+  /**
+   * execute delete command and return whether the operator is successful.
+   *
+   * @param deletePlan physical delete plan
+   */
+  void delete(DeletePlan deletePlan) throws QueryProcessException;
+
+  /**
+   * execute delete command and return whether the operator is successful.
+   *
+   * @param path delete series seriesPath
+   * @param startTime start time in delete command
+   * @param endTime end time in delete command
+   * @param planIndex index of the deletion plan
+   * @param partitionFilter specify involving time partitions, if null, all partitions are involved
+   */
+  void delete(
+      PartialPath path,
+      long startTime,
+      long endTime,
+      long planIndex,
+      TimePartitionFilter partitionFilter)
+      throws QueryProcessException;
+
+  /**
+   * execute insert command and return whether the operator is successful.
+   *
+   * @param insertRowPlan physical insert plan
+   */
+  void insert(InsertRowPlan insertRowPlan) throws QueryProcessException;
+
+  /**
+   * execute insert command and return whether the operator is successful.
+   *
+   * @param insertRowsPlan physical insert rows plan, which contains multi insertRowPlans
+   */
+  void insert(InsertRowsPlan insertRowsPlan) throws QueryProcessException;
+
+  /**
+   * execute insert command and return whether the operator is successful.
+   *
+   * @param insertRowsOfOneDevicePlan physical insert plan
+   */
+  void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan) throws QueryProcessException;
+
+  /**
+   * execute batch insert plan
+   *
+   * @throws BatchProcessException when some of the rows failed
+   */
+  void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException;
+
+  /**
+   * execute multi batch insert plan
+   *
+   * @throws QueryProcessException when some of the rows failed
+   */
+  void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan) throws QueryProcessException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/filter/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/PlanExecutor.java
new file mode 100644
index 0000000000..84f8b3c5b4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/filter/executor/PlanExecutor.java
@@ -0,0 +1,2176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.filter.executor;
+
+import org.apache.iotdb.db.auth.AuthException;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
+import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
+import org.apache.iotdb.db.auth.entity.PathPrivilege;
+import org.apache.iotdb.db.auth.entity.Role;
+import org.apache.iotdb.db.auth.entity.User;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.cq.ContinuousQueryService;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager;
+import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager.TaskStatus;
+import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
+import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.ContinuousQueryException;
+import org.apache.iotdb.db.exception.QueryIdNotExsitException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.TriggerExecutionException;
+import org.apache.iotdb.db.exception.TriggerManagementException;
+import org.apache.iotdb.db.exception.UDFRegistrationException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
+import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePartitionPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDAFPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
+import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.CountPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
+import org.apache.iotdb.db.qp.physical.sys.MergePlan;
+import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
+import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SettlePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowChildNodesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowLockInfoPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
+import org.apache.iotdb.db.query.dataset.ListDataSet;
+import org.apache.iotdb.db.query.dataset.ShowContinuousQueriesResult;
+import org.apache.iotdb.db.query.dataset.ShowDevicesDataSet;
+import org.apache.iotdb.db.query.dataset.ShowTimeseriesDataSet;
+import org.apache.iotdb.db.query.dataset.SingleDataSet;
+import org.apache.iotdb.db.query.executor.IQueryRouter;
+import org.apache.iotdb.db.query.executor.QueryRouter;
+import org.apache.iotdb.db.query.udf.service.UDFRegistrationInformation;
+import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.SettleService;
+import org.apache.iotdb.db.tools.TsFileRewriteTool;
+import org.apache.iotdb.db.utils.AuthUtils;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.EmptyDataSet;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_QUERY_SQL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_TARGET_PATH;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_CLASS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_LOCK_INFO;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDTF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.conf.IoTDBConstant.QUERY_ID;
+import static org.apache.iotdb.db.conf.IoTDBConstant.STATEMENT;
+import static org.apache.iotdb.rpc.TSStatusCode.INTERNAL_SERVER_ERROR;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+public class PlanExecutor implements IPlanExecutor {
+
+  private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class);
+  private static final Logger AUDIT_LOGGER =
+      LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
+  // for data query
+  protected IQueryRouter queryRouter;
+  // for administration
+  private final IAuthorizer authorizer;
+
+  private ThreadPoolExecutor insertionPool;
+
+  private static final String INSERT_MEASUREMENTS_FAILED_MESSAGE = "failed to insert measurements ";
+
+  public PlanExecutor() throws QueryProcessException {
+    queryRouter = new QueryRouter();
+    try {
+      authorizer = BasicAuthorizer.getInstance();
+    } catch (AuthException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+  }
+
+  @Override
+  public QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext context)
+      throws IOException, StorageEngineException, QueryFilterOptimizationException,
+          QueryProcessException, MetadataException, InterruptedException {
+    if (queryPlan instanceof QueryPlan) {
+      return processDataQuery((QueryPlan) queryPlan, context);
+    } else if (queryPlan instanceof AuthorPlan) {
+      return processAuthorQuery((AuthorPlan) queryPlan);
+    } else if (queryPlan instanceof ShowPlan) {
+      return processShowQuery((ShowPlan) queryPlan, context);
+    } else {
+      throw new QueryProcessException(String.format("Unrecognized query plan %s", queryPlan));
+    }
+  }
+
+  @Override
+  public boolean processNonQuery(PhysicalPlan plan)
+      throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
+    switch (plan.getOperatorType()) {
+      case DELETE:
+        delete((DeletePlan) plan);
+        return true;
+      case INSERT:
+        insert((InsertRowPlan) plan);
+        return true;
+      case BATCH_INSERT_ONE_DEVICE:
+        insert((InsertRowsOfOneDevicePlan) plan);
+        return true;
+      case BATCH_INSERT_ROWS:
+        insert((InsertRowsPlan) plan);
+        return true;
+      case BATCH_INSERT:
+        insertTablet((InsertTabletPlan) plan);
+        return true;
+      case MULTI_BATCH_INSERT:
+        insertTablet((InsertMultiTabletPlan) plan);
+        return true;
+      case CREATE_ROLE:
+      case DELETE_ROLE:
+      case CREATE_USER:
+      case REVOKE_USER_ROLE:
+      case REVOKE_ROLE_PRIVILEGE:
+      case REVOKE_USER_PRIVILEGE:
+      case GRANT_ROLE_PRIVILEGE:
+      case GRANT_USER_PRIVILEGE:
+      case GRANT_USER_ROLE:
+      case MODIFY_PASSWORD:
+      case DELETE_USER:
+        AuthorPlan author = (AuthorPlan) plan;
+        return operateAuthor(author);
+      case GRANT_WATERMARK_EMBEDDING:
+        return operateWatermarkEmbedding(((DataAuthPlan) plan).getUsers(), true);
+      case REVOKE_WATERMARK_EMBEDDING:
+        return operateWatermarkEmbedding(((DataAuthPlan) plan).getUsers(), false);
+      case DELETE_TIMESERIES:
+        return deleteTimeSeries((DeleteTimeSeriesPlan) plan);
+      case CREATE_TIMESERIES:
+        return createTimeSeries((CreateTimeSeriesPlan) plan);
+      case CREATE_ALIGNED_TIMESERIES:
+        return createAlignedTimeSeries((CreateAlignedTimeSeriesPlan) plan);
+      case CREATE_MULTI_TIMESERIES:
+        return createMultiTimeSeries((CreateMultiTimeSeriesPlan) plan);
+      case ALTER_TIMESERIES:
+        return alterTimeSeries((AlterTimeSeriesPlan) plan);
+      case SET_STORAGE_GROUP:
+        return setStorageGroup((SetStorageGroupPlan) plan);
+      case DELETE_STORAGE_GROUP:
+        return deleteStorageGroups((DeleteStorageGroupPlan) plan);
+      case TTL:
+        operateTTL((SetTTLPlan) plan);
+        return true;
+      case LOAD_CONFIGURATION:
+        loadConfiguration((LoadConfigurationPlan) plan);
+        return true;
+      case LOAD_FILES:
+        operateLoadFiles((OperateFilePlan) plan);
+        return true;
+      case REMOVE_FILE:
+        operateRemoveFile((OperateFilePlan) plan);
+        return true;
+      case UNLOAD_FILE:
+        operateUnloadFile((OperateFilePlan) plan);
+        return true;
+      case FLUSH:
+        operateFlush((FlushPlan) plan);
+        return true;
+      case MERGE:
+      case FULL_MERGE:
+        operateMerge((MergePlan) plan);
+        return true;
+      case SET_SYSTEM_MODE:
+        operateSetSystemMode((SetSystemModePlan) plan);
+        return true;
+      case CLEAR_CACHE:
+        operateClearCache();
+        return true;
+      case DELETE_PARTITION:
+        DeletePartitionPlan p = (DeletePartitionPlan) plan;
+        TimePartitionFilter filter =
+            (storageGroupName, partitionId) ->
+                storageGroupName.equals(
+                        ((DeletePartitionPlan) plan).getStorageGroupName().getFullPath())
+                    && p.getPartitionId().contains(partitionId);
+        StorageEngine.getInstance()
+            .removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter);
+        return true;
+      case CREATE_SCHEMA_SNAPSHOT:
+        operateCreateSnapshot();
+        return true;
+      case CREATE_FUNCTION:
+        return operateCreateFunction((CreateFunctionPlan) plan);
+      case DROP_FUNCTION:
+        return operateDropFunction((DropFunctionPlan) plan);
+      case CREATE_TRIGGER:
+        return operateCreateTrigger((CreateTriggerPlan) plan);
+      case DROP_TRIGGER:
+        return operateDropTrigger((DropTriggerPlan) plan);
+      case START_TRIGGER:
+        return operateStartTrigger((StartTriggerPlan) plan);
+      case STOP_TRIGGER:
+        return operateStopTrigger((StopTriggerPlan) plan);
+      case KILL:
+        try {
+          operateKillQuery((KillQueryPlan) plan);
+        } catch (QueryIdNotExsitException e) {
+          throw new QueryProcessException(e.getMessage());
+        }
+        return true;
+      case CREATE_TEMPLATE:
+        return createTemplate((CreateTemplatePlan) plan);
+      case APPEND_TEMPLATE:
+        return appendTemplate((AppendTemplatePlan) plan);
+      case PRUNE_TEMPLATE:
+        return pruneTemplate((PruneTemplatePlan) plan);
+      case SET_TEMPLATE:
+        return setTemplate((SetTemplatePlan) plan);
+      case ACTIVATE_TEMPLATE:
+        return activateTemplate((ActivateTemplatePlan) plan);
+      case UNSET_TEMPLATE:
+        return unsetTemplate((UnsetTemplatePlan) plan);
+      case CREATE_CONTINUOUS_QUERY:
+        return operateCreateContinuousQuery((CreateContinuousQueryPlan) plan);
+      case DROP_CONTINUOUS_QUERY:
+        return operateDropContinuousQuery((DropContinuousQueryPlan) plan);
+      case SETTLE:
+        settle((SettlePlan) plan);
+        return true;
+      default:
+        throw new UnsupportedOperationException(
+            String.format("operation %s is not supported", plan.getOperatorType()));
+    }
+  }
+
+  private boolean createTemplate(CreateTemplatePlan createTemplatePlan)
+      throws QueryProcessException {
+    try {
+      IoTDB.metaManager.createSchemaTemplate(createTemplatePlan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  private boolean appendTemplate(AppendTemplatePlan plan) throws QueryProcessException {
+    try {
+      IoTDB.metaManager.appendSchemaTemplate(plan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  private boolean pruneTemplate(PruneTemplatePlan plan) throws QueryProcessException {
+    try {
+      IoTDB.metaManager.pruneSchemaTemplate(plan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  private boolean setTemplate(SetTemplatePlan setTemplatePlan) throws QueryProcessException {
+    try {
+      IoTDB.metaManager.setSchemaTemplate(setTemplatePlan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  private boolean activateTemplate(ActivateTemplatePlan activateTemplatePlan)
+      throws QueryProcessException {
+    try {
+      IoTDB.metaManager.setUsingSchemaTemplate(activateTemplatePlan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  private boolean unsetTemplate(UnsetTemplatePlan unsetTemplatePlan) throws QueryProcessException {
+    try {
+      IoTDB.metaManager.unsetSchemaTemplate(unsetTemplatePlan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  private boolean operateCreateFunction(CreateFunctionPlan plan) throws UDFRegistrationException {
+    UDFRegistrationService.getInstance().register(plan.getUdfName(), plan.getClassName(), true);
+    return true;
+  }
+
+  private boolean operateDropFunction(DropFunctionPlan plan) throws UDFRegistrationException {
+    UDFRegistrationService.getInstance().deregister(plan.getUdfName());
+    return true;
+  }
+
+  private boolean operateCreateTrigger(CreateTriggerPlan plan)
+      throws TriggerManagementException, TriggerExecutionException {
+    TriggerRegistrationService.getInstance().register(plan);
+    return true;
+  }
+
+  private boolean operateDropTrigger(DropTriggerPlan plan) throws TriggerManagementException {
+    TriggerRegistrationService.getInstance().deregister(plan);
+    return true;
+  }
+
+  private boolean operateStartTrigger(StartTriggerPlan plan)
+      throws TriggerManagementException, TriggerExecutionException {
+    TriggerRegistrationService.getInstance().activate(plan);
+    return true;
+  }
+
+  private boolean operateStopTrigger(StopTriggerPlan plan) throws TriggerManagementException {
+    TriggerRegistrationService.getInstance().inactivate(plan);
+    return true;
+  }
+
+  private void operateMerge(MergePlan plan) throws StorageEngineException {
+    if (plan.getOperatorType() == OperatorType.FULL_MERGE) {
+      StorageEngine.getInstance().mergeAll(true);
+    } else {
+      StorageEngine.getInstance().mergeAll(false);
+    }
+  }
+
+  private void operateClearCache() {
+    ChunkCache.getInstance().clear();
+    TimeSeriesMetadataCache.getInstance().clear();
+  }
+
+  private void operateCreateSnapshot() {
+    IoTDB.metaManager.createMTreeSnapshot();
+  }
+
+  private void operateKillQuery(KillQueryPlan killQueryPlan) throws QueryIdNotExsitException {
+    QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+    long killQueryId = killQueryPlan.getQueryId();
+    if (killQueryId != -1) {
+      if (queryTimeManager.getQueryContextMap().get(killQueryId) != null) {
+        queryTimeManager.killQuery(killQueryId);
+      } else {
+        throw new QueryIdNotExsitException(
+            String.format(
+                "Query Id %d is not exist, please check it.", killQueryPlan.getQueryId()));
+      }
+    } else {
+      // if queryId is not specified, kill all running queries
+      if (!queryTimeManager.getQueryContextMap().isEmpty()) {
+        synchronized (queryTimeManager.getQueryContextMap()) {
+          List<Long> queryIdList = new ArrayList<>(queryTimeManager.getQueryContextMap().keySet());
+          for (Long queryId : queryIdList) {
+            queryTimeManager.killQuery(queryId);
+          }
+        }
+      }
+    }
+  }
+
+  private void operateSetSystemMode(SetSystemModePlan plan) {
+    IoTDBDescriptor.getInstance().getConfig().setReadOnly(plan.isReadOnly());
+  }
+
+  private void operateFlush(FlushPlan plan) throws StorageGroupNotSetException {
+    if (plan.getPaths().isEmpty()) {
+      StorageEngine.getInstance().syncCloseAllProcessor();
+    } else {
+      flushSpecifiedStorageGroups(plan);
+    }
+
+    if (!plan.getPaths().isEmpty()) {
+      List<PartialPath> noExistSg = checkStorageGroupExist(plan.getPaths());
+      if (!noExistSg.isEmpty()) {
+        StringBuilder sb = new StringBuilder();
+        noExistSg.forEach(storageGroup -> sb.append(storageGroup.getFullPath()).append(","));
+        throw new StorageGroupNotSetException(sb.subSequence(0, sb.length() - 1).toString(), true);
+      }
+    }
+  }
+
+  private boolean operateCreateContinuousQuery(CreateContinuousQueryPlan plan)
+      throws ContinuousQueryException {
+    return ContinuousQueryService.getInstance().register(plan, true);
+  }
+
+  private boolean operateDropContinuousQuery(DropContinuousQueryPlan plan)
+      throws ContinuousQueryException {
+    return ContinuousQueryService.getInstance().deregister(plan);
+  }
+
+  public static void flushSpecifiedStorageGroups(FlushPlan plan)
+      throws StorageGroupNotSetException {
+    Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupMap =
+        plan.getStorageGroupPartitionIds();
+    for (Entry<PartialPath, List<Pair<Long, Boolean>>> entry : storageGroupMap.entrySet()) {
+      PartialPath storageGroupName = entry.getKey();
+      // normal flush
+      if (entry.getValue() == null) {
+        if (plan.isSeq() == null) {
+          StorageEngine.getInstance()
+              .closeStorageGroupProcessor(storageGroupName, true, plan.isSync());
+          StorageEngine.getInstance()
+              .closeStorageGroupProcessor(storageGroupName, false, plan.isSync());
+        } else {
+          StorageEngine.getInstance()
+              .closeStorageGroupProcessor(storageGroupName, plan.isSeq(), plan.isSync());
+        }
+      }
+      // partition specified flush, for snapshot flush plan
+      else {
+        List<Pair<Long, Boolean>> partitionIdSequencePairs = entry.getValue();
+        for (Pair<Long, Boolean> pair : partitionIdSequencePairs) {
+          StorageEngine.getInstance()
+              .closeStorageGroupProcessor(storageGroupName, pair.left, pair.right, true);
+        }
+      }
+    }
+  }
+
+  protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext context)
+      throws StorageEngineException, QueryFilterOptimizationException, QueryProcessException,
+          IOException, InterruptedException {
+    QueryDataSet queryDataSet;
+    if (queryPlan instanceof AlignByDevicePlan) {
+      queryDataSet = getAlignByDeviceDataSet((AlignByDevicePlan) queryPlan, context, queryRouter);
+    } else {
+      if (queryPlan.getPaths() == null || queryPlan.getPaths().isEmpty()) {
+        // no time series are selected, return EmptyDataSet
+        return new EmptyDataSet();
+      } else if (queryPlan instanceof UDAFPlan) {
+        UDAFPlan udafPlan = (UDAFPlan) queryPlan;
+        queryDataSet = queryRouter.udafQuery(udafPlan, context);
+      } else if (queryPlan instanceof UDTFPlan) {
+        UDTFPlan udtfPlan = (UDTFPlan) queryPlan;
+        queryDataSet = queryRouter.udtfQuery(udtfPlan, context);
+      } else if (queryPlan instanceof GroupByTimeFillPlan) {
+        GroupByTimeFillPlan groupByFillPlan = (GroupByTimeFillPlan) queryPlan;
+        queryDataSet = queryRouter.groupByFill(groupByFillPlan, context);
+      } else if (queryPlan instanceof GroupByTimePlan) {
+        GroupByTimePlan groupByTimePlan = (GroupByTimePlan) queryPlan;
+        queryDataSet = queryRouter.groupBy(groupByTimePlan, context);
+      } else if (queryPlan instanceof QueryIndexPlan) {
+        throw new QueryProcessException("Query index hasn't been supported yet");
+      } else if (queryPlan instanceof AggregationPlan) {
+        AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
+        queryDataSet = queryRouter.aggregate(aggregationPlan, context);
+      } else if (queryPlan instanceof FillQueryPlan) {
+        FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan;
+        queryDataSet = queryRouter.fill(fillQueryPlan, context);
+      } else if (queryPlan instanceof LastQueryPlan) {
+        queryDataSet = queryRouter.lastQuery((LastQueryPlan) queryPlan, context);
+      } else {
+        queryDataSet = queryRouter.rawDataQuery((RawDataQueryPlan) queryPlan, context);
+      }
+    }
+    queryDataSet.setRowLimit(queryPlan.getRowLimit());
+    queryDataSet.setRowOffset(queryPlan.getRowOffset());
+    queryDataSet.setWithoutAllNull(queryPlan.isWithoutAllNull());
+    queryDataSet.setWithoutAnyNull(queryPlan.isWithoutAnyNull());
+    return queryDataSet;
+  }
+
+  protected AlignByDeviceDataSet getAlignByDeviceDataSet(
+      AlignByDevicePlan plan, QueryContext context, IQueryRouter router) {
+    return new AlignByDeviceDataSet(plan, context, router);
+  }
+
+  protected QueryDataSet processShowQuery(ShowPlan showPlan, QueryContext context)
+      throws QueryProcessException, MetadataException {
+    switch (showPlan.getShowContentType()) {
+      case TTL:
+        return processShowTTLQuery((ShowTTLPlan) showPlan);
+      case FLUSH_TASK_INFO:
+        return processShowFlushTaskInfo();
+      case VERSION:
+        return processShowVersion();
+      case TIMESERIES:
+        return processShowTimeseries((ShowTimeSeriesPlan) showPlan, context);
+      case STORAGE_GROUP:
+        return processShowStorageGroup((ShowStorageGroupPlan) showPlan);
+      case LOCK_INFO:
+        return processShowLockInfo((ShowLockInfoPlan) showPlan);
+      case DEVICES:
+        return processShowDevices((ShowDevicesPlan) showPlan);
+      case CHILD_PATH:
+        return processShowChildPaths((ShowChildPathsPlan) showPlan);
+      case CHILD_NODE:
+        return processShowChildNodes((ShowChildNodesPlan) showPlan);
+      case COUNT_TIMESERIES:
+        return processCountTimeSeries((CountPlan) showPlan);
+      case COUNT_NODE_TIMESERIES:
+        return processCountNodeTimeSeries((CountPlan) showPlan);
+      case COUNT_DEVICES:
+        return processCountDevices((CountPlan) showPlan);
+      case COUNT_STORAGE_GROUP:
+        return processCountStorageGroup((CountPlan) showPlan);
+      case COUNT_NODES:
+        return processCountNodes((CountPlan) showPlan);
+      case MERGE_STATUS:
+        return processShowMergeStatus();
+      case QUERY_PROCESSLIST:
+        return processShowQueryProcesslist();
+      case FUNCTIONS:
+        return processShowFunctions((ShowFunctionsPlan) showPlan);
+      case TRIGGERS:
+        return processShowTriggers();
+      case CONTINUOUS_QUERY:
+        return processShowContinuousQueries();
+      default:
+        throw new QueryProcessException(String.format("Unrecognized show plan %s", showPlan));
+    }
+  }
+
+  private QueryDataSet processCountNodes(CountPlan countPlan) throws MetadataException {
+    int num = getNodesNumInGivenLevel(countPlan.getPath(), countPlan.getLevel());
+    return createSingleDataSet(COLUMN_COUNT, TSDataType.INT32, num);
+  }
+
+  private QueryDataSet processCountNodeTimeSeries(CountPlan countPlan) throws MetadataException {
+    // get the nodes that need to group by first
+    List<PartialPath> nodes = getNodesList(countPlan.getPath(), countPlan.getLevel());
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_COLUMN, false), new PartialPath(COLUMN_COUNT, false)),
+            Arrays.asList(TSDataType.TEXT, TSDataType.INT32));
+    for (PartialPath columnPath : nodes) {
+      RowRecord record = new RowRecord(0);
+      Field field = new Field(TSDataType.TEXT);
+      field.setBinaryV(new Binary(columnPath.getFullPath()));
+      Field field1 = new Field(TSDataType.INT32);
+      // get the count of every group
+      field1.setIntV(getPathsNum(columnPath));
+      record.addField(field);
+      record.addField(field1);
+      listDataSet.putRecord(record);
+    }
+    return listDataSet;
+  }
+
+  private QueryDataSet processCountDevices(CountPlan countPlan) throws MetadataException {
+    int num = getDevicesNum(countPlan.getPath());
+    return createSingleDataSet(COLUMN_DEVICES, TSDataType.INT32, num);
+  }
+
+  private QueryDataSet processCountStorageGroup(CountPlan countPlan) throws MetadataException {
+    int num = getStorageGroupNum(countPlan.getPath());
+    return createSingleDataSet(COLUMN_STORAGE_GROUP, TSDataType.INT32, num);
+  }
+
+  private QueryDataSet createSingleDataSet(String columnName, TSDataType columnType, Object val) {
+    SingleDataSet singleDataSet =
+        new SingleDataSet(
+            Collections.singletonList(new PartialPath(columnName, false)),
+            Collections.singletonList(columnType));
+    Field field = new Field(columnType);
+    switch (columnType) {
+      case TEXT:
+        field.setBinaryV(((Binary) val));
+        break;
+      case FLOAT:
+        field.setFloatV(((float) val));
+        break;
+      case INT32:
+        field.setIntV(((int) val));
+        break;
+      case INT64:
+        field.setLongV(((long) val));
+        break;
+      case DOUBLE:
+        field.setDoubleV(((double) val));
+        break;
+      case BOOLEAN:
+        field.setBoolV(((boolean) val));
+        break;
+      default:
+        throw new UnSupportedDataTypeException("Unsupported data type" + columnType);
+    }
+    RowRecord record = new RowRecord(0);
+    record.addField(field);
+    singleDataSet.setRecord(record);
+    return singleDataSet;
+  }
+
+  protected int getDevicesNum(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getDevicesNum(path);
+  }
+
+  private int getStorageGroupNum(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getStorageGroupNum(path);
+  }
+
+  protected int getPathsNum(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getAllTimeseriesCount(path);
+  }
+
+  protected int getNodesNumInGivenLevel(PartialPath path, int level) throws MetadataException {
+    return IoTDB.metaManager.getNodesCountInGivenLevel(path, level);
+  }
+
+  protected List<MeasurementPath> getPathsName(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getMeasurementPaths(path);
+  }
+
+  protected List<PartialPath> getNodesList(PartialPath schemaPattern, int level)
+      throws MetadataException {
+    return IoTDB.metaManager.getNodesListInGivenLevel(schemaPattern, level);
+  }
+
+  private QueryDataSet processCountTimeSeries(CountPlan countPlan) throws MetadataException {
+    int num = getPathsNum(countPlan.getPath());
+    return createSingleDataSet(COLUMN_COUNT, TSDataType.INT32, num);
+  }
+
+  private QueryDataSet processShowDevices(ShowDevicesPlan showDevicesPlan)
+      throws MetadataException {
+    return new ShowDevicesDataSet(showDevicesPlan);
+  }
+
+  private QueryDataSet processShowChildPaths(ShowChildPathsPlan showChildPathsPlan)
+      throws MetadataException {
+    Set<String> childPathsList = getPathNextChildren(showChildPathsPlan.getPath());
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Collections.singletonList(new PartialPath(COLUMN_CHILD_PATHS, false)),
+            Collections.singletonList(TSDataType.TEXT));
+    for (String s : childPathsList) {
+      RowRecord record = new RowRecord(0);
+      Field field = new Field(TSDataType.TEXT);
+      field.setBinaryV(new Binary(s));
+      record.addField(field);
+      listDataSet.putRecord(record);
+    }
+    return listDataSet;
+  }
+
+  protected Set<String> getPathNextChildren(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getChildNodePathInNextLevel(path);
+  }
+
+  private QueryDataSet processShowChildNodes(ShowChildNodesPlan showChildNodesPlan)
+      throws MetadataException {
+    // getNodeNextChildren
+    Set<String> childNodesList = getNodeNextChildren(showChildNodesPlan.getPath());
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Collections.singletonList(new PartialPath(COLUMN_CHILD_NODES, false)),
+            Collections.singletonList(TSDataType.TEXT));
+    for (String s : childNodesList) {
+      RowRecord record = new RowRecord(0);
+      Field field = new Field(TSDataType.TEXT);
+      field.setBinaryV(new Binary(s));
+      record.addField(field);
+      listDataSet.putRecord(record);
+    }
+    return listDataSet;
+  }
+
+  protected Set<String> getNodeNextChildren(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getChildNodeNameInNextLevel(path);
+  }
+
+  protected List<PartialPath> getStorageGroupNames(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getMatchedStorageGroups(path);
+  }
+
+  private QueryDataSet processShowStorageGroup(ShowStorageGroupPlan showStorageGroupPlan)
+      throws MetadataException {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Collections.singletonList(new PartialPath(COLUMN_STORAGE_GROUP, false)),
+            Collections.singletonList(TSDataType.TEXT));
+    List<PartialPath> storageGroupList = getStorageGroupNames(showStorageGroupPlan.getPath());
+    addToDataSet(storageGroupList, listDataSet);
+    return listDataSet;
+  }
+
+  private void addToDataSet(Collection<PartialPath> paths, ListDataSet dataSet) {
+    for (PartialPath s : paths) {
+      RowRecord record = new RowRecord(0);
+      Field field = new Field(TSDataType.TEXT);
+      field.setBinaryV(new Binary(s.getFullPath()));
+      record.addField(field);
+      dataSet.putRecord(record);
+    }
+  }
+
+  private QueryDataSet processShowLockInfo(ShowLockInfoPlan showLockInfoPlan)
+      throws MetadataException {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_STORAGE_GROUP, false),
+                new PartialPath(COLUMN_LOCK_INFO, false)),
+            Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));
+    try {
+      List<PartialPath> storageGroupList = getStorageGroupNames(showLockInfoPlan.getPath());
+      List<String> lockHolderList = StorageEngine.getInstance().getLockInfo(storageGroupList);
+      addLockInfoToDataSet(storageGroupList, lockHolderList, listDataSet);
+    } catch (StorageEngineException e) {
+      throw new MetadataException(e);
+    }
+    return listDataSet;
+  }
+
+  private void addLockInfoToDataSet(
+      List<PartialPath> paths, List<String> lockHolderList, ListDataSet dataSet) {
+    for (int i = 0; i < paths.size(); i++) {
+      RowRecord record = new RowRecord(0);
+      Field field = new Field(TSDataType.TEXT);
+      field.setBinaryV(new Binary(paths.get(i).getFullPath()));
+      record.addField(field);
+      field = new Field(TSDataType.TEXT);
+      field.setBinaryV(new Binary(lockHolderList.get(i)));
+      record.addField(field);
+      dataSet.putRecord(record);
+    }
+  }
+
+  private QueryDataSet processShowTimeseries(
+      ShowTimeSeriesPlan showTimeSeriesPlan, QueryContext context) throws MetadataException {
+    return new ShowTimeseriesDataSet(showTimeSeriesPlan, context);
+  }
+
+  protected List<IStorageGroupMNode> getAllStorageGroupNodes() {
+    return IoTDB.metaManager.getAllStorageGroupNodes();
+  }
+
+  private QueryDataSet processShowTTLQuery(ShowTTLPlan showTTLPlan) {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_STORAGE_GROUP, false), new PartialPath(COLUMN_TTL, false)),
+            Arrays.asList(TSDataType.TEXT, TSDataType.INT64));
+    Set<PartialPath> selectedSgs = new HashSet<>(showTTLPlan.getStorageGroups());
+
+    List<IStorageGroupMNode> storageGroups = getAllStorageGroupNodes();
+    int timestamp = 0;
+    for (IStorageGroupMNode mNode : storageGroups) {
+      PartialPath sgName = mNode.getPartialPath();
+      if (!selectedSgs.isEmpty() && !selectedSgs.contains(sgName)) {
+        continue;
+      }
+      RowRecord rowRecord = new RowRecord(timestamp++);
+      Field sg = new Field(TSDataType.TEXT);
+      Field ttl;
+      sg.setBinaryV(new Binary(sgName.getFullPath()));
+      if (mNode.getDataTTL() != Long.MAX_VALUE) {
+        ttl = new Field(TSDataType.INT64);
+        ttl.setLongV(mNode.getDataTTL());
+      } else {
+        ttl = null;
+      }
+      rowRecord.addField(sg);
+      rowRecord.addField(ttl);
+      listDataSet.putRecord(rowRecord);
+    }
+
+    return listDataSet;
+  }
+
+  private QueryDataSet processShowVersion() {
+    SingleDataSet singleDataSet =
+        new SingleDataSet(
+            Collections.singletonList(new PartialPath(IoTDBConstant.COLUMN_VERSION, false)),
+            Collections.singletonList(TSDataType.TEXT));
+    Field field = new Field(TSDataType.TEXT);
+    field.setBinaryV(new Binary(IoTDBConstant.VERSION));
+    RowRecord rowRecord = new RowRecord(0);
+    rowRecord.addField(field);
+    singleDataSet.setRecord(rowRecord);
+    return singleDataSet;
+  }
+
+  private QueryDataSet processShowFlushTaskInfo() {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_ITEM, false), new PartialPath(COLUMN_VALUE, false)),
+            Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));
+
+    int timestamp = 0;
+    addRowRecordForShowQuery(
+        listDataSet,
+        timestamp++,
+        "total number of flush tasks",
+        Integer.toString(FlushTaskPoolManager.getInstance().getTotalTasks()));
+    addRowRecordForShowQuery(
+        listDataSet,
+        timestamp++,
+        "number of working flush tasks",
+        Integer.toString(FlushTaskPoolManager.getInstance().getWorkingTasksNumber()));
+    addRowRecordForShowQuery(
+        listDataSet,
+        timestamp,
+        "number of waiting flush tasks",
+        Integer.toString(FlushTaskPoolManager.getInstance().getWaitingTasksNumber()));
+    return listDataSet;
+  }
+
+  private QueryDataSet processShowFunctions(ShowFunctionsPlan showPlan)
+      throws QueryProcessException {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_FUNCTION_NAME, false),
+                new PartialPath(COLUMN_FUNCTION_TYPE, false),
+                new PartialPath(COLUMN_FUNCTION_CLASS, false)),
+            Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+
+    appendUDFs(listDataSet, showPlan);
+    appendNativeFunctions(listDataSet, showPlan);
+
+    listDataSet.sort(
+        (r1, r2) ->
+            String.CASE_INSENSITIVE_ORDER.compare(
+                r1.getFields().get(0).getStringValue(), r2.getFields().get(0).getStringValue()));
+    return listDataSet;
+  }
+
+  @SuppressWarnings("squid:S3776")
+  private void appendUDFs(ListDataSet listDataSet, ShowFunctionsPlan showPlan)
+      throws QueryProcessException {
+    for (UDFRegistrationInformation info :
+        UDFRegistrationService.getInstance().getRegistrationInformation()) {
+      RowRecord rowRecord = new RowRecord(0); // ignore timestamp
+      rowRecord.addField(Binary.valueOf(info.getFunctionName()), TSDataType.TEXT);
+      String functionType = "";
+      try {
+        if (info.isBuiltin()) {
+          if (info.isUDTF()) {
+            functionType = FUNCTION_TYPE_BUILTIN_UDTF;
+          } else if (info.isUDAF()) {
+            functionType = FUNCTION_TYPE_BUILTIN_UDAF;
+          }
+        } else {
+          if (info.isUDTF()) {
+            functionType = FUNCTION_TYPE_EXTERNAL_UDTF;
+          } else if (info.isUDAF()) {
+            functionType = FUNCTION_TYPE_EXTERNAL_UDAF;
+          }
+        }
+      } catch (InstantiationException
+          | InvocationTargetException
+          | NoSuchMethodException
+          | IllegalAccessException e) {
+        throw new QueryProcessException(e.toString());
+      }
+      rowRecord.addField(Binary.valueOf(functionType), TSDataType.TEXT);
+      rowRecord.addField(Binary.valueOf(info.getClassName()), TSDataType.TEXT);
+      listDataSet.putRecord(rowRecord);
+    }
+  }
+
+  private QueryDataSet processShowContinuousQueries() {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_NAME, false),
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL, false),
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL, false),
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_QUERY_SQL, false),
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_TARGET_PATH, false)),
+            Arrays.asList(
+                TSDataType.TEXT,
+                TSDataType.INT64,
+                TSDataType.INT64,
+                TSDataType.TEXT,
+                TSDataType.TEXT));
+
+    List<ShowContinuousQueriesResult> continuousQueriesList =
+        ContinuousQueryService.getInstance().getShowContinuousQueriesResultList();
+
+    for (ShowContinuousQueriesResult result : continuousQueriesList) {
+      RowRecord record = new RowRecord(0);
+      record.addField(Binary.valueOf(result.getContinuousQueryName()), TSDataType.TEXT);
+      record.addField(result.getEveryInterval(), TSDataType.INT64);
+      record.addField(result.getForInterval(), TSDataType.INT64);
+      record.addField(Binary.valueOf(result.getQuerySql()), TSDataType.TEXT);
+      record.addField(Binary.valueOf(result.getTargetPath().getFullPath()), TSDataType.TEXT);
+      listDataSet.putRecord(record);
+    }
+
+    return listDataSet;
+  }
+
+  private void appendNativeFunctions(ListDataSet listDataSet, ShowFunctionsPlan showPlan) {
+    final Binary functionType = Binary.valueOf(FUNCTION_TYPE_NATIVE);
+    final Binary className = Binary.valueOf("");
+    for (String functionName : SQLConstant.getNativeFunctionNames()) {
+      RowRecord rowRecord = new RowRecord(0); // ignore timestamp
+      rowRecord.addField(Binary.valueOf(functionName.toUpperCase()), TSDataType.TEXT);
+      rowRecord.addField(functionType, TSDataType.TEXT);
+      rowRecord.addField(className, TSDataType.TEXT);
+      listDataSet.putRecord(rowRecord);
+    }
+  }
+
+  private QueryDataSet processShowTriggers() {
+    return TriggerRegistrationService.getInstance().show();
+  }
+
+  private void addRowRecordForShowQuery(
+      ListDataSet listDataSet, int timestamp, String item, String value) {
+    RowRecord rowRecord = new RowRecord(timestamp);
+    Field itemField = new Field(TSDataType.TEXT);
+    itemField.setBinaryV(new Binary(item));
+    Field valueField = new Field(TSDataType.TEXT);
+    valueField.setBinaryV(new Binary(value));
+    rowRecord.addField(itemField);
+    rowRecord.addField(valueField);
+    listDataSet.putRecord(rowRecord);
+  }
+
+  @Override
+  public void delete(DeletePlan deletePlan) throws QueryProcessException {
+    AUDIT_LOGGER.info(
+        "delete data from {} in [{},{}]",
+        deletePlan.getPaths(),
+        deletePlan.getDeleteStartTime(),
+        deletePlan.getDeleteEndTime());
+    for (PartialPath path : deletePlan.getPaths()) {
+      delete(
+          path,
+          deletePlan.getDeleteStartTime(),
+          deletePlan.getDeleteEndTime(),
+          deletePlan.getIndex(),
+          deletePlan.getPartitionFilter());
+    }
+  }
+
+  private void operateLoadFiles(OperateFilePlan plan) throws QueryProcessException {
+    File file = plan.getFile();
+    if (!file.exists()) {
+      throw new QueryProcessException(
+          String.format("File path %s doesn't exists.", file.getPath()));
+    }
+    if (file.isDirectory()) {
+      loadDir(file, plan);
+    } else {
+      loadFile(file, plan);
+    }
+  }
+
+  private void loadDir(File curFile, OperateFilePlan plan) throws QueryProcessException {
+    File[] files = curFile.listFiles();
+    long[] establishTime = new long[files.length];
+    List<Integer> tsfiles = new ArrayList<>();
+
+    for (int i = 0; i < files.length; i++) {
+      File file = files[i];
+      if (!file.isDirectory()) {
+        String fileName = file.getName();
+        if (fileName.endsWith(TSFILE_SUFFIX)) {
+          establishTime[i] = Long.parseLong(fileName.split(FILE_NAME_SEPARATOR)[0]);
+          tsfiles.add(i);
+        }
+      }
+    }
+    Collections.sort(
+        tsfiles,
+        (o1, o2) -> {
+          if (establishTime[o1] == establishTime[o2]) {
+            return 0;
+          }
+          return establishTime[o1] < establishTime[o2] ? -1 : 1;
+        });
+    for (Integer i : tsfiles) {
+      loadFile(files[i], plan);
+    }
+
+    for (File file : files) {
+      if (file.isDirectory()) {
+        loadDir(file, plan);
+      }
+    }
+  }
+
+  private void loadFile(File file, OperateFilePlan plan) throws QueryProcessException {
+    if (!file.getName().endsWith(TSFILE_SUFFIX)) {
+      return;
+    }
+    TsFileResource tsFileResource = new TsFileResource(file);
+    tsFileResource.setClosed(true);
+    try {
+      // check file
+      RestorableTsFileIOWriter restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
+      if (restorableTsFileIOWriter.hasCrashed()) {
+        restorableTsFileIOWriter.close();
+        throw new QueryProcessException(
+            String.format(
+                "Cannot load file %s because the file has crashed.", file.getAbsolutePath()));
+      }
+      Map<Path, IMeasurementSchema> schemaMap = new HashMap<>();
+
+      List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
+      try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
+        reader.selfCheck(schemaMap, chunkGroupMetadataList, false);
+        if (plan.getVerifyMetadata()) {
+          loadNewTsFileVerifyMetadata(reader);
+        }
+      } catch (IOException e) {
+        logger.warn("can not get timeseries metadata from {}.", file.getAbsoluteFile());
+        throw new QueryProcessException(e.getMessage());
+      }
+
+      FileLoaderUtils.checkTsFileResource(tsFileResource);
+      if (UpgradeUtils.isNeedUpgrade(tsFileResource)) {
+        throw new QueryProcessException(
+            String.format(
+                "Cannot load file %s because the file's version is old which needs to be upgraded.",
+                file.getAbsolutePath()));
+      }
+
+      // create schemas if they doesn't exist
+      if (plan.isAutoCreateSchema()) {
+        createSchemaAutomatically(chunkGroupMetadataList, schemaMap, plan.getSgLevel());
+      }
+
+      List<TsFileResource> splitResources = new ArrayList();
+      if (tsFileResource.isSpanMultiTimePartitions()) {
+        logger.info(
+            "try to split the tsFile={} du to it spans multi partitions",
+            tsFileResource.getTsFile().getPath());
+        TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResources);
+        tsFileResource.writeLock();
+        tsFileResource.removeModFile();
+        tsFileResource.writeUnlock();
+        logger.info(
+            "after split, the old tsFile was split to {} new tsFiles", splitResources.size());
+      }
+
+      if (splitResources.isEmpty()) {
+        splitResources.add(tsFileResource);
+      }
+
+      for (TsFileResource resource : splitResources) {
+        StorageEngine.getInstance().loadNewTsFile(resource);
+      }
+    } catch (Exception e) {
+      logger.error("fail to load file {}", file.getName(), e);
+      throw new QueryProcessException(
+          String.format("Cannot load file %s because %s", file.getAbsolutePath(), e.getMessage()));
+    }
+  }
+
+  private void loadNewTsFileVerifyMetadata(TsFileSequenceReader tsFileSequenceReader)
+      throws MetadataException, QueryProcessException, IOException {
+    Map<String, List<TimeseriesMetadata>> metadataSet =
+        tsFileSequenceReader.getAllTimeseriesMetadata();
+    for (Map.Entry<String, List<TimeseriesMetadata>> entry : metadataSet.entrySet()) {
+      String deviceId = entry.getKey();
+      PartialPath devicePath = new PartialPath(deviceId);
+      if (!IoTDB.metaManager.isPathExist(devicePath)) {
+        continue;
+      }
+      for (TimeseriesMetadata metadata : entry.getValue()) {
+        PartialPath fullPath =
+            new PartialPath(deviceId + TsFileConstant.PATH_SEPARATOR + metadata.getMeasurementId());
+        if (IoTDB.metaManager.isPathExist(fullPath)) {
+          TSDataType dataType = IoTDB.metaManager.getSeriesType(fullPath);
+          if (dataType != metadata.getTSDataType()) {
+            throw new QueryProcessException(
+                fullPath.getFullPath()
+                    + " is "
+                    + metadata.getTSDataType().name()
+                    + " in the loading TsFile but is "
+                    + dataType.name()
+                    + " in IoTDB.");
+          }
+        }
+      }
+    }
+  }
+
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+  private void createSchemaAutomatically(
+      List<ChunkGroupMetadata> chunkGroupMetadataList,
+      Map<Path, IMeasurementSchema> knownSchemas,
+      int sgLevel)
+      throws MetadataException {
+    if (chunkGroupMetadataList.isEmpty()) {
+      return;
+    }
+
+    Set<PartialPath> registeredSeries = new HashSet<>();
+    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+      String device = chunkGroupMetadata.getDevice();
+      Set<String> existSeriesSet = new HashSet<>();
+      PartialPath devicePath = new PartialPath(device);
+      PartialPath storageGroupPath = MetaUtils.getStorageGroupPathByLevel(devicePath, sgLevel);
+      try {
+        IoTDB.metaManager.setStorageGroup(storageGroupPath);
+      } catch (StorageGroupAlreadySetException alreadySetException) {
+        if (!alreadySetException.getStorageGroupPath().equals(storageGroupPath.getFullPath())) {
+          throw alreadySetException;
+        }
+      }
+      for (PartialPath path :
+          IoTDB.metaManager.getMeasurementPaths(devicePath.concatNode(ONE_LEVEL_PATH_WILDCARD))) {
+        existSeriesSet.add(path.getMeasurement());
+        existSeriesSet.add(path.getMeasurementAlias());
+      }
+      for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+        PartialPath series =
+            new PartialPath(
+                chunkGroupMetadata.getDevice()
+                    + TsFileConstant.PATH_SEPARATOR
+                    + chunkMetadata.getMeasurementUid());
+        if (!registeredSeries.contains(series)) {
+          registeredSeries.add(series);
+          IMeasurementSchema schema =
+              knownSchemas.get(new Path(series.getDevice(), series.getMeasurement()));
+          if (schema == null) {
+            throw new MetadataException(
+                String.format(
+                    "Can not get the schema of measurement [%s]",
+                    chunkMetadata.getMeasurementUid()));
+          }
+          if (!existSeriesSet.contains(chunkMetadata.getMeasurementUid())) {
+            IoTDB.metaManager.createTimeseries(
+                series,
+                schema.getType(),
+                schema.getEncodingType(),
+                schema.getCompressor(),
+                Collections.emptyMap());
+          }
+        }
+      }
+    }
+  }
+
+  private void operateRemoveFile(OperateFilePlan plan) throws QueryProcessException {
+    try {
+      if (!StorageEngine.getInstance().deleteTsfile(plan.getFile())) {
+        throw new QueryProcessException(
+            String.format("File %s doesn't exist.", plan.getFile().getName()));
+      }
+    } catch (StorageEngineException | IllegalPathException e) {
+      throw new QueryProcessException(
+          String.format("Cannot remove file because %s", e.getMessage()));
+    }
+  }
+
+  private void operateUnloadFile(OperateFilePlan plan) throws QueryProcessException {
+    if (!plan.getTargetDir().exists() || !plan.getTargetDir().isDirectory()) {
+      throw new QueryProcessException(
+          String.format("Target dir %s is invalid.", plan.getTargetDir().getPath()));
+    }
+    try {
+      if (!StorageEngine.getInstance().unloadTsfile(plan.getFile(), plan.getTargetDir())) {
+        throw new QueryProcessException(
+            String.format("File %s doesn't exist.", plan.getFile().getName()));
+      }
+    } catch (StorageEngineException | IllegalPathException e) {
+      throw new QueryProcessException(
+          String.format(
+              "Cannot unload file %s to target directory %s because %s",
+              plan.getFile().getPath(), plan.getTargetDir().getPath(), e.getMessage()));
+    }
+  }
+
+  private void operateTTL(SetTTLPlan plan) throws QueryProcessException {
+    try {
+      List<PartialPath> storageGroupPaths =
+          IoTDB.metaManager.getMatchedStorageGroups(plan.getStorageGroup());
+      for (PartialPath storagePath : storageGroupPaths) {
+        IoTDB.metaManager.setTTL(storagePath, plan.getDataTTL());
+        StorageEngine.getInstance().setTTL(storagePath, plan.getDataTTL());
+      }
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    } catch (IOException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+  }
+
+  @Override
+  public void update(PartialPath path, long startTime, long endTime, String value) {
+    throw new UnsupportedOperationException("update is not supported now");
+  }
+
+  @Override
+  public void delete(
+      PartialPath path,
+      long startTime,
+      long endTime,
+      long planIndex,
+      TimePartitionFilter timePartitionFilter)
+      throws QueryProcessException {
+    try {
+      StorageEngine.getInstance().delete(path, startTime, endTime, planIndex, timePartitionFilter);
+    } catch (StorageEngineException e) {
+      throw new QueryProcessException(e);
+    }
+  }
+
+  private void checkFailedMeasurments(InsertPlan plan)
+      throws PathNotExistException, StorageEngineException {
+    // check if all path not exist exceptions
+    List<String> failedPaths = plan.getFailedMeasurements();
+    List<Exception> exceptions = plan.getFailedExceptions();
+    boolean isPathNotExistException = true;
+    for (Exception e : exceptions) {
+      Throwable curException = e;
+      while (curException.getCause() != null) {
+        curException = curException.getCause();
+      }
+      if (!(curException instanceof PathNotExistException)) {
+        isPathNotExistException = false;
+        break;
+      }
+    }
+    if (isPathNotExistException) {
+      throw new PathNotExistException(failedPaths);
+    } else {
+      throw new StorageEngineException(
+          INSERT_MEASUREMENTS_FAILED_MESSAGE
+              + plan.getFailedMeasurements()
+              + (!exceptions.isEmpty() ? (" caused by " + exceptions.get(0).getMessage()) : ""));
+    }
+  }
+
+  @Override
+  public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+      throws QueryProcessException {
+    if (insertRowsOfOneDevicePlan.getRowPlans().length == 0) {
+      return;
+    }
+    try {
+      // insert to storage engine
+      StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan);
+
+      List<String> notExistedPaths = null;
+      List<String> failedMeasurements = null;
+
+      // If there are some exceptions, we assume they caused by the same reason.
+      Exception exception = null;
+      for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+        if (plan.getFailedMeasurements() != null) {
+          if (notExistedPaths == null) {
+            notExistedPaths = new ArrayList<>();
+            failedMeasurements = new ArrayList<>();
+          }
+          // check if all path not exist exceptions
+          List<String> failedPaths = plan.getFailedMeasurements();
+          List<Exception> exceptions = plan.getFailedExceptions();
+          boolean isPathNotExistException = true;
+          for (Exception e : exceptions) {
+            exception = e;
+            Throwable curException = e;
+            while (curException.getCause() != null) {
+              curException = curException.getCause();
+            }
+            if (!(curException instanceof PathNotExistException)) {
+              isPathNotExistException = false;
+              break;
+            }
+          }
+          if (isPathNotExistException) {
+            notExistedPaths.addAll(failedPaths);
+          } else {
+            failedMeasurements.addAll(plan.getFailedMeasurements());
+          }
+        }
+      }
+      if (notExistedPaths != null && !notExistedPaths.isEmpty()) {
+        throw new PathNotExistException(notExistedPaths);
+      } else if (notExistedPaths != null && !failedMeasurements.isEmpty()) {
+        throw new StorageEngineException(
+            "failed to insert points "
+                + failedMeasurements
+                + (exception != null ? (" caused by " + exception.getMessage()) : ""));
+      }
+
+    } catch (StorageEngineException | MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+  }
+
+  @Override
+  public void insert(InsertRowsPlan plan) throws QueryProcessException {
+    for (int i = 0; i < plan.getInsertRowPlanList().size(); i++) {
+      if (plan.getResults().containsKey(i) || plan.isExecuted(i)) {
+        continue;
+      }
+      try {
+        insert(plan.getInsertRowPlanList().get(i));
+      } catch (QueryProcessException e) {
+        plan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      }
+    }
+    if (!plan.getResults().isEmpty()) {
+      throw new BatchProcessException(plan.getFailingStatus());
+    }
+  }
+
+  @Override
+  public void insert(InsertRowPlan insertRowPlan) throws QueryProcessException {
+    try {
+      insertRowPlan.setMeasurementMNodes(
+          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
+      // When insert data with sql statement, the data types will be null here.
+      // We need to predicted the data types first
+      if (insertRowPlan.getDataTypes()[0] == null) {
+        for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
+          insertRowPlan.getDataTypes()[i] =
+              TypeInferenceUtils.getPredictedDataType(
+                  insertRowPlan.getValues()[i], insertRowPlan.isNeedInferType());
+        }
+      }
+
+      StorageEngine.getInstance().insert(insertRowPlan);
+
+      if (insertRowPlan.getFailedMeasurements() != null) {
+        checkFailedMeasurments(insertRowPlan);
+      }
+    } catch (StorageEngineException | MetadataException e) {
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+        StatMonitor.getInstance().updateFailedStatValue();
+      }
+      throw new QueryProcessException(e);
+    } catch (Exception e) {
+      // update failed statistics
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+        StatMonitor.getInstance().updateFailedStatValue();
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan)
+      throws QueryProcessException {
+    if (insertMultiTabletPlan.isEnableMultiThreading()) {
+      insertTabletParallel(insertMultiTabletPlan);
+    } else {
+      insertTabletSerial(insertMultiTabletPlan);
+    }
+  }
+
+  private void insertTabletSerial(InsertMultiTabletPlan insertMultiTabletPlan)
+      throws BatchProcessException {
+    for (int i = 0; i < insertMultiTabletPlan.getInsertTabletPlanList().size(); i++) {
+      if (insertMultiTabletPlan.getResults().containsKey(i)
+          || insertMultiTabletPlan.isExecuted(i)) {
+        continue;
+      }
+      try {
+        insertTablet(insertMultiTabletPlan.getInsertTabletPlanList().get(i));
+      } catch (QueryProcessException e) {
+        insertMultiTabletPlan
+            .getResults()
+            .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      }
+    }
+    if (!insertMultiTabletPlan.getResults().isEmpty()) {
+      throw new BatchProcessException(insertMultiTabletPlan.getFailingStatus());
+    }
+  }
+
+  private void insertTabletParallel(InsertMultiTabletPlan insertMultiTabletPlan)
+      throws BatchProcessException {
+    updateInsertTabletsPool(insertMultiTabletPlan.getDifferentStorageGroupsCount());
+
+    List<InsertTabletPlan> planList = insertMultiTabletPlan.getInsertTabletPlanList();
+    List<Future<?>> futureList = new ArrayList<>();
+
+    Map<Integer, TSStatus> results = insertMultiTabletPlan.getResults();
+
+    List<InsertTabletPlan> runPlanList = new ArrayList<>();
+    Map<Integer, Integer> runIndexToRealIndex = new HashMap<>();
+    for (int i = 0; i < planList.size(); i++) {
+      if (!(results.containsKey(i) || insertMultiTabletPlan.isExecuted(i))) {
+        runPlanList.add(planList.get(i));
+        runIndexToRealIndex.put(runPlanList.size() - 1, i);
+      }
+    }
+    for (InsertTabletPlan plan : runPlanList) {
+      Future<?> f =
+          insertionPool.submit(
+              () -> {
+                insertTablet(plan);
+                return null;
+              });
+      futureList.add(f);
+    }
+    for (int i = 0; i < futureList.size(); i++) {
+      try {
+        futureList.get(i).get();
+      } catch (Exception e) {
+        if (e.getCause() instanceof QueryProcessException) {
+          QueryProcessException qe = (QueryProcessException) e.getCause();
+          results.put(
+              runIndexToRealIndex.get(i), RpcUtils.getStatus(qe.getErrorCode(), qe.getMessage()));
+        } else {
+          results.put(
+              runIndexToRealIndex.get(i),
+              RpcUtils.getStatus(INTERNAL_SERVER_ERROR, e.getMessage()));
+        }
+      }
+    }
+
+    if (!results.isEmpty()) {
+      throw new BatchProcessException(insertMultiTabletPlan.getFailingStatus());
+    }
+  }
+
+  private void updateInsertTabletsPool(int sgSize) {
+    int updateCoreSize = Math.min(sgSize, Runtime.getRuntime().availableProcessors() / 2);
+    if (insertionPool == null || insertionPool.isTerminated()) {
+      insertionPool =
+          (ThreadPoolExecutor)
+              IoTDBThreadPoolFactory.newFixedThreadPool(
+                  updateCoreSize, ThreadName.INSERTION_SERVICE.getName());
+    } else if (insertionPool.getCorePoolSize() > updateCoreSize) {
+      insertionPool.setCorePoolSize(updateCoreSize);
+      insertionPool.setMaximumPoolSize(updateCoreSize);
+    } else if (insertionPool.getCorePoolSize() < updateCoreSize) {
+      insertionPool.setMaximumPoolSize(updateCoreSize);
+      insertionPool.setCorePoolSize(updateCoreSize);
+    }
+  }
+
+  @Override
+  public void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException {
+
+    if (insertTabletPlan.getRowCount() == 0) {
+      return;
+    }
+    try {
+      insertTabletPlan.setMeasurementMNodes(
+          new IMeasurementMNode[insertTabletPlan.getMeasurements().length]);
+
+      StorageEngine.getInstance().insertTablet(insertTabletPlan);
+
+      if (insertTabletPlan.getFailedMeasurements() != null) {
+        checkFailedMeasurments(insertTabletPlan);
+      }
+    } catch (StorageEngineException | MetadataException e) {
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+        StatMonitor.getInstance().updateFailedStatValue();
+      }
+      throw new QueryProcessException(e);
+    } catch (Exception e) {
+      // update failed statistics
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+        StatMonitor.getInstance().updateFailedStatValue();
+      }
+      throw e;
+    }
+  }
+
+  private boolean operateAuthor(AuthorPlan author) throws QueryProcessException {
+    AuthorOperator.AuthorType authorType = author.getAuthorType();
+    String userName = author.getUserName();
+    String roleName = author.getRoleName();
+    String password = author.getPassword();
+    String newPassword = author.getNewPassword();
+    Set<Integer> permissions = author.getPermissions();
+    PartialPath nodeName = author.getNodeName();
+    try {
+      switch (authorType) {
+        case UPDATE_USER:
+          authorizer.updateUserPassword(userName, newPassword);
+          break;
+        case CREATE_USER:
+          authorizer.createUser(userName, password);
+          break;
+        case CREATE_ROLE:
+          authorizer.createRole(roleName);
+          break;
+        case DROP_USER:
+          authorizer.deleteUser(userName);
+          break;
+        case DROP_ROLE:
+          authorizer.deleteRole(roleName);
+          break;
+        case GRANT_ROLE:
+          for (int i : permissions) {
+            authorizer.grantPrivilegeToRole(roleName, nodeName.getFullPath(), i);
+          }
+          break;
+        case GRANT_USER:
+          for (int i : permissions) {
+            authorizer.grantPrivilegeToUser(userName, nodeName.getFullPath(), i);
+          }
+          break;
+        case GRANT_ROLE_TO_USER:
+          authorizer.grantRoleToUser(roleName, userName);
+          break;
+        case REVOKE_USER:
+          for (int i : permissions) {
+            authorizer.revokePrivilegeFromUser(userName, nodeName.getFullPath(), i);
+          }
+          break;
+        case REVOKE_ROLE:
+          for (int i : permissions) {
+            authorizer.revokePrivilegeFromRole(roleName, nodeName.getFullPath(), i);
+          }
+          break;
+        case REVOKE_ROLE_FROM_USER:
+          authorizer.revokeRoleFromUser(roleName, userName);
+          break;
+        default:
+          throw new QueryProcessException("Unsupported operation " + authorType);
+      }
+    } catch (AuthException e) {
+      throw new QueryProcessException(e.getMessage(), true);
+    }
+    return true;
+  }
+
+  private boolean operateWatermarkEmbedding(List<String> users, boolean useWatermark)
+      throws QueryProcessException {
+    try {
+      for (String user : users) {
+        authorizer.setUserUseWaterMark(user, useWatermark);
+      }
+    } catch (AuthException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+    return true;
+  }
+
+  private boolean createTimeSeries(CreateTimeSeriesPlan createTimeSeriesPlan)
+      throws QueryProcessException {
+    try {
+      IoTDB.metaManager.createTimeseries(createTimeSeriesPlan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  private boolean createAlignedTimeSeries(CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan)
+      throws QueryProcessException {
+    try {
+      IoTDB.metaManager.createAlignedTimeSeries(createAlignedTimeSeriesPlan);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  @SuppressWarnings("squid:S3776") // high Cognitive Complexity
+  private boolean createMultiTimeSeries(CreateMultiTimeSeriesPlan multiPlan)
+      throws BatchProcessException {
+    int dataTypeIdx = 0;
+    for (int i = 0; i < multiPlan.getPaths().size(); i++) {
+      if (multiPlan.getResults().containsKey(i) || multiPlan.isExecuted(i)) {
+        continue;
+      }
+      PartialPath path = multiPlan.getPaths().get(i);
+      String measurement = path.getMeasurement();
+      CreateTimeSeriesPlan plan =
+          new CreateTimeSeriesPlan(
+              multiPlan.getPaths().get(i),
+              multiPlan.getDataTypes().get(i),
+              multiPlan.getEncodings().get(i),
+              multiPlan.getCompressors().get(i),
+              multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
+              multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
+              multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
+              multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
+      dataTypeIdx++;
+      try {
+        createTimeSeries(plan);
+      } catch (QueryProcessException e) {
+        multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      }
+    }
+    if (!multiPlan.getResults().isEmpty()) {
+      throw new BatchProcessException(multiPlan.getFailingStatus());
+    }
+    return true;
+  }
+
+  protected boolean deleteTimeSeries(DeleteTimeSeriesPlan deleteTimeSeriesPlan)
+      throws QueryProcessException {
+    AUDIT_LOGGER.info("delete timeseries {}", deleteTimeSeriesPlan.getPaths());
+    List<PartialPath> deletePathList = deleteTimeSeriesPlan.getPaths();
+    for (int i = 0; i < deletePathList.size(); i++) {
+      PartialPath path = deletePathList.get(i);
+      try {
+        StorageEngine.getInstance()
+            .deleteTimeseries(
+                path, deleteTimeSeriesPlan.getIndex(), deleteTimeSeriesPlan.getPartitionFilter());
+        String failed = IoTDB.metaManager.deleteTimeseries(path);
+        if (failed != null) {
+          deleteTimeSeriesPlan
+              .getResults()
+              .put(i, RpcUtils.getStatus(TSStatusCode.NODE_DELETE_FAILED_ERROR, failed));
+        }
+      } catch (StorageEngineException | MetadataException e) {
+        deleteTimeSeriesPlan
+            .getResults()
+            .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      }
+    }
+    if (!deleteTimeSeriesPlan.getResults().isEmpty()) {
+      throw new BatchProcessException(deleteTimeSeriesPlan.getFailingStatus());
+    }
+    return true;
+  }
+
+  private boolean alterTimeSeries(AlterTimeSeriesPlan alterTimeSeriesPlan)
+      throws QueryProcessException {
+    PartialPath path = alterTimeSeriesPlan.getPath();
+    Map<String, String> alterMap = alterTimeSeriesPlan.getAlterMap();
+    try {
+      switch (alterTimeSeriesPlan.getAlterType()) {
+        case RENAME:
+          String beforeName = alterMap.keySet().iterator().next();
+          String currentName = alterMap.get(beforeName);
+          IoTDB.metaManager.renameTagOrAttributeKey(beforeName, currentName, path);
+          break;
+        case SET:
+          IoTDB.metaManager.setTagsOrAttributesValue(alterMap, path);
+          break;
+        case DROP:
+          IoTDB.metaManager.dropTagsOrAttributes(alterMap.keySet(), path);
+          break;
+        case ADD_TAGS:
+          IoTDB.metaManager.addTags(alterMap, path);
+          break;
+        case ADD_ATTRIBUTES:
+          IoTDB.metaManager.addAttributes(alterMap, path);
+          break;
+        case UPSERT:
+          IoTDB.metaManager.upsertTagsAndAttributes(
+              alterTimeSeriesPlan.getAlias(),
+              alterTimeSeriesPlan.getTagsMap(),
+              alterTimeSeriesPlan.getAttributesMap(),
+              path);
+          break;
+      }
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    } catch (IOException e) {
+      throw new QueryProcessException(
+          String.format(
+              "Something went wrong while read/write the [%s]'s tag/attribute info.",
+              path.getFullPath()));
+    }
+    return true;
+  }
+
+  public boolean setStorageGroup(SetStorageGroupPlan setStorageGroupPlan)
+      throws QueryProcessException {
+    AUDIT_LOGGER.info("set storage group to {}", setStorageGroupPlan.getPaths());
+    PartialPath path = setStorageGroupPlan.getPath();
+    try {
+      IoTDB.metaManager.setStorageGroup(path);
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  protected boolean deleteStorageGroups(DeleteStorageGroupPlan deleteStorageGroupPlan)
+      throws QueryProcessException {
+    AUDIT_LOGGER.info("delete storage group {}", deleteStorageGroupPlan.getPaths());
+    List<PartialPath> deletePathList = new ArrayList<>();
+    try {
+      for (PartialPath storageGroupPath : deleteStorageGroupPlan.getPaths()) {
+        List<PartialPath> allRelatedStorageGroupPath =
+            IoTDB.metaManager.getMatchedStorageGroups(storageGroupPath);
+        if (allRelatedStorageGroupPath.isEmpty()) {
+          throw new PathNotExistException(storageGroupPath.getFullPath(), true);
+        }
+        for (PartialPath path : allRelatedStorageGroupPath) {
+          StorageEngine.getInstance().deleteStorageGroup(path);
+          deletePathList.add(path);
+        }
+      }
+      IoTDB.metaManager.deleteStorageGroups(deletePathList);
+      operateClearCache();
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+    return true;
+  }
+
+  protected QueryDataSet processAuthorQuery(AuthorPlan plan) throws QueryProcessException {
+    AuthorType authorType = plan.getAuthorType();
+    String userName = plan.getUserName();
+    String roleName = plan.getRoleName();
+    PartialPath path = plan.getNodeName();
+
+    ListDataSet dataSet;
+
+    try {
+      switch (authorType) {
+        case LIST_ROLE:
+          dataSet = executeListRole(plan);
+          break;
+        case LIST_USER:
+          dataSet = executeListUser(plan);
+          break;
+        case LIST_ROLE_USERS:
+          dataSet = executeListRoleUsers(roleName);
+          break;
+        case LIST_USER_ROLES:
+          dataSet = executeListUserRoles(userName);
+          break;
+        case LIST_ROLE_PRIVILEGE:
+          dataSet = executeListRolePrivileges(roleName, path);
+          break;
+        case LIST_USER_PRIVILEGE:
+          dataSet = executeListUserPrivileges(userName, path);
+          break;
+        default:
+          throw new QueryProcessException("Unsupported operation " + authorType);
+      }
+    } catch (AuthException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+    return dataSet;
+  }
+
+  private ListDataSet executeListRole(AuthorPlan plan) throws AuthException {
+    ListDataSet dataSet =
+        new ListDataSet(
+            Collections.singletonList(new PartialPath(COLUMN_ROLE, false)),
+            Collections.singletonList(TSDataType.TEXT));
+
+    // check if current user is granted list_role privilege
+    boolean hasListRolePrivilege =
+        AuthorityChecker.check(
+            plan.getLoginUserName(),
+            Collections.emptyList(),
+            plan.getOperatorType(),
+            plan.getLoginUserName());
+    if (!hasListRolePrivilege) {
+      return dataSet;
+    }
+
+    List<String> roleList = authorizer.listAllRoles();
+    addToDataSet(roleList, dataSet);
+    return dataSet;
+  }
+
+  private void addToDataSet(List<String> strResults, ListDataSet dataSet) {
+    int index = 0;
+    for (String role : strResults) {
+      RowRecord record = new RowRecord(index++);
+      Field field = new Field(TSDataType.TEXT);
+      field.setBinaryV(new Binary(role));
+      record.addField(field);
+      dataSet.putRecord(record);
+    }
+  }
+
+  private ListDataSet executeListUser(AuthorPlan plan) throws AuthException {
+    ListDataSet dataSet =
+        new ListDataSet(
+            Collections.singletonList(new PartialPath(COLUMN_USER, false)),
+            Collections.singletonList(TSDataType.TEXT));
+
+    // check if current user is granted list_user privilege
+    boolean hasListUserPrivilege =
+        AuthorityChecker.check(
+            plan.getLoginUserName(),
+            Collections.singletonList((plan.getNodeName())),
+            plan.getOperatorType(),
+            plan.getLoginUserName());
+    if (!hasListUserPrivilege) {
+      return dataSet;
+    }
+
+    List<String> userList = authorizer.listAllUsers();
+    addToDataSet(userList, dataSet);
+    return dataSet;
+  }
+
+  private ListDataSet executeListRoleUsers(String roleName) throws AuthException {
+    Role role = authorizer.getRole(roleName);
+    if (role == null) {
+      throw new AuthException("No such role : " + roleName);
+    }
+    ListDataSet dataSet =
+        new ListDataSet(
+            Collections.singletonList(new PartialPath(COLUMN_USER, false)),
+            Collections.singletonList(TSDataType.TEXT));
+    List<String> userList = authorizer.listAllUsers();
+    int index = 0;
+    for (String userN : userList) {
+      User userObj = authorizer.getUser(userN);
+      if (userObj != null && userObj.hasRole(roleName)) {
+        RowRecord record = new RowRecord(index++);
+        Field field = new Field(TSDataType.TEXT);
+        field.setBinaryV(new Binary(userN));
+        record.addField(field);
+        dataSet.putRecord(record);
+      }
+    }
+    return dataSet;
+  }
+
+  private ListDataSet executeListUserRoles(String userName) throws AuthException {
+    User user = authorizer.getUser(userName);
+    if (user != null) {
+      ListDataSet dataSet =
+          new ListDataSet(
+              Collections.singletonList(new PartialPath(COLUMN_ROLE, false)),
+              Collections.singletonList(TSDataType.TEXT));
+      int index = 0;
+      for (String roleN : user.getRoleList()) {
+        RowRecord record = new RowRecord(index++);
+        Field field = new Field(TSDataType.TEXT);
+        field.setBinaryV(new Binary(roleN));
+        record.addField(field);
+        dataSet.putRecord(record);
+      }
+      return dataSet;
+    } else {
+      throw new AuthException("No such user : " + userName);
+    }
+  }
+
+  private ListDataSet executeListRolePrivileges(String roleName, PartialPath path)
+      throws AuthException {
+    Role role = authorizer.getRole(roleName);
+    if (role != null) {
+      List<PartialPath> headerList = new ArrayList<>();
+      List<TSDataType> typeList = new ArrayList<>();
+      headerList.add(new PartialPath(COLUMN_PRIVILEGE, false));
+      typeList.add(TSDataType.TEXT);
+      ListDataSet dataSet = new ListDataSet(headerList, typeList);
+      int index = 0;
+      for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
+        if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
+          RowRecord record = new RowRecord(index++);
+          Field field = new Field(TSDataType.TEXT);
+          field.setBinaryV(new Binary(pathPrivilege.toString()));
+          record.addField(field);
+          dataSet.putRecord(record);
+        }
+      }
+      return dataSet;
+    } else {
+      throw new AuthException("No such role : " + roleName);
+    }
+  }
+
+  private ListDataSet executeListUserPrivileges(String userName, PartialPath path)
+      throws AuthException {
+    User user = authorizer.getUser(userName);
+    if (user == null) {
+      throw new AuthException("No such user : " + userName);
+    }
+    List<PartialPath> headerList = new ArrayList<>();
+    List<TSDataType> typeList = new ArrayList<>();
+    headerList.add(new PartialPath(COLUMN_ROLE, false));
+    headerList.add(new PartialPath(COLUMN_PRIVILEGE, false));
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.TEXT);
+    ListDataSet dataSet = new ListDataSet(headerList, typeList);
+    int index = 0;
+    for (PathPrivilege pathPrivilege : user.getPrivilegeList()) {
+      if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
+        RowRecord record = new RowRecord(index++);
+        Field roleF = new Field(TSDataType.TEXT);
+        roleF.setBinaryV(new Binary(""));
+        record.addField(roleF);
+        Field privilegeF = new Field(TSDataType.TEXT);
+        privilegeF.setBinaryV(new Binary(pathPrivilege.toString()));
+        record.addField(privilegeF);
+        dataSet.putRecord(record);
+      }
+    }
+    for (String roleN : user.getRoleList()) {
+      Role role = authorizer.getRole(roleN);
+      if (role == null) {
+        continue;
+      }
+      for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
+        if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
+          RowRecord record = new RowRecord(index++);
+          Field roleF = new Field(TSDataType.TEXT);
+          roleF.setBinaryV(new Binary(roleN));
+          record.addField(roleF);
+          Field privilegeF = new Field(TSDataType.TEXT);
+          privilegeF.setBinaryV(new Binary(pathPrivilege.toString()));
+          record.addField(privilegeF);
+          dataSet.putRecord(record);
+        }
+      }
+    }
+    return dataSet;
+  }
+
+  @SuppressWarnings("unused") // for the distributed version
+  protected void loadConfiguration(LoadConfigurationPlan plan) throws QueryProcessException {
+    IoTDBDescriptor.getInstance().loadHotModifiedProps();
+  }
+
+  private QueryDataSet processShowMergeStatus() {
+    List<PartialPath> headerList = new ArrayList<>();
+    List<TSDataType> typeList = new ArrayList<>();
+    headerList.add(new PartialPath(COLUMN_STORAGE_GROUP, false));
+    headerList.add(new PartialPath(COLUMN_TASK_NAME, false));
+    headerList.add(new PartialPath(COLUMN_CREATED_TIME, false));
+    headerList.add(new PartialPath(COLUMN_PROGRESS, false));
+    headerList.add(new PartialPath(COLUMN_CANCELLED, false));
+    headerList.add(new PartialPath(COLUMN_DONE, false));
+
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.BOOLEAN);
+    typeList.add(TSDataType.BOOLEAN);
+    ListDataSet dataSet = new ListDataSet(headerList, typeList);
+    Map<String, List<TaskStatus>>[] taskStatus = MergeManager.getINSTANCE().collectTaskStatus();
+    for (Map<String, List<TaskStatus>> statusMap : taskStatus) {
+      for (Entry<String, List<TaskStatus>> stringListEntry : statusMap.entrySet()) {
+        for (TaskStatus status : stringListEntry.getValue()) {
+          dataSet.putRecord(toRowRecord(status, stringListEntry.getKey()));
+        }
+      }
+    }
+    return dataSet;
+  }
+
+  public RowRecord toRowRecord(TaskStatus status, String storageGroup) {
+    RowRecord record = new RowRecord(0);
+    record.addField(new Binary(storageGroup), TSDataType.TEXT);
+    record.addField(new Binary(status.getTaskName()), TSDataType.TEXT);
+    record.addField(new Binary(status.getCreatedTime()), TSDataType.TEXT);
+    record.addField(new Binary(status.getProgress()), TSDataType.TEXT);
+    record.addField(status.isCancelled(), TSDataType.BOOLEAN);
+    record.addField(status.isDone(), TSDataType.BOOLEAN);
+    return record;
+  }
+
+  private QueryDataSet processShowQueryProcesslist() {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(new PartialPath(QUERY_ID, false), new PartialPath(STATEMENT, false)),
+            Arrays.asList(TSDataType.INT64, TSDataType.TEXT));
+    QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+    for (Entry<Long, QueryContext> context : queryTimeManager.getQueryContextMap().entrySet()) {
+      RowRecord record = new RowRecord(context.getValue().getStartTime());
+      record.addField(context.getKey(), TSDataType.INT64);
+      record.addField(new Binary(context.getValue().getStatement()), TSDataType.TEXT);
+      listDataSet.putRecord(record);
+    }
+    return listDataSet;
+  }
+
+  /**
+   * @param storageGroups the storage groups to check
+   * @return List of PartialPath the storage groups that not exist
+   */
+  List<PartialPath> checkStorageGroupExist(List<PartialPath> storageGroups) {
+    List<PartialPath> noExistSg = new ArrayList<>();
+    if (storageGroups == null) {
+      return noExistSg;
+    }
+    for (PartialPath storageGroup : storageGroups) {
+      if (!IoTDB.metaManager.isStorageGroup(storageGroup)) {
+        noExistSg.add(storageGroup);
+      }
+    }
+    return noExistSg;
+  }
+
+  private void settle(SettlePlan plan) throws StorageEngineException {
+    if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
+      throw new StorageEngineException(
+          "Current system mode is read only, does not support file settle");
+    }
+    if (!SettleService.getINSTANCE().isRecoverFinish()) {
+      throw new StorageEngineException("Existing sg that is not ready, please try later.");
+    }
+    PartialPath sgPath = null;
+    try {
+      List<TsFileResource> seqResourcesToBeSettled = new ArrayList<>();
+      List<TsFileResource> unseqResourcesToBeSettled = new ArrayList<>();
+      List<String> tsFilePaths = new ArrayList<>();
+      if (plan.isSgPath()) {
+        sgPath = plan.getSgPath();
+      } else {
+        String tsFilePath = plan.getTsFilePath();
+        if (new File(tsFilePath).isDirectory()) {
+          throw new WriteProcessException("The file should not be a directory.");
+        } else if (!new File(tsFilePath).exists()) {
+          throw new WriteProcessException("The tsFile " + tsFilePath + " is not existed.");
+        }
+        sgPath = SettleService.getINSTANCE().getSGByFilePath(tsFilePath);
+        tsFilePaths.add(tsFilePath);
+      }
+      StorageEngine.getInstance()
+          .getResourcesToBeSettled(
+              sgPath, seqResourcesToBeSettled, unseqResourcesToBeSettled, tsFilePaths);
+      SettleService.getINSTANCE().startSettling(seqResourcesToBeSettled, unseqResourcesToBeSettled);
+      StorageEngine.getInstance().setSettling(sgPath, false);
+    } catch (WriteProcessException e) {
+      if (sgPath != null) {
+        StorageEngine.getInstance().setSettling(sgPath, false);
+      }
+      throw new StorageEngineException(e.getMessage());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
index 6f8e823a4e..c9bcdf1cd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
@@ -33,8 +33,6 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
@@ -44,6 +42,8 @@ import org.apache.iotdb.db.query.control.QueryTimeManager;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.SessionTimeoutManager;
 import org.apache.iotdb.db.query.control.tracing.TracingManager;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 27bba9d092..11a2026e5c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -39,10 +39,10 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java
index 9766bfc85b..860bdd0ab7 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableResourceControlTest.java
@@ -30,9 +30,9 @@ import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java
index 526226884f..e3e21b1cb6 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRestartTest.java
@@ -25,11 +25,11 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
index e70e5ee404..21284733fe 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -34,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java
index 2a596895e9..60ba99f132 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/LastQueryWithIDTable.java
@@ -26,10 +26,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java
index 8aeb162b48..a449f92ff0 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryAlignedTimeseriesWithIDTableTest.java
@@ -28,10 +28,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
index 329e4bf090..f7fd194709 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
@@ -28,10 +28,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
index 5677ad27c8..4f8e894bb8 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
@@ -25,12 +25,12 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
index 062bbc3611..db9584f819 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
@@ -23,10 +23,10 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
index 75213a288a..bd0cb839cf 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
@@ -25,12 +25,12 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
index d33d1c9230..3090f5b070 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
@@ -36,6 +36,8 @@ import java.nio.ByteBuffer;
 /** Unit tests of AggregateResult without desc aggregate result. */
 public class AggregateResultTest {
 
+  private static final double maxError = 0.0001d;
+
   @Test
   public void avgAggrResultTest() throws QueryProcessException, IOException {
     AggregateResult avgAggrResult1 =
@@ -291,4 +293,34 @@ public class AggregateResultTest {
     AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
     Assert.assertEquals(2d, (double) result.getResult(), 0.01);
   }
+
+  @Test
+  public void validityAggrResultTest() throws QueryProcessException, IOException {
+    AggregateResult validityAggrResult1 =
+        AggregateResultFactory.getAggrResultByName(SQLConstant.VALIDITY, TSDataType.DOUBLE, true);
+    AggregateResult validityAggrResult2 =
+        AggregateResultFactory.getAggrResultByName(SQLConstant.VALIDITY, TSDataType.DOUBLE, true);
+
+    Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    statistics1.update(1L, 1d);
+    statistics1.update(2L, 1d);
+    statistics2.update(3L, 1d);
+    statistics2.update(4L, 3d);
+
+    validityAggrResult1.updateResultFromStatistics(statistics1);
+    validityAggrResult2.updateResultFromStatistics(statistics2);
+    //    System.out.println(validityAggrResult1.getResult());
+    //    System.out.println(validityAggrResult2.getResult());
+    validityAggrResult1.merge(validityAggrResult2);
+    //    System.out.println(validityAggrResult1.getResult());
+
+    //    Assert.assertEquals(0.25, (double) validityAggrResult1.getResult(), maxError);
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    validityAggrResult1.serializeTo(outputStream);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
+    AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
+    Assert.assertEquals(0.25, (double) result.getResult(), maxError);
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java b/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
index d038b3467f..35fab752ab 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
@@ -21,170 +21,47 @@ package org.apache.iotdb.db.query.aggregation;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.query.aggregation.impl.ValidityAggrResult;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
-import org.apache.iotdb.tsfile.utils.Binary;
 
-import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 /** Unit tests of desc aggregate result. */
 public class DescAggregateResultTest {
 
   @Test
-  public void maxTimeDescAggrResultTest() throws QueryProcessException, IOException {
-    AggregateResult maxTimeDescAggrResult =
-        AggregateResultFactory.getAggrResultByName(SQLConstant.MAX_TIME, TSDataType.FLOAT, false);
-
-    Statistics statistics1 = Statistics.getStatsByType(TSDataType.FLOAT);
-    Statistics statistics2 = Statistics.getStatsByType(TSDataType.FLOAT);
-    statistics1.update(10L, 10.0f);
-    statistics2.update(1L, 1.0f);
-
-    maxTimeDescAggrResult.updateResultFromStatistics(statistics1);
-    Assert.assertEquals(10L, (long) maxTimeDescAggrResult.getResult());
-    maxTimeDescAggrResult.updateResultFromStatistics(statistics2);
-    Assert.assertEquals(10L, (long) maxTimeDescAggrResult.getResult());
-
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    maxTimeDescAggrResult.serializeTo(outputStream);
-    ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
-    AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
-    Assert.assertEquals(10L, (long) result.getResult());
-
-    maxTimeDescAggrResult.reset();
-    BatchData batchData = BatchDataFactory.createBatchData(TSDataType.FLOAT, false, false);
-    batchData.putFloat(1, 1.0F);
-    batchData.putFloat(2, 2.0F);
-    batchData.putFloat(3, 3.0F);
-    batchData.putFloat(4, 4.0F);
-    batchData.putFloat(5, 5.0F);
-    batchData.resetBatchData();
-    IBatchDataIterator it = batchData.getBatchDataIterator();
-    maxTimeDescAggrResult.updateResultFromPageData(it);
-    Assert.assertEquals(5L, maxTimeDescAggrResult.getResult());
-    it.reset();
-    maxTimeDescAggrResult.updateResultFromPageData(it, 2, 5);
-    Assert.assertEquals(5L, maxTimeDescAggrResult.getResult());
-  }
-
-  @Test
-  public void minTimeDescAggrResultTest() throws QueryProcessException, IOException {
-    AggregateResult minTimeDescAggrResult =
-        AggregateResultFactory.getAggrResultByName(SQLConstant.MIN_TIME, TSDataType.FLOAT, false);
-
-    Statistics statistics1 = Statistics.getStatsByType(TSDataType.FLOAT);
-    Statistics statistics2 = Statistics.getStatsByType(TSDataType.FLOAT);
-    statistics1.update(10L, 10.0f);
-    statistics2.update(1L, 1.0f);
-
-    minTimeDescAggrResult.updateResultFromStatistics(statistics1);
-    Assert.assertEquals(10L, (long) minTimeDescAggrResult.getResult());
-    minTimeDescAggrResult.updateResultFromStatistics(statistics2);
-    Assert.assertEquals(1L, (long) minTimeDescAggrResult.getResult());
-
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    minTimeDescAggrResult.serializeTo(outputStream);
-    ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
-    AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
-    Assert.assertEquals(1L, (long) result.getResult());
-
-    minTimeDescAggrResult.reset();
-    BatchData batchData = BatchDataFactory.createBatchData(TSDataType.FLOAT, false, false);
-    batchData.putFloat(1, 1.0F);
-    batchData.putFloat(2, 2.0F);
-    batchData.putFloat(3, 3.0F);
-    batchData.putFloat(4, 4.0F);
-    batchData.putFloat(5, 5.0F);
-    batchData.resetBatchData();
-    IBatchDataIterator it = batchData.getBatchDataIterator();
-    minTimeDescAggrResult.updateResultFromPageData(it);
-    Assert.assertEquals(1L, minTimeDescAggrResult.getResult());
-    it.reset();
-    minTimeDescAggrResult.updateResultFromPageData(it, 1, 3);
-    Assert.assertEquals(1L, minTimeDescAggrResult.getResult());
-  }
-
-  @Test
-  public void firstValueDescAggrResultTest() throws QueryProcessException, IOException {
-    AggregateResult firstValueDescAggrResult =
-        AggregateResultFactory.getAggrResultByName(
-            SQLConstant.FIRST_VALUE, TSDataType.BOOLEAN, false);
-
-    Statistics statistics1 = Statistics.getStatsByType(TSDataType.BOOLEAN);
-    Statistics statistics2 = Statistics.getStatsByType(TSDataType.BOOLEAN);
-    statistics1.update(10L, true);
-    statistics2.update(1L, false);
-
-    firstValueDescAggrResult.updateResultFromStatistics(statistics1);
-    Assert.assertEquals(true, firstValueDescAggrResult.getResult());
-    firstValueDescAggrResult.updateResultFromStatistics(statistics2);
-    Assert.assertEquals(false, firstValueDescAggrResult.getResult());
-
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    firstValueDescAggrResult.serializeTo(outputStream);
-    ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
-    AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
-    Assert.assertEquals(false, result.getResult());
-
-    firstValueDescAggrResult.reset();
-    BatchData batchData = BatchDataFactory.createBatchData(TSDataType.BOOLEAN, false, false);
-    batchData.putBoolean(1, true);
-    batchData.putBoolean(2, false);
-    batchData.putBoolean(3, false);
-    batchData.putBoolean(4, true);
-    batchData.putBoolean(5, false);
-    batchData.resetBatchData();
-    IBatchDataIterator it = batchData.getBatchDataIterator();
-    firstValueDescAggrResult.updateResultFromPageData(it);
-    Assert.assertTrue((boolean) firstValueDescAggrResult.getResult());
-    it.reset();
-    firstValueDescAggrResult.updateResultFromPageData(it, 1, 3);
-    Assert.assertTrue((boolean) firstValueDescAggrResult.getResult());
-  }
-
-  @Test
-  public void lastValueDescAggrResultTest() throws QueryProcessException, IOException {
-    AggregateResult lastValueDescAggrResult =
-        AggregateResultFactory.getAggrResultByName(SQLConstant.LAST_VALUE, TSDataType.TEXT, false);
-
-    Statistics statistics1 = Statistics.getStatsByType(TSDataType.TEXT);
-    Statistics statistics2 = Statistics.getStatsByType(TSDataType.TEXT);
-    statistics1.update(10L, new Binary("last"));
-    statistics2.update(1L, new Binary("first"));
-
-    lastValueDescAggrResult.updateResultFromStatistics(statistics1);
-    Assert.assertEquals("last", String.valueOf(lastValueDescAggrResult.getResult()));
-    lastValueDescAggrResult.updateResultFromStatistics(statistics2);
-    Assert.assertEquals("last", String.valueOf(lastValueDescAggrResult.getResult()));
-
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    lastValueDescAggrResult.serializeTo(outputStream);
-    ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
-    AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
-    Assert.assertEquals("last", String.valueOf(result.getResult()));
-
-    lastValueDescAggrResult.reset();
-    BatchData batchData = BatchDataFactory.createBatchData(TSDataType.TEXT, false, false);
-    batchData.putBinary(1L, new Binary("a"));
-    batchData.putBinary(2L, new Binary("b"));
-    batchData.putBinary(3L, new Binary("c"));
-    batchData.putBinary(4L, new Binary("d"));
-    batchData.putBinary(5L, new Binary("e"));
+  public void validityMergeTest() throws QueryProcessException, IOException {
+    AggregateResult ValidityAggrResult =
+        AggregateResultFactory.getAggrResultByName(SQLConstant.VALIDITY, TSDataType.DOUBLE, true);
+
+    System.out.println(Runtime.getRuntime().freeMemory() / 1024 / 1024);
+
+    BatchData batchData = BatchDataFactory.createBatchData(TSDataType.DOUBLE, true, true);
+    BatchData batchData2 = BatchDataFactory.createBatchData(TSDataType.DOUBLE, true, true);
+
+    for (int i = 0; i < 3000; i++) {
+      batchData.putDouble(1623311071000L - 3000 + i, 0d);
+    }
+    batchData.putDouble(1623311071000L, 0d);
+    batchData.putDouble(1623312051000L, 2d);
+    batchData.putDouble(1623312053000L, 2d);
+    batchData.putDouble(1623312055000L, 2d);
+    batchData.putDouble(1623312057000L, 2d);
+    for (int i = 0; i < 7024; i++) {
+      batchData2.putDouble(1623312057000L + i, 0d);
+    }
     batchData.resetBatchData();
+    batchData2.resetBatchData();
     IBatchDataIterator it = batchData.getBatchDataIterator();
-    lastValueDescAggrResult.updateResultFromPageData(it);
-    Assert.assertEquals("e", ((Binary) lastValueDescAggrResult.getResult()).getStringValue());
+    IBatchDataIterator it2 = batchData2.getBatchDataIterator();
+    ValidityAggrResult.updateResultFromPageData(it);
     it.reset();
-    lastValueDescAggrResult.updateResultFromPageData(it, 3, 5);
-    Assert.assertEquals("e", ((Binary) lastValueDescAggrResult.getResult()).getStringValue());
+    ValidityAggrResult.updateResultFromPageData(it2);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java
index 07fad6d9df..92171f832c 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithValueFilterTest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.query.dataset;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java
index 14873748bd..8909c62d82 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/ListDataSetTest.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java
index 8f9a15b1c1..6aa4073366 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/SingleDataSetTest.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java
index 34d7edc939..23ed66971f 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSetTest.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
index ba6578af53..141673fe59 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSetTest.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.db.query.dataset.groupby;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
index 03cd7c431c..1602a3f207 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.query.dataset.groupby;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
index 30f3468b8b..05f999139b 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.query.dataset.groupby;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java
index 522518c5ea..5d7193fc8d 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/valuefilter/RawQueryWithValueFilterTest.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
index 864b4c0048..b49d9a5f1c 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
@@ -29,8 +29,8 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.IPlanExecutor;
+import org.apache.iotdb.db.query.filter.executor.PlanExecutor;
 import org.apache.iotdb.db.tools.TsFileRewriteTool;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;